diff --git a/frontend/rust-lib/flowy-document/src/editor/editor.rs b/frontend/rust-lib/flowy-document/src/editor/editor.rs index eee8eaa6de..0dcfd42b04 100644 --- a/frontend/rust-lib/flowy-document/src/editor/editor.rs +++ b/frontend/rust-lib/flowy-document/src/editor/editor.rs @@ -83,7 +83,13 @@ fn spawn_edit_queue( } impl DocumentEditor for Arc { - fn close(&self) {} + #[tracing::instrument(name = "close document editor", level = "trace", skip_all)] + fn close(&self) { + let rev_manager = self.rev_manager.clone(); + tokio::spawn(async move { + rev_manager.close().await; + }); + } fn export(&self) -> FutureResult { let this = self.clone(); diff --git a/frontend/rust-lib/flowy-document/src/manager.rs b/frontend/rust-lib/flowy-document/src/manager.rs index bb56aa71b0..0dd4cc19d4 100644 --- a/frontend/rust-lib/flowy-document/src/manager.rs +++ b/frontend/rust-lib/flowy-document/src/manager.rs @@ -5,7 +5,7 @@ use crate::services::rev_sqlite::{SQLiteDeltaDocumentRevisionPersistence, SQLite use crate::services::DocumentPersistence; use crate::{errors::FlowyError, DocumentCloudService}; use bytes::Bytes; -use dashmap::DashMap; + use flowy_database::ConnectionPool; use flowy_error::FlowyResult; use flowy_revision::{ @@ -16,9 +16,11 @@ use flowy_sync::client_document::initial_delta_document_content; use flowy_sync::entities::{document::DocumentIdPB, revision::Revision, ws_data::ServerRevisionWSData}; use flowy_sync::util::md5; use lib_infra::future::FutureResult; +use lib_infra::ref_map::{RefCountHashMap, RefCountValue}; use lib_ws::WSConnectState; use std::any::Any; use std::{convert::TryInto, sync::Arc}; +use tokio::sync::RwLock; pub trait DocumentUser: Send + Sync { fn user_dir(&self) -> Result; @@ -76,7 +78,7 @@ impl std::default::Default for DocumentConfig { pub struct DocumentManager { cloud_service: Arc, rev_web_socket: Arc, - editor_map: Arc, + editor_map: Arc>>, user: Arc, persistence: Arc, #[allow(dead_code)] @@ -94,7 +96,7 @@ impl DocumentManager { Self { cloud_service, rev_web_socket, - editor_map: Arc::new(DocumentEditorMap::new()), + editor_map: Arc::new(RwLock::new(RefCountHashMap::new())), user: document_user, persistence: Arc::new(DocumentPersistence::new(database)), config, @@ -124,10 +126,10 @@ impl DocumentManager { } #[tracing::instrument(level = "trace", skip(self, editor_id), fields(editor_id), err)] - pub fn close_document_editor>(&self, editor_id: T) -> Result<(), FlowyError> { + pub async fn close_document_editor>(&self, editor_id: T) -> Result<(), FlowyError> { let editor_id = editor_id.as_ref(); tracing::Span::current().record("editor_id", &editor_id); - self.editor_map.remove(editor_id); + self.editor_map.write().await.remove(editor_id); Ok(()) } @@ -149,9 +151,9 @@ impl DocumentManager { pub async fn receive_ws_data(&self, data: Bytes) { let result: Result = data.try_into(); match result { - Ok(data) => match self.editor_map.get(&data.object_id) { + Ok(data) => match self.editor_map.read().await.get(&data.object_id) { None => tracing::error!("Can't find any source handler for {:?}-{:?}", data.object_id, data.ty), - Some(editor) => match editor.receive_ws_data(data).await { + Some(handler) => match handler.0.receive_ws_data(data).await { Ok(_) => {} Err(e) => tracing::error!("{}", e), }, @@ -180,13 +182,13 @@ impl DocumentManager { /// returns: Result, FlowyError> /// async fn get_document_editor(&self, doc_id: &str) -> FlowyResult> { - match self.editor_map.get(doc_id) { + match self.editor_map.read().await.get(doc_id) { None => { // tracing::warn!("Should call init_document_editor first"); self.init_document_editor(doc_id).await } - Some(editor) => Ok(editor), + Some(handler) => Ok(handler.0.clone()), } } @@ -216,14 +218,20 @@ impl DocumentManager { DeltaDocumentEditor::new(doc_id, user, rev_manager, self.rev_web_socket.clone(), cloud_service) .await?, ); - self.editor_map.insert(doc_id, editor.clone()); + self.editor_map + .write() + .await + .insert(doc_id.to_string(), RefCountDocumentHandler(editor.clone())); Ok(editor) } DocumentVersionPB::V1 => { let rev_manager = self.make_document_rev_manager(doc_id, pool.clone())?; let editor: Arc = Arc::new(AppFlowyDocumentEditor::new(doc_id, user, rev_manager, cloud_service).await?); - self.editor_map.insert(doc_id, editor.clone()); + self.editor_map + .write() + .await + .insert(doc_id.to_string(), RefCountDocumentHandler(editor.clone())); Ok(editor) } } @@ -247,7 +255,7 @@ impl DocumentManager { ) -> Result>, FlowyError> { let user_id = self.user.user_id()?; let disk_cache = SQLiteDocumentRevisionPersistence::new(&user_id, pool.clone()); - let configuration = RevisionPersistenceConfiguration::new(100); + let configuration = RevisionPersistenceConfiguration::new(100, true); let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache, configuration); // let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone()); let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(doc_id, pool); @@ -268,7 +276,7 @@ impl DocumentManager { ) -> Result>, FlowyError> { let user_id = self.user.user_id()?; let disk_cache = SQLiteDeltaDocumentRevisionPersistence::new(&user_id, pool.clone()); - let configuration = RevisionPersistenceConfiguration::new(100); + let configuration = RevisionPersistenceConfiguration::new(100, true); let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache, configuration); // let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone()); let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(doc_id, pool); @@ -309,40 +317,32 @@ impl RevisionCloudService for DocumentRevisionCloudService { } } -pub struct DocumentEditorMap { - inner: DashMap>, +#[derive(Clone)] +struct RefCountDocumentHandler(Arc); + +impl RefCountValue for RefCountDocumentHandler { + fn did_remove(&self) { + self.0.close(); + } } -impl DocumentEditorMap { - fn new() -> Self { - Self { inner: DashMap::new() } - } +impl std::ops::Deref for RefCountDocumentHandler { + type Target = Arc; - pub(crate) fn insert(&self, editor_id: &str, editor: Arc) { - if self.inner.contains_key(editor_id) { - log::warn!("Editor:{} already open", editor_id); - } - self.inner.insert(editor_id.to_string(), editor); - } - - pub(crate) fn get(&self, editor_id: &str) -> Option> { - Some(self.inner.get(editor_id)?.clone()) - } - - pub(crate) fn remove(&self, editor_id: &str) { - if let Some(editor) = self.get(editor_id) { - editor.close() - } - self.inner.remove(editor_id); + fn deref(&self) -> &Self::Target { + &self.0 } } #[tracing::instrument(level = "trace", skip(web_socket, handlers))] -fn listen_ws_state_changed(web_socket: Arc, handlers: Arc) { +fn listen_ws_state_changed( + web_socket: Arc, + handlers: Arc>>, +) { tokio::spawn(async move { let mut notify = web_socket.subscribe_state_changed().await; while let Ok(state) = notify.recv().await { - handlers.inner.iter().for_each(|handler| { + handlers.read().await.values().iter().for_each(|handler| { handler.receive_ws_state(&state); }) } diff --git a/frontend/rust-lib/flowy-folder/src/manager.rs b/frontend/rust-lib/flowy-folder/src/manager.rs index 2c4e5383fb..c9c4c342a0 100644 --- a/frontend/rust-lib/flowy-folder/src/manager.rs +++ b/frontend/rust-lib/flowy-folder/src/manager.rs @@ -168,7 +168,7 @@ impl FolderManager { let pool = self.persistence.db_pool()?; let object_id = folder_id.as_ref(); let disk_cache = SQLiteFolderRevisionPersistence::new(user_id, pool.clone()); - let configuration = RevisionPersistenceConfiguration::new(100); + let configuration = RevisionPersistenceConfiguration::new(100, false); let rev_persistence = RevisionPersistence::new(user_id, object_id, disk_cache, configuration); let rev_compactor = FolderRevisionCompress(); // let history_persistence = SQLiteRevisionHistoryPersistence::new(object_id, pool.clone()); diff --git a/frontend/rust-lib/flowy-folder/tests/workspace/script.rs b/frontend/rust-lib/flowy-folder/tests/workspace/script.rs index 5725258bf2..19e599ff00 100644 --- a/frontend/rust-lib/flowy-folder/tests/workspace/script.rs +++ b/frontend/rust-lib/flowy-folder/tests/workspace/script.rs @@ -70,6 +70,7 @@ pub enum FolderScript { DeleteAllTrash, // Sync + #[allow(dead_code)] AssertCurrentRevId(i64), AssertNextSyncRevId(Option), AssertRevisionState { diff --git a/frontend/rust-lib/flowy-grid/src/manager.rs b/frontend/rust-lib/flowy-grid/src/manager.rs index 68bb61aea9..013b69510f 100644 --- a/frontend/rust-lib/flowy-grid/src/manager.rs +++ b/frontend/rust-lib/flowy-grid/src/manager.rs @@ -1,15 +1,15 @@ use crate::entities::GridLayout; -use crate::services::block_editor::GridBlockRevisionCompress; + use crate::services::grid_editor::{GridRevisionCompress, GridRevisionEditor}; use crate::services::grid_view_manager::make_grid_view_rev_manager; use crate::services::persistence::block_index::BlockIndexCache; use crate::services::persistence::kv::GridKVPersistence; use crate::services::persistence::migration::GridMigration; -use crate::services::persistence::rev_sqlite::{SQLiteGridBlockRevisionPersistence, SQLiteGridRevisionPersistence}; +use crate::services::persistence::rev_sqlite::SQLiteGridRevisionPersistence; use crate::services::persistence::GridDatabase; use crate::services::tasks::GridTaskScheduler; use bytes::Bytes; -use dashmap::DashMap; + use flowy_database::ConnectionPool; use flowy_error::{FlowyError, FlowyResult}; use flowy_grid_data_model::revision::{BuildGridContext, GridRevision, GridViewRevision}; @@ -20,7 +20,8 @@ use flowy_revision::{ use flowy_sync::client_grid::{make_grid_block_operations, make_grid_operations, make_grid_view_operations}; use flowy_sync::entities::revision::Revision; use lib_infra::ref_map::{RefCountHashMap, RefCountValue}; -use std::collections::HashMap; + +use crate::services::block_manager::make_grid_block_rev_manager; use std::sync::Arc; use tokio::sync::RwLock; @@ -92,8 +93,7 @@ impl GridManager { #[tracing::instrument(level = "debug", skip_all, err)] pub async fn create_grid_block>(&self, block_id: T, revisions: Vec) -> FlowyResult<()> { let block_id = block_id.as_ref(); - let db_pool = self.grid_user.db_pool()?; - let rev_manager = self.make_grid_block_rev_manager(block_id, db_pool)?; + let rev_manager = make_grid_block_rev_manager(&self.grid_user, block_id)?; let _ = rev_manager.reset_object(revisions).await?; Ok(()) } @@ -119,13 +119,13 @@ impl GridManager { pub async fn get_grid_editor(&self, grid_id: &str) -> FlowyResult> { match self.grid_editors.read().await.get(grid_id) { None => Err(FlowyError::internal().context("Should call open_grid function first")), - Some(editor) => Ok(editor.clone()), + Some(editor) => Ok(editor), } } async fn get_or_create_grid_editor(&self, grid_id: &str) -> FlowyResult> { if let Some(editor) = self.grid_editors.read().await.get(grid_id) { - return Ok(editor.clone()); + return Ok(editor); } let db_pool = self.grid_user.db_pool()?; @@ -164,29 +164,13 @@ impl GridManager { ) -> FlowyResult>> { let user_id = self.grid_user.user_id()?; let disk_cache = SQLiteGridRevisionPersistence::new(&user_id, pool.clone()); - let configuration = RevisionPersistenceConfiguration::new(2); + let configuration = RevisionPersistenceConfiguration::new(2, false); let rev_persistence = RevisionPersistence::new(&user_id, grid_id, disk_cache, configuration); let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(grid_id, pool); let rev_compactor = GridRevisionCompress(); let rev_manager = RevisionManager::new(&user_id, grid_id, rev_persistence, rev_compactor, snapshot_persistence); Ok(rev_manager) } - - fn make_grid_block_rev_manager( - &self, - block_id: &str, - pool: Arc, - ) -> FlowyResult>> { - let user_id = self.grid_user.user_id()?; - let disk_cache = SQLiteGridBlockRevisionPersistence::new(&user_id, pool.clone()); - let configuration = RevisionPersistenceConfiguration::new(4); - let rev_persistence = RevisionPersistence::new(&user_id, block_id, disk_cache, configuration); - let rev_compactor = GridBlockRevisionCompress(); - let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(block_id, pool); - let rev_manager = - RevisionManager::new(&user_id, block_id, rev_persistence, rev_compactor, snapshot_persistence); - Ok(rev_manager) - } } pub async fn make_grid_view_data( diff --git a/frontend/rust-lib/flowy-grid/src/services/block_manager.rs b/frontend/rust-lib/flowy-grid/src/services/block_manager.rs index 1750a3473b..474e224fd5 100644 --- a/frontend/rust-lib/flowy-grid/src/services/block_manager.rs +++ b/frontend/rust-lib/flowy-grid/src/services/block_manager.rs @@ -6,6 +6,7 @@ use crate::services::persistence::block_index::BlockIndexCache; use crate::services::persistence::rev_sqlite::SQLiteGridBlockRevisionPersistence; use crate::services::row::{block_from_row_orders, make_row_from_row_rev, GridBlockSnapshot}; use dashmap::DashMap; +use flowy_database::ConnectionPool; use flowy_error::FlowyResult; use flowy_grid_data_model::revision::{ GridBlockMetaRevision, GridBlockMetaRevisionChangeset, RowChangeset, RowRevision, @@ -46,7 +47,7 @@ impl GridBlockManager { match self.block_editors.get(block_id) { None => { tracing::error!("This is a fatal error, block with id:{} is not exist", block_id); - let editor = Arc::new(make_block_editor(&self.user, block_id).await?); + let editor = Arc::new(make_grid_block_editor(&self.user, block_id).await?); self.block_editors.insert(block_id.to_owned(), editor.clone()); Ok(editor) } @@ -261,24 +262,32 @@ async fn make_block_editors( ) -> FlowyResult>> { let editor_map = DashMap::new(); for block_meta_rev in block_meta_revs { - let editor = make_block_editor(user, &block_meta_rev.block_id).await?; + let editor = make_grid_block_editor(user, &block_meta_rev.block_id).await?; editor_map.insert(block_meta_rev.block_id.clone(), Arc::new(editor)); } Ok(editor_map) } -async fn make_block_editor(user: &Arc, block_id: &str) -> FlowyResult { +async fn make_grid_block_editor(user: &Arc, block_id: &str) -> FlowyResult { tracing::trace!("Open block:{} editor", block_id); let token = user.token()?; let user_id = user.user_id()?; - let pool = user.db_pool()?; + let rev_manager = make_grid_block_rev_manager(user, block_id)?; + GridBlockRevisionEditor::new(&user_id, &token, block_id, rev_manager).await +} +pub fn make_grid_block_rev_manager( + user: &Arc, + block_id: &str, +) -> FlowyResult>> { + let user_id = user.user_id()?; + let pool = user.db_pool()?; let disk_cache = SQLiteGridBlockRevisionPersistence::new(&user_id, pool.clone()); - let configuration = RevisionPersistenceConfiguration::new(4); + let configuration = RevisionPersistenceConfiguration::new(4, false); let rev_persistence = RevisionPersistence::new(&user_id, block_id, disk_cache, configuration); let rev_compactor = GridBlockRevisionCompress(); let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(block_id, pool); let rev_manager = RevisionManager::new(&user_id, block_id, rev_persistence, rev_compactor, snapshot_persistence); - GridBlockRevisionEditor::new(&user_id, &token, block_id, rev_manager).await + Ok(rev_manager) } diff --git a/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs b/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs index 3a9e7ac16c..d88c7ec0e4 100644 --- a/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs +++ b/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs @@ -94,7 +94,13 @@ impl GridRevisionEditor { Ok(editor) } - pub fn close(&self) {} + #[tracing::instrument(name = "close grid editor", level = "trace", skip_all)] + pub fn close(&self) { + let rev_manager = self.rev_manager.clone(); + tokio::spawn(async move { + rev_manager.close().await; + }); + } /// Save the type-option data to disk and send a `GridNotification::DidUpdateField` notification /// to dart side. diff --git a/frontend/rust-lib/flowy-grid/src/services/grid_view_manager.rs b/frontend/rust-lib/flowy-grid/src/services/grid_view_manager.rs index f7de07bf6d..b902d2d25b 100644 --- a/frontend/rust-lib/flowy-grid/src/services/grid_view_manager.rs +++ b/frontend/rust-lib/flowy-grid/src/services/grid_view_manager.rs @@ -255,7 +255,7 @@ pub async fn make_grid_view_rev_manager( let pool = user.db_pool()?; let disk_cache = SQLiteGridViewRevisionPersistence::new(&user_id, pool.clone()); - let configuration = RevisionPersistenceConfiguration::new(2); + let configuration = RevisionPersistenceConfiguration::new(2, false); let rev_persistence = RevisionPersistence::new(&user_id, view_id, disk_cache, configuration); let rev_compactor = GridViewRevisionCompress(); diff --git a/frontend/rust-lib/flowy-grid/src/services/mod.rs b/frontend/rust-lib/flowy-grid/src/services/mod.rs index ab864c544a..1e759a082a 100644 --- a/frontend/rust-lib/flowy-grid/src/services/mod.rs +++ b/frontend/rust-lib/flowy-grid/src/services/mod.rs @@ -1,7 +1,7 @@ mod util; pub mod block_editor; -mod block_manager; +pub mod block_manager; mod block_manager_trait_impl; pub mod cell; pub mod field; diff --git a/frontend/rust-lib/flowy-grid/src/services/tasks/scheduler.rs b/frontend/rust-lib/flowy-grid/src/services/tasks/scheduler.rs index a4ebe36d2e..e88cd9fd9d 100644 --- a/frontend/rust-lib/flowy-grid/src/services/tasks/scheduler.rs +++ b/frontend/rust-lib/flowy-grid/src/services/tasks/scheduler.rs @@ -1,4 +1,4 @@ -use crate::services::tasks::queue::{GridTaskQueue, TaskHandlerId}; +use crate::services::tasks::queue::GridTaskQueue; use crate::services::tasks::runner::GridTaskRunner; use crate::services::tasks::store::GridTaskStore; use crate::services::tasks::task::Task; @@ -7,22 +7,28 @@ use crate::services::tasks::{TaskContent, TaskId, TaskStatus}; use flowy_error::FlowyError; use lib_infra::future::BoxResultFuture; use lib_infra::ref_map::{RefCountHashMap, RefCountValue}; -use std::collections::HashMap; + use std::sync::Arc; use std::time::Duration; use tokio::sync::{watch, RwLock}; -pub(crate) trait GridTaskHandler: Send + Sync + 'static + RefCountValue { +pub(crate) trait GridTaskHandler: Send + Sync + 'static { fn handler_id(&self) -> &str; fn process_content(&self, content: TaskContent) -> BoxResultFuture<(), FlowyError>; } +#[derive(Clone)] +struct RefCountTaskHandler(Arc); +impl RefCountValue for RefCountTaskHandler { + fn did_remove(&self) {} +} + pub struct GridTaskScheduler { queue: GridTaskQueue, store: GridTaskStore, notifier: watch::Sender, - handlers: RefCountHashMap>, + handlers: RefCountHashMap, } impl GridTaskScheduler { @@ -51,7 +57,7 @@ impl GridTaskScheduler { T: GridTaskHandler, { let handler_id = handler.handler_id().to_owned(); - self.handlers.insert(handler_id, handler); + self.handlers.insert(handler_id, RefCountTaskHandler(handler)); } pub(crate) fn unregister_handler>(&mut self, handler_id: T) { @@ -74,7 +80,7 @@ impl GridTaskScheduler { let content = task.content.take()?; task.set_status(TaskStatus::Processing); - let _ = match handler.process_content(content).await { + let _ = match handler.0.process_content(content).await { Ok(_) => { task.set_status(TaskStatus::Done); let _ = ret.send(task.into()); diff --git a/frontend/rust-lib/flowy-revision/src/cache/reset.rs b/frontend/rust-lib/flowy-revision/src/cache/reset.rs index 4c4c223818..8fa522c033 100644 --- a/frontend/rust-lib/flowy-revision/src/cache/reset.rs +++ b/frontend/rust-lib/flowy-revision/src/cache/reset.rs @@ -60,7 +60,7 @@ where } async fn reset_object(&self) -> FlowyResult<()> { - let configuration = RevisionPersistenceConfiguration::new(2); + let configuration = RevisionPersistenceConfiguration::new(2, false); let rev_persistence = Arc::new(RevisionPersistence::from_disk_cache( &self.user_id, self.target.target_id(), diff --git a/frontend/rust-lib/flowy-revision/src/rev_manager.rs b/frontend/rust-lib/flowy-revision/src/rev_manager.rs index c1bd8f531b..cb613bad65 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_manager.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_manager.rs @@ -123,6 +123,10 @@ impl RevisionManager { B::deserialize_revisions(&self.object_id, revisions) } + pub async fn close(&self) { + let _ = self.rev_persistence.compact_lagging_revisions(&self.rev_compress).await; + } + pub async fn load_revisions(&self) -> FlowyResult> { let revisions = RevisionLoader { object_id: self.object_id.clone(), diff --git a/frontend/rust-lib/flowy-revision/src/rev_persistence.rs b/frontend/rust-lib/flowy-revision/src/rev_persistence.rs index 7b8f4c2b9e..901a0b214f 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_persistence.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_persistence.rs @@ -17,22 +17,32 @@ pub const REVISION_WRITE_INTERVAL_IN_MILLIS: u64 = 600; #[derive(Clone)] pub struct RevisionPersistenceConfiguration { merge_threshold: usize, + merge_lagging: bool, } impl RevisionPersistenceConfiguration { - pub fn new(merge_threshold: usize) -> Self { + pub fn new(merge_threshold: usize, merge_lagging: bool) -> Self { debug_assert!(merge_threshold > 1); if merge_threshold > 1 { - Self { merge_threshold } + Self { + merge_threshold, + merge_lagging, + } } else { - Self { merge_threshold: 100 } + Self { + merge_threshold: 100, + merge_lagging, + } } } } impl std::default::Default for RevisionPersistenceConfiguration { fn default() -> Self { - Self { merge_threshold: 100 } + Self { + merge_threshold: 100, + merge_lagging: false, + } } } @@ -98,6 +108,36 @@ where Ok(()) } + #[tracing::instrument(level = "trace", skip_all, err)] + pub async fn compact_lagging_revisions<'a>( + &'a self, + rev_compress: &Arc, + ) -> FlowyResult<()> { + if !self.configuration.merge_lagging { + return Ok(()); + } + + let mut sync_seq = self.sync_seq.write().await; + let compact_seq = sync_seq.compact(); + if !compact_seq.is_empty() { + let range = RevisionRange { + start: *compact_seq.front().unwrap(), + end: *compact_seq.back().unwrap(), + }; + + let revisions = self.revisions_in_range(&range).await?; + debug_assert_eq!(range.len() as usize, revisions.len()); + // compact multiple revisions into one + let merged_revision = rev_compress.merge_revisions(&self.user_id, &self.object_id, revisions)?; + tracing::Span::current().record("rev_id", &merged_revision.rev_id); + let _ = sync_seq.recv(merged_revision.rev_id)?; + + // replace the revisions in range with compact revision + self.compact(&range, merged_revision).await?; + } + Ok(()) + } + /// Save the revision to disk and append it to the end of the sync sequence. #[tracing::instrument(level = "trace", skip_all, fields(rev_id, compact_range, object_id=%self.object_id), err)] pub(crate) async fn add_sync_revision<'a>( @@ -108,7 +148,7 @@ where let mut sync_seq = self.sync_seq.write().await; let compact_length = sync_seq.compact_length; - // Before the new_revision is pushed into the sync_seq, we check if the current `step` of the + // Before the new_revision is pushed into the sync_seq, we check if the current `compact_length` of the // sync_seq is less equal to or greater than the merge threshold. If yes, it's needs to merged // with the new_revision into one revision. let mut compact_seq = VecDeque::default(); diff --git a/frontend/rust-lib/flowy-revision/tests/revision_test/script.rs b/frontend/rust-lib/flowy-revision/tests/revision_test/script.rs index 76f1833554..864002051b 100644 --- a/frontend/rust-lib/flowy-revision/tests/revision_test/script.rs +++ b/frontend/rust-lib/flowy-revision/tests/revision_test/script.rs @@ -63,7 +63,7 @@ impl RevisionTest { pub async fn new_with_configuration(merge_threshold: i64) -> Self { let user_id = nanoid!(10); let object_id = nanoid!(6); - let configuration = RevisionPersistenceConfiguration::new(merge_threshold as usize); + let configuration = RevisionPersistenceConfiguration::new(merge_threshold as usize, false); let disk_cache = RevisionDiskCacheMock::new(vec![]); let persistence = RevisionPersistence::new(&user_id, &object_id, disk_cache, configuration.clone()); let compress = RevisionCompressMock {}; diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/folder_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/folder_deps.rs index 1fa6804177..35def846a7 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/folder_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/folder_deps.rs @@ -165,7 +165,7 @@ impl ViewDataProcessor for DocumentViewDataProcessor { let manager = self.0.clone(); let view_id = view_id.to_string(); FutureResult::new(async move { - let _ = manager.close_document_editor(view_id)?; + let _ = manager.close_document_editor(view_id).await?; Ok(()) }) } diff --git a/shared-lib/lib-infra/src/ref_map.rs b/shared-lib/lib-infra/src/ref_map.rs index 1b9e3baad7..8ef4ba8a6b 100644 --- a/shared-lib/lib-infra/src/ref_map.rs +++ b/shared-lib/lib-infra/src/ref_map.rs @@ -22,16 +22,26 @@ impl RefCountHandler { pub struct RefCountHashMap(HashMap>); +impl std::default::Default for RefCountHashMap { + fn default() -> Self { + Self(HashMap::new()) + } +} + impl RefCountHashMap where T: Clone + Send + Sync + RefCountValue, { pub fn new() -> Self { - Self(Default::default()) + Self::default() } pub fn get(&self, key: &str) -> Option { - self.0.get(key).and_then(|handler| Some(handler.inner.clone())) + self.0.get(key).map(|handler| handler.inner.clone()) + } + + pub fn values(&self) -> Vec { + self.0.values().map(|value| value.inner.clone()).collect::>() } pub fn insert(&mut self, key: String, value: T) {