From ab63ce7bce2c50270a06e1bbf37d201491b10373 Mon Sep 17 00:00:00 2001 From: appflowy Date: Tue, 7 Jun 2022 14:22:20 +0800 Subject: [PATCH 1/5] chore: rename structs --- frontend/rust-lib/flowy-grid/src/manager.rs | 26 ++------- .../services/{ => block}/block_meta_editor.rs | 28 +++++----- .../{ => block}/block_meta_manager.rs | 45 ++++++++------- .../flowy-grid/src/services/block/mod.rs | 5 ++ .../{grid_editor.rs => grid_meta_editor.rs} | 55 +++++++++++-------- .../rust-lib/flowy-grid/src/services/mod.rs | 5 +- .../flowy-grid/tests/grid/grid_test.rs | 8 +-- .../rust-lib/flowy-grid/tests/grid/script.rs | 15 ++--- .../src/entities/meta.rs | 20 +++---- .../src/client_grid/grid_block_meta_pad.rs | 16 +++--- .../src/client_grid/grid_builder.rs | 10 ++-- .../src/client_grid/grid_meta_pad.rs | 16 +++--- 12 files changed, 125 insertions(+), 124 deletions(-) rename frontend/rust-lib/flowy-grid/src/services/{ => block}/block_meta_editor.rs (86%) rename frontend/rust-lib/flowy-grid/src/services/{ => block}/block_meta_manager.rs (88%) create mode 100644 frontend/rust-lib/flowy-grid/src/services/block/mod.rs rename frontend/rust-lib/flowy-grid/src/services/{grid_editor.rs => grid_meta_editor.rs} (93%) diff --git a/frontend/rust-lib/flowy-grid/src/manager.rs b/frontend/rust-lib/flowy-grid/src/manager.rs index 2965cb5bd9..08d9559d41 100644 --- a/frontend/rust-lib/flowy-grid/src/manager.rs +++ b/frontend/rust-lib/flowy-grid/src/manager.rs @@ -1,4 +1,5 @@ -use crate::services::grid_editor::GridMetaEditor; +use crate::services::block::make_grid_block_meta_rev_manager; +use crate::services::grid_meta_editor::GridMetaEditor; use crate::services::persistence::block_index::BlockIndexCache; use crate::services::persistence::kv::GridKVPersistence; use crate::services::persistence::GridDatabase; @@ -7,7 +8,7 @@ use dashmap::DashMap; use flowy_database::ConnectionPool; use flowy_error::{FlowyError, FlowyResult}; use flowy_grid_data_model::entities::{BuildGridContext, GridMeta}; -use flowy_revision::disk::{SQLiteGridBlockMetaRevisionPersistence, SQLiteGridRevisionPersistence}; +use flowy_revision::disk::SQLiteGridRevisionPersistence; use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket}; use flowy_sync::client_grid::{make_block_meta_delta, make_grid_delta}; use flowy_sync::entities::revision::{RepeatedRevision, Revision}; @@ -35,12 +36,12 @@ impl GridManager { ) -> Self { let grid_editors = Arc::new(DashMap::new()); let kv_persistence = Arc::new(GridKVPersistence::new(database.clone())); - let block_index_persistence = Arc::new(BlockIndexCache::new(database)); + let block_index_cache = Arc::new(BlockIndexCache::new(database)); Self { editor_map: grid_editors, grid_user, + block_index_cache, kv_persistence, - block_index_cache: block_index_persistence, } } @@ -59,9 +60,7 @@ impl GridManager { block_id: T, revisions: RepeatedRevision, ) -> FlowyResult<()> { - let block_id = block_id.as_ref(); - let db_pool = self.grid_user.db_pool()?; - let rev_manager = self.make_grid_block_meta_rev_manager(block_id, db_pool)?; + let rev_manager = make_grid_block_meta_rev_manager(&self.grid_user, block_id.as_ref())?; let _ = rev_manager.reset_object(revisions).await?; Ok(()) } @@ -134,18 +133,6 @@ impl GridManager { let rev_manager = RevisionManager::new(&user_id, grid_id, rev_persistence); Ok(rev_manager) } - - fn make_grid_block_meta_rev_manager( - &self, - block_d: &str, - pool: Arc, - ) -> FlowyResult { - let user_id = self.grid_user.user_id()?; - let disk_cache = Arc::new(SQLiteGridBlockMetaRevisionPersistence::new(&user_id, pool)); - let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, block_d, disk_cache)); - let rev_manager = RevisionManager::new(&user_id, block_d, rev_persistence); - Ok(rev_manager) - } } pub async fn make_grid_view_data( @@ -166,7 +153,6 @@ pub async fn make_grid_view_data( let repeated_revision: RepeatedRevision = Revision::initial_revision(user_id, view_id, grid_delta_data.clone()).into(); let _ = grid_manager.create_grid(view_id, repeated_revision).await?; - for block_meta_data in build_context.blocks_meta_data { let block_id = block_meta_data.block_id.clone(); diff --git a/frontend/rust-lib/flowy-grid/src/services/block_meta_editor.rs b/frontend/rust-lib/flowy-grid/src/services/block/block_meta_editor.rs similarity index 86% rename from frontend/rust-lib/flowy-grid/src/services/block_meta_editor.rs rename to frontend/rust-lib/flowy-grid/src/services/block/block_meta_editor.rs index d37c869156..81c23f9c69 100644 --- a/frontend/rust-lib/flowy-grid/src/services/block_meta_editor.rs +++ b/frontend/rust-lib/flowy-grid/src/services/block/block_meta_editor.rs @@ -1,8 +1,8 @@ use bytes::Bytes; use flowy_error::{FlowyError, FlowyResult}; -use flowy_grid_data_model::entities::{CellMeta, GridBlockMetaData, RowMeta, RowMetaChangeset, RowOrder}; +use flowy_grid_data_model::entities::{CellMeta, GridBlockMeta, RowMeta, RowMetaChangeset, RowOrder}; use flowy_revision::{RevisionCloudService, RevisionCompactor, RevisionManager, RevisionObjectBuilder}; -use flowy_sync::client_grid::{GridBlockMetaChange, GridBlockMetaPad}; +use flowy_sync::client_grid::{GridBlockMetaDeltaChangeset, GridBlockMetaPad}; use flowy_sync::entities::revision::Revision; use flowy_sync::util::make_delta_from_revisions; use lib_infra::future::FutureResult; @@ -14,7 +14,7 @@ use tokio::sync::RwLock; pub struct GridBlockMetaEditor { user_id: String, pub block_id: String, - pad: Arc>, + block_meta: Arc>, rev_manager: Arc, } @@ -29,20 +29,20 @@ impl GridBlockMetaEditor { token: token.to_owned(), }); let block_meta_pad = rev_manager.load::(Some(cloud)).await?; - let pad = Arc::new(RwLock::new(block_meta_pad)); + let block_meta = Arc::new(RwLock::new(block_meta_pad)); let rev_manager = Arc::new(rev_manager); let user_id = user_id.to_owned(); let block_id = block_id.to_owned(); Ok(Self { user_id, block_id, - pad, + block_meta, rev_manager, }) } - pub async fn duplicate_block_meta_data(&self, duplicated_block_id: &str) -> GridBlockMetaData { - self.pad.read().await.duplicate_data(duplicated_block_id).await + pub async fn duplicate_block_meta(&self, duplicated_block_id: &str) -> GridBlockMeta { + self.block_meta.read().await.duplicate_data(duplicated_block_id).await } /// return current number of rows and the inserted index. The inserted index will be None if the start_row_id is None @@ -109,7 +109,7 @@ impl GridBlockMetaEditor { where T: AsRef + ToOwned + ?Sized, { - let row_metas = self.pad.read().await.get_row_metas(row_ids)?; + let row_metas = self.block_meta.read().await.get_row_metas(row_ids)?; Ok(row_metas) } @@ -118,7 +118,7 @@ impl GridBlockMetaEditor { field_id: &str, row_ids: Option>>, ) -> FlowyResult> { - let cell_metas = self.pad.read().await.get_cell_metas(field_id, row_ids)?; + let cell_metas = self.block_meta.read().await.get_cell_metas(field_id, row_ids)?; Ok(cell_metas) } @@ -132,7 +132,7 @@ impl GridBlockMetaEditor { T: AsRef + ToOwned + ?Sized, { let row_orders = self - .pad + .block_meta .read() .await .get_row_metas(row_ids)? @@ -144,9 +144,9 @@ impl GridBlockMetaEditor { async fn modify(&self, f: F) -> FlowyResult<()> where - F: for<'a> FnOnce(&'a mut GridBlockMetaPad) -> FlowyResult>, + F: for<'a> FnOnce(&'a mut GridBlockMetaPad) -> FlowyResult>, { - let mut write_guard = self.pad.write().await; + let mut write_guard = self.block_meta.write().await; match f(&mut *write_guard)? { None => {} Some(change) => { @@ -156,8 +156,8 @@ impl GridBlockMetaEditor { Ok(()) } - async fn apply_change(&self, change: GridBlockMetaChange) -> FlowyResult<()> { - let GridBlockMetaChange { delta, md5 } = change; + async fn apply_change(&self, change: GridBlockMetaDeltaChangeset) -> FlowyResult<()> { + let GridBlockMetaDeltaChangeset { delta, md5 } = change; let user_id = self.user_id.clone(); let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair(); let delta_data = delta.to_delta_bytes(); diff --git a/frontend/rust-lib/flowy-grid/src/services/block_meta_manager.rs b/frontend/rust-lib/flowy-grid/src/services/block/block_meta_manager.rs similarity index 88% rename from frontend/rust-lib/flowy-grid/src/services/block_meta_manager.rs rename to frontend/rust-lib/flowy-grid/src/services/block/block_meta_manager.rs index ec7c47f069..a1d9e27f4a 100644 --- a/frontend/rust-lib/flowy-grid/src/services/block_meta_manager.rs +++ b/frontend/rust-lib/flowy-grid/src/services/block/block_meta_manager.rs @@ -1,13 +1,13 @@ use crate::dart_notification::{send_dart_notification, GridNotification}; use crate::manager::GridUser; -use crate::services::block_meta_editor::GridBlockMetaEditor; +use crate::services::block::GridBlockMetaEditor; use crate::services::persistence::block_index::BlockIndexCache; use crate::services::row::{group_row_orders, GridBlockSnapshot}; use dashmap::DashMap; use flowy_error::FlowyResult; use flowy_grid_data_model::entities::{ - CellChangeset, CellMeta, GridBlockMeta, GridBlockMetaChangeset, GridRowsChangeset, IndexRowOrder, Row, RowMeta, - RowMetaChangeset, RowOrder, UpdatedRowOrder, + CellChangeset, CellMeta, GridBlockInfoChangeset, GridBlockMetaSnapshot, GridRowsChangeset, IndexRowOrder, Row, + RowMeta, RowMetaChangeset, RowOrder, UpdatedRowOrder, }; use flowy_revision::disk::SQLiteGridBlockMetaRevisionPersistence; use flowy_revision::{RevisionManager, RevisionPersistence}; @@ -16,19 +16,19 @@ use std::collections::HashMap; use std::sync::Arc; type BlockId = String; -pub(crate) struct GridBlockManager { +pub(crate) struct GridBlockMetaManager { grid_id: String, user: Arc, - persistence: Arc, + block_index_cache: Arc, block_editor_map: DashMap>, } -impl GridBlockManager { +impl GridBlockMetaManager { pub(crate) async fn new( grid_id: &str, user: &Arc, - blocks: Vec, - persistence: Arc, + blocks: Vec, + block_index_cache: Arc, ) -> FlowyResult { let editor_map = make_block_meta_editor_map(user, blocks).await?; let user = user.clone(); @@ -37,7 +37,7 @@ impl GridBlockManager { grid_id, user, block_editor_map: editor_map, - persistence, + block_index_cache, }; Ok(manager) } @@ -57,7 +57,7 @@ impl GridBlockManager { } async fn get_editor_from_row_id(&self, row_id: &str) -> FlowyResult> { - let block_id = self.persistence.get_block_id(row_id)?; + let block_id = self.block_index_cache.get_block_id(row_id)?; Ok(self.get_editor(&block_id).await?) } @@ -67,7 +67,7 @@ impl GridBlockManager { row_meta: RowMeta, start_row_id: Option, ) -> FlowyResult { - let _ = self.persistence.insert(&row_meta.block_id, &row_meta.id)?; + let _ = self.block_index_cache.insert(&row_meta.block_id, &row_meta.id)?; let editor = self.get_editor(&row_meta.block_id).await?; let mut index_row_order = IndexRowOrder::from(&row_meta); @@ -83,21 +83,21 @@ impl GridBlockManager { pub(crate) async fn insert_row( &self, rows_by_block_id: HashMap>, - ) -> FlowyResult> { + ) -> FlowyResult> { let mut changesets = vec![]; for (block_id, row_metas) in rows_by_block_id { let mut inserted_row_orders = vec![]; let editor = self.get_editor(&block_id).await?; let mut row_count = 0; for row in row_metas { - let _ = self.persistence.insert(&row.block_id, &row.id)?; + let _ = self.block_index_cache.insert(&row.block_id, &row.id)?; let mut row_order = IndexRowOrder::from(&row); let (count, index) = editor.create_row(row, None).await?; row_count = count; row_order.index = index; inserted_row_orders.push(row_order); } - changesets.push(GridBlockMetaChangeset::from_row_count(&block_id, row_count)); + changesets.push(GridBlockInfoChangeset::from_row_count(&block_id, row_count)); let _ = self .notify_did_update_block(GridRowsChangeset::insert(&block_id, inserted_row_orders)) @@ -128,7 +128,7 @@ impl GridBlockManager { pub async fn delete_row(&self, row_id: &str) -> FlowyResult<()> { let row_id = row_id.to_owned(); - let block_id = self.persistence.get_block_id(&row_id)?; + let block_id = self.block_index_cache.get_block_id(&row_id)?; let editor = self.get_editor(&block_id).await?; match editor.get_row_order(&row_id).await? { None => {} @@ -143,7 +143,7 @@ impl GridBlockManager { Ok(()) } - pub(crate) async fn delete_rows(&self, row_orders: Vec) -> FlowyResult> { + pub(crate) async fn delete_rows(&self, row_orders: Vec) -> FlowyResult> { let mut changesets = vec![]; for block_order in group_row_orders(row_orders) { let editor = self.get_editor(&block_order.block_id).await?; @@ -153,7 +153,7 @@ impl GridBlockManager { .map(|row_order| Cow::Owned(row_order.row_id)) .collect::>>(); let row_count = editor.delete_rows(row_ids).await?; - let changeset = GridBlockMetaChangeset::from_row_count(&block_order.block_id, row_count); + let changeset = GridBlockInfoChangeset::from_row_count(&block_order.block_id, row_count); changesets.push(changeset); } @@ -255,7 +255,7 @@ impl GridBlockManager { async fn make_block_meta_editor_map( user: &Arc, - blocks: Vec, + blocks: Vec, ) -> FlowyResult>> { let editor_map = DashMap::new(); for block in blocks { @@ -269,11 +269,16 @@ async fn make_block_meta_editor_map( async fn make_block_meta_editor(user: &Arc, block_id: &str) -> FlowyResult { tracing::trace!("Open block:{} meta editor", block_id); let token = user.token()?; + let user_id = user.user_id()?; + let rev_manager = make_grid_block_meta_rev_manager(user, block_id)?; + GridBlockMetaEditor::new(&user_id, &token, block_id, rev_manager).await +} + +pub fn make_grid_block_meta_rev_manager(user: &Arc, block_id: &str) -> FlowyResult { let user_id = user.user_id()?; let pool = user.db_pool()?; let disk_cache = Arc::new(SQLiteGridBlockMetaRevisionPersistence::new(&user_id, pool)); let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, block_id, disk_cache)); - let rev_manager = RevisionManager::new(&user_id, block_id, rev_persistence); - GridBlockMetaEditor::new(&user_id, &token, block_id, rev_manager).await + Ok(RevisionManager::new(&user_id, block_id, rev_persistence)) } diff --git a/frontend/rust-lib/flowy-grid/src/services/block/mod.rs b/frontend/rust-lib/flowy-grid/src/services/block/mod.rs new file mode 100644 index 0000000000..f4a110cf63 --- /dev/null +++ b/frontend/rust-lib/flowy-grid/src/services/block/mod.rs @@ -0,0 +1,5 @@ +mod block_meta_editor; +mod block_meta_manager; + +pub(crate) use block_meta_editor::*; +pub(crate) use block_meta_manager::*; diff --git a/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs b/frontend/rust-lib/flowy-grid/src/services/grid_meta_editor.rs similarity index 93% rename from frontend/rust-lib/flowy-grid/src/services/grid_editor.rs rename to frontend/rust-lib/flowy-grid/src/services/grid_meta_editor.rs index 82db6aac1b..d584e1dc31 100644 --- a/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs +++ b/frontend/rust-lib/flowy-grid/src/services/grid_meta_editor.rs @@ -1,7 +1,7 @@ use crate::dart_notification::{send_dart_notification, GridNotification}; use crate::entities::CellIdentifier; use crate::manager::GridUser; -use crate::services::block_meta_manager::GridBlockManager; +use crate::services::block::GridBlockMetaManager; use crate::services::field::{default_type_option_builder_from_type, type_option_builder_from_bytes, FieldBuilder}; use crate::services::persistence::block_index::BlockIndexCache; use crate::services::row::*; @@ -24,7 +24,7 @@ pub struct GridMetaEditor { user: Arc, grid_pad: Arc>, rev_manager: Arc, - block_manager: Arc, + block_meta_manager: Arc, } impl Drop for GridMetaEditor { @@ -47,13 +47,13 @@ impl GridMetaEditor { let grid_pad = Arc::new(RwLock::new(grid_pad)); let blocks = grid_pad.read().await.get_block_metas(); - let block_meta_manager = Arc::new(GridBlockManager::new(grid_id, &user, blocks, persistence).await?); + let block_meta_manager = Arc::new(GridBlockMetaManager::new(grid_id, &user, blocks, persistence).await?); Ok(Arc::new(Self { grid_id: grid_id.to_owned(), user, grid_pad, rev_manager, - block_manager: block_meta_manager, + block_meta_manager, })) } @@ -240,12 +240,12 @@ impl GridMetaEditor { Ok(field_metas) } - pub async fn create_block(&self, grid_block: GridBlockMeta) -> FlowyResult<()> { + pub async fn create_block(&self, grid_block: GridBlockMetaSnapshot) -> FlowyResult<()> { let _ = self.modify(|grid| Ok(grid.create_block_meta(grid_block)?)).await?; Ok(()) } - pub async fn update_block(&self, changeset: GridBlockMetaChangeset) -> FlowyResult<()> { + pub async fn update_block(&self, changeset: GridBlockInfoChangeset) -> FlowyResult<()> { let _ = self.modify(|grid| Ok(grid.update_block_meta(changeset)?)).await?; Ok(()) } @@ -260,10 +260,13 @@ impl GridMetaEditor { let row_order = RowOrder::from(&row_meta); // insert the row - let row_count = self.block_manager.create_row(&block_id, row_meta, start_row_id).await?; + let row_count = self + .block_meta_manager + .create_row(&block_id, row_meta, start_row_id) + .await?; // update block row count - let changeset = GridBlockMetaChangeset::from_row_count(&block_id, row_count); + let changeset = GridBlockInfoChangeset::from_row_count(&block_id, row_count); let _ = self.update_block(changeset).await?; Ok(row_order) } @@ -280,7 +283,7 @@ impl GridMetaEditor { .or_insert_with(Vec::new) .push(row_meta); } - let changesets = self.block_manager.insert_row(rows_by_block_id).await?; + let changesets = self.block_meta_manager.insert_row(rows_by_block_id).await?; for changeset in changesets { let _ = self.update_block(changeset).await?; } @@ -289,7 +292,7 @@ impl GridMetaEditor { pub async fn update_row(&self, changeset: RowMetaChangeset) -> FlowyResult<()> { let field_metas = self.get_field_metas::(None).await?; - self.block_manager + self.block_meta_manager .update_row(changeset, |row_meta| make_row_from_row_meta(&field_metas, row_meta)) .await } @@ -312,7 +315,7 @@ impl GridMetaEditor { } pub async fn get_row(&self, row_id: &str) -> FlowyResult> { - match self.block_manager.get_row_meta(row_id).await? { + match self.block_meta_manager.get_row_meta(row_id).await? { None => Ok(None), Some(row_meta) => { let field_metas = self.get_field_metas::(None).await?; @@ -324,7 +327,7 @@ impl GridMetaEditor { } } pub async fn delete_row(&self, row_id: &str) -> FlowyResult<()> { - let _ = self.block_manager.delete_row(row_id).await?; + let _ = self.block_meta_manager.delete_row(row_id).await?; Ok(()) } @@ -334,12 +337,12 @@ impl GridMetaEditor { pub async fn get_cell(&self, params: &CellIdentifier) -> Option { let field_meta = self.get_field_meta(¶ms.field_id).await?; - let row_meta = self.block_manager.get_row_meta(¶ms.row_id).await.ok()??; + let row_meta = self.block_meta_manager.get_row_meta(¶ms.row_id).await.ok()??; make_cell(¶ms.field_id, &field_meta, &row_meta) } pub async fn get_cell_meta(&self, row_id: &str, field_id: &str) -> FlowyResult> { - let row_meta = self.block_manager.get_row_meta(row_id).await?; + let row_meta = self.block_meta_manager.get_row_meta(row_id).await?; match row_meta { None => Ok(None), Some(row_meta) => { @@ -385,7 +388,7 @@ impl GridMetaEditor { cell_content_changeset, }; let _ = self - .block_manager + .block_meta_manager .update_cell(cell_changeset, |row_meta| { make_row_from_row_meta(&field_metas, row_meta) }) @@ -400,13 +403,13 @@ impl GridMetaEditor { make_grid_blocks(block_ids, block_snapshots) } - pub async fn get_block_metas(&self) -> FlowyResult> { + pub async fn get_block_metas(&self) -> FlowyResult> { let grid_blocks = self.grid_pad.read().await.get_block_metas(); Ok(grid_blocks) } pub async fn delete_rows(&self, row_orders: Vec) -> FlowyResult<()> { - let changesets = self.block_manager.delete_rows(row_orders).await?; + let changesets = self.block_meta_manager.delete_rows(row_orders).await?; for changeset in changesets { let _ = self.update_block(changeset).await?; } @@ -418,7 +421,7 @@ impl GridMetaEditor { let field_orders = pad_read_guard.get_field_orders(); let mut block_orders = vec![]; for block_order in pad_read_guard.get_block_metas() { - let row_orders = self.block_manager.get_row_orders(&block_order.block_id).await?; + let row_orders = self.block_meta_manager.get_row_orders(&block_order.block_id).await?; let block_order = GridBlockOrder { block_id: block_order.block_id, row_orders, @@ -445,7 +448,7 @@ impl GridMetaEditor { .collect::>(), Some(block_ids) => block_ids, }; - let snapshots = self.block_manager.make_block_snapshots(block_ids).await?; + let snapshots = self.block_meta_manager.make_block_snapshots(block_ids).await?; Ok(snapshots) } @@ -479,7 +482,10 @@ impl GridMetaEditor { } pub async fn move_row(&self, row_id: &str, from: i32, to: i32) -> FlowyResult<()> { - let _ = self.block_manager.move_row(row_id, from as usize, to as usize).await?; + let _ = self + .block_meta_manager + .move_row(row_id, from as usize, to as usize) + .await?; Ok(()) } @@ -495,13 +501,14 @@ impl GridMetaEditor { let mut blocks_meta_data = vec![]; if original_blocks.len() == duplicated_blocks.len() { for (index, original_block_meta) in original_blocks.iter().enumerate() { - let grid_block_meta_editor = self.block_manager.get_editor(&original_block_meta.block_id).await?; + let grid_block_meta_editor = self + .block_meta_manager + .get_editor(&original_block_meta.block_id) + .await?; let duplicated_block_id = &duplicated_blocks[index].block_id; tracing::trace!("Duplicate block:{} meta data", duplicated_block_id); - let duplicated_block_meta_data = grid_block_meta_editor - .duplicate_block_meta_data(duplicated_block_id) - .await; + let duplicated_block_meta_data = grid_block_meta_editor.duplicate_block_meta(duplicated_block_id).await; blocks_meta_data.push(duplicated_block_meta_data); } } else { diff --git a/frontend/rust-lib/flowy-grid/src/services/mod.rs b/frontend/rust-lib/flowy-grid/src/services/mod.rs index c9a8217bd8..69622827bf 100644 --- a/frontend/rust-lib/flowy-grid/src/services/mod.rs +++ b/frontend/rust-lib/flowy-grid/src/services/mod.rs @@ -1,8 +1,7 @@ mod util; -pub mod block_meta_editor; -mod block_meta_manager; +pub mod block; pub mod field; -pub mod grid_editor; +pub mod grid_meta_editor; pub mod persistence; pub mod row; diff --git a/frontend/rust-lib/flowy-grid/tests/grid/grid_test.rs b/frontend/rust-lib/flowy-grid/tests/grid/grid_test.rs index 7b210d4f95..63bea4cc3d 100644 --- a/frontend/rust-lib/flowy-grid/tests/grid/grid_test.rs +++ b/frontend/rust-lib/flowy-grid/tests/grid/grid_test.rs @@ -7,7 +7,7 @@ use flowy_grid::services::field::{ }; use flowy_grid::services::row::{decode_cell_data_from_type_option_cell_data, CreateRowMetaBuilder}; use flowy_grid_data_model::entities::{ - CellChangeset, FieldChangesetParams, FieldType, GridBlockMeta, GridBlockMetaChangeset, RowMetaChangeset, + CellChangeset, FieldChangesetParams, FieldType, GridBlockInfoChangeset, GridBlockMetaSnapshot, RowMetaChangeset, TypeOptionDataEntry, }; @@ -123,7 +123,7 @@ async fn grid_delete_field() { #[tokio::test] async fn grid_create_block() { - let grid_block = GridBlockMeta::new(); + let grid_block = GridBlockMetaSnapshot::new(); let scripts = vec![ AssertBlockCount(1), CreateBlock { block: grid_block }, @@ -134,9 +134,9 @@ async fn grid_create_block() { #[tokio::test] async fn grid_update_block() { - let grid_block = GridBlockMeta::new(); + let grid_block = GridBlockMetaSnapshot::new(); let mut cloned_grid_block = grid_block.clone(); - let changeset = GridBlockMetaChangeset { + let changeset = GridBlockInfoChangeset { block_id: grid_block.block_id.clone(), start_row_index: Some(2), row_count: Some(10), diff --git a/frontend/rust-lib/flowy-grid/tests/grid/script.rs b/frontend/rust-lib/flowy-grid/tests/grid/script.rs index 37c3b736d4..c1fa47c7a9 100644 --- a/frontend/rust-lib/flowy-grid/tests/grid/script.rs +++ b/frontend/rust-lib/flowy-grid/tests/grid/script.rs @@ -1,10 +1,11 @@ use bytes::Bytes; use flowy_grid::services::field::*; -use flowy_grid::services::grid_editor::{GridMetaEditor, GridPadBuilder}; +use flowy_grid::services::grid_meta_editor::{GridMetaEditor, GridPadBuilder}; use flowy_grid::services::row::CreateRowMetaPayload; use flowy_grid_data_model::entities::{ - BuildGridContext, CellChangeset, Field, FieldChangesetParams, FieldMeta, FieldOrder, FieldType, GridBlockMeta, - GridBlockMetaChangeset, InsertFieldParams, RowMeta, RowMetaChangeset, RowOrder, TypeOptionDataEntry, + BuildGridContext, CellChangeset, Field, FieldChangesetParams, FieldMeta, FieldOrder, FieldType, + GridBlockInfoChangeset, GridBlockMetaSnapshot, InsertFieldParams, RowMeta, RowMetaChangeset, RowOrder, + TypeOptionDataEntry, }; use flowy_revision::REVISION_WRITE_INTERVAL_IN_MILLIS; use flowy_sync::client_grid::GridBuilder; @@ -32,10 +33,10 @@ pub enum EditorScript { field_meta: FieldMeta, }, CreateBlock { - block: GridBlockMeta, + block: GridBlockMetaSnapshot, }, UpdateBlock { - changeset: GridBlockMetaChangeset, + changeset: GridBlockInfoChangeset, }, AssertBlockCount(usize), AssertBlock { @@ -45,7 +46,7 @@ pub enum EditorScript { }, AssertBlockEqual { block_index: usize, - block: GridBlockMeta, + block: GridBlockMetaSnapshot, }, CreateEmptyRow, CreateRow { @@ -74,7 +75,7 @@ pub struct GridEditorTest { pub grid_id: String, pub editor: Arc, pub field_metas: Vec, - pub grid_blocks: Vec, + pub grid_blocks: Vec, pub row_metas: Vec>, pub field_count: usize, diff --git a/shared-lib/flowy-grid-data-model/src/entities/meta.rs b/shared-lib/flowy-grid-data-model/src/entities/meta.rs index d62965245b..c48f9864d5 100644 --- a/shared-lib/flowy-grid-data-model/src/entities/meta.rs +++ b/shared-lib/flowy-grid-data-model/src/entities/meta.rs @@ -28,17 +28,17 @@ pub fn gen_field_id() -> String { pub struct GridMeta { pub grid_id: String, pub fields: Vec, - pub blocks: Vec, + pub blocks: Vec, } #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] -pub struct GridBlockMeta { +pub struct GridBlockMetaSnapshot { pub block_id: String, pub start_row_index: i32, pub row_count: i32, } -impl GridBlockMeta { +impl GridBlockMetaSnapshot { pub fn len(&self) -> i32 { self.row_count } @@ -48,22 +48,22 @@ impl GridBlockMeta { } } -impl GridBlockMeta { +impl GridBlockMetaSnapshot { pub fn new() -> Self { - GridBlockMeta { + GridBlockMetaSnapshot { block_id: gen_block_id(), ..Default::default() } } } -pub struct GridBlockMetaChangeset { +pub struct GridBlockInfoChangeset { pub block_id: String, pub start_row_index: Option, pub row_count: Option, } -impl GridBlockMetaChangeset { +impl GridBlockInfoChangeset { pub fn from_row_count(block_id: &str, row_count: i32) -> Self { Self { block_id: block_id.to_string(), @@ -74,7 +74,7 @@ impl GridBlockMetaChangeset { } #[derive(Debug, Clone, Default, Serialize, Deserialize)] -pub struct GridBlockMetaData { +pub struct GridBlockMeta { pub block_id: String, pub rows: Vec, } @@ -206,8 +206,8 @@ impl CellMeta { #[derive(Clone, Default, Deserialize, Serialize)] pub struct BuildGridContext { pub field_metas: Vec, - pub blocks: Vec, - pub blocks_meta_data: Vec, + pub blocks: Vec, + pub blocks_meta_data: Vec, } impl BuildGridContext { diff --git a/shared-lib/flowy-sync/src/client_grid/grid_block_meta_pad.rs b/shared-lib/flowy-sync/src/client_grid/grid_block_meta_pad.rs index 175890ad01..25ee1f5471 100644 --- a/shared-lib/flowy-sync/src/client_grid/grid_block_meta_pad.rs +++ b/shared-lib/flowy-sync/src/client_grid/grid_block_meta_pad.rs @@ -1,9 +1,7 @@ use crate::entities::revision::{md5, RepeatedRevision, Revision}; use crate::errors::{CollaborateError, CollaborateResult}; use crate::util::{cal_diff, make_delta_from_revisions}; -use flowy_grid_data_model::entities::{ - gen_block_id, gen_row_id, CellMeta, GridBlockMetaData, RowMeta, RowMetaChangeset, -}; +use flowy_grid_data_model::entities::{gen_block_id, gen_row_id, CellMeta, GridBlockMeta, RowMeta, RowMetaChangeset}; use lib_ot::core::{OperationTransformable, PlainTextAttributes, PlainTextDelta, PlainTextDeltaBuilder}; use serde::{Deserialize, Serialize}; use std::borrow::Cow; @@ -24,7 +22,7 @@ pub struct GridBlockMetaPad { } impl GridBlockMetaPad { - pub async fn duplicate_data(&self, duplicated_block_id: &str) -> GridBlockMetaData { + pub async fn duplicate_data(&self, duplicated_block_id: &str) -> GridBlockMeta { let duplicated_rows = self .rows .iter() @@ -35,7 +33,7 @@ impl GridBlockMetaPad { duplicated_row }) .collect::>(); - GridBlockMetaData { + GridBlockMeta { block_id: duplicated_block_id.to_string(), rows: duplicated_rows, } @@ -43,7 +41,7 @@ impl GridBlockMetaPad { pub fn from_delta(delta: GridBlockMetaDelta) -> CollaborateResult { let s = delta.to_str()?; - let meta_data: GridBlockMetaData = serde_json::from_str(&s).map_err(|e| { + let meta_data: GridBlockMeta = serde_json::from_str(&s).map_err(|e| { let msg = format!("Deserialize delta to block meta failed: {}", e); tracing::error!("{}", s); CollaborateError::internal().context(msg) @@ -241,12 +239,12 @@ pub struct GridBlockMetaChange { pub md5: String, } -pub fn make_block_meta_delta(grid_block_meta_data: &GridBlockMetaData) -> GridBlockMetaDelta { +pub fn make_block_meta_delta(grid_block_meta_data: &GridBlockMeta) -> GridBlockMetaDelta { let json = serde_json::to_string(&grid_block_meta_data).unwrap(); PlainTextDeltaBuilder::new().insert(&json).build() } -pub fn make_block_meta_revisions(user_id: &str, grid_block_meta_data: &GridBlockMetaData) -> RepeatedRevision { +pub fn make_block_meta_revisions(user_id: &str, grid_block_meta_data: &GridBlockMeta) -> RepeatedRevision { let delta = make_block_meta_delta(grid_block_meta_data); let bytes = delta.to_delta_bytes(); let revision = Revision::initial_revision(user_id, &grid_block_meta_data.block_id, bytes); @@ -255,7 +253,7 @@ pub fn make_block_meta_revisions(user_id: &str, grid_block_meta_data: &GridBlock impl std::default::Default for GridBlockMetaPad { fn default() -> Self { - let block_meta_data = GridBlockMetaData { + let block_meta_data = GridBlockMeta { block_id: gen_block_id(), rows: vec![], }; diff --git a/shared-lib/flowy-sync/src/client_grid/grid_builder.rs b/shared-lib/flowy-sync/src/client_grid/grid_builder.rs index 95e707a287..d0da45dc81 100644 --- a/shared-lib/flowy-sync/src/client_grid/grid_builder.rs +++ b/shared-lib/flowy-sync/src/client_grid/grid_builder.rs @@ -1,5 +1,5 @@ use crate::errors::{CollaborateError, CollaborateResult}; -use flowy_grid_data_model::entities::{BuildGridContext, FieldMeta, GridBlockMeta, GridBlockMetaData, RowMeta}; +use flowy_grid_data_model::entities::{BuildGridContext, FieldMeta, GridBlockMeta, GridBlockMetaSnapshot, RowMeta}; pub struct GridBuilder { build_context: BuildGridContext, @@ -9,8 +9,8 @@ impl std::default::Default for GridBuilder { fn default() -> Self { let mut build_context = BuildGridContext::new(); - let block_meta = GridBlockMeta::new(); - let block_meta_data = GridBlockMetaData { + let block_meta = GridBlockMetaSnapshot::new(); + let block_meta_data = GridBlockMeta { block_id: block_meta.block_id.clone(), rows: vec![], }; @@ -59,7 +59,7 @@ fn check_rows(fields: &[FieldMeta], rows: &[RowMeta]) -> CollaborateResult<()> { mod tests { use crate::client_grid::{make_block_meta_delta, make_grid_delta, GridBuilder}; - use flowy_grid_data_model::entities::{FieldMeta, FieldType, GridBlockMetaData, GridMeta}; + use flowy_grid_data_model::entities::{FieldMeta, FieldType, GridBlockMeta, GridMeta}; #[test] fn create_default_grid_test() { @@ -82,6 +82,6 @@ mod tests { let _: GridMeta = serde_json::from_str(&grid_meta_delta.to_str().unwrap()).unwrap(); let grid_block_meta_delta = make_block_meta_delta(build_context.blocks_meta_data.first().unwrap()); - let _: GridBlockMetaData = serde_json::from_str(&grid_block_meta_delta.to_str().unwrap()).unwrap(); + let _: GridBlockMeta = serde_json::from_str(&grid_block_meta_delta.to_str().unwrap()).unwrap(); } } diff --git a/shared-lib/flowy-sync/src/client_grid/grid_meta_pad.rs b/shared-lib/flowy-sync/src/client_grid/grid_meta_pad.rs index 0e255f0f12..1d800961e3 100644 --- a/shared-lib/flowy-sync/src/client_grid/grid_meta_pad.rs +++ b/shared-lib/flowy-sync/src/client_grid/grid_meta_pad.rs @@ -3,8 +3,8 @@ use crate::errors::{internal_error, CollaborateError, CollaborateResult}; use crate::util::{cal_diff, make_delta_from_revisions}; use bytes::Bytes; use flowy_grid_data_model::entities::{ - gen_block_id, gen_grid_id, FieldChangesetParams, FieldMeta, FieldOrder, FieldType, GridBlockMeta, - GridBlockMetaChangeset, GridMeta, + gen_block_id, gen_grid_id, FieldChangesetParams, FieldMeta, FieldOrder, FieldType, GridBlockInfoChangeset, + GridBlockMetaSnapshot, GridMeta, }; use lib_infra::util::move_vec_element; use lib_ot::core::{OperationTransformable, PlainTextAttributes, PlainTextDelta, PlainTextDeltaBuilder}; @@ -24,7 +24,7 @@ pub trait JsonDeserializer { } impl GridMetaPad { - pub async fn duplicate_grid_meta(&self) -> (Vec, Vec) { + pub async fn duplicate_grid_meta(&self) -> (Vec, Vec) { let fields = self .grid_meta .fields @@ -41,7 +41,7 @@ impl GridMetaPad { duplicated_block.block_id = gen_block_id(); duplicated_block }) - .collect::>(); + .collect::>(); (fields, blocks) } @@ -283,7 +283,7 @@ impl GridMetaPad { } } - pub fn create_block_meta(&mut self, block: GridBlockMeta) -> CollaborateResult> { + pub fn create_block_meta(&mut self, block: GridBlockMetaSnapshot) -> CollaborateResult> { self.modify_grid(|grid_meta| { if grid_meta.blocks.iter().any(|b| b.block_id == block.block_id) { tracing::warn!("Duplicate grid block"); @@ -306,11 +306,11 @@ impl GridMetaPad { }) } - pub fn get_block_metas(&self) -> Vec { + pub fn get_block_metas(&self) -> Vec { self.grid_meta.blocks.clone() } - pub fn update_block_meta(&mut self, changeset: GridBlockMetaChangeset) -> CollaborateResult> { + pub fn update_block_meta(&mut self, changeset: GridBlockInfoChangeset) -> CollaborateResult> { let block_id = changeset.block_id.clone(); self.modify_block(&block_id, |block| { let mut is_changed = None; @@ -368,7 +368,7 @@ impl GridMetaPad { pub fn modify_block(&mut self, block_id: &str, f: F) -> CollaborateResult> where - F: FnOnce(&mut GridBlockMeta) -> CollaborateResult>, + F: FnOnce(&mut GridBlockMetaSnapshot) -> CollaborateResult>, { self.modify_grid( |grid_meta| match grid_meta.blocks.iter().position(|block| block.block_id == block_id) { From f6ade11eb2a4fc00b20fbd609fdb963218a7a403 Mon Sep 17 00:00:00 2001 From: appflowy Date: Tue, 7 Jun 2022 16:38:00 +0800 Subject: [PATCH 2/5] chore: config revision history --- .../down.sql | 2 + .../2022-06-07-071750_revision-history/up.sql | 8 ++ .../rust-lib/flowy-database/src/schema.rs | 11 +++ .../flowy-revision/src/history/mod.rs | 4 + .../flowy-revision/src/history/persistence.rs | 22 ++++++ .../flowy-revision/src/history/rev_history.rs | 78 +++++++++++++++++++ frontend/rust-lib/flowy-revision/src/lib.rs | 1 + .../flowy-revision/src/rev_manager.rs | 6 +- .../src/client_grid/grid_block_meta_pad.rs | 27 +++++-- 9 files changed, 150 insertions(+), 9 deletions(-) create mode 100644 frontend/rust-lib/flowy-database/migrations/2022-06-07-071750_revision-history/down.sql create mode 100644 frontend/rust-lib/flowy-database/migrations/2022-06-07-071750_revision-history/up.sql create mode 100644 frontend/rust-lib/flowy-revision/src/history/mod.rs create mode 100644 frontend/rust-lib/flowy-revision/src/history/persistence.rs create mode 100644 frontend/rust-lib/flowy-revision/src/history/rev_history.rs diff --git a/frontend/rust-lib/flowy-database/migrations/2022-06-07-071750_revision-history/down.sql b/frontend/rust-lib/flowy-database/migrations/2022-06-07-071750_revision-history/down.sql new file mode 100644 index 0000000000..8d9f64d2a7 --- /dev/null +++ b/frontend/rust-lib/flowy-database/migrations/2022-06-07-071750_revision-history/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +DROP TABLE rev_history; \ No newline at end of file diff --git a/frontend/rust-lib/flowy-database/migrations/2022-06-07-071750_revision-history/up.sql b/frontend/rust-lib/flowy-database/migrations/2022-06-07-071750_revision-history/up.sql new file mode 100644 index 0000000000..23ada9a2e8 --- /dev/null +++ b/frontend/rust-lib/flowy-database/migrations/2022-06-07-071750_revision-history/up.sql @@ -0,0 +1,8 @@ +-- Your SQL goes here +CREATE TABLE rev_history ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + object_id TEXT NOT NULL DEFAULT '', + start_rev_id BIGINT NOT NULL DEFAULT 0, + end_rev_id BIGINT NOT NULL DEFAULT 0, + data BLOB NOT NULL DEFAULT (x'') +); \ No newline at end of file diff --git a/frontend/rust-lib/flowy-database/src/schema.rs b/frontend/rust-lib/flowy-database/src/schema.rs index 8fddd037e0..b57ff46875 100644 --- a/frontend/rust-lib/flowy-database/src/schema.rs +++ b/frontend/rust-lib/flowy-database/src/schema.rs @@ -57,6 +57,16 @@ table! { } } +table! { + rev_history (id) { + id -> Integer, + object_id -> Text, + start_rev_id -> BigInt, + end_rev_id -> BigInt, + data -> Binary, + } +} + table! { rev_table (id) { id -> Integer, @@ -124,6 +134,7 @@ allow_tables_to_appear_in_same_query!( grid_meta_rev_table, grid_rev_table, kv_table, + rev_history, rev_table, trash_table, user_table, diff --git a/frontend/rust-lib/flowy-revision/src/history/mod.rs b/frontend/rust-lib/flowy-revision/src/history/mod.rs new file mode 100644 index 0000000000..be42f6f9be --- /dev/null +++ b/frontend/rust-lib/flowy-revision/src/history/mod.rs @@ -0,0 +1,4 @@ +mod persistence; +mod rev_history; + +pub use rev_history::*; diff --git a/frontend/rust-lib/flowy-revision/src/history/persistence.rs b/frontend/rust-lib/flowy-revision/src/history/persistence.rs new file mode 100644 index 0000000000..c548e04940 --- /dev/null +++ b/frontend/rust-lib/flowy-revision/src/history/persistence.rs @@ -0,0 +1,22 @@ +use crate::history::RevisionHistoryDiskCache; +use flowy_error::FlowyError; +use flowy_sync::entities::revision::Revision; + +pub struct SQLiteRevisionHistoryPersistence {} + +impl SQLiteRevisionHistoryPersistence { + pub fn new() -> Self { + Self {} + } +} + +impl RevisionHistoryDiskCache for SQLiteRevisionHistoryPersistence { + type Error = FlowyError; + + fn save_revision(&self, revision: Revision) -> Result<(), Self::Error> { + todo!() + } +} + +struct RevisionHistorySql(); +impl RevisionHistorySql {} diff --git a/frontend/rust-lib/flowy-revision/src/history/rev_history.rs b/frontend/rust-lib/flowy-revision/src/history/rev_history.rs new file mode 100644 index 0000000000..8c0b10e071 --- /dev/null +++ b/frontend/rust-lib/flowy-revision/src/history/rev_history.rs @@ -0,0 +1,78 @@ +use crate::history::persistence::SQLiteRevisionHistoryPersistence; +use flowy_error::FlowyError; +use flowy_sync::entities::revision::Revision; +use std::fmt::Debug; +use std::sync::Arc; +use tokio::sync::RwLock; + +pub trait RevisionHistoryDiskCache: Send + Sync { + type Error: Debug; + + fn save_revision(&self, revision: Revision) -> Result<(), Self::Error>; +} + +pub struct RevisionHistory { + config: RevisionHistoryConfig, + checkpoint: Arc>, + disk_cache: Arc>, +} + +impl RevisionHistory { + pub fn new(config: RevisionHistoryConfig) -> Self { + let disk_cache = Arc::new(SQLiteRevisionHistoryPersistence::new()); + let cloned_disk_cache = disk_cache.clone(); + let checkpoint = HistoryCheckpoint::from_config(&config, move |revision| { + let _ = cloned_disk_cache.save_revision(revision); + }); + let checkpoint = Arc::new(RwLock::new(checkpoint)); + + Self { + config, + checkpoint, + disk_cache, + } + } + + pub async fn save_revision(&self, revision: &Revision) { + self.checkpoint.write().await.add_revision(revision); + } +} + +pub struct RevisionHistoryConfig { + check_when_close: bool, + check_interval: i64, +} + +impl std::default::Default for RevisionHistoryConfig { + fn default() -> Self { + Self { + check_when_close: true, + check_interval: 19, + } + } +} + +struct HistoryCheckpoint { + interval: i64, + revisions: Vec, + on_check: Box, +} + +impl HistoryCheckpoint { + fn from_config(config: &RevisionHistoryConfig, on_check: F) -> Self + where + F: Fn(Revision) + Send + Sync + 'static, + { + Self { + interval: config.check_interval, + revisions: vec![], + on_check: Box::new(on_check), + } + } + + fn check(&mut self) -> Revision { + todo!() + } + + fn add_revision(&mut self, revision: &Revision) {} +} diff --git a/frontend/rust-lib/flowy-revision/src/lib.rs b/frontend/rust-lib/flowy-revision/src/lib.rs index 05e60c00e0..6c708afab2 100644 --- a/frontend/rust-lib/flowy-revision/src/lib.rs +++ b/frontend/rust-lib/flowy-revision/src/lib.rs @@ -1,5 +1,6 @@ mod cache; mod conflict_resolve; +mod history; mod rev_manager; mod rev_persistence; mod ws_manager; diff --git a/frontend/rust-lib/flowy-revision/src/rev_manager.rs b/frontend/rust-lib/flowy-revision/src/rev_manager.rs index 68d8db6f27..15f37af456 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_manager.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_manager.rs @@ -1,4 +1,5 @@ use crate::disk::RevisionState; +use crate::history::{RevisionHistory, RevisionHistoryConfig}; use crate::{RevisionPersistence, WSDataProviderDataSource}; use bytes::Bytes; use flowy_error::{FlowyError, FlowyResult}; @@ -45,7 +46,7 @@ pub struct RevisionManager { user_id: String, rev_id_counter: RevIdCounter, rev_persistence: Arc, - + rev_history: Arc, #[cfg(feature = "flowy_unit_test")] rev_ack_notifier: tokio::sync::broadcast::Sender, } @@ -53,6 +54,8 @@ pub struct RevisionManager { impl RevisionManager { pub fn new(user_id: &str, object_id: &str, rev_persistence: Arc) -> Self { let rev_id_counter = RevIdCounter::new(0); + let rev_history_config = RevisionHistoryConfig::default(); + let rev_history = Arc::new(RevisionHistory::new(rev_history_config)); #[cfg(feature = "flowy_unit_test")] let (revision_ack_notifier, _) = tokio::sync::broadcast::channel(1); @@ -61,6 +64,7 @@ impl RevisionManager { user_id: user_id.to_owned(), rev_id_counter, rev_persistence, + rev_history, #[cfg(feature = "flowy_unit_test")] rev_ack_notifier: revision_ack_notifier, diff --git a/shared-lib/flowy-sync/src/client_grid/grid_block_meta_pad.rs b/shared-lib/flowy-sync/src/client_grid/grid_block_meta_pad.rs index 25ee1f5471..68c182bcc0 100644 --- a/shared-lib/flowy-sync/src/client_grid/grid_block_meta_pad.rs +++ b/shared-lib/flowy-sync/src/client_grid/grid_block_meta_pad.rs @@ -61,7 +61,7 @@ impl GridBlockMetaPad { &mut self, row: RowMeta, start_row_id: Option, - ) -> CollaborateResult> { + ) -> CollaborateResult> { self.modify(|rows| { if let Some(start_row_id) = start_row_id { if !start_row_id.is_empty() { @@ -77,7 +77,10 @@ impl GridBlockMetaPad { }) } - pub fn delete_rows(&mut self, row_ids: Vec>) -> CollaborateResult> { + pub fn delete_rows( + &mut self, + row_ids: Vec>, + ) -> CollaborateResult> { self.modify(|rows| { rows.retain(|row| !row_ids.contains(&Cow::Borrowed(&row.id))); Ok(Some(())) @@ -141,7 +144,10 @@ impl GridBlockMetaPad { .map(|index| index as i32) } - pub fn update_row(&mut self, changeset: RowMetaChangeset) -> CollaborateResult> { + pub fn update_row( + &mut self, + changeset: RowMetaChangeset, + ) -> CollaborateResult> { let row_id = changeset.row_id.clone(); self.modify_row(&row_id, |row| { let mut is_changed = None; @@ -166,7 +172,12 @@ impl GridBlockMetaPad { }) } - pub fn move_row(&mut self, row_id: &str, from: usize, to: usize) -> CollaborateResult> { + pub fn move_row( + &mut self, + row_id: &str, + from: usize, + to: usize, + ) -> CollaborateResult> { self.modify(|row_metas| { if let Some(position) = row_metas.iter().position(|row_meta| row_meta.id == row_id) { debug_assert_eq!(from, position); @@ -179,7 +190,7 @@ impl GridBlockMetaPad { }) } - pub fn modify(&mut self, f: F) -> CollaborateResult> + pub fn modify(&mut self, f: F) -> CollaborateResult> where F: for<'a> FnOnce(&'a mut Vec>) -> CollaborateResult>, { @@ -198,14 +209,14 @@ impl GridBlockMetaPad { // self.delta.to_str().unwrap_or_else(|_| "".to_string()) // ); self.delta = self.delta.compose(&delta)?; - Ok(Some(GridBlockMetaChange { delta, md5: self.md5() })) + Ok(Some(GridBlockMetaDeltaChangeset { delta, md5: self.md5() })) } } } } } - fn modify_row(&mut self, row_id: &str, f: F) -> CollaborateResult> + fn modify_row(&mut self, row_id: &str, f: F) -> CollaborateResult> where F: FnOnce(&mut RowMeta) -> CollaborateResult>, { @@ -233,7 +244,7 @@ impl GridBlockMetaPad { } } -pub struct GridBlockMetaChange { +pub struct GridBlockMetaDeltaChangeset { pub delta: GridBlockMetaDelta, /// md5: the md5 of the grid after applying the change. pub md5: String, From aeb69f307c09f8d98f01544270d4b40a785a12f6 Mon Sep 17 00:00:00 2001 From: appflowy Date: Thu, 9 Jun 2022 20:58:56 +0800 Subject: [PATCH 3/5] chore: write checkpoint in fixed duration --- frontend/rust-lib/flowy-folder/src/manager.rs | 17 +- .../src/services/folder_editor.rs | 8 +- .../src/services/persistence/migration.rs | 2 +- frontend/rust-lib/flowy-grid/src/manager.rs | 12 +- .../src/services/block/block_meta_editor.rs | 7 +- .../src/services/block/block_meta_manager.rs | 19 +- .../src/services/grid_meta_editor.rs | 9 +- ...ev_impl.rs => grid_block_meta_rev_impl.rs} | 1 - .../flowy-revision/src/cache/disk/mod.rs | 4 +- .../flowy-revision/src/history/mod.rs | 1 + .../flowy-revision/src/history/persistence.rs | 51 ++++- .../flowy-revision/src/history/rev_history.rs | 187 ++++++++++++++---- frontend/rust-lib/flowy-revision/src/lib.rs | 1 + .../flowy-revision/src/rev_manager.rs | 43 +++- .../flowy-revision/src/rev_persistence.rs | 12 +- .../rust-lib/flowy-revision/src/ws_manager.rs | 1 - .../rust-lib/flowy-text-block/src/manager.rs | 20 +- .../rust-lib/flowy-text-block/src/queue.rs | 5 +- .../src/client_grid/grid_meta_pad.rs | 7 +- 19 files changed, 299 insertions(+), 108 deletions(-) rename frontend/rust-lib/flowy-revision/src/cache/disk/{grid_meta_rev_impl.rs => grid_block_meta_rev_impl.rs} (99%) diff --git a/frontend/rust-lib/flowy-folder/src/manager.rs b/frontend/rust-lib/flowy-folder/src/manager.rs index 3181c14625..035555576a 100644 --- a/frontend/rust-lib/flowy-folder/src/manager.rs +++ b/frontend/rust-lib/flowy-folder/src/manager.rs @@ -11,11 +11,12 @@ use crate::{ use bytes::Bytes; use flowy_sync::client_document::default::{initial_quill_delta_string, initial_read_me}; +use crate::services::folder_editor::FolderRevisionCompactor; use flowy_error::FlowyError; use flowy_folder_data_model::entities::view::ViewDataType; use flowy_folder_data_model::user_default; use flowy_revision::disk::SQLiteTextBlockRevisionPersistence; -use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket}; +use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionHistoryPersistence}; use flowy_sync::{client_folder::FolderPad, entities::ws_data::ServerRevisionWSData}; use lazy_static::lazy_static; use lib_infra::future::FutureResult; @@ -162,9 +163,17 @@ impl FolderManager { let _ = self.persistence.initialize(user_id, &folder_id).await?; let pool = self.persistence.db_pool()?; - let disk_cache = Arc::new(SQLiteTextBlockRevisionPersistence::new(user_id, pool)); - let rev_persistence = Arc::new(RevisionPersistence::new(user_id, folder_id.as_ref(), disk_cache)); - let rev_manager = RevisionManager::new(user_id, folder_id.as_ref(), rev_persistence); + let disk_cache = SQLiteTextBlockRevisionPersistence::new(user_id, pool.clone()); + let rev_persistence = RevisionPersistence::new(user_id, folder_id.as_ref(), disk_cache); + let rev_compactor = FolderRevisionCompactor(); + let history_persistence = SQLiteRevisionHistoryPersistence::new(pool); + let rev_manager = RevisionManager::new( + user_id, + folder_id.as_ref(), + rev_persistence, + rev_compactor, + history_persistence, + ); let folder_editor = FolderEditor::new(user_id, &folder_id, token, rev_manager, self.web_socket.clone()).await?; *self.folder_editor.write().await = Some(Arc::new(folder_editor)); diff --git a/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs b/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs index ef94c1d316..6661530213 100644 --- a/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs +++ b/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs @@ -91,11 +91,7 @@ impl FolderEditor { &self.user_id, md5, ); - let _ = futures::executor::block_on(async { - self.rev_manager - .add_local_revision(&revision, Box::new(FolderRevisionCompactor())) - .await - })?; + let _ = futures::executor::block_on(async { self.rev_manager.add_local_revision(&revision).await })?; Ok(()) } @@ -135,7 +131,7 @@ impl FolderEditor { } } -struct FolderRevisionCompactor(); +pub struct FolderRevisionCompactor(); impl RevisionCompactor for FolderRevisionCompactor { fn bytes_from_revisions(&self, revisions: Vec) -> FlowyResult { let delta = make_delta_from_revisions::(revisions)?; diff --git a/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs b/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs index bc8838059a..ee8731c870 100644 --- a/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs +++ b/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs @@ -88,7 +88,7 @@ impl FolderMigration { return Ok(None); } let pool = self.database.db_pool()?; - let disk_cache = Arc::new(SQLiteTextBlockRevisionPersistence::new(user_id, pool)); + let disk_cache = SQLiteTextBlockRevisionPersistence::new(user_id, pool); let rev_persistence = Arc::new(RevisionPersistence::new(user_id, folder_id.as_ref(), disk_cache)); let (revisions, _) = RevisionLoader { object_id: folder_id.as_ref().to_owned(), diff --git a/frontend/rust-lib/flowy-grid/src/manager.rs b/frontend/rust-lib/flowy-grid/src/manager.rs index 08d9559d41..8d8171ee02 100644 --- a/frontend/rust-lib/flowy-grid/src/manager.rs +++ b/frontend/rust-lib/flowy-grid/src/manager.rs @@ -1,5 +1,5 @@ use crate::services::block::make_grid_block_meta_rev_manager; -use crate::services::grid_meta_editor::GridMetaEditor; +use crate::services::grid_meta_editor::{GridMetaEditor, GridMetaRevisionCompactor}; use crate::services::persistence::block_index::BlockIndexCache; use crate::services::persistence::kv::GridKVPersistence; use crate::services::persistence::GridDatabase; @@ -9,7 +9,7 @@ use flowy_database::ConnectionPool; use flowy_error::{FlowyError, FlowyResult}; use flowy_grid_data_model::entities::{BuildGridContext, GridMeta}; use flowy_revision::disk::SQLiteGridRevisionPersistence; -use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket}; +use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionHistoryPersistence}; use flowy_sync::client_grid::{make_block_meta_delta, make_grid_delta}; use flowy_sync::entities::revision::{RepeatedRevision, Revision}; use std::sync::Arc; @@ -128,9 +128,11 @@ impl GridManager { pub fn make_grid_rev_manager(&self, grid_id: &str, pool: Arc) -> FlowyResult { let user_id = self.grid_user.user_id()?; - let disk_cache = Arc::new(SQLiteGridRevisionPersistence::new(&user_id, pool)); - let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, grid_id, disk_cache)); - let rev_manager = RevisionManager::new(&user_id, grid_id, rev_persistence); + let disk_cache = SQLiteGridRevisionPersistence::new(&user_id, pool.clone()); + let rev_persistence = RevisionPersistence::new(&user_id, grid_id, disk_cache); + let history_persistence = SQLiteRevisionHistoryPersistence::new(pool); + let rev_compactor = GridMetaRevisionCompactor(); + let rev_manager = RevisionManager::new(&user_id, grid_id, rev_persistence, rev_compactor, history_persistence); Ok(rev_manager) } } diff --git a/frontend/rust-lib/flowy-grid/src/services/block/block_meta_editor.rs b/frontend/rust-lib/flowy-grid/src/services/block/block_meta_editor.rs index 81c23f9c69..ee43aebf10 100644 --- a/frontend/rust-lib/flowy-grid/src/services/block/block_meta_editor.rs +++ b/frontend/rust-lib/flowy-grid/src/services/block/block_meta_editor.rs @@ -169,10 +169,7 @@ impl GridBlockMetaEditor { &user_id, md5, ); - let _ = self - .rev_manager - .add_local_revision(&revision, Box::new(GridBlockMetaRevisionCompactor())) - .await?; + let _ = self.rev_manager.add_local_revision(&revision).await?; Ok(()) } } @@ -199,7 +196,7 @@ impl RevisionObjectBuilder for GridBlockMetaPadBuilder { } } -struct GridBlockMetaRevisionCompactor(); +pub struct GridBlockMetaRevisionCompactor(); impl RevisionCompactor for GridBlockMetaRevisionCompactor { fn bytes_from_revisions(&self, revisions: Vec) -> FlowyResult { let delta = make_delta_from_revisions::(revisions)?; diff --git a/frontend/rust-lib/flowy-grid/src/services/block/block_meta_manager.rs b/frontend/rust-lib/flowy-grid/src/services/block/block_meta_manager.rs index a1d9e27f4a..417c158cff 100644 --- a/frontend/rust-lib/flowy-grid/src/services/block/block_meta_manager.rs +++ b/frontend/rust-lib/flowy-grid/src/services/block/block_meta_manager.rs @@ -1,6 +1,6 @@ use crate::dart_notification::{send_dart_notification, GridNotification}; use crate::manager::GridUser; -use crate::services::block::GridBlockMetaEditor; +use crate::services::block::{GridBlockMetaEditor, GridBlockMetaRevisionCompactor}; use crate::services::persistence::block_index::BlockIndexCache; use crate::services::row::{group_row_orders, GridBlockSnapshot}; use dashmap::DashMap; @@ -10,7 +10,7 @@ use flowy_grid_data_model::entities::{ RowMeta, RowMetaChangeset, RowOrder, UpdatedRowOrder, }; use flowy_revision::disk::SQLiteGridBlockMetaRevisionPersistence; -use flowy_revision::{RevisionManager, RevisionPersistence}; +use flowy_revision::{RevisionManager, RevisionPersistence, SQLiteRevisionHistoryPersistence}; use std::borrow::Cow; use std::collections::HashMap; use std::sync::Arc; @@ -278,7 +278,16 @@ pub fn make_grid_block_meta_rev_manager(user: &Arc, block_id: &str let user_id = user.user_id()?; let pool = user.db_pool()?; - let disk_cache = Arc::new(SQLiteGridBlockMetaRevisionPersistence::new(&user_id, pool)); - let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, block_id, disk_cache)); - Ok(RevisionManager::new(&user_id, block_id, rev_persistence)) + let disk_cache = SQLiteGridBlockMetaRevisionPersistence::new(&user_id, pool.clone()); + let rev_persistence = RevisionPersistence::new(&user_id, block_id, disk_cache); + let rev_compactor = GridBlockMetaRevisionCompactor(); + let history_persistence = SQLiteRevisionHistoryPersistence::new(pool); + + Ok(RevisionManager::new( + &user_id, + block_id, + rev_persistence, + rev_compactor, + history_persistence, + )) } diff --git a/frontend/rust-lib/flowy-grid/src/services/grid_meta_editor.rs b/frontend/rust-lib/flowy-grid/src/services/grid_meta_editor.rs index d584e1dc31..4f4b21ad4c 100644 --- a/frontend/rust-lib/flowy-grid/src/services/grid_meta_editor.rs +++ b/frontend/rust-lib/flowy-grid/src/services/grid_meta_editor.rs @@ -547,10 +547,7 @@ impl GridMetaEditor { &user_id, md5, ); - let _ = self - .rev_manager - .add_local_revision(&revision, Box::new(GridRevisionCompactor())) - .await?; + let _ = self.rev_manager.add_local_revision(&revision).await?; Ok(()) } @@ -629,8 +626,8 @@ impl RevisionCloudService for GridRevisionCloudService { } } -struct GridRevisionCompactor(); -impl RevisionCompactor for GridRevisionCompactor { +pub struct GridMetaRevisionCompactor(); +impl RevisionCompactor for GridMetaRevisionCompactor { fn bytes_from_revisions(&self, revisions: Vec) -> FlowyResult { let delta = make_delta_from_revisions::(revisions)?; Ok(delta.to_delta_bytes()) diff --git a/frontend/rust-lib/flowy-revision/src/cache/disk/grid_meta_rev_impl.rs b/frontend/rust-lib/flowy-revision/src/cache/disk/grid_block_meta_rev_impl.rs similarity index 99% rename from frontend/rust-lib/flowy-revision/src/cache/disk/grid_meta_rev_impl.rs rename to frontend/rust-lib/flowy-revision/src/cache/disk/grid_block_meta_rev_impl.rs index e850119c30..93af25e7ca 100644 --- a/frontend/rust-lib/flowy-revision/src/cache/disk/grid_meta_rev_impl.rs +++ b/frontend/rust-lib/flowy-revision/src/cache/disk/grid_block_meta_rev_impl.rs @@ -1,6 +1,5 @@ use crate::cache::disk::RevisionDiskCache; use crate::disk::{RevisionChangeset, RevisionRecord, RevisionState}; - use bytes::Bytes; use diesel::{sql_types::Integer, update, SqliteConnection}; use flowy_database::{ diff --git a/frontend/rust-lib/flowy-revision/src/cache/disk/mod.rs b/frontend/rust-lib/flowy-revision/src/cache/disk/mod.rs index 945c6d707f..814b591f15 100644 --- a/frontend/rust-lib/flowy-revision/src/cache/disk/mod.rs +++ b/frontend/rust-lib/flowy-revision/src/cache/disk/mod.rs @@ -1,10 +1,10 @@ mod folder_rev_impl; -mod grid_meta_rev_impl; +mod grid_block_meta_rev_impl; mod grid_rev_impl; mod text_rev_impl; pub use folder_rev_impl::*; -pub use grid_meta_rev_impl::*; +pub use grid_block_meta_rev_impl::*; pub use grid_rev_impl::*; pub use text_rev_impl::*; diff --git a/frontend/rust-lib/flowy-revision/src/history/mod.rs b/frontend/rust-lib/flowy-revision/src/history/mod.rs index be42f6f9be..9de42831ed 100644 --- a/frontend/rust-lib/flowy-revision/src/history/mod.rs +++ b/frontend/rust-lib/flowy-revision/src/history/mod.rs @@ -1,4 +1,5 @@ mod persistence; mod rev_history; +pub use persistence::*; pub use rev_history::*; diff --git a/frontend/rust-lib/flowy-revision/src/history/persistence.rs b/frontend/rust-lib/flowy-revision/src/history/persistence.rs index c548e04940..01d22fe6a4 100644 --- a/frontend/rust-lib/flowy-revision/src/history/persistence.rs +++ b/frontend/rust-lib/flowy-revision/src/history/persistence.rs @@ -1,22 +1,59 @@ use crate::history::RevisionHistoryDiskCache; -use flowy_error::FlowyError; +use diesel::{sql_types::Integer, update, SqliteConnection}; +use flowy_database::{ + prelude::*, + schema::{rev_history, rev_history::dsl}, + ConnectionPool, +}; +use flowy_error::{FlowyError, FlowyResult}; use flowy_sync::entities::revision::Revision; +use std::sync::Arc; -pub struct SQLiteRevisionHistoryPersistence {} +pub struct SQLiteRevisionHistoryPersistence { + pool: Arc, +} impl SQLiteRevisionHistoryPersistence { - pub fn new() -> Self { - Self {} + pub fn new(pool: Arc) -> Self { + Self { pool } } } impl RevisionHistoryDiskCache for SQLiteRevisionHistoryPersistence { - type Error = FlowyError; + fn save_revision(&self, revision: Revision) -> FlowyResult<()> { + todo!() + } - fn save_revision(&self, revision: Revision) -> Result<(), Self::Error> { + fn read_revision(&self, rev_id: i64) -> FlowyResult { + todo!() + } + + fn clear(&self) -> FlowyResult<()> { todo!() } } struct RevisionHistorySql(); -impl RevisionHistorySql {} +impl RevisionHistorySql { + fn read_revision(object_id: &str, rev_id: i64, conn: &SqliteConnection) -> Result { + let records: Vec = dsl::rev_history + .filter(dsl::start_rev_id.lt(rev_id)) + .filter(dsl::end_rev_id.ge(rev_id)) + .filter(dsl::object_id.eq(object_id)) + .load::(conn)?; + + debug_assert_eq!(records.len(), 1); + + todo!() + } +} + +#[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)] +#[table_name = "rev_history"] +struct RevisionRecord { + id: i32, + object_id: String, + start_rev_id: i64, + end_rev_id: i64, + data: Vec, +} diff --git a/frontend/rust-lib/flowy-revision/src/history/rev_history.rs b/frontend/rust-lib/flowy-revision/src/history/rev_history.rs index 8c0b10e071..faa35a9ede 100644 --- a/frontend/rust-lib/flowy-revision/src/history/rev_history.rs +++ b/frontend/rust-lib/flowy-revision/src/history/rev_history.rs @@ -1,78 +1,195 @@ use crate::history::persistence::SQLiteRevisionHistoryPersistence; -use flowy_error::FlowyError; +use crate::RevisionCompactor; +use async_stream::stream; +use flowy_database::ConnectionPool; +use flowy_error::{FlowyError, FlowyResult}; use flowy_sync::entities::revision::Revision; +use futures_util::future::BoxFuture; +use futures_util::stream::StreamExt; +use futures_util::FutureExt; use std::fmt::Debug; use std::sync::Arc; -use tokio::sync::RwLock; +use std::time::Duration; +use tokio::sync::mpsc::error::SendError; +use tokio::sync::mpsc::Sender; +use tokio::sync::{mpsc, oneshot, RwLock}; +use tokio::time::interval; pub trait RevisionHistoryDiskCache: Send + Sync { - type Error: Debug; + fn save_revision(&self, revision: Revision) -> FlowyResult<()>; - fn save_revision(&self, revision: Revision) -> Result<(), Self::Error>; + fn read_revision(&self, rev_id: i64) -> FlowyResult; + + fn clear(&self) -> FlowyResult<()>; } pub struct RevisionHistory { + stop_timer: mpsc::Sender<()>, config: RevisionHistoryConfig, - checkpoint: Arc>, - disk_cache: Arc>, + revisions: Arc>>, + disk_cache: Arc, } impl RevisionHistory { - pub fn new(config: RevisionHistoryConfig) -> Self { - let disk_cache = Arc::new(SQLiteRevisionHistoryPersistence::new()); + pub fn new( + user_id: &str, + object_id: &str, + config: RevisionHistoryConfig, + disk_cache: Arc, + rev_compactor: Arc, + ) -> Self { + let user_id = user_id.to_string(); + let object_id = object_id.to_string(); let cloned_disk_cache = disk_cache.clone(); - let checkpoint = HistoryCheckpoint::from_config(&config, move |revision| { - let _ = cloned_disk_cache.save_revision(revision); - }); - let checkpoint = Arc::new(RwLock::new(checkpoint)); + let (stop_timer, stop_rx) = mpsc::channel(1); + let (checkpoint_tx, checkpoint_rx) = mpsc::channel(1); + let revisions = Arc::new(RwLock::new(vec![])); + let fix_duration_checkpoint_tx = FixedDurationCheckpointSender { + user_id, + object_id, + checkpoint_tx, + disk_cache: cloned_disk_cache, + revisions: revisions.clone(), + rev_compactor, + duration: config.check_duration, + }; + + tokio::spawn(CheckpointRunner::new(stop_rx, checkpoint_rx).run()); + tokio::spawn(fix_duration_checkpoint_tx.run()); Self { + stop_timer, config, - checkpoint, + revisions, disk_cache, } } - pub async fn save_revision(&self, revision: &Revision) { - self.checkpoint.write().await.add_revision(revision); + pub async fn add_revision(&self, revision: &Revision) { + self.revisions.write().await.push(revision.clone()); + } + + pub async fn reset_history(&self) { + self.revisions.write().await.clear(); + match self.disk_cache.clear() { + Ok(_) => {} + Err(e) => tracing::error!("Clear history failed: {:?}", e), + } } } pub struct RevisionHistoryConfig { - check_when_close: bool, - check_interval: i64, + check_duration: Duration, } impl std::default::Default for RevisionHistoryConfig { fn default() -> Self { Self { - check_when_close: true, - check_interval: 19, + check_duration: Duration::from_secs(5), } } } +struct CheckpointRunner { + stop_rx: Option>, + checkpoint_rx: Option>, +} + +impl CheckpointRunner { + fn new(stop_rx: mpsc::Receiver<()>, checkpoint_rx: mpsc::Receiver) -> Self { + Self { + stop_rx: Some(stop_rx), + checkpoint_rx: Some(checkpoint_rx), + } + } + + async fn run(mut self) { + let mut stop_rx = self.stop_rx.take().expect("It should only run once"); + let mut checkpoint_rx = self.checkpoint_rx.take().expect("It should only run once"); + let stream = stream! { + loop { + tokio::select! { + result = checkpoint_rx.recv() => { + match result { + Some(checkpoint) => yield checkpoint, + None => {}, + } + }, + _ = stop_rx.recv() => { + tracing::trace!("Checkpoint runner exit"); + break + }, + }; + } + }; + + stream + .for_each(|checkpoint| async move { + checkpoint.write().await; + }) + .await; + } +} + struct HistoryCheckpoint { - interval: i64, + user_id: String, + object_id: String, revisions: Vec, - on_check: Box, + disk_cache: Arc, + rev_compactor: Arc, } impl HistoryCheckpoint { - fn from_config(config: &RevisionHistoryConfig, on_check: F) -> Self - where - F: Fn(Revision) + Send + Sync + 'static, - { - Self { - interval: config.check_interval, - revisions: vec![], - on_check: Box::new(on_check), + async fn write(self) { + if self.revisions.is_empty() { + return; + } + + let result = || { + let revision = self + .rev_compactor + .compact(&self.user_id, &self.object_id, self.revisions)?; + let _ = self.disk_cache.save_revision(revision)?; + Ok::<(), FlowyError>(()) + }; + + match result() { + Ok(_) => {} + Err(e) => tracing::error!("Write history checkout failed: {:?}", e), } } - - fn check(&mut self) -> Revision { - todo!() - } - - fn add_revision(&mut self, revision: &Revision) {} +} + +struct FixedDurationCheckpointSender { + user_id: String, + object_id: String, + checkpoint_tx: mpsc::Sender, + disk_cache: Arc, + revisions: Arc>>, + rev_compactor: Arc, + duration: Duration, +} + +impl FixedDurationCheckpointSender { + fn run(self) -> BoxFuture<'static, ()> { + async move { + let mut interval = interval(self.duration); + let checkpoint_revisions: Vec = revisions.write().await.drain(..).collect(); + let checkpoint = HistoryCheckpoint { + user_id: self.user_id.clone(), + object_id: self.object_id.clone(), + revisions: checkpoint_revisions, + disk_cache: self.disk_cache.clone(), + rev_compactor: self.rev_compactor.clone(), + }; + match checkpoint_tx.send(checkpoint).await { + Ok(_) => { + interval.tick().await; + self.run(); + } + Err(_) => {} + } + } + .boxed() + } } diff --git a/frontend/rust-lib/flowy-revision/src/lib.rs b/frontend/rust-lib/flowy-revision/src/lib.rs index 6c708afab2..14f00568f2 100644 --- a/frontend/rust-lib/flowy-revision/src/lib.rs +++ b/frontend/rust-lib/flowy-revision/src/lib.rs @@ -7,6 +7,7 @@ mod ws_manager; pub use cache::*; pub use conflict_resolve::*; +pub use history::*; pub use rev_manager::*; pub use rev_persistence::*; pub use ws_manager::*; diff --git a/frontend/rust-lib/flowy-revision/src/rev_manager.rs b/frontend/rust-lib/flowy-revision/src/rev_manager.rs index 15f37af456..bed942b7b4 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_manager.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_manager.rs @@ -1,5 +1,5 @@ use crate::disk::RevisionState; -use crate::history::{RevisionHistory, RevisionHistoryConfig}; +use crate::history::{RevisionHistory, RevisionHistoryConfig, RevisionHistoryDiskCache}; use crate::{RevisionPersistence, WSDataProviderDataSource}; use bytes::Bytes; use flowy_error::{FlowyError, FlowyResult}; @@ -47,15 +47,36 @@ pub struct RevisionManager { rev_id_counter: RevIdCounter, rev_persistence: Arc, rev_history: Arc, + rev_compactor: Arc, #[cfg(feature = "flowy_unit_test")] rev_ack_notifier: tokio::sync::broadcast::Sender, } impl RevisionManager { - pub fn new(user_id: &str, object_id: &str, rev_persistence: Arc) -> Self { + pub fn new( + user_id: &str, + object_id: &str, + rev_persistence: RevisionPersistence, + rev_compactor: C, + history_persistence: P, + ) -> Self + where + P: 'static + RevisionHistoryDiskCache, + C: 'static + RevisionCompactor, + { let rev_id_counter = RevIdCounter::new(0); + let rev_compactor = Arc::new(rev_compactor); + let history_persistence = Arc::new(history_persistence); let rev_history_config = RevisionHistoryConfig::default(); - let rev_history = Arc::new(RevisionHistory::new(rev_history_config)); + let rev_persistence = Arc::new(rev_persistence); + + let rev_history = Arc::new(RevisionHistory::new( + user_id, + object_id, + rev_history_config, + history_persistence, + rev_compactor.clone(), + )); #[cfg(feature = "flowy_unit_test")] let (revision_ack_notifier, _) = tokio::sync::broadcast::channel(1); @@ -65,7 +86,7 @@ impl RevisionManager { rev_id_counter, rev_persistence, rev_history, - + rev_compactor, #[cfg(feature = "flowy_unit_test")] rev_ack_notifier: revision_ack_notifier, } @@ -93,6 +114,7 @@ impl RevisionManager { pub async fn reset_object(&self, revisions: RepeatedRevision) -> FlowyResult<()> { let rev_id = pair_rev_id_from_revisions(&revisions).1; let _ = self.rev_persistence.reset(revisions.into_inner()).await?; + self.rev_history.reset_history().await; self.rev_id_counter.set(rev_id); Ok(()) } @@ -104,20 +126,21 @@ impl RevisionManager { } let _ = self.rev_persistence.add_ack_revision(revision).await?; + self.rev_history.add_revision(revision).await; self.rev_id_counter.set(revision.rev_id); Ok(()) } #[tracing::instrument(level = "debug", skip_all, err)] - pub async fn add_local_revision<'a>( - &'a self, - revision: &Revision, - compactor: Box, - ) -> Result<(), FlowyError> { + pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), FlowyError> { if revision.delta_data.is_empty() { return Err(FlowyError::internal().context("Delta data should be empty")); } - let rev_id = self.rev_persistence.add_sync_revision(revision, compactor).await?; + let rev_id = self + .rev_persistence + .add_sync_revision(revision, &self.rev_compactor) + .await?; + self.rev_history.add_revision(revision).await; self.rev_id_counter.set(rev_id); Ok(()) } diff --git a/frontend/rust-lib/flowy-revision/src/rev_persistence.rs b/frontend/rust-lib/flowy-revision/src/rev_persistence.rs index 6412b26ec6..98bc89ba24 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_persistence.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_persistence.rs @@ -24,13 +24,13 @@ pub struct RevisionPersistence { } impl RevisionPersistence { - pub fn new( - user_id: &str, - object_id: &str, - disk_cache: Arc>, - ) -> RevisionPersistence { + pub fn new(user_id: &str, object_id: &str, disk_cache: C) -> RevisionPersistence + where + C: 'static + RevisionDiskCache, + { let object_id = object_id.to_owned(); let user_id = user_id.to_owned(); + let disk_cache = Arc::new(disk_cache) as Arc>; let sync_seq = RwLock::new(RevisionSyncSequence::new()); let memory_cache = Arc::new(RevisionMemoryCache::new(&object_id, Arc::new(disk_cache.clone()))); Self { @@ -63,7 +63,7 @@ impl RevisionPersistence { pub(crate) async fn add_sync_revision<'a>( &'a self, revision: &'a Revision, - compactor: Box, + compactor: &Arc, ) -> FlowyResult { let result = self.sync_seq.read().await.compact(); match result { diff --git a/frontend/rust-lib/flowy-revision/src/ws_manager.rs b/frontend/rust-lib/flowy-revision/src/ws_manager.rs index 42ad39c617..eb7539c380 100644 --- a/frontend/rust-lib/flowy-revision/src/ws_manager.rs +++ b/frontend/rust-lib/flowy-revision/src/ws_manager.rs @@ -1,6 +1,5 @@ use crate::ConflictRevisionSink; use async_stream::stream; - use bytes::Bytes; use flowy_error::{FlowyError, FlowyResult}; use flowy_sync::entities::{ diff --git a/frontend/rust-lib/flowy-text-block/src/manager.rs b/frontend/rust-lib/flowy-text-block/src/manager.rs index 9325fe6001..a6cdda1220 100644 --- a/frontend/rust-lib/flowy-text-block/src/manager.rs +++ b/frontend/rust-lib/flowy-text-block/src/manager.rs @@ -1,10 +1,13 @@ +use crate::queue::TextBlockRevisionCompactor; use crate::{editor::TextBlockEditor, errors::FlowyError, BlockCloudService}; use bytes::Bytes; use dashmap::DashMap; use flowy_database::ConnectionPool; use flowy_error::FlowyResult; use flowy_revision::disk::SQLiteTextBlockRevisionPersistence; -use flowy_revision::{RevisionCloudService, RevisionManager, RevisionPersistence, RevisionWebSocket}; +use flowy_revision::{ + RevisionCloudService, RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionHistoryPersistence, +}; use flowy_sync::entities::{ revision::{md5, RepeatedRevision, Revision}, text_block_info::{TextBlockDelta, TextBlockId}, @@ -139,9 +142,18 @@ impl TextBlockManager { fn make_rev_manager(&self, doc_id: &str, pool: Arc) -> Result { let user_id = self.user.user_id()?; - let disk_cache = Arc::new(SQLiteTextBlockRevisionPersistence::new(&user_id, pool)); - let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, doc_id, disk_cache)); - Ok(RevisionManager::new(&user_id, doc_id, rev_persistence)) + let disk_cache = SQLiteTextBlockRevisionPersistence::new(&user_id, pool.clone()); + let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache); + let history_persistence = SQLiteRevisionHistoryPersistence::new(pool); + let rev_compactor = TextBlockRevisionCompactor(); + + Ok(RevisionManager::new( + &user_id, + doc_id, + rev_persistence, + rev_compactor, + history_persistence, + )) } } diff --git a/frontend/rust-lib/flowy-text-block/src/queue.rs b/frontend/rust-lib/flowy-text-block/src/queue.rs index 7c11afd0ff..343ad11e7b 100644 --- a/frontend/rust-lib/flowy-text-block/src/queue.rs +++ b/frontend/rust-lib/flowy-text-block/src/queue.rs @@ -186,10 +186,7 @@ impl EditBlockQueue { &user_id, md5, ); - let _ = self - .rev_manager - .add_local_revision(&revision, Box::new(TextBlockRevisionCompactor())) - .await?; + let _ = self.rev_manager.add_local_revision(&revision).await?; Ok(rev_id.into()) } } diff --git a/shared-lib/flowy-sync/src/client_grid/grid_meta_pad.rs b/shared-lib/flowy-sync/src/client_grid/grid_meta_pad.rs index 1d800961e3..3ef9251e4a 100644 --- a/shared-lib/flowy-sync/src/client_grid/grid_meta_pad.rs +++ b/shared-lib/flowy-sync/src/client_grid/grid_meta_pad.rs @@ -25,12 +25,7 @@ pub trait JsonDeserializer { impl GridMetaPad { pub async fn duplicate_grid_meta(&self) -> (Vec, Vec) { - let fields = self - .grid_meta - .fields - .iter() - .map(|field| field.clone()) - .collect::>(); + let fields = self.grid_meta.fields.to_vec(); let blocks = self .grid_meta From d5eabe4ea3ac74aa033c2721664ae3ab882a7906 Mon Sep 17 00:00:00 2001 From: appflowy Date: Fri, 10 Jun 2022 22:27:19 +0800 Subject: [PATCH 4/5] chore: add snapshot --- .../down.sql | 2 + .../up.sql | 7 ++ .../rust-lib/flowy-database/src/schema.rs | 10 ++ frontend/rust-lib/flowy-folder/src/manager.rs | 12 ++- frontend/rust-lib/flowy-grid/src/manager.rs | 17 +++- .../src/services/block/block_meta_manager.rs | 9 +- .../src/cache/disk/grid_rev_impl.rs | 1 - .../flowy-revision/src/history/persistence.rs | 69 +++++++++----- .../flowy-revision/src/history/rev_history.rs | 94 ++++++++++--------- frontend/rust-lib/flowy-revision/src/lib.rs | 2 + .../flowy-revision/src/rev_manager.rs | 22 +++-- .../flowy-revision/src/snapshot/mod.rs | 5 + .../src/snapshot/persistence.rs | 28 ++++++ .../src/snapshot/rev_snapshot.rs | 29 ++++++ .../rust-lib/flowy-text-block/src/manager.rs | 5 +- 15 files changed, 225 insertions(+), 87 deletions(-) create mode 100644 frontend/rust-lib/flowy-database/migrations/2022-06-10-140131_revision-snapshot/down.sql create mode 100644 frontend/rust-lib/flowy-database/migrations/2022-06-10-140131_revision-snapshot/up.sql create mode 100644 frontend/rust-lib/flowy-revision/src/snapshot/mod.rs create mode 100644 frontend/rust-lib/flowy-revision/src/snapshot/persistence.rs create mode 100644 frontend/rust-lib/flowy-revision/src/snapshot/rev_snapshot.rs diff --git a/frontend/rust-lib/flowy-database/migrations/2022-06-10-140131_revision-snapshot/down.sql b/frontend/rust-lib/flowy-database/migrations/2022-06-10-140131_revision-snapshot/down.sql new file mode 100644 index 0000000000..01a87a1e99 --- /dev/null +++ b/frontend/rust-lib/flowy-database/migrations/2022-06-10-140131_revision-snapshot/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +DROP TABLE rev_snapshot; \ No newline at end of file diff --git a/frontend/rust-lib/flowy-database/migrations/2022-06-10-140131_revision-snapshot/up.sql b/frontend/rust-lib/flowy-database/migrations/2022-06-10-140131_revision-snapshot/up.sql new file mode 100644 index 0000000000..9656dabc3c --- /dev/null +++ b/frontend/rust-lib/flowy-database/migrations/2022-06-10-140131_revision-snapshot/up.sql @@ -0,0 +1,7 @@ +-- Your SQL goes here +CREATE TABLE rev_snapshot ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + object_id TEXT NOT NULL DEFAULT '', + rev_id BIGINT NOT NULL DEFAULT 0, + data BLOB NOT NULL DEFAULT (x'') +); \ No newline at end of file diff --git a/frontend/rust-lib/flowy-database/src/schema.rs b/frontend/rust-lib/flowy-database/src/schema.rs index b57ff46875..ee81806ccf 100644 --- a/frontend/rust-lib/flowy-database/src/schema.rs +++ b/frontend/rust-lib/flowy-database/src/schema.rs @@ -67,6 +67,15 @@ table! { } } +table! { + rev_snapshot (id) { + id -> Integer, + object_id -> Text, + rev_id -> BigInt, + data -> Binary, + } +} + table! { rev_table (id) { id -> Integer, @@ -135,6 +144,7 @@ allow_tables_to_appear_in_same_query!( grid_rev_table, kv_table, rev_history, + rev_snapshot, rev_table, trash_table, user_table, diff --git a/frontend/rust-lib/flowy-folder/src/manager.rs b/frontend/rust-lib/flowy-folder/src/manager.rs index 035555576a..557f0e7cf6 100644 --- a/frontend/rust-lib/flowy-folder/src/manager.rs +++ b/frontend/rust-lib/flowy-folder/src/manager.rs @@ -16,7 +16,10 @@ use flowy_error::FlowyError; use flowy_folder_data_model::entities::view::ViewDataType; use flowy_folder_data_model::user_default; use flowy_revision::disk::SQLiteTextBlockRevisionPersistence; -use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionHistoryPersistence}; +use flowy_revision::{ + RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionHistoryPersistence, + SQLiteRevisionSnapshotPersistence, +}; use flowy_sync::{client_folder::FolderPad, entities::ws_data::ServerRevisionWSData}; use lazy_static::lazy_static; use lib_infra::future::FutureResult; @@ -163,16 +166,19 @@ impl FolderManager { let _ = self.persistence.initialize(user_id, &folder_id).await?; let pool = self.persistence.db_pool()?; + let object_id = folder_id.as_ref(); let disk_cache = SQLiteTextBlockRevisionPersistence::new(user_id, pool.clone()); - let rev_persistence = RevisionPersistence::new(user_id, folder_id.as_ref(), disk_cache); + let rev_persistence = RevisionPersistence::new(user_id, object_id, disk_cache); let rev_compactor = FolderRevisionCompactor(); - let history_persistence = SQLiteRevisionHistoryPersistence::new(pool); + let history_persistence = SQLiteRevisionHistoryPersistence::new(object_id, pool.clone()); + let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(object_id, pool); let rev_manager = RevisionManager::new( user_id, folder_id.as_ref(), rev_persistence, rev_compactor, history_persistence, + snapshot_persistence, ); let folder_editor = FolderEditor::new(user_id, &folder_id, token, rev_manager, self.web_socket.clone()).await?; diff --git a/frontend/rust-lib/flowy-grid/src/manager.rs b/frontend/rust-lib/flowy-grid/src/manager.rs index 8d8171ee02..1a1d6ff91e 100644 --- a/frontend/rust-lib/flowy-grid/src/manager.rs +++ b/frontend/rust-lib/flowy-grid/src/manager.rs @@ -9,7 +9,10 @@ use flowy_database::ConnectionPool; use flowy_error::{FlowyError, FlowyResult}; use flowy_grid_data_model::entities::{BuildGridContext, GridMeta}; use flowy_revision::disk::SQLiteGridRevisionPersistence; -use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionHistoryPersistence}; +use flowy_revision::{ + RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionHistoryPersistence, + SQLiteRevisionSnapshotPersistence, +}; use flowy_sync::client_grid::{make_block_meta_delta, make_grid_delta}; use flowy_sync::entities::revision::{RepeatedRevision, Revision}; use std::sync::Arc; @@ -130,9 +133,17 @@ impl GridManager { let disk_cache = SQLiteGridRevisionPersistence::new(&user_id, pool.clone()); let rev_persistence = RevisionPersistence::new(&user_id, grid_id, disk_cache); - let history_persistence = SQLiteRevisionHistoryPersistence::new(pool); + let history_persistence = SQLiteRevisionHistoryPersistence::new(grid_id, pool.clone()); + let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(grid_id, pool); let rev_compactor = GridMetaRevisionCompactor(); - let rev_manager = RevisionManager::new(&user_id, grid_id, rev_persistence, rev_compactor, history_persistence); + let rev_manager = RevisionManager::new( + &user_id, + grid_id, + rev_persistence, + rev_compactor, + history_persistence, + snapshot_persistence, + ); Ok(rev_manager) } } diff --git a/frontend/rust-lib/flowy-grid/src/services/block/block_meta_manager.rs b/frontend/rust-lib/flowy-grid/src/services/block/block_meta_manager.rs index 417c158cff..232b342367 100644 --- a/frontend/rust-lib/flowy-grid/src/services/block/block_meta_manager.rs +++ b/frontend/rust-lib/flowy-grid/src/services/block/block_meta_manager.rs @@ -10,7 +10,9 @@ use flowy_grid_data_model::entities::{ RowMeta, RowMetaChangeset, RowOrder, UpdatedRowOrder, }; use flowy_revision::disk::SQLiteGridBlockMetaRevisionPersistence; -use flowy_revision::{RevisionManager, RevisionPersistence, SQLiteRevisionHistoryPersistence}; +use flowy_revision::{ + RevisionManager, RevisionPersistence, SQLiteRevisionHistoryPersistence, SQLiteRevisionSnapshotPersistence, +}; use std::borrow::Cow; use std::collections::HashMap; use std::sync::Arc; @@ -280,8 +282,10 @@ pub fn make_grid_block_meta_rev_manager(user: &Arc, block_id: &str let disk_cache = SQLiteGridBlockMetaRevisionPersistence::new(&user_id, pool.clone()); let rev_persistence = RevisionPersistence::new(&user_id, block_id, disk_cache); + let rev_compactor = GridBlockMetaRevisionCompactor(); - let history_persistence = SQLiteRevisionHistoryPersistence::new(pool); + let history_persistence = SQLiteRevisionHistoryPersistence::new(block_id, pool.clone()); + let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(block_id, pool); Ok(RevisionManager::new( &user_id, @@ -289,5 +293,6 @@ pub fn make_grid_block_meta_rev_manager(user: &Arc, block_id: &str rev_persistence, rev_compactor, history_persistence, + snapshot_persistence, )) } diff --git a/frontend/rust-lib/flowy-revision/src/cache/disk/grid_rev_impl.rs b/frontend/rust-lib/flowy-revision/src/cache/disk/grid_rev_impl.rs index d51ea8e48c..9ddb21bc8c 100644 --- a/frontend/rust-lib/flowy-revision/src/cache/disk/grid_rev_impl.rs +++ b/frontend/rust-lib/flowy-revision/src/cache/disk/grid_rev_impl.rs @@ -95,7 +95,6 @@ struct GridRevisionSql(); impl GridRevisionSql { fn create(revision_records: Vec, conn: &SqliteConnection) -> Result<(), FlowyError> { // Batch insert: https://diesel.rs/guides/all-about-inserts.html - let records = revision_records .into_iter() .map(|record| { diff --git a/frontend/rust-lib/flowy-revision/src/history/persistence.rs b/frontend/rust-lib/flowy-revision/src/history/persistence.rs index 01d22fe6a4..9c1bdacc9e 100644 --- a/frontend/rust-lib/flowy-revision/src/history/persistence.rs +++ b/frontend/rust-lib/flowy-revision/src/history/persistence.rs @@ -1,50 +1,51 @@ use crate::history::RevisionHistoryDiskCache; -use diesel::{sql_types::Integer, update, SqliteConnection}; use flowy_database::{ prelude::*, schema::{rev_history, rev_history::dsl}, ConnectionPool, }; -use flowy_error::{FlowyError, FlowyResult}; +use flowy_error::{internal_error, FlowyResult}; use flowy_sync::entities::revision::Revision; use std::sync::Arc; pub struct SQLiteRevisionHistoryPersistence { + object_id: String, pool: Arc, } impl SQLiteRevisionHistoryPersistence { - pub fn new(pool: Arc) -> Self { - Self { pool } + pub fn new(object_id: &str, pool: Arc) -> Self { + let object_id = object_id.to_owned(); + Self { object_id, pool } } } impl RevisionHistoryDiskCache for SQLiteRevisionHistoryPersistence { - fn save_revision(&self, revision: Revision) -> FlowyResult<()> { - todo!() + fn write_history(&self, revision: Revision) -> FlowyResult<()> { + let record = ( + dsl::object_id.eq(revision.object_id), + dsl::start_rev_id.eq(revision.base_rev_id), + dsl::end_rev_id.eq(revision.rev_id), + dsl::data.eq(revision.delta_data), + ); + let conn = self.pool.get().map_err(internal_error)?; + + let _ = insert_or_ignore_into(dsl::rev_history) + .values(vec![record]) + .execute(&*conn)?; + Ok(()) } - fn read_revision(&self, rev_id: i64) -> FlowyResult { - todo!() - } - - fn clear(&self) -> FlowyResult<()> { - todo!() - } -} - -struct RevisionHistorySql(); -impl RevisionHistorySql { - fn read_revision(object_id: &str, rev_id: i64, conn: &SqliteConnection) -> Result { + fn read_histories(&self) -> FlowyResult> { + let conn = self.pool.get().map_err(internal_error)?; let records: Vec = dsl::rev_history - .filter(dsl::start_rev_id.lt(rev_id)) - .filter(dsl::end_rev_id.ge(rev_id)) - .filter(dsl::object_id.eq(object_id)) - .load::(conn)?; + .filter(dsl::object_id.eq(&self.object_id)) + .load::(&*conn)?; - debug_assert_eq!(records.len(), 1); - - todo!() + Ok(records + .into_iter() + .map(|record| record.into()) + .collect::>()) } } @@ -57,3 +58,21 @@ struct RevisionRecord { end_rev_id: i64, data: Vec, } + +pub struct RevisionHistory { + pub object_id: String, + pub start_rev_id: i64, + pub end_rev_id: i64, + pub data: Vec, +} + +impl std::convert::From for RevisionHistory { + fn from(record: RevisionRecord) -> Self { + RevisionHistory { + object_id: record.object_id, + start_rev_id: record.start_rev_id, + end_rev_id: record.end_rev_id, + data: record.data, + } + } +} diff --git a/frontend/rust-lib/flowy-revision/src/history/rev_history.rs b/frontend/rust-lib/flowy-revision/src/history/rev_history.rs index faa35a9ede..71bdc0c333 100644 --- a/frontend/rust-lib/flowy-revision/src/history/rev_history.rs +++ b/frontend/rust-lib/flowy-revision/src/history/rev_history.rs @@ -1,36 +1,31 @@ -use crate::history::persistence::SQLiteRevisionHistoryPersistence; -use crate::RevisionCompactor; +use crate::{RevisionCompactor, RevisionHistory}; use async_stream::stream; -use flowy_database::ConnectionPool; + use flowy_error::{FlowyError, FlowyResult}; use flowy_sync::entities::revision::Revision; use futures_util::future::BoxFuture; use futures_util::stream::StreamExt; use futures_util::FutureExt; -use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; -use tokio::sync::mpsc::error::SendError; -use tokio::sync::mpsc::Sender; -use tokio::sync::{mpsc, oneshot, RwLock}; +use tokio::sync::{mpsc, RwLock}; use tokio::time::interval; pub trait RevisionHistoryDiskCache: Send + Sync { - fn save_revision(&self, revision: Revision) -> FlowyResult<()>; + fn write_history(&self, revision: Revision) -> FlowyResult<()>; - fn read_revision(&self, rev_id: i64) -> FlowyResult; - - fn clear(&self) -> FlowyResult<()>; + fn read_histories(&self) -> FlowyResult>; } -pub struct RevisionHistory { - stop_timer: mpsc::Sender<()>, +pub struct RevisionHistoryManager { + user_id: String, + stop_tx: mpsc::Sender<()>, config: RevisionHistoryConfig, revisions: Arc>>, disk_cache: Arc, } -impl RevisionHistory { +impl RevisionHistoryManager { pub fn new( user_id: &str, object_id: &str, @@ -38,27 +33,13 @@ impl RevisionHistory { disk_cache: Arc, rev_compactor: Arc, ) -> Self { - let user_id = user_id.to_string(); - let object_id = object_id.to_string(); - let cloned_disk_cache = disk_cache.clone(); - let (stop_timer, stop_rx) = mpsc::channel(1); - let (checkpoint_tx, checkpoint_rx) = mpsc::channel(1); let revisions = Arc::new(RwLock::new(vec![])); - let fix_duration_checkpoint_tx = FixedDurationCheckpointSender { - user_id, - object_id, - checkpoint_tx, - disk_cache: cloned_disk_cache, - revisions: revisions.clone(), - rev_compactor, - duration: config.check_duration, - }; - - tokio::spawn(CheckpointRunner::new(stop_rx, checkpoint_rx).run()); - tokio::spawn(fix_duration_checkpoint_tx.run()); - + let stop_tx = + spawn_history_checkpoint_runner(user_id, object_id, &disk_cache, &revisions, rev_compactor, &config); + let user_id = user_id.to_owned(); Self { - stop_timer, + user_id, + stop_tx, config, revisions, disk_cache, @@ -69,12 +50,8 @@ impl RevisionHistory { self.revisions.write().await.push(revision.clone()); } - pub async fn reset_history(&self) { - self.revisions.write().await.clear(); - match self.disk_cache.clear() { - Ok(_) => {} - Err(e) => tracing::error!("Clear history failed: {:?}", e), - } + pub async fn read_revision_histories(&self) -> FlowyResult> { + self.disk_cache.read_histories() } } @@ -90,12 +67,41 @@ impl std::default::Default for RevisionHistoryConfig { } } -struct CheckpointRunner { +fn spawn_history_checkpoint_runner( + user_id: &str, + object_id: &str, + disk_cache: &Arc, + revisions: &Arc>>, + rev_compactor: Arc, + config: &RevisionHistoryConfig, +) -> mpsc::Sender<()> { + let user_id = user_id.to_string(); + let object_id = object_id.to_string(); + let disk_cache = disk_cache.clone(); + let revisions = revisions.clone(); + + let (checkpoint_tx, checkpoint_rx) = mpsc::channel(1); + let (stop_tx, stop_rx) = mpsc::channel(1); + let checkpoint_sender = FixedDurationCheckpointSender { + user_id, + object_id, + checkpoint_tx, + disk_cache, + revisions, + rev_compactor, + duration: config.check_duration, + }; + tokio::spawn(HistoryCheckpointRunner::new(stop_rx, checkpoint_rx).run()); + tokio::spawn(checkpoint_sender.run()); + stop_tx +} + +struct HistoryCheckpointRunner { stop_rx: Option>, checkpoint_rx: Option>, } -impl CheckpointRunner { +impl HistoryCheckpointRunner { fn new(stop_rx: mpsc::Receiver<()>, checkpoint_rx: mpsc::Receiver) -> Self { Self { stop_rx: Some(stop_rx), @@ -149,7 +155,7 @@ impl HistoryCheckpoint { let revision = self .rev_compactor .compact(&self.user_id, &self.object_id, self.revisions)?; - let _ = self.disk_cache.save_revision(revision)?; + let _ = self.disk_cache.write_history(revision)?; Ok::<(), FlowyError>(()) }; @@ -174,7 +180,7 @@ impl FixedDurationCheckpointSender { fn run(self) -> BoxFuture<'static, ()> { async move { let mut interval = interval(self.duration); - let checkpoint_revisions: Vec = revisions.write().await.drain(..).collect(); + let checkpoint_revisions: Vec = self.revisions.write().await.drain(..).collect(); let checkpoint = HistoryCheckpoint { user_id: self.user_id.clone(), object_id: self.object_id.clone(), @@ -182,7 +188,7 @@ impl FixedDurationCheckpointSender { disk_cache: self.disk_cache.clone(), rev_compactor: self.rev_compactor.clone(), }; - match checkpoint_tx.send(checkpoint).await { + match self.checkpoint_tx.send(checkpoint).await { Ok(_) => { interval.tick().await; self.run(); diff --git a/frontend/rust-lib/flowy-revision/src/lib.rs b/frontend/rust-lib/flowy-revision/src/lib.rs index 14f00568f2..8631ddf14d 100644 --- a/frontend/rust-lib/flowy-revision/src/lib.rs +++ b/frontend/rust-lib/flowy-revision/src/lib.rs @@ -3,6 +3,7 @@ mod conflict_resolve; mod history; mod rev_manager; mod rev_persistence; +mod snapshot; mod ws_manager; pub use cache::*; @@ -10,6 +11,7 @@ pub use conflict_resolve::*; pub use history::*; pub use rev_manager::*; pub use rev_persistence::*; +pub use snapshot::*; pub use ws_manager::*; #[macro_use] diff --git a/frontend/rust-lib/flowy-revision/src/rev_manager.rs b/frontend/rust-lib/flowy-revision/src/rev_manager.rs index bed942b7b4..5d10185dc9 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_manager.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_manager.rs @@ -1,6 +1,6 @@ use crate::disk::RevisionState; -use crate::history::{RevisionHistory, RevisionHistoryConfig, RevisionHistoryDiskCache}; -use crate::{RevisionPersistence, WSDataProviderDataSource}; +use crate::history::{RevisionHistoryConfig, RevisionHistoryDiskCache, RevisionHistoryManager}; +use crate::{RevisionPersistence, RevisionSnapshotDiskCache, RevisionSnapshotManager, WSDataProviderDataSource}; use bytes::Bytes; use flowy_error::{FlowyError, FlowyResult}; use flowy_sync::{ @@ -46,37 +46,43 @@ pub struct RevisionManager { user_id: String, rev_id_counter: RevIdCounter, rev_persistence: Arc, - rev_history: Arc, + rev_history: Arc, + rev_snapshot: Arc, rev_compactor: Arc, #[cfg(feature = "flowy_unit_test")] rev_ack_notifier: tokio::sync::broadcast::Sender, } impl RevisionManager { - pub fn new( + pub fn new( user_id: &str, object_id: &str, rev_persistence: RevisionPersistence, rev_compactor: C, - history_persistence: P, + history_persistence: HP, + snapshot_persistence: SP, ) -> Self where - P: 'static + RevisionHistoryDiskCache, + HP: 'static + RevisionHistoryDiskCache, + SP: 'static + RevisionSnapshotDiskCache, C: 'static + RevisionCompactor, { let rev_id_counter = RevIdCounter::new(0); let rev_compactor = Arc::new(rev_compactor); let history_persistence = Arc::new(history_persistence); + let rev_history_config = RevisionHistoryConfig::default(); let rev_persistence = Arc::new(rev_persistence); - let rev_history = Arc::new(RevisionHistory::new( + let rev_history = Arc::new(RevisionHistoryManager::new( user_id, object_id, rev_history_config, history_persistence, rev_compactor.clone(), )); + + let rev_snapshot = Arc::new(RevisionSnapshotManager::new(user_id, object_id, snapshot_persistence)); #[cfg(feature = "flowy_unit_test")] let (revision_ack_notifier, _) = tokio::sync::broadcast::channel(1); @@ -86,6 +92,7 @@ impl RevisionManager { rev_id_counter, rev_persistence, rev_history, + rev_snapshot, rev_compactor, #[cfg(feature = "flowy_unit_test")] rev_ack_notifier: revision_ack_notifier, @@ -114,7 +121,6 @@ impl RevisionManager { pub async fn reset_object(&self, revisions: RepeatedRevision) -> FlowyResult<()> { let rev_id = pair_rev_id_from_revisions(&revisions).1; let _ = self.rev_persistence.reset(revisions.into_inner()).await?; - self.rev_history.reset_history().await; self.rev_id_counter.set(rev_id); Ok(()) } diff --git a/frontend/rust-lib/flowy-revision/src/snapshot/mod.rs b/frontend/rust-lib/flowy-revision/src/snapshot/mod.rs new file mode 100644 index 0000000000..adc4ee2ccc --- /dev/null +++ b/frontend/rust-lib/flowy-revision/src/snapshot/mod.rs @@ -0,0 +1,5 @@ +mod persistence; +mod rev_snapshot; + +pub use persistence::*; +pub use rev_snapshot::*; diff --git a/frontend/rust-lib/flowy-revision/src/snapshot/persistence.rs b/frontend/rust-lib/flowy-revision/src/snapshot/persistence.rs new file mode 100644 index 0000000000..e07c541b12 --- /dev/null +++ b/frontend/rust-lib/flowy-revision/src/snapshot/persistence.rs @@ -0,0 +1,28 @@ +use crate::{RevisionSnapshotDiskCache, RevisionSnapshotInfo}; +use flowy_database::ConnectionPool; +use flowy_error::FlowyResult; +use std::sync::Arc; + +pub struct SQLiteRevisionSnapshotPersistence { + object_id: String, + pool: Arc, +} + +impl SQLiteRevisionSnapshotPersistence { + pub fn new(object_id: &str, pool: Arc) -> Self { + Self { + object_id: object_id.to_string(), + pool, + } + } +} + +impl RevisionSnapshotDiskCache for SQLiteRevisionSnapshotPersistence { + fn write_snapshot(&self, object_id: &str, rev_id: i64, data: Vec) -> FlowyResult<()> { + todo!() + } + + fn read_snapshot(&self, object_id: &str, rev_id: i64) -> FlowyResult { + todo!() + } +} diff --git a/frontend/rust-lib/flowy-revision/src/snapshot/rev_snapshot.rs b/frontend/rust-lib/flowy-revision/src/snapshot/rev_snapshot.rs new file mode 100644 index 0000000000..71e2e270ec --- /dev/null +++ b/frontend/rust-lib/flowy-revision/src/snapshot/rev_snapshot.rs @@ -0,0 +1,29 @@ +use flowy_error::FlowyResult; +use std::sync::Arc; + +pub trait RevisionSnapshotDiskCache: Send + Sync { + fn write_snapshot(&self, object_id: &str, rev_id: i64, data: Vec) -> FlowyResult<()>; + fn read_snapshot(&self, object_id: &str, rev_id: i64) -> FlowyResult; +} + +pub struct RevisionSnapshotManager { + user_id: String, + object_id: String, + disk_cache: Arc, +} + +impl RevisionSnapshotManager { + pub fn new(user_id: &str, object_id: &str, disk_cache: D) -> Self + where + D: RevisionSnapshotDiskCache + 'static, + { + let disk_cache = Arc::new(disk_cache); + Self { + user_id: user_id.to_string(), + object_id: object_id.to_string(), + disk_cache, + } + } +} + +pub struct RevisionSnapshotInfo {} diff --git a/frontend/rust-lib/flowy-text-block/src/manager.rs b/frontend/rust-lib/flowy-text-block/src/manager.rs index a6cdda1220..9b0dc18a94 100644 --- a/frontend/rust-lib/flowy-text-block/src/manager.rs +++ b/frontend/rust-lib/flowy-text-block/src/manager.rs @@ -7,6 +7,7 @@ use flowy_error::FlowyResult; use flowy_revision::disk::SQLiteTextBlockRevisionPersistence; use flowy_revision::{ RevisionCloudService, RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionHistoryPersistence, + SQLiteRevisionSnapshotPersistence, }; use flowy_sync::entities::{ revision::{md5, RepeatedRevision, Revision}, @@ -144,7 +145,8 @@ impl TextBlockManager { let user_id = self.user.user_id()?; let disk_cache = SQLiteTextBlockRevisionPersistence::new(&user_id, pool.clone()); let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache); - let history_persistence = SQLiteRevisionHistoryPersistence::new(pool); + let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone()); + let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(doc_id, pool); let rev_compactor = TextBlockRevisionCompactor(); Ok(RevisionManager::new( @@ -153,6 +155,7 @@ impl TextBlockManager { rev_persistence, rev_compactor, history_persistence, + snapshot_persistence, )) } } From cb1afacbd6f9f85e45ba9f9dcef3c7f3a4eeb1b6 Mon Sep 17 00:00:00 2001 From: appflowy Date: Wed, 20 Jul 2022 15:30:48 +0800 Subject: [PATCH 5/5] chore: rename some structs --- frontend/rust-lib/flowy-database/src/schema.rs | 11 ----------- .../rust-lib/flowy-revision/src/rev_manager.rs | 15 +-------------- .../flowy-revision/src/snapshot/persistence.rs | 3 +++ .../flowy-revision/src/snapshot/rev_snapshot.rs | 3 +++ 4 files changed, 7 insertions(+), 25 deletions(-) diff --git a/frontend/rust-lib/flowy-database/src/schema.rs b/frontend/rust-lib/flowy-database/src/schema.rs index 70d3ae4b20..e41fd6d865 100644 --- a/frontend/rust-lib/flowy-database/src/schema.rs +++ b/frontend/rust-lib/flowy-database/src/schema.rs @@ -49,16 +49,6 @@ table! { } } -table! { - rev_history (id) { - id -> Integer, - object_id -> Text, - start_rev_id -> BigInt, - end_rev_id -> BigInt, - data -> Binary, - } -} - table! { rev_snapshot (id) { id -> Integer, @@ -135,7 +125,6 @@ allow_tables_to_appear_in_same_query!( grid_meta_rev_table, grid_rev_table, kv_table, - rev_history, rev_snapshot, rev_table, trash_table, diff --git a/frontend/rust-lib/flowy-revision/src/rev_manager.rs b/frontend/rust-lib/flowy-revision/src/rev_manager.rs index f133c20782..bd4d01740d 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_manager.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_manager.rs @@ -1,5 +1,4 @@ use crate::disk::RevisionState; -// use crate::history::{RevisionHistoryConfig, RevisionHistoryDiskCache, RevisionHistoryManager}; use crate::{RevisionPersistence, RevisionSnapshotDiskCache, RevisionSnapshotManager, WSDataProviderDataSource}; use bytes::Bytes; use flowy_error::{FlowyError, FlowyResult}; @@ -46,7 +45,7 @@ pub struct RevisionManager { user_id: String, rev_id_counter: RevIdCounter, rev_persistence: Arc, - // rev_history: Arc, + #[allow(dead_code)] rev_snapshot: Arc, rev_compactor: Arc, #[cfg(feature = "flowy_unit_test")] @@ -59,25 +58,14 @@ impl RevisionManager { object_id: &str, rev_persistence: RevisionPersistence, rev_compactor: C, - // history_persistence: HP, snapshot_persistence: SP, ) -> Self where - // HP: 'static + RevisionHistoryDiskCache, SP: 'static + RevisionSnapshotDiskCache, C: 'static + RevisionCompactor, { let rev_id_counter = RevIdCounter::new(0); let rev_compactor = Arc::new(rev_compactor); - // let history_persistence = Arc::new(history_persistence); - // let rev_history_config = RevisionHistoryConfig::default(); - // let rev_history = Arc::new(RevisionHistoryManager::new( - // user_id, - // object_id, - // rev_history_config, - // history_persistence, - // rev_compactor.clone(), - // )); let rev_persistence = Arc::new(rev_persistence); @@ -90,7 +78,6 @@ impl RevisionManager { user_id: user_id.to_owned(), rev_id_counter, rev_persistence, - // rev_history, rev_snapshot, rev_compactor, #[cfg(feature = "flowy_unit_test")] diff --git a/frontend/rust-lib/flowy-revision/src/snapshot/persistence.rs b/frontend/rust-lib/flowy-revision/src/snapshot/persistence.rs index e07c541b12..d8d7bae3a6 100644 --- a/frontend/rust-lib/flowy-revision/src/snapshot/persistence.rs +++ b/frontend/rust-lib/flowy-revision/src/snapshot/persistence.rs @@ -1,3 +1,6 @@ +#![allow(clippy::all)] +#![allow(dead_code)] +#![allow(unused_variables)] use crate::{RevisionSnapshotDiskCache, RevisionSnapshotInfo}; use flowy_database::ConnectionPool; use flowy_error::FlowyResult; diff --git a/frontend/rust-lib/flowy-revision/src/snapshot/rev_snapshot.rs b/frontend/rust-lib/flowy-revision/src/snapshot/rev_snapshot.rs index 71e2e270ec..047d21607e 100644 --- a/frontend/rust-lib/flowy-revision/src/snapshot/rev_snapshot.rs +++ b/frontend/rust-lib/flowy-revision/src/snapshot/rev_snapshot.rs @@ -1,3 +1,6 @@ +#![allow(clippy::all)] +#![allow(dead_code)] +#![allow(unused_variables)] use flowy_error::FlowyResult; use std::sync::Arc;