From 9215f5188cab48fb10139d48a0781e7ba3f2247e Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Thu, 12 Jan 2023 22:31:39 +0800 Subject: [PATCH] Feat/restore from snapshot (#1699) * feat: snapshot for folder * feat: snapshot for document Co-authored-by: nathan --- .../lib/user/presentation/router.dart | 23 +++-- frontend/rust-lib/flowy-core/src/lib.rs | 1 + .../down.sql | 2 + .../up.sql | 9 ++ .../rust-lib/flowy-database/src/schema.rs | 46 +++++---- .../flowy-document/src/editor/document.rs | 4 + .../rust-lib/flowy-document/src/manager.rs | 12 ++- .../flowy-document/src/old_editor/editor.rs | 5 + .../rev_sqlite/document_snapshot.rs | 96 +++++++++++++++++++ .../services/persistence/rev_sqlite/mod.rs | 2 + frontend/rust-lib/flowy-folder/src/manager.rs | 2 +- .../src/services/folder_editor.rs | 18 +++- .../src/services/persistence/migration.rs | 2 +- .../src/services/persistence/mod.rs | 4 +- ...shot_sqlite_impl.rs => folder_snapshot.rs} | 2 +- .../services/persistence/rev_sqlite/mod.rs | 4 +- frontend/rust-lib/flowy-grid/src/manager.rs | 2 +- .../flowy-grid/src/services/block_editor.rs | 5 + .../flowy-grid/src/services/grid_editor.rs | 4 + ...apshot_sqlite_impl.rs => grid_snapshot.rs} | 2 +- .../services/persistence/rev_sqlite/mod.rs | 4 +- .../src/services/view_editor/trait_impl.rs | 4 + .../flowy-revision/src/rev_manager.rs | 16 +++- .../flowy-revision/src/rev_snapshot.rs | 7 +- .../tests/revision_test/script.rs | 4 + .../src/client_folder/folder_pad.rs | 10 -- frontend/rust-lib/flowy-sync/src/errors.rs | 2 + frontend/rust-lib/flowy-sync/src/util.rs | 56 ++++++----- shared-lib/folder-rev-model/src/folder_rev.rs | 40 +++----- shared-lib/lib-ot/src/core/delta/ops.rs | 10 +- 30 files changed, 287 insertions(+), 111 deletions(-) create mode 100644 frontend/rust-lib/flowy-database/migrations/2023-01-12-131717_document_rev_snapshot/down.sql create mode 100644 frontend/rust-lib/flowy-database/migrations/2023-01-12-131717_document_rev_snapshot/up.sql create mode 100644 frontend/rust-lib/flowy-document/src/services/persistence/rev_sqlite/document_snapshot.rs rename frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/{folder_snapshot_sqlite_impl.rs => folder_snapshot.rs} (98%) rename frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/{grid_snapshot_sqlite_impl.rs => grid_snapshot.rs} (98%) diff --git a/frontend/app_flowy/lib/user/presentation/router.dart b/frontend/app_flowy/lib/user/presentation/router.dart index 4a735f4f37..a59f854ae7 100644 --- a/frontend/app_flowy/lib/user/presentation/router.dart +++ b/frontend/app_flowy/lib/user/presentation/router.dart @@ -5,6 +5,7 @@ import 'package:app_flowy/user/presentation/sign_up_screen.dart'; import 'package:app_flowy/user/presentation/skip_log_in_screen.dart'; import 'package:app_flowy/user/presentation/welcome_screen.dart'; import 'package:app_flowy/workspace/presentation/home/home_screen.dart'; +import 'package:appflowy_backend/dispatch/dispatch.dart'; import 'package:flowy_infra/time/duration.dart'; import 'package:flowy_infra_ui/widget/route/animation.dart'; import 'package:appflowy_backend/protobuf/flowy-user/protobuf.dart' @@ -44,21 +45,31 @@ class AuthRouter { class SplashRoute { Future pushWelcomeScreen( - BuildContext context, UserProfilePB userProfile) async { + BuildContext context, + UserProfilePB userProfile, + ) async { final screen = WelcomeScreen(userProfile: userProfile); - final workspaceId = await Navigator.of(context).push( + await Navigator.of(context).push( PageRoutes.fade( () => screen, RouteDurations.slow.inMilliseconds * .001, ), ); - // ignore: use_build_context_synchronously - pushHomeScreen(context, userProfile, workspaceId); + FolderEventReadCurrentWorkspace().send().then((result) { + result.fold( + (workspaceSettingPB) => + pushHomeScreen(context, userProfile, workspaceSettingPB), + (r) => null, + ); + }); } - void pushHomeScreen(BuildContext context, UserProfilePB userProfile, - WorkspaceSettingPB workspaceSetting) { + void pushHomeScreen( + BuildContext context, + UserProfilePB userProfile, + WorkspaceSettingPB workspaceSetting, + ) { Navigator.push( context, PageRoutes.fade( diff --git a/frontend/rust-lib/flowy-core/src/lib.rs b/frontend/rust-lib/flowy-core/src/lib.rs index dbe0c09601..cec8230819 100644 --- a/frontend/rust-lib/flowy-core/src/lib.rs +++ b/frontend/rust-lib/flowy-core/src/lib.rs @@ -91,6 +91,7 @@ fn crate_log_filter(level: String) -> String { filters.push(format!("lib_infra={}", level)); filters.push(format!("flowy_sync={}", level)); filters.push(format!("flowy_revision={}", level)); + filters.push(format!("flowy_revision_persistence={}", level)); filters.push(format!("flowy_task={}", level)); // filters.push(format!("lib_dispatch={}", level)); diff --git a/frontend/rust-lib/flowy-database/migrations/2023-01-12-131717_document_rev_snapshot/down.sql b/frontend/rust-lib/flowy-database/migrations/2023-01-12-131717_document_rev_snapshot/down.sql new file mode 100644 index 0000000000..5220bab0d3 --- /dev/null +++ b/frontend/rust-lib/flowy-database/migrations/2023-01-12-131717_document_rev_snapshot/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +DROP TABLE document_rev_snapshot; diff --git a/frontend/rust-lib/flowy-database/migrations/2023-01-12-131717_document_rev_snapshot/up.sql b/frontend/rust-lib/flowy-database/migrations/2023-01-12-131717_document_rev_snapshot/up.sql new file mode 100644 index 0000000000..bc6ffb0cee --- /dev/null +++ b/frontend/rust-lib/flowy-database/migrations/2023-01-12-131717_document_rev_snapshot/up.sql @@ -0,0 +1,9 @@ +-- Your SQL goes here +CREATE TABLE document_rev_snapshot ( + snapshot_id TEXT NOT NULL PRIMARY KEY DEFAULT '', + object_id TEXT NOT NULL DEFAULT '', + rev_id BIGINT NOT NULL DEFAULT 0, + base_rev_id BIGINT NOT NULL DEFAULT 0, + timestamp BIGINT NOT NULL DEFAULT 0, + data BLOB NOT NULL DEFAULT (x'') +); \ No newline at end of file diff --git a/frontend/rust-lib/flowy-database/src/schema.rs b/frontend/rust-lib/flowy-database/src/schema.rs index d45d20a938..148dd2556a 100644 --- a/frontend/rust-lib/flowy-database/src/schema.rs +++ b/frontend/rust-lib/flowy-database/src/schema.rs @@ -1,4 +1,6 @@ -table! { +// @generated automatically by Diesel CLI. + +diesel::table! { app_table (id) { id -> Text, workspace_id -> Text, @@ -13,7 +15,18 @@ table! { } } -table! { +diesel::table! { + document_rev_snapshot (snapshot_id) { + snapshot_id -> Text, + object_id -> Text, + rev_id -> BigInt, + base_rev_id -> BigInt, + timestamp -> BigInt, + data -> Binary, + } +} + +diesel::table! { document_rev_table (id) { id -> Integer, document_id -> Text, @@ -24,7 +37,7 @@ table! { } } -table! { +diesel::table! { folder_rev_snapshot (snapshot_id) { snapshot_id -> Text, object_id -> Text, @@ -35,14 +48,14 @@ table! { } } -table! { +diesel::table! { grid_block_index_table (row_id) { row_id -> Text, block_id -> Text, } } -table! { +diesel::table! { grid_meta_rev_table (id) { id -> Integer, object_id -> Text, @@ -53,7 +66,7 @@ table! { } } -table! { +diesel::table! { grid_rev_snapshot (snapshot_id) { snapshot_id -> Text, object_id -> Text, @@ -64,7 +77,7 @@ table! { } } -table! { +diesel::table! { grid_rev_table (id) { id -> Integer, object_id -> Text, @@ -75,7 +88,7 @@ table! { } } -table! { +diesel::table! { grid_view_rev_table (id) { id -> Integer, object_id -> Text, @@ -86,14 +99,14 @@ table! { } } -table! { +diesel::table! { kv_table (key) { key -> Text, value -> Binary, } } -table! { +diesel::table! { rev_snapshot (id) { id -> Integer, object_id -> Text, @@ -102,7 +115,7 @@ table! { } } -table! { +diesel::table! { rev_table (id) { id -> Integer, doc_id -> Text, @@ -114,7 +127,7 @@ table! { } } -table! { +diesel::table! { trash_table (id) { id -> Text, name -> Text, @@ -125,7 +138,7 @@ table! { } } -table! { +diesel::table! { user_table (id) { id -> Text, name -> Text, @@ -136,7 +149,7 @@ table! { } } -table! { +diesel::table! { view_table (id) { id -> Text, belong_to_id -> Text, @@ -152,7 +165,7 @@ table! { } } -table! { +diesel::table! { workspace_table (id) { id -> Text, name -> Text, @@ -164,8 +177,9 @@ table! { } } -allow_tables_to_appear_in_same_query!( +diesel::allow_tables_to_appear_in_same_query!( app_table, + document_rev_snapshot, document_rev_table, folder_rev_snapshot, grid_block_index_table, diff --git a/frontend/rust-lib/flowy-document/src/editor/document.rs b/frontend/rust-lib/flowy-document/src/editor/document.rs index 1642cd9418..026bbb3f27 100644 --- a/frontend/rust-lib/flowy-document/src/editor/document.rs +++ b/frontend/rust-lib/flowy-document/src/editor/document.rs @@ -86,6 +86,10 @@ impl RevisionObjectDeserializer for DocumentRevisionSerde { let document = Document::new(tree); Result::::Ok(document) } + + fn recover_operations_from_revisions(_revisions: Vec) -> Option { + None + } } impl RevisionObjectSerializer for DocumentRevisionSerde { diff --git a/frontend/rust-lib/flowy-document/src/manager.rs b/frontend/rust-lib/flowy-document/src/manager.rs index 7e52228058..a036de01e8 100644 --- a/frontend/rust-lib/flowy-document/src/manager.rs +++ b/frontend/rust-lib/flowy-document/src/manager.rs @@ -1,7 +1,10 @@ use crate::editor::{initial_document_content, AppFlowyDocumentEditor, DocumentRevisionMergeable}; use crate::entities::{DocumentVersionPB, EditParams}; use crate::old_editor::editor::{DeltaDocumentEditor, DeltaDocumentRevisionMergeable}; -use crate::services::rev_sqlite::{SQLiteDeltaDocumentRevisionPersistence, SQLiteDocumentRevisionPersistence}; +use crate::services::rev_sqlite::{ + SQLiteDeltaDocumentRevisionPersistence, SQLiteDocumentRevisionPersistence, + SQLiteDocumentRevisionSnapshotPersistence, +}; use crate::services::DocumentPersistence; use crate::{errors::FlowyError, DocumentCloudService}; use bytes::Bytes; @@ -261,15 +264,16 @@ impl DocumentManager { pool: Arc, ) -> Result>, FlowyError> { let user_id = self.user.user_id()?; - let disk_cache = SQLiteDocumentRevisionPersistence::new(&user_id, pool); - let configuration = RevisionPersistenceConfiguration::new(100, true); + let disk_cache = SQLiteDocumentRevisionPersistence::new(&user_id, pool.clone()); + let configuration = RevisionPersistenceConfiguration::new(200, true); let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache, configuration); + let snapshot_persistence = SQLiteDocumentRevisionSnapshotPersistence::new(doc_id, pool); Ok(RevisionManager::new( &user_id, doc_id, rev_persistence, DocumentRevisionMergeable(), - PhantomSnapshotPersistence(), + snapshot_persistence, )) } diff --git a/frontend/rust-lib/flowy-document/src/old_editor/editor.rs b/frontend/rust-lib/flowy-document/src/old_editor/editor.rs index b705ebefca..1e16239c5f 100644 --- a/frontend/rust-lib/flowy-document/src/old_editor/editor.rs +++ b/frontend/rust-lib/flowy-document/src/old_editor/editor.rs @@ -1,5 +1,6 @@ #![allow(unused_attributes)] #![allow(unused_attributes)] + use crate::old_editor::queue::{EditDocumentQueue, EditorCommand, EditorCommandSender}; use crate::{errors::FlowyError, DocumentEditor, DocumentUser}; use bytes::Bytes; @@ -260,6 +261,10 @@ impl RevisionObjectDeserializer for DeltaDocumentRevisionSerde { base_rev_id, }) } + + fn recover_operations_from_revisions(_revisions: Vec) -> Option { + None + } } impl RevisionObjectSerializer for DeltaDocumentRevisionSerde { diff --git a/frontend/rust-lib/flowy-document/src/services/persistence/rev_sqlite/document_snapshot.rs b/frontend/rust-lib/flowy-document/src/services/persistence/rev_sqlite/document_snapshot.rs new file mode 100644 index 0000000000..5e7a8d3791 --- /dev/null +++ b/frontend/rust-lib/flowy-document/src/services/persistence/rev_sqlite/document_snapshot.rs @@ -0,0 +1,96 @@ +use bytes::Bytes; +use flowy_database::{ + prelude::*, + schema::{document_rev_snapshot, document_rev_snapshot::dsl}, + ConnectionPool, +}; +use flowy_error::{internal_error, FlowyResult}; +use flowy_revision::{RevisionSnapshot, RevisionSnapshotDiskCache}; +use lib_infra::util::timestamp; +use std::sync::Arc; + +pub struct SQLiteDocumentRevisionSnapshotPersistence { + object_id: String, + pool: Arc, +} + +impl SQLiteDocumentRevisionSnapshotPersistence { + pub fn new(object_id: &str, pool: Arc) -> Self { + Self { + object_id: object_id.to_string(), + pool, + } + } + + fn gen_snapshot_id(&self, rev_id: i64) -> String { + format!("{}:{}", self.object_id, rev_id) + } +} + +impl RevisionSnapshotDiskCache for SQLiteDocumentRevisionSnapshotPersistence { + fn should_generate_snapshot_from_range(&self, start_rev_id: i64, current_rev_id: i64) -> bool { + (current_rev_id - start_rev_id) >= 150 + } + + fn write_snapshot(&self, rev_id: i64, data: Vec) -> FlowyResult<()> { + let conn = self.pool.get().map_err(internal_error)?; + let snapshot_id = self.gen_snapshot_id(rev_id); + let timestamp = timestamp(); + let record = ( + dsl::snapshot_id.eq(&snapshot_id), + dsl::object_id.eq(&self.object_id), + dsl::rev_id.eq(rev_id), + dsl::base_rev_id.eq(rev_id), + dsl::timestamp.eq(timestamp), + dsl::data.eq(data), + ); + let _ = insert_or_ignore_into(dsl::document_rev_snapshot) + .values(record) + .execute(&*conn)?; + Ok(()) + } + + fn read_snapshot(&self, rev_id: i64) -> FlowyResult> { + let conn = self.pool.get().map_err(internal_error)?; + let snapshot_id = self.gen_snapshot_id(rev_id); + let record = dsl::document_rev_snapshot + .filter(dsl::snapshot_id.eq(&snapshot_id)) + .first::(&*conn)?; + + Ok(Some(record.into())) + } + + fn read_last_snapshot(&self) -> FlowyResult> { + let conn = self.pool.get().map_err(internal_error)?; + let latest_record = dsl::document_rev_snapshot + .filter(dsl::object_id.eq(&self.object_id)) + .order(dsl::timestamp.desc()) + // .select(max(dsl::rev_id)) + // .select((dsl::id, dsl::object_id, dsl::rev_id, dsl::data)) + .first::(&*conn)?; + Ok(Some(latest_record.into())) + } +} + +#[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)] +#[table_name = "document_rev_snapshot"] +#[primary_key("snapshot_id")] +struct DocumentSnapshotRecord { + snapshot_id: String, + object_id: String, + rev_id: i64, + base_rev_id: i64, + timestamp: i64, + data: Vec, +} + +impl std::convert::From for RevisionSnapshot { + fn from(record: DocumentSnapshotRecord) -> Self { + RevisionSnapshot { + rev_id: record.rev_id, + base_rev_id: record.base_rev_id, + timestamp: record.timestamp, + data: Bytes::from(record.data), + } + } +} diff --git a/frontend/rust-lib/flowy-document/src/services/persistence/rev_sqlite/mod.rs b/frontend/rust-lib/flowy-document/src/services/persistence/rev_sqlite/mod.rs index e0c1920633..8102291344 100644 --- a/frontend/rust-lib/flowy-document/src/services/persistence/rev_sqlite/mod.rs +++ b/frontend/rust-lib/flowy-document/src/services/persistence/rev_sqlite/mod.rs @@ -1,5 +1,7 @@ mod document_rev_sqlite_v0; mod document_rev_sqlite_v1; +mod document_snapshot; pub use document_rev_sqlite_v0::*; pub use document_rev_sqlite_v1::*; +pub use document_snapshot::*; diff --git a/frontend/rust-lib/flowy-folder/src/manager.rs b/frontend/rust-lib/flowy-folder/src/manager.rs index 707985f356..fa324b16cd 100644 --- a/frontend/rust-lib/flowy-folder/src/manager.rs +++ b/frontend/rust-lib/flowy-folder/src/manager.rs @@ -170,7 +170,7 @@ impl FolderManager { let pool = self.persistence.db_pool()?; let object_id = folder_id.as_ref(); let disk_cache = SQLiteFolderRevisionPersistence::new(user_id, pool.clone()); - let configuration = RevisionPersistenceConfiguration::new(100, false); + let configuration = RevisionPersistenceConfiguration::new(200, false); let rev_persistence = RevisionPersistence::new(user_id, object_id, disk_cache, configuration); let rev_compactor = FolderRevisionMergeable(); diff --git a/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs b/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs index dde27fdd26..4743996cf3 100644 --- a/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs +++ b/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs @@ -9,7 +9,8 @@ use flowy_revision::{ RevisionWebSocket, }; use flowy_sync::client_folder::{FolderChangeset, FolderPad}; -use flowy_sync::util::make_operations_from_revisions; +use flowy_sync::server_folder::FolderOperations; +use flowy_sync::util::{make_operations_from_revisions, recover_operation_from_revisions}; use lib_infra::future::FutureResult; use lib_ot::core::EmptyAttributes; use parking_lot::RwLock; @@ -102,8 +103,19 @@ impl RevisionObjectDeserializer for FolderRevisionSerde { type Output = FolderPad; fn deserialize_revisions(_object_id: &str, revisions: Vec) -> FlowyResult { - let pad = FolderPad::from_revisions(revisions)?; - Ok(pad) + let operations: FolderOperations = make_operations_from_revisions(revisions)?; + Ok(FolderPad::from_operations(operations)?) + } + + fn recover_operations_from_revisions(revisions: Vec) -> Option { + if let Some(operations) = recover_operation_from_revisions(revisions, |operations| { + FolderPad::from_operations(operations.clone()).is_ok() + }) { + if let Ok(pad) = FolderPad::from_operations(operations) { + return Some(pad); + } + } + None } } diff --git a/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs b/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs index 8065ab6f1d..dde3f16702 100644 --- a/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs +++ b/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs @@ -123,7 +123,7 @@ fn migration_flag_key(user_id: &str, version: &str) -> String { md5(format!("{}{}", user_id, version,)) } -pub struct FolderRevisionResettable { +struct FolderRevisionResettable { folder_id: String, } diff --git a/frontend/rust-lib/flowy-folder/src/services/persistence/mod.rs b/frontend/rust-lib/flowy-folder/src/services/persistence/mod.rs index b4048476c3..b68bf27fe8 100644 --- a/frontend/rust-lib/flowy-folder/src/services/persistence/mod.rs +++ b/frontend/rust-lib/flowy-folder/src/services/persistence/mod.rs @@ -118,12 +118,12 @@ impl FolderPersistence { write_to_disk: true, }; - let disk_cache = mk_text_block_revision_disk_cache(user_id, pool); + let disk_cache = make_folder_revision_disk_cache(user_id, pool); disk_cache.delete_and_insert_records(folder_id.as_ref(), None, vec![record]) } } -pub fn mk_text_block_revision_disk_cache( +pub fn make_folder_revision_disk_cache( user_id: &str, pool: Arc, ) -> Arc, Error = FlowyError>> { diff --git a/frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/folder_snapshot_sqlite_impl.rs b/frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/folder_snapshot.rs similarity index 98% rename from frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/folder_snapshot_sqlite_impl.rs rename to frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/folder_snapshot.rs index 7ca2695a17..f64549c833 100644 --- a/frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/folder_snapshot_sqlite_impl.rs +++ b/frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/folder_snapshot.rs @@ -65,7 +65,7 @@ impl RevisionSnapshotDiskCache for SQLiteFolderRevisionSnapshotPersistence { let conn = self.pool.get().map_err(internal_error)?; let latest_record = dsl::folder_rev_snapshot .filter(dsl::object_id.eq(&self.object_id)) - .order(dsl::rev_id.desc()) + .order(dsl::timestamp.desc()) // .select(max(dsl::rev_id)) // .select((dsl::id, dsl::object_id, dsl::rev_id, dsl::data)) .first::(&*conn)?; diff --git a/frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/mod.rs b/frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/mod.rs index 754848674a..6b185637ba 100644 --- a/frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/mod.rs +++ b/frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/mod.rs @@ -1,5 +1,5 @@ mod folder_rev_sqlite; -mod folder_snapshot_sqlite_impl; +mod folder_snapshot; pub use folder_rev_sqlite::*; -pub use folder_snapshot_sqlite_impl::*; +pub use folder_snapshot::*; diff --git a/frontend/rust-lib/flowy-grid/src/manager.rs b/frontend/rust-lib/flowy-grid/src/manager.rs index c73d8b28e8..2421010121 100644 --- a/frontend/rust-lib/flowy-grid/src/manager.rs +++ b/frontend/rust-lib/flowy-grid/src/manager.rs @@ -162,7 +162,7 @@ impl GridManager { // Create revision persistence let disk_cache = SQLiteGridRevisionPersistence::new(&user_id, pool.clone()); - let configuration = RevisionPersistenceConfiguration::new(4, false); + let configuration = RevisionPersistenceConfiguration::new(6, false); let rev_persistence = RevisionPersistence::new(&user_id, grid_id, disk_cache, configuration); // Create snapshot persistence diff --git a/frontend/rust-lib/flowy-grid/src/services/block_editor.rs b/frontend/rust-lib/flowy-grid/src/services/block_editor.rs index 878f504b87..2201a7bb65 100644 --- a/frontend/rust-lib/flowy-grid/src/services/block_editor.rs +++ b/frontend/rust-lib/flowy-grid/src/services/block_editor.rs @@ -178,10 +178,15 @@ impl RevisionCloudService for GridBlockRevisionCloudService { struct GridBlockRevisionSerde(); impl RevisionObjectDeserializer for GridBlockRevisionSerde { type Output = GridBlockRevisionPad; + fn deserialize_revisions(object_id: &str, revisions: Vec) -> FlowyResult { let pad = GridBlockRevisionPad::from_revisions(object_id, revisions)?; Ok(pad) } + + fn recover_operations_from_revisions(_revisions: Vec) -> Option { + None + } } impl RevisionObjectSerializer for GridBlockRevisionSerde { diff --git a/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs b/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs index 226b6921d0..a5f25e68b4 100644 --- a/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs +++ b/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs @@ -887,6 +887,10 @@ impl RevisionObjectDeserializer for GridRevisionSerde { let pad = GridRevisionPad::from_revisions(revisions)?; Ok(pad) } + + fn recover_operations_from_revisions(_revisions: Vec) -> Option { + None + } } impl RevisionObjectSerializer for GridRevisionSerde { fn combine_revisions(revisions: Vec) -> FlowyResult { diff --git a/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_snapshot_sqlite_impl.rs b/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_snapshot.rs similarity index 98% rename from frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_snapshot_sqlite_impl.rs rename to frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_snapshot.rs index d7456c31b8..baacfd3011 100644 --- a/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_snapshot_sqlite_impl.rs +++ b/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_snapshot.rs @@ -86,7 +86,7 @@ impl RevisionSnapshotDiskCache for SQLiteGridRevisionSnapshotPersistence { let conn = self.pool.get().map_err(internal_error)?; let latest_record = dsl::grid_rev_snapshot .filter(dsl::object_id.eq(&self.object_id)) - .order(dsl::rev_id.desc()) + .order(dsl::timestamp.desc()) // .select(max(dsl::rev_id)) // .select((dsl::id, dsl::object_id, dsl::rev_id, dsl::data)) .first::(&*conn)?; diff --git a/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/mod.rs b/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/mod.rs index 90d7c655e0..bc7fa5de63 100644 --- a/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/mod.rs +++ b/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/mod.rs @@ -1,9 +1,9 @@ mod grid_block_sqlite_impl; -mod grid_snapshot_sqlite_impl; +mod grid_snapshot; mod grid_sqlite_impl; mod grid_view_sqlite_impl; pub use grid_block_sqlite_impl::*; -pub use grid_snapshot_sqlite_impl::*; +pub use grid_snapshot::*; pub use grid_sqlite_impl::*; pub use grid_view_sqlite_impl::*; diff --git a/frontend/rust-lib/flowy-grid/src/services/view_editor/trait_impl.rs b/frontend/rust-lib/flowy-grid/src/services/view_editor/trait_impl.rs index 661e8e4597..7ff22e77b0 100644 --- a/frontend/rust-lib/flowy-grid/src/services/view_editor/trait_impl.rs +++ b/frontend/rust-lib/flowy-grid/src/services/view_editor/trait_impl.rs @@ -41,6 +41,10 @@ impl RevisionObjectDeserializer for GridViewRevisionSerde { let pad = GridViewRevisionPad::from_revisions(object_id, revisions)?; Ok(pad) } + + fn recover_operations_from_revisions(_revisions: Vec) -> Option { + None + } } impl RevisionObjectSerializer for GridViewRevisionSerde { diff --git a/frontend/rust-lib/flowy-revision/src/rev_manager.rs b/frontend/rust-lib/flowy-revision/src/rev_manager.rs index b24ac5725e..9e5e145857 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_manager.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_manager.rs @@ -34,6 +34,8 @@ pub trait RevisionObjectDeserializer: Send + Sync { /// * `revisions`: a list of revisions that represent the object /// fn deserialize_revisions(object_id: &str, revisions: Vec) -> FlowyResult; + + fn recover_operations_from_revisions(revisions: Vec) -> Option; } pub trait RevisionObjectSerializer: Send + Sync { @@ -125,24 +127,28 @@ impl RevisionManager { } } - #[tracing::instrument(level = "trace", skip_all, fields(deserializer, object) err)] + #[tracing::instrument(name = "revision_manager_initialize", level = "info", skip_all, fields(deserializer, object_id, deserialize_revisions) err)] pub async fn initialize(&mut self, _cloud: Option>) -> FlowyResult where B: RevisionObjectDeserializer, { let revision_records = self.rev_persistence.load_all_records(&self.object_id)?; - tracing::Span::current().record("object", &self.object_id.as_str()); + tracing::Span::current().record("object_id", &self.object_id.as_str()); tracing::Span::current().record("deserializer", &std::any::type_name::()); let revisions: Vec = revision_records.iter().map(|record| record.revision.clone()).collect(); + tracing::Span::current().record("deserialize_revisions", &revisions.len()); let current_rev_id = revisions.last().as_ref().map(|revision| revision.rev_id).unwrap_or(0); - match B::deserialize_revisions(&self.object_id, revisions) { + match B::deserialize_revisions(&self.object_id, revisions.clone()) { Ok(object) => { self.rev_persistence.sync_revision_records(&revision_records).await?; self.rev_id_counter.set(current_rev_id); Ok(object) } - Err(err) => match self.rev_snapshot.restore_from_snapshot::(current_rev_id) { - None => Err(err), + Err(e) => match self.rev_snapshot.restore_from_snapshot::(current_rev_id) { + None => { + tracing::info!("Restore object from validation revisions"); + B::recover_operations_from_revisions(revisions).ok_or(e) + } Some((object, snapshot_rev)) => { let snapshot_rev_id = snapshot_rev.rev_id; let _ = self.rev_persistence.reset(vec![snapshot_rev]).await; diff --git a/frontend/rust-lib/flowy-revision/src/rev_snapshot.rs b/frontend/rust-lib/flowy-revision/src/rev_snapshot.rs index 6c1df77280..097479ec52 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_snapshot.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_snapshot.rs @@ -87,11 +87,12 @@ where } /// Find the nearest revision base on the passed-in rev_id + #[tracing::instrument(level = "trace", skip_all)] pub fn restore_from_snapshot(&self, rev_id: i64) -> Option<(B::Output, Revision)> where B: RevisionObjectDeserializer, { - tracing::trace!("Try to find if {} has snapshot", self.object_id); + tracing::info!("Try to find if {} has snapshot", self.object_id); let snapshot = self.rev_snapshot_persistence.read_last_snapshot().ok()??; let snapshot_rev_id = snapshot.rev_id; let revision = Revision::new( @@ -101,13 +102,13 @@ where snapshot.data, "".to_owned(), ); - tracing::trace!( + tracing::info!( "Try to restore from snapshot: {}, {}", snapshot.base_rev_id, snapshot.rev_id ); let object = B::deserialize_revisions(&self.object_id, vec![revision.clone()]).ok()?; - tracing::trace!( + tracing::info!( "Restore {} from snapshot with rev_id: {}", self.object_id, snapshot_rev_id diff --git a/frontend/rust-lib/flowy-revision/tests/revision_test/script.rs b/frontend/rust-lib/flowy-revision/tests/revision_test/script.rs index 9680117b4b..738abe3ea0 100644 --- a/frontend/rust-lib/flowy-revision/tests/revision_test/script.rs +++ b/frontend/rust-lib/flowy-revision/tests/revision_test/script.rs @@ -332,4 +332,8 @@ impl RevisionObjectDeserializer for RevisionObjectMockSerde { Ok(object) } + + fn recover_operations_from_revisions(_revisions: Vec) -> Option { + None + } } diff --git a/frontend/rust-lib/flowy-sync/src/client_folder/folder_pad.rs b/frontend/rust-lib/flowy-sync/src/client_folder/folder_pad.rs index 8455494297..ae5a421e5a 100644 --- a/frontend/rust-lib/flowy-sync/src/client_folder/folder_pad.rs +++ b/frontend/rust-lib/flowy-sync/src/client_folder/folder_pad.rs @@ -41,7 +41,6 @@ impl FolderPad { } pub fn from_operations(operations: FolderOperations) -> CollaborateResult { - // TODO: Reconvert from history if delta.to_str() failed. let content = operations.content()?; let mut deserializer = serde_json::Deserializer::from_reader(content.as_bytes()); @@ -443,15 +442,6 @@ pub fn initial_folder_operations(folder_pad: &FolderPad) -> CollaborateResult Self { - FolderPad { - folder_rev: FolderRevision::default(), - operations: default_folder_operations(), - } - } -} - pub struct FolderChangeset { pub operations: FolderOperations, /// md5: the md5 of the FolderPad's operations after applying the change. diff --git a/frontend/rust-lib/flowy-sync/src/errors.rs b/frontend/rust-lib/flowy-sync/src/errors.rs index 241f07d867..42419109d6 100644 --- a/frontend/rust-lib/flowy-sync/src/errors.rs +++ b/frontend/rust-lib/flowy-sync/src/errors.rs @@ -42,6 +42,7 @@ impl CollaborateError { static_error!(record_not_found, ErrorCode::RecordNotFound); static_error!(revision_conflict, ErrorCode::RevisionConflict); static_error!(can_not_delete_primary_field, ErrorCode::CannotDeleteThePrimaryField); + static_error!(unexpected_empty_revision, ErrorCode::UnexpectedEmptyRevision); } impl fmt::Display for CollaborateError { @@ -60,6 +61,7 @@ pub enum ErrorCode { RevisionConflict = 203, RecordNotFound = 300, CannotDeleteThePrimaryField = 301, + UnexpectedEmptyRevision = 302, SerdeError = 999, InternalError = 1000, } diff --git a/frontend/rust-lib/flowy-sync/src/util.rs b/frontend/rust-lib/flowy-sync/src/util.rs index f1aca58208..de7e630f3b 100644 --- a/frontend/rust-lib/flowy-sync/src/util.rs +++ b/frontend/rust-lib/flowy-sync/src/util.rs @@ -39,39 +39,49 @@ where let mut new_operations = DeltaOperations::::new(); for revision in revisions { if revision.bytes.is_empty() { - tracing::warn!("revision delta_data is empty"); - continue; + return Err(CollaborateError::unexpected_empty_revision().context("Unexpected Empty revision")); } - let operations = DeltaOperations::::from_bytes(revision.bytes).map_err(|e| { let err_msg = format!("Deserialize revision failed: {:?}", e); CollaborateError::internal().context(err_msg) })?; - match new_operations.compose(&operations) { - Ok(composed_operations) => { - new_operations = composed_operations; - // if composed_operations.content().is_ok() { - // new_operations = composed_operations; - // } else { - // tracing::error!( - // "Compose operation failed: rev_id: {}, object_id: {} {:?}", - // revision.rev_id, - // revision.object_id, - // operations - // ); - // return Ok(new_operations); - // } - } - Err(e) => { - tracing::error!("Compose operation failed: {}, {:?}", e, operations); - return Ok(new_operations); - } - } + new_operations = new_operations.compose(&operations)?; } Ok(new_operations) } +pub fn recover_operation_from_revisions( + revisions: Vec, + validator: impl Fn(&DeltaOperations) -> bool, +) -> Option> +where + T: OperationAttributes + DeserializeOwned + OperationAttributes, +{ + let mut new_operations = DeltaOperations::::new(); + for revision in revisions { + if let Ok(operations) = DeltaOperations::::from_bytes(revision.bytes) { + match new_operations.compose(&operations) { + Ok(composed_operations) => { + if validator(&composed_operations) { + new_operations = composed_operations; + } else { + break; + } + } + Err(_) => break, + } + } else { + break; + } + } + return if new_operations.is_empty() { + None + } else { + Some(new_operations) + }; +} + pub fn pair_rev_id_from_revision_pbs(revisions: &[Revision]) -> (i64, i64) { let mut rev_id = 0; revisions.iter().for_each(|revision| { diff --git a/shared-lib/folder-rev-model/src/folder_rev.rs b/shared-lib/folder-rev-model/src/folder_rev.rs index f2930ca069..c6378aab4d 100644 --- a/shared-lib/folder-rev-model/src/folder_rev.rs +++ b/shared-lib/folder-rev-model/src/folder_rev.rs @@ -26,36 +26,26 @@ impl<'de> Deserialize<'de> for FolderRevision { where A: MapAccess<'de>, { - let f = |map: &mut A, - workspaces: &mut Option>, - trash: &mut Option>| match map.next_key::() - { - Ok(Some(key)) => { - if key == "workspaces" && workspaces.is_none() { - *workspaces = Some(map.next_value::>().ok()?); - } - if key == "trash" && trash.is_none() { - *trash = Some(map.next_value::>().ok()?); - } - Some(()) - } - Ok(None) => None, - Err(_e) => None, - }; - let mut workspaces: Option> = None; let mut trash: Option> = None; - while f(&mut map, &mut workspaces, &mut trash).is_some() { - if workspaces.is_some() && trash.is_some() { - break; + while let Some(key) = map.next_key::()? { + if key == "workspaces" && workspaces.is_none() { + workspaces = Some(map.next_value::>()?); + } + if key == "trash" && trash.is_none() { + trash = Some(map.next_value::>()?); } } - *self.0 = Some(FolderRevision { - workspaces: workspaces.unwrap_or_default().into_iter().map(Arc::new).collect(), - trash: trash.unwrap_or_default().into_iter().map(Arc::new).collect(), - }); - Ok(()) + if let Some(workspaces) = workspaces { + *self.0 = Some(FolderRevision { + workspaces: workspaces.into_iter().map(Arc::new).collect(), + trash: trash.unwrap_or_default().into_iter().map(Arc::new).collect(), + }); + Ok(()) + } else { + Err(de::Error::missing_field("workspaces")) + } } } diff --git a/shared-lib/lib-ot/src/core/delta/ops.rs b/shared-lib/lib-ot/src/core/delta/ops.rs index 2d41278198..ce25e31497 100644 --- a/shared-lib/lib-ot/src/core/delta/ops.rs +++ b/shared-lib/lib-ot/src/core/delta/ops.rs @@ -293,6 +293,11 @@ where pub fn extend(&mut self, other: Self) { other.ops.into_iter().for_each(|op| self.add(op)); } + + /// Get the content that the [Delta] represents. + pub fn content(&self) -> Result { + self.apply("") + } } impl OperationTransform for DeltaOperations @@ -607,11 +612,6 @@ where serde_json::to_string(self).unwrap_or_else(|_| "".to_owned()) } - /// Get the content that the [Delta] represents. - pub fn content(&self) -> Result { - self.apply("") - } - /// Serial the [Delta] into a String in Bytes format pub fn json_bytes(&self) -> Bytes { let json = self.json_str();