From e77fef3a19aec731b944e69806037a7b61fa3274 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Tue, 7 Feb 2023 14:30:25 +0800 Subject: [PATCH] chore: remove invalid revisions (#1816) --- .gitignore | 1 - frontend/.vscode/launch.json | 2 +- .../rust-lib/flowy-client-sync/src/util.rs | 6 +- .../rust-lib/flowy-database/src/manager.rs | 12 ++- .../src/services/block_editor.rs | 2 +- .../src/services/grid_editor.rs | 18 +++-- .../src/services/view_editor/trait_impl.rs | 2 +- .../flowy-document/src/editor/document.rs | 2 +- .../flowy-document/src/old_editor/editor.rs | 2 +- .../src/services/folder_editor.rs | 6 +- .../flowy-revision/src/rev_manager.rs | 35 ++++++--- .../flowy-revision/src/rev_persistence.rs | 77 +++++++++---------- .../rust-lib/flowy-revision/src/rev_queue.rs | 2 +- .../flowy-revision/src/rev_snapshot.rs | 16 +++- .../revision_test/local_revision_test.rs | 41 +++++----- .../tests/revision_test/script.rs | 6 +- 16 files changed, 132 insertions(+), 98 deletions(-) diff --git a/.gitignore b/.gitignore index 83af459150..4c4b010a7c 100644 --- a/.gitignore +++ b/.gitignore @@ -21,7 +21,6 @@ node_modules **/resources/proto -frontend/.vscode/* !frontend/.vscode/settings.json !frontend/.vscode/tasks.json !frontend/.vscode/launch.json diff --git a/frontend/.vscode/launch.json b/frontend/.vscode/launch.json index 71860a14c7..8abfd479b2 100644 --- a/frontend/.vscode/launch.json +++ b/frontend/.vscode/launch.json @@ -12,7 +12,7 @@ "type": "dart", "preLaunchTask": "AF: Build Appflowy Core", "env": { - "RUST_LOG": "trace" + "RUST_LOG": "trace", // "RUST_LOG": "debug" }, "cwd": "${workspaceRoot}/app_flowy" diff --git a/frontend/rust-lib/flowy-client-sync/src/util.rs b/frontend/rust-lib/flowy-client-sync/src/util.rs index f063db7698..8b5af5bcf9 100644 --- a/frontend/rust-lib/flowy-client-sync/src/util.rs +++ b/frontend/rust-lib/flowy-client-sync/src/util.rs @@ -32,16 +32,18 @@ pub fn contain_newline(s: &str) -> bool { pub fn recover_operation_from_revisions( revisions: Vec, validator: impl Fn(&DeltaOperations) -> bool, -) -> Option> +) -> Option<(DeltaOperations, i64)> where T: OperationAttributes + DeserializeOwned + OperationAttributes, { let mut new_operations = DeltaOperations::::new(); + let mut rev_id = 0; for revision in revisions { if let Ok(operations) = DeltaOperations::::from_bytes(revision.bytes) { match new_operations.compose(&operations) { Ok(composed_operations) => { if validator(&composed_operations) { + rev_id = revision.rev_id; new_operations = composed_operations; } else { break; @@ -56,7 +58,7 @@ where if new_operations.is_empty() { None } else { - Some(new_operations) + Some((new_operations, rev_id)) } } diff --git a/frontend/rust-lib/flowy-database/src/manager.rs b/frontend/rust-lib/flowy-database/src/manager.rs index c1d4014a6e..7709ff9735 100644 --- a/frontend/rust-lib/flowy-database/src/manager.rs +++ b/frontend/rust-lib/flowy-database/src/manager.rs @@ -1,5 +1,7 @@ use crate::entities::DatabaseViewLayout; -use crate::services::grid_editor::{DatabaseRevisionEditor, GridRevisionMergeable}; +use crate::services::grid_editor::{ + DatabaseRevisionEditor, GridRevisionCloudService, GridRevisionMergeable, GridRevisionSerde, +}; use crate::services::persistence::block_index::BlockIndexCache; use crate::services::persistence::kv::DatabaseKVPersistence; use crate::services::persistence::migration::DatabaseMigration; @@ -144,10 +146,16 @@ impl DatabaseManager { pool: Arc, ) -> Result, FlowyError> { let user = self.database_user.clone(); - let rev_manager = self.make_database_rev_manager(database_id, pool.clone())?; + let token = user.token()?; + let cloud = Arc::new(GridRevisionCloudService::new(token)); + let mut rev_manager = self.make_database_rev_manager(database_id, pool.clone())?; + let database_pad = Arc::new(RwLock::new( + rev_manager.initialize::(Some(cloud)).await?, + )); let database_editor = DatabaseRevisionEditor::new( database_id, user, + database_pad, rev_manager, self.block_index_cache.clone(), self.task_scheduler.clone(), diff --git a/frontend/rust-lib/flowy-database/src/services/block_editor.rs b/frontend/rust-lib/flowy-database/src/services/block_editor.rs index ec64d0d0ef..b85b74e4bf 100644 --- a/frontend/rust-lib/flowy-database/src/services/block_editor.rs +++ b/frontend/rust-lib/flowy-database/src/services/block_editor.rs @@ -199,7 +199,7 @@ impl RevisionObjectDeserializer for DatabaseBlockRevisionSerde { Ok(pad) } - fn recover_operations_from_revisions(_revisions: Vec) -> Option { + fn recover_from_revisions(_revisions: Vec) -> Option<(Self::Output, i64)> { None } } diff --git a/frontend/rust-lib/flowy-database/src/services/grid_editor.rs b/frontend/rust-lib/flowy-database/src/services/grid_editor.rs index b18bdeb2a2..d68fb5fc25 100644 --- a/frontend/rust-lib/flowy-database/src/services/grid_editor.rs +++ b/frontend/rust-lib/flowy-database/src/services/grid_editor.rs @@ -57,15 +57,12 @@ impl DatabaseRevisionEditor { pub async fn new( database_id: &str, user: Arc, - mut rev_manager: RevisionManager>, + database_pad: Arc>, + rev_manager: RevisionManager>, persistence: Arc, task_scheduler: Arc>, ) -> FlowyResult> { - let token = user.token()?; - let cloud = Arc::new(GridRevisionCloudService { token }); - let database_pad = rev_manager.initialize::(Some(cloud)).await?; let rev_manager = Arc::new(rev_manager); - let database_pad = Arc::new(RwLock::new(database_pad)); let cell_data_cache = AnyTypeCache::::new(); // Block manager @@ -891,7 +888,7 @@ impl RevisionObjectDeserializer for GridRevisionSerde { Ok(pad) } - fn recover_operations_from_revisions(_revisions: Vec) -> Option { + fn recover_from_revisions(_revisions: Vec) -> Option<(Self::Output, i64)> { None } } @@ -901,11 +898,18 @@ impl RevisionObjectSerializer for GridRevisionSerde { Ok(operations.json_bytes()) } } -struct GridRevisionCloudService { + +pub struct GridRevisionCloudService { #[allow(dead_code)] token: String, } +impl GridRevisionCloudService { + pub fn new(token: String) -> Self { + Self { token } + } +} + impl RevisionCloudService for GridRevisionCloudService { #[tracing::instrument(level = "trace", skip(self))] fn fetch_object(&self, _user_id: &str, _object_id: &str) -> FutureResult, FlowyError> { diff --git a/frontend/rust-lib/flowy-database/src/services/view_editor/trait_impl.rs b/frontend/rust-lib/flowy-database/src/services/view_editor/trait_impl.rs index 8ebbd39e78..4605768caa 100644 --- a/frontend/rust-lib/flowy-database/src/services/view_editor/trait_impl.rs +++ b/frontend/rust-lib/flowy-database/src/services/view_editor/trait_impl.rs @@ -42,7 +42,7 @@ impl RevisionObjectDeserializer for GridViewRevisionSerde { Ok(pad) } - fn recover_operations_from_revisions(_revisions: Vec) -> Option { + fn recover_from_revisions(_revisions: Vec) -> Option<(Self::Output, i64)> { None } } diff --git a/frontend/rust-lib/flowy-document/src/editor/document.rs b/frontend/rust-lib/flowy-document/src/editor/document.rs index 280d96110a..22333051c2 100644 --- a/frontend/rust-lib/flowy-document/src/editor/document.rs +++ b/frontend/rust-lib/flowy-document/src/editor/document.rs @@ -87,7 +87,7 @@ impl RevisionObjectDeserializer for DocumentRevisionSerde { Result::::Ok(document) } - fn recover_operations_from_revisions(_revisions: Vec) -> Option { + fn recover_from_revisions(_revisions: Vec) -> Option<(Self::Output, i64)> { None } } diff --git a/frontend/rust-lib/flowy-document/src/old_editor/editor.rs b/frontend/rust-lib/flowy-document/src/old_editor/editor.rs index ed35a2dbcf..72810649f1 100644 --- a/frontend/rust-lib/flowy-document/src/old_editor/editor.rs +++ b/frontend/rust-lib/flowy-document/src/old_editor/editor.rs @@ -263,7 +263,7 @@ impl RevisionObjectDeserializer for DeltaDocumentRevisionSerde { }) } - fn recover_operations_from_revisions(_revisions: Vec) -> Option { + fn recover_from_revisions(_revisions: Vec) -> Option<(Self::Output, i64)> { None } } diff --git a/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs b/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs index 9c5aa82232..93a158217b 100644 --- a/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs +++ b/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs @@ -107,12 +107,12 @@ impl RevisionObjectDeserializer for FolderRevisionSerde { Ok(FolderPad::from_operations(operations)?) } - fn recover_operations_from_revisions(revisions: Vec) -> Option { - if let Some(operations) = recover_operation_from_revisions(revisions, |operations| { + fn recover_from_revisions(revisions: Vec) -> Option<(Self::Output, i64)> { + if let Some((operations, rev_id)) = recover_operation_from_revisions(revisions, |operations| { FolderPad::from_operations(operations.clone()).is_ok() }) { if let Ok(pad) = FolderPad::from_operations(operations) { - return Some(pad); + return Some((pad, rev_id)); } } None diff --git a/frontend/rust-lib/flowy-revision/src/rev_manager.rs b/frontend/rust-lib/flowy-revision/src/rev_manager.rs index 1eb2ea3d4e..5f5f894d74 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_manager.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_manager.rs @@ -35,7 +35,7 @@ pub trait RevisionObjectDeserializer: Send + Sync { /// fn deserialize_revisions(object_id: &str, revisions: Vec) -> FlowyResult; - fn recover_operations_from_revisions(revisions: Vec) -> Option; + fn recover_from_revisions(revisions: Vec) -> Option<(Self::Output, i64)>; } pub trait RevisionObjectSerializer: Send + Sync { @@ -58,10 +58,9 @@ pub trait RevisionMergeable: Send + Sync { return Ok(revisions.pop().unwrap()); } - let first_revision = revisions.first().unwrap(); + // Select the last version, making sure version numbers don't overlap let last_revision = revisions.last().unwrap(); - - let (base_rev_id, rev_id) = first_revision.pair_rev_id(); + let (base_rev_id, rev_id) = last_revision.pair_rev_id(); let md5 = last_revision.md5.clone(); let bytes = self.combine_revisions(revisions)?; Ok(Revision::new(object_id, base_rev_id, rev_id, bytes, md5)) @@ -137,17 +136,33 @@ impl RevisionManager { tracing::Span::current().record("deserializer", std::any::type_name::()); let revisions: Vec = revision_records.iter().map(|record| record.revision.clone()).collect(); tracing::Span::current().record("deserialize_revisions", revisions.len()); - let current_rev_id = revisions.last().as_ref().map(|revision| revision.rev_id).unwrap_or(0); + let last_rev_id = revisions.last().as_ref().map(|revision| revision.rev_id).unwrap_or(0); match De::deserialize_revisions(&self.object_id, revisions.clone()) { Ok(object) => { self.rev_persistence.sync_revision_records(&revision_records).await?; - self.rev_id_counter.set(current_rev_id); + self.rev_id_counter.set(last_rev_id); Ok(object) } - Err(e) => match self.rev_snapshot.restore_from_snapshot::(current_rev_id) { + Err(e) => match self.rev_snapshot.restore_from_snapshot::(last_rev_id) { None => { - tracing::info!("Restore object from validation revisions"); - De::recover_operations_from_revisions(revisions).ok_or(e) + tracing::info!("[Restore] iterate restore from each revision"); + let (output, recover_rev_id) = De::recover_from_revisions(revisions).ok_or(e)?; + tracing::info!( + "[Restore] last_rev_id:{}, recover_rev_id: {}", + last_rev_id, + recover_rev_id + ); + self.rev_id_counter.set(recover_rev_id); + // delete the revisions whose rev_id is greater than recover_rev_id + if recover_rev_id < last_rev_id { + let range = RevisionRange { + start: recover_rev_id + 1, + end: last_rev_id, + }; + tracing::info!("[Restore] delete revisions in range: {}", range); + let _ = self.rev_persistence.delete_revisions_from_range(range); + } + Ok(output) } Some((object, snapshot_rev)) => { let snapshot_rev_id = snapshot_rev.rev_id; @@ -162,7 +177,7 @@ impl RevisionManager { } pub async fn close(&self) { - let _ = self.rev_persistence.compact_lagging_revisions(&self.rev_compress).await; + let _ = self.rev_persistence.merge_lagging_revisions(&self.rev_compress).await; } pub async fn generate_snapshot(&self) { diff --git a/frontend/rust-lib/flowy-revision/src/rev_persistence.rs b/frontend/rust-lib/flowy-revision/src/rev_persistence.rs index 54581f416f..4ee43922da 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_persistence.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_persistence.rs @@ -14,26 +14,26 @@ pub const REVISION_WRITE_INTERVAL_IN_MILLIS: u64 = 600; #[derive(Clone)] pub struct RevisionPersistenceConfiguration { - // If the number of revisions that didn't sync to the server greater than the merge_threshold + // If the number of revisions that didn't sync to the server greater than the max_merge_len // then these revisions will be merged into one revision. - merge_threshold: usize, + max_merge_len: usize, /// Indicates that the revisions that didn't sync to the server can be merged into one when - /// `compact_lagging_revisions` get called. + /// `merge_lagging_revisions` get called. merge_lagging: bool, } impl RevisionPersistenceConfiguration { - pub fn new(merge_threshold: usize, merge_lagging: bool) -> Self { - debug_assert!(merge_threshold > 1); - if merge_threshold > 1 { + pub fn new(merge_max_length: usize, merge_lagging: bool) -> Self { + debug_assert!(merge_max_length > 1); + if merge_max_length > 1 { Self { - merge_threshold, + max_merge_len: merge_max_length, merge_lagging, } } else { Self { - merge_threshold: 100, + max_merge_len: 100, merge_lagging, } } @@ -43,7 +43,7 @@ impl RevisionPersistenceConfiguration { impl std::default::Default for RevisionPersistenceConfiguration { fn default() -> Self { Self { - merge_threshold: 100, + max_merge_len: 100, merge_lagging: false, } } @@ -106,7 +106,7 @@ where } #[tracing::instrument(level = "trace", skip_all, err)] - pub async fn compact_lagging_revisions<'a>( + pub async fn merge_lagging_revisions<'a>( &'a self, rev_compress: &Arc, ) -> FlowyResult<()> { @@ -115,7 +115,7 @@ where } let mut sync_seq = self.sync_seq.write().await; - let compact_seq = sync_seq.compact(); + let compact_seq = sync_seq.pop_merge_rev_ids(); if !compact_seq.is_empty() { let range = RevisionRange { start: *compact_seq.front().unwrap(), @@ -164,18 +164,18 @@ where ) -> FlowyResult { let mut sync_seq = self.sync_seq.write().await; - // Before the new_revision is pushed into the sync_seq, we check if the current `compact_length` of the - // sync_seq is less equal to or greater than the merge threshold. If yes, it's needs to merged + // Before the new_revision is pushed into the sync_seq, we check if the current `merge_length` of the + // sync_seq is less equal to or greater than the `merge_max_length`. If yes, it's needs to merged // with the new_revision into one revision. - let mut compact_seq = VecDeque::default(); + let mut merge_rev_ids = VecDeque::default(); // tracing::info!("{}", compact_seq) - if sync_seq.compact_length >= self.configuration.merge_threshold - 1 { - compact_seq.extend(sync_seq.compact()); + if sync_seq.merge_length >= self.configuration.max_merge_len - 1 { + merge_rev_ids.extend(sync_seq.pop_merge_rev_ids()); } - if !compact_seq.is_empty() { + if !merge_rev_ids.is_empty() { let range = RevisionRange { - start: *compact_seq.front().unwrap(), - end: *compact_seq.back().unwrap(), + start: *merge_rev_ids.front().unwrap(), + end: *merge_rev_ids.back().unwrap(), }; tracing::Span::current().record("compact_range", format!("{}", range).as_str()); @@ -186,13 +186,13 @@ where // compact multiple revisions into one let merged_revision = rev_compress.merge_revisions(&self.user_id, &self.object_id, revisions)?; - let rev_id = merged_revision.rev_id; + let new_rev_id = merged_revision.rev_id; tracing::Span::current().record("rev_id", merged_revision.rev_id); - sync_seq.recv(merged_revision.rev_id)?; + sync_seq.recv(new_rev_id)?; // replace the revisions in range with compact revision self.compact(&range, merged_revision).await?; - Ok(rev_id) + Ok(new_rev_id) } else { let rev_id = new_revision.rev_id; tracing::Span::current().record("rev_id", rev_id); @@ -333,7 +333,6 @@ where .collect::>()) } - #[allow(dead_code)] pub fn delete_revisions_from_range(&self, range: RevisionRange) -> FlowyResult<()> { self.disk_cache .delete_revision_records(&self.object_id, Some(range.to_rev_ids()))?; @@ -370,8 +369,8 @@ impl RevisionMemoryCacheDelegate for Arc, - compact_index: Option, - compact_length: usize, + merge_start: Option, + merge_length: usize, } impl DeferSyncSequence { @@ -381,14 +380,12 @@ impl DeferSyncSequence { /// Pushes the new_rev_id to the end of the list and marks this new_rev_id is mergeable. /// - /// When calling `compact` method, it will return a list of revision ids started from - /// the `compact_start_pos`, and ends with the `compact_length`. fn merge_recv(&mut self, new_rev_id: i64) -> FlowyResult<()> { self.recv(new_rev_id)?; - self.compact_length += 1; - if self.compact_index.is_none() && !self.rev_ids.is_empty() { - self.compact_index = Some(self.rev_ids.len() - 1); + self.merge_length += 1; + if self.merge_start.is_none() && !self.rev_ids.is_empty() { + self.merge_start = Some(self.rev_ids.len() - 1); } Ok(()) } @@ -419,14 +416,14 @@ impl DeferSyncSequence { } let mut compact_rev_id = None; - if let Some(compact_index) = self.compact_index { + if let Some(compact_index) = self.merge_start { compact_rev_id = self.rev_ids.get(compact_index).cloned(); } let pop_rev_id = self.rev_ids.pop_front(); if let (Some(compact_rev_id), Some(pop_rev_id)) = (compact_rev_id, pop_rev_id) { - if compact_rev_id <= pop_rev_id && self.compact_length > 0 { - self.compact_length -= 1; + if compact_rev_id <= pop_rev_id && self.merge_length > 0 { + self.merge_length -= 1; } } } @@ -438,22 +435,22 @@ impl DeferSyncSequence { } fn clear(&mut self) { - self.compact_index = None; - self.compact_length = 0; + self.merge_start = None; + self.merge_length = 0; self.rev_ids.clear(); } - // Compact the rev_ids into one except the current synchronizing rev_id. - fn compact(&mut self) -> VecDeque { + // Returns the rev_ids into one except the current synchronizing rev_id. + fn pop_merge_rev_ids(&mut self) -> VecDeque { let mut compact_seq = VecDeque::with_capacity(self.rev_ids.len()); - if let Some(start) = self.compact_index { + if let Some(start) = self.merge_start { if start < self.rev_ids.len() { let seq = self.rev_ids.split_off(start); compact_seq.extend(seq); } } - self.compact_index = None; - self.compact_length = 0; + self.merge_start = None; + self.merge_length = 0; compact_seq } } diff --git a/frontend/rust-lib/flowy-revision/src/rev_queue.rs b/frontend/rust-lib/flowy-revision/src/rev_queue.rs index a52dadd59e..64f790af05 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_queue.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_queue.rs @@ -63,7 +63,7 @@ where .for_each(|command| async { match self.handle_command(command).await { Ok(_) => {} - Err(e) => tracing::debug!("[RevQueue]: {}", e), + Err(e) => tracing::error!("[RevQueue]: {}", e), } }) .await; diff --git a/frontend/rust-lib/flowy-revision/src/rev_snapshot.rs b/frontend/rust-lib/flowy-revision/src/rev_snapshot.rs index f236c70bed..70a3c97591 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_snapshot.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_snapshot.rs @@ -19,8 +19,10 @@ pub trait RevisionSnapshotPersistence: Send + Sync { fn read_snapshot(&self, rev_id: i64) -> FlowyResult>; fn read_last_snapshot(&self) -> FlowyResult>; +} - // fn generate_snapshot_data(&self) -> Option; +pub trait RevisionSnapshotDataGenerator: Send + Sync { + fn generate_snapshot_data(&self) -> Option; } const AUTO_GEN_SNAPSHOT_PER_10_REVISION: i64 = 10; @@ -29,6 +31,7 @@ pub struct RevisionSnapshotController { user_id: String, object_id: String, rev_snapshot_persistence: Arc, + rev_snapshot_data: Option>, rev_id_counter: Arc, rev_persistence: Arc>, rev_compress: Arc, @@ -57,11 +60,16 @@ where rev_snapshot_persistence, rev_id_counter, start_rev_id: AtomicI64::new(0), + rev_snapshot_data: None, rev_persistence: revision_persistence, rev_compress: revision_compress, } } + pub async fn set_snapshot_data_generator(&mut self, generator: Arc) { + self.rev_snapshot_data = Some(generator); + } + pub async fn generate_snapshot(&self) { if let Some((rev_id, bytes)) = self.generate_snapshot_data() { if let Err(e) = self.rev_snapshot_persistence.write_snapshot(rev_id, bytes.to_vec()) { @@ -76,7 +84,7 @@ where where B: RevisionObjectDeserializer, { - tracing::info!("Try to find if {} has snapshot", self.object_id); + tracing::info!("[Restore] Try to find if {} has snapshot", self.object_id); let snapshot = self.rev_snapshot_persistence.read_last_snapshot().ok()??; let snapshot_rev_id = snapshot.rev_id; let revision = Revision::new( @@ -87,13 +95,13 @@ where "".to_owned(), ); tracing::info!( - "Try to restore from snapshot: {}, {}", + "[Restore] Try to restore from snapshot: {}, {}", snapshot.base_rev_id, snapshot.rev_id ); let object = B::deserialize_revisions(&self.object_id, vec![revision.clone()]).ok()?; tracing::info!( - "Restore {} from snapshot with rev_id: {}", + "[Restore] Restore {} from snapshot with rev_id: {}", self.object_id, snapshot_rev_id ); diff --git a/frontend/rust-lib/flowy-revision/tests/revision_test/local_revision_test.rs b/frontend/rust-lib/flowy-revision/tests/revision_test/local_revision_test.rs index 15c85277bd..152b0dca43 100644 --- a/frontend/rust-lib/flowy-revision/tests/revision_test/local_revision_test.rs +++ b/frontend/rust-lib/flowy-revision/tests/revision_test/local_revision_test.rs @@ -28,11 +28,11 @@ async fn revision_compress_2_revisions_with_2_threshold_test() { .await; test.run_scripts(vec![ - AssertNextSyncRevisionId { rev_id: Some(1) }, + AssertNextSyncRevisionId { rev_id: Some(2) }, AssertNextSyncRevisionContent { expected: "123456".to_string(), }, - AckRevision { rev_id: 1 }, + AckRevision { rev_id: 2 }, AssertNextSyncRevisionId { rev_id: None }, ]) .await; @@ -41,13 +41,11 @@ async fn revision_compress_2_revisions_with_2_threshold_test() { #[tokio::test] async fn revision_compress_4_revisions_with_threshold_2_test() { let test = RevisionTest::new_with_configuration(2).await; - let rev_id_1 = 1; test.run_script(AddLocalRevision { content: "1".to_string(), }) .await; - let rev_id_2 = 2; test.run_script(AddLocalRevision { content: "2".to_string(), }) @@ -63,15 +61,14 @@ async fn revision_compress_4_revisions_with_threshold_2_test() { }) .await; - // rev_id_2,rev_id_3,rev_id4 will be merged into rev_id_1 test.run_scripts(vec![ AssertNumberOfSyncRevisions { num: 2 }, - AssertNextSyncRevisionId { rev_id: Some(rev_id_1) }, + AssertNextSyncRevisionId { rev_id: Some(2) }, AssertNextSyncRevisionContent { expected: "12".to_string(), }, - AckRevision { rev_id: rev_id_1 }, - AssertNextSyncRevisionId { rev_id: Some(rev_id_2) }, + AckRevision { rev_id: 2 }, + AssertNextSyncRevisionId { rev_id: Some(4) }, AssertNextSyncRevisionContent { expected: "34".to_string(), }, @@ -81,8 +78,8 @@ async fn revision_compress_4_revisions_with_threshold_2_test() { #[tokio::test] async fn revision_compress_8_revisions_with_threshold_4_test() { - let test = RevisionTest::new_with_configuration(4).await; - let rev_id_1 = 1; + let merge_len = 4; + let test = RevisionTest::new_with_configuration(merge_len).await; test.run_script(AddLocalRevision { content: "1".to_string(), }) @@ -103,7 +100,6 @@ async fn revision_compress_8_revisions_with_threshold_4_test() { }) .await; - let rev_id_a = 2; test.run_script(AddLocalRevision { content: "a".to_string(), }) @@ -126,16 +122,20 @@ async fn revision_compress_8_revisions_with_threshold_4_test() { test.run_scripts(vec![ AssertNumberOfSyncRevisions { num: 2 }, - AssertNextSyncRevisionId { rev_id: Some(rev_id_1) }, + AssertNextSyncRevisionId { + rev_id: Some(merge_len), + }, AssertNextSyncRevisionContent { expected: "1234".to_string(), }, - AckRevision { rev_id: rev_id_1 }, - AssertNextSyncRevisionId { rev_id: Some(rev_id_a) }, + AckRevision { rev_id: merge_len }, + AssertNextSyncRevisionId { + rev_id: Some(merge_len * 2), + }, AssertNextSyncRevisionContent { expected: "abcd".to_string(), }, - AckRevision { rev_id: rev_id_a }, + AckRevision { rev_id: merge_len * 2 }, AssertNextSyncRevisionId { rev_id: None }, ]) .await; @@ -143,7 +143,8 @@ async fn revision_compress_8_revisions_with_threshold_4_test() { #[tokio::test] async fn revision_merge_per_5_revision_test() { - let test = RevisionTest::new_with_configuration(5).await; + let merge_len = 5; + let test = RevisionTest::new_with_configuration(merge_len).await; for i in 0..20 { let content = format!("{}", i); test.run_script(AddLocalRevision { content }).await; @@ -154,19 +155,19 @@ async fn revision_merge_per_5_revision_test() { AssertNextSyncRevisionContent { expected: "01234".to_string(), }, - AckRevision { rev_id: 1 }, + AckRevision { rev_id: merge_len }, AssertNextSyncRevisionContent { expected: "56789".to_string(), }, - AckRevision { rev_id: 2 }, + AckRevision { rev_id: merge_len * 2 }, AssertNextSyncRevisionContent { expected: "1011121314".to_string(), }, - AckRevision { rev_id: 3 }, + AckRevision { rev_id: merge_len * 3 }, AssertNextSyncRevisionContent { expected: "1516171819".to_string(), }, - AckRevision { rev_id: 4 }, + AckRevision { rev_id: merge_len * 4 }, AssertNextSyncRevisionId { rev_id: None }, ]) .await; diff --git a/frontend/rust-lib/flowy-revision/tests/revision_test/script.rs b/frontend/rust-lib/flowy-revision/tests/revision_test/script.rs index e2f21829aa..841270eef4 100644 --- a/frontend/rust-lib/flowy-revision/tests/revision_test/script.rs +++ b/frontend/rust-lib/flowy-revision/tests/revision_test/script.rs @@ -39,10 +39,10 @@ impl RevisionTest { Self::new_with_configuration(2).await } - pub async fn new_with_configuration(merge_threshold: i64) -> Self { + pub async fn new_with_configuration(max_merge_len: i64) -> Self { let user_id = nanoid!(10); let object_id = nanoid!(6); - let configuration = RevisionPersistenceConfiguration::new(merge_threshold as usize, false); + let configuration = RevisionPersistenceConfiguration::new(max_merge_len as usize, false); let disk_cache = RevisionDiskCacheMock::new(vec![]); let persistence = RevisionPersistence::new(&user_id, &object_id, disk_cache, configuration.clone()); let compress = RevisionMergeableMock {}; @@ -334,7 +334,7 @@ impl RevisionObjectDeserializer for RevisionObjectMockSerde { Ok(object) } - fn recover_operations_from_revisions(_revisions: Vec) -> Option { + fn recover_from_revisions(_revisions: Vec) -> Option<(Self::Output, i64)> { None } }