diff --git a/frontend/rust-lib/flowy-database/migrations/2022-06-10-140131_revision-snapshot/down.sql b/frontend/rust-lib/flowy-database/migrations/2022-06-10-140131_revision-snapshot/down.sql new file mode 100644 index 0000000000..01a87a1e99 --- /dev/null +++ b/frontend/rust-lib/flowy-database/migrations/2022-06-10-140131_revision-snapshot/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +DROP TABLE rev_snapshot; \ No newline at end of file diff --git a/frontend/rust-lib/flowy-database/migrations/2022-06-10-140131_revision-snapshot/up.sql b/frontend/rust-lib/flowy-database/migrations/2022-06-10-140131_revision-snapshot/up.sql new file mode 100644 index 0000000000..9656dabc3c --- /dev/null +++ b/frontend/rust-lib/flowy-database/migrations/2022-06-10-140131_revision-snapshot/up.sql @@ -0,0 +1,7 @@ +-- Your SQL goes here +CREATE TABLE rev_snapshot ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + object_id TEXT NOT NULL DEFAULT '', + rev_id BIGINT NOT NULL DEFAULT 0, + data BLOB NOT NULL DEFAULT (x'') +); \ No newline at end of file diff --git a/frontend/rust-lib/flowy-database/src/schema.rs b/frontend/rust-lib/flowy-database/src/schema.rs index b131adf05f..e41fd6d865 100644 --- a/frontend/rust-lib/flowy-database/src/schema.rs +++ b/frontend/rust-lib/flowy-database/src/schema.rs @@ -49,6 +49,15 @@ table! { } } +table! { + rev_snapshot (id) { + id -> Integer, + object_id -> Text, + rev_id -> BigInt, + data -> Binary, + } +} + table! { rev_table (id) { id -> Integer, @@ -116,6 +125,7 @@ allow_tables_to_appear_in_same_query!( grid_meta_rev_table, grid_rev_table, kv_table, + rev_snapshot, rev_table, trash_table, user_table, diff --git a/frontend/rust-lib/flowy-folder/src/manager.rs b/frontend/rust-lib/flowy-folder/src/manager.rs index 826006e98c..356ff15897 100644 --- a/frontend/rust-lib/flowy-folder/src/manager.rs +++ b/frontend/rust-lib/flowy-folder/src/manager.rs @@ -1,4 +1,5 @@ use crate::entities::view::ViewDataType; +use crate::services::folder_editor::FolderRevisionCompactor; use crate::{ dart_notification::{send_dart_notification, FolderNotification}, entities::workspace::RepeatedWorkspacePB, @@ -13,7 +14,7 @@ use bytes::Bytes; use flowy_error::FlowyError; use flowy_folder_data_model::user_default; use flowy_revision::disk::SQLiteTextBlockRevisionPersistence; -use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket}; +use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence}; use flowy_sync::client_document::default::{initial_quill_delta_string, initial_read_me}; use flowy_sync::{client_folder::FolderPad, entities::ws_data::ServerRevisionWSData}; use lazy_static::lazy_static; @@ -161,9 +162,20 @@ impl FolderManager { let _ = self.persistence.initialize(user_id, &folder_id).await?; let pool = self.persistence.db_pool()?; - let disk_cache = Arc::new(SQLiteTextBlockRevisionPersistence::new(user_id, pool)); - let rev_persistence = Arc::new(RevisionPersistence::new(user_id, folder_id.as_ref(), disk_cache)); - let rev_manager = RevisionManager::new(user_id, folder_id.as_ref(), rev_persistence); + let object_id = folder_id.as_ref(); + let disk_cache = SQLiteTextBlockRevisionPersistence::new(user_id, pool.clone()); + let rev_persistence = RevisionPersistence::new(user_id, object_id, disk_cache); + let rev_compactor = FolderRevisionCompactor(); + // let history_persistence = SQLiteRevisionHistoryPersistence::new(object_id, pool.clone()); + let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(object_id, pool); + let rev_manager = RevisionManager::new( + user_id, + folder_id.as_ref(), + rev_persistence, + rev_compactor, + // history_persistence, + snapshot_persistence, + ); let folder_editor = FolderEditor::new(user_id, &folder_id, token, rev_manager, self.web_socket.clone()).await?; *self.folder_editor.write().await = Some(Arc::new(folder_editor)); diff --git a/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs b/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs index 4fa40013d0..b69b303fa1 100644 --- a/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs +++ b/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs @@ -89,11 +89,7 @@ impl FolderEditor { &self.user_id, md5, ); - let _ = futures::executor::block_on(async { - self.rev_manager - .add_local_revision(&revision, Box::new(FolderRevisionCompactor())) - .await - })?; + let _ = futures::executor::block_on(async { self.rev_manager.add_local_revision(&revision).await })?; Ok(()) } @@ -133,7 +129,7 @@ impl FolderEditor { } } -struct FolderRevisionCompactor(); +pub struct FolderRevisionCompactor(); impl RevisionCompactor for FolderRevisionCompactor { fn bytes_from_revisions(&self, revisions: Vec) -> FlowyResult { let delta = make_delta_from_revisions::(revisions)?; diff --git a/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs b/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs index 4c147ad0d4..d67d062d25 100644 --- a/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs +++ b/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs @@ -85,7 +85,7 @@ impl FolderMigration { return Ok(None); } let pool = self.database.db_pool()?; - let disk_cache = Arc::new(SQLiteTextBlockRevisionPersistence::new(user_id, pool)); + let disk_cache = SQLiteTextBlockRevisionPersistence::new(user_id, pool); let rev_persistence = Arc::new(RevisionPersistence::new(user_id, folder_id.as_ref(), disk_cache)); let (revisions, _) = RevisionLoader { object_id: folder_id.as_ref().to_owned(), diff --git a/frontend/rust-lib/flowy-grid/src/manager.rs b/frontend/rust-lib/flowy-grid/src/manager.rs index 28fa7b0c9e..77f019293f 100644 --- a/frontend/rust-lib/flowy-grid/src/manager.rs +++ b/frontend/rust-lib/flowy-grid/src/manager.rs @@ -1,4 +1,5 @@ -use crate::services::grid_editor::GridRevisionEditor; +use crate::services::block_revision_editor::GridBlockRevisionCompactor; +use crate::services::grid_editor::{GridRevisionCompactor, GridRevisionEditor}; use crate::services::persistence::block_index::BlockIndexCache; use crate::services::persistence::kv::GridKVPersistence; use crate::services::persistence::GridDatabase; @@ -8,9 +9,9 @@ use dashmap::DashMap; use flowy_database::ConnectionPool; use flowy_error::{FlowyError, FlowyResult}; use flowy_grid_data_model::revision::{BuildGridContext, GridRevision}; -use flowy_revision::disk::{SQLiteGridBlockMetaRevisionPersistence, SQLiteGridRevisionPersistence}; -use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket}; -use flowy_sync::client_grid::{make_block_meta_delta, make_grid_delta}; +use flowy_revision::disk::{SQLiteGridBlockRevisionPersistence, SQLiteGridRevisionPersistence}; +use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence}; +use flowy_sync::client_grid::{make_grid_block_delta, make_grid_delta}; use flowy_sync::entities::revision::{RepeatedRevision, Revision}; use std::sync::Arc; use tokio::sync::RwLock; @@ -61,14 +62,10 @@ impl GridManager { } #[tracing::instrument(level = "debug", skip_all, err)] - pub async fn create_grid_block_meta>( - &self, - block_id: T, - revisions: RepeatedRevision, - ) -> FlowyResult<()> { + pub async fn create_grid_block>(&self, block_id: T, revisions: RepeatedRevision) -> FlowyResult<()> { let block_id = block_id.as_ref(); let db_pool = self.grid_user.db_pool()?; - let rev_manager = self.make_grid_block_meta_rev_manager(block_id, db_pool)?; + let rev_manager = self.make_grid_block_rev_manager(block_id, db_pool)?; let _ = rev_manager.reset_object(revisions).await?; Ok(()) } @@ -113,7 +110,7 @@ impl GridManager { None => { tracing::trace!("Create grid editor with id: {}", grid_id); let db_pool = self.grid_user.db_pool()?; - let editor = self.make_grid_editor(grid_id, db_pool).await?; + let editor = self.make_grid_rev_editor(grid_id, db_pool).await?; if self.grid_editors.contains_key(grid_id) { tracing::warn!("Grid:{} already exists in cache", grid_id); @@ -127,7 +124,7 @@ impl GridManager { } #[tracing::instrument(level = "trace", skip(self, pool), err)] - async fn make_grid_editor( + async fn make_grid_rev_editor( &self, grid_id: &str, pool: Arc, @@ -147,22 +144,22 @@ impl GridManager { pub fn make_grid_rev_manager(&self, grid_id: &str, pool: Arc) -> FlowyResult { let user_id = self.grid_user.user_id()?; - - let disk_cache = Arc::new(SQLiteGridRevisionPersistence::new(&user_id, pool)); - let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, grid_id, disk_cache)); - let rev_manager = RevisionManager::new(&user_id, grid_id, rev_persistence); + let disk_cache = SQLiteGridRevisionPersistence::new(&user_id, pool.clone()); + let rev_persistence = RevisionPersistence::new(&user_id, grid_id, disk_cache); + let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(grid_id, pool); + let rev_compactor = GridRevisionCompactor(); + let rev_manager = RevisionManager::new(&user_id, grid_id, rev_persistence, rev_compactor, snapshot_persistence); Ok(rev_manager) } - fn make_grid_block_meta_rev_manager( - &self, - block_d: &str, - pool: Arc, - ) -> FlowyResult { + fn make_grid_block_rev_manager(&self, block_id: &str, pool: Arc) -> FlowyResult { let user_id = self.grid_user.user_id()?; - let disk_cache = Arc::new(SQLiteGridBlockMetaRevisionPersistence::new(&user_id, pool)); - let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, block_d, disk_cache)); - let rev_manager = RevisionManager::new(&user_id, block_d, rev_persistence); + let disk_cache = SQLiteGridBlockRevisionPersistence::new(&user_id, pool.clone()); + let rev_persistence = RevisionPersistence::new(&user_id, block_id, disk_cache); + let rev_compactor = GridBlockRevisionCompactor(); + let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(block_id, pool); + let rev_manager = + RevisionManager::new(&user_id, block_id, rev_persistence, rev_compactor, snapshot_persistence); Ok(rev_manager) } } @@ -181,13 +178,11 @@ pub async fn make_grid_view_data( }); // Create grid's block - let grid_block_meta_delta = make_block_meta_delta(block_meta_data); - let block_meta_delta_data = grid_block_meta_delta.to_delta_bytes(); + let grid_block_delta = make_grid_block_delta(block_meta_data); + let block_delta_data = grid_block_delta.to_delta_bytes(); let repeated_revision: RepeatedRevision = - Revision::initial_revision(user_id, block_id, block_meta_delta_data).into(); - let _ = grid_manager - .create_grid_block_meta(&block_id, repeated_revision) - .await?; + Revision::initial_revision(user_id, block_id, block_delta_data).into(); + let _ = grid_manager.create_grid_block(&block_id, repeated_revision).await?; } let grid_rev = GridRevision::from_build_context(view_id, build_context); diff --git a/frontend/rust-lib/flowy-grid/src/services/block_manager.rs b/frontend/rust-lib/flowy-grid/src/services/block_manager.rs index 39c1a442b5..c83bfcc2cd 100644 --- a/frontend/rust-lib/flowy-grid/src/services/block_manager.rs +++ b/frontend/rust-lib/flowy-grid/src/services/block_manager.rs @@ -1,7 +1,7 @@ use crate::dart_notification::{send_dart_notification, GridNotification}; use crate::entities::{CellChangesetPB, GridBlockChangesetPB, GridRowPB, InsertedRowPB, UpdatedRowPB}; use crate::manager::GridUser; -use crate::services::block_revision_editor::GridBlockRevisionEditor; +use crate::services::block_revision_editor::{GridBlockRevisionCompactor, GridBlockRevisionEditor}; use crate::services::persistence::block_index::BlockIndexCache; use crate::services::row::{block_from_row_orders, GridBlockSnapshot}; use dashmap::DashMap; @@ -9,8 +9,8 @@ use flowy_error::FlowyResult; use flowy_grid_data_model::revision::{ GridBlockMetaRevision, GridBlockMetaRevisionChangeset, RowMetaChangeset, RowRevision, }; -use flowy_revision::disk::SQLiteGridBlockMetaRevisionPersistence; -use flowy_revision::{RevisionManager, RevisionPersistence}; +use flowy_revision::disk::SQLiteGridBlockRevisionPersistence; +use flowy_revision::{RevisionManager, RevisionPersistence, SQLiteRevisionSnapshotPersistence}; use std::borrow::Cow; use std::collections::HashMap; use std::sync::Arc; @@ -280,8 +280,10 @@ async fn make_block_editor(user: &Arc, block_id: &str) -> FlowyRes let user_id = user.user_id()?; let pool = user.db_pool()?; - let disk_cache = Arc::new(SQLiteGridBlockMetaRevisionPersistence::new(&user_id, pool)); - let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, block_id, disk_cache)); - let rev_manager = RevisionManager::new(&user_id, block_id, rev_persistence); + let disk_cache = SQLiteGridBlockRevisionPersistence::new(&user_id, pool.clone()); + let rev_persistence = RevisionPersistence::new(&user_id, block_id, disk_cache); + let rev_compactor = GridBlockRevisionCompactor(); + let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(block_id, pool); + let rev_manager = RevisionManager::new(&user_id, block_id, rev_persistence, rev_compactor, snapshot_persistence); GridBlockRevisionEditor::new(&user_id, &token, block_id, rev_manager).await } diff --git a/frontend/rust-lib/flowy-grid/src/services/block_revision_editor.rs b/frontend/rust-lib/flowy-grid/src/services/block_revision_editor.rs index 3b94665283..f74075ec6e 100644 --- a/frontend/rust-lib/flowy-grid/src/services/block_revision_editor.rs +++ b/frontend/rust-lib/flowy-grid/src/services/block_revision_editor.rs @@ -26,7 +26,7 @@ impl GridBlockRevisionEditor { block_id: &str, mut rev_manager: RevisionManager, ) -> FlowyResult { - let cloud = Arc::new(GridBlockMetaRevisionCloudService { + let cloud = Arc::new(GridBlockRevisionCloudService { token: token.to_owned(), }); let block_meta_pad = rev_manager.load::(Some(cloud)).await?; @@ -170,20 +170,17 @@ impl GridBlockRevisionEditor { &user_id, md5, ); - let _ = self - .rev_manager - .add_local_revision(&revision, Box::new(GridBlockMetaRevisionCompactor())) - .await?; + let _ = self.rev_manager.add_local_revision(&revision).await?; Ok(()) } } -struct GridBlockMetaRevisionCloudService { +struct GridBlockRevisionCloudService { #[allow(dead_code)] token: String, } -impl RevisionCloudService for GridBlockMetaRevisionCloudService { +impl RevisionCloudService for GridBlockRevisionCloudService { #[tracing::instrument(level = "trace", skip(self))] fn fetch_object(&self, _user_id: &str, _object_id: &str) -> FutureResult, FlowyError> { FutureResult::new(async move { Ok(vec![]) }) @@ -200,8 +197,8 @@ impl RevisionObjectBuilder for GridBlockMetaPadBuilder { } } -struct GridBlockMetaRevisionCompactor(); -impl RevisionCompactor for GridBlockMetaRevisionCompactor { +pub struct GridBlockRevisionCompactor(); +impl RevisionCompactor for GridBlockRevisionCompactor { fn bytes_from_revisions(&self, revisions: Vec) -> FlowyResult { let delta = make_delta_from_revisions::(revisions)?; Ok(delta.to_delta_bytes()) diff --git a/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs b/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs index 0fef3230de..bcb4a7d288 100644 --- a/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs +++ b/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs @@ -582,10 +582,7 @@ impl GridRevisionEditor { &user_id, md5, ); - let _ = self - .rev_manager - .add_local_revision(&revision, Box::new(GridRevisionCompactor())) - .await?; + let _ = self.rev_manager.add_local_revision(&revision).await?; Ok(()) } @@ -664,7 +661,7 @@ impl RevisionCloudService for GridRevisionCloudService { } } -struct GridRevisionCompactor(); +pub struct GridRevisionCompactor(); impl RevisionCompactor for GridRevisionCompactor { fn bytes_from_revisions(&self, revisions: Vec) -> FlowyResult { let delta = make_delta_from_revisions::(revisions)?; diff --git a/frontend/rust-lib/flowy-grid/tests/grid/grid_test.rs b/frontend/rust-lib/flowy-grid/tests/grid/grid_test.rs new file mode 100644 index 0000000000..63bea4cc3d --- /dev/null +++ b/frontend/rust-lib/flowy-grid/tests/grid/grid_test.rs @@ -0,0 +1,381 @@ +use crate::grid::script::EditorScript::*; +use crate::grid::script::*; +use chrono::NaiveDateTime; +use flowy_grid::services::field::{ + DateCellContentChangeset, DateCellData, MultiSelectTypeOption, SelectOption, SelectOptionCellContentChangeset, + SingleSelectTypeOption, SELECTION_IDS_SEPARATOR, +}; +use flowy_grid::services::row::{decode_cell_data_from_type_option_cell_data, CreateRowMetaBuilder}; +use flowy_grid_data_model::entities::{ + CellChangeset, FieldChangesetParams, FieldType, GridBlockInfoChangeset, GridBlockMetaSnapshot, RowMetaChangeset, + TypeOptionDataEntry, +}; + +#[tokio::test] +async fn grid_create_field() { + let mut test = GridEditorTest::new().await; + let (text_field_params, text_field_meta) = create_text_field(&test.grid_id); + let (single_select_params, single_select_field) = create_single_select_field(&test.grid_id); + let scripts = vec![ + CreateField { + params: text_field_params, + }, + AssertFieldEqual { + field_index: test.field_count, + field_meta: text_field_meta, + }, + ]; + test.run_scripts(scripts).await; + + let scripts = vec![ + CreateField { + params: single_select_params, + }, + AssertFieldEqual { + field_index: test.field_count, + field_meta: single_select_field, + }, + ]; + test.run_scripts(scripts).await; +} + +#[tokio::test] +async fn grid_create_duplicate_field() { + let mut test = GridEditorTest::new().await; + let (params, _) = create_text_field(&test.grid_id); + let field_count = test.field_count; + let expected_field_count = field_count + 1; + let scripts = vec![ + CreateField { params: params.clone() }, + CreateField { params }, + AssertFieldCount(expected_field_count), + ]; + test.run_scripts(scripts).await; +} + +#[tokio::test] +async fn grid_update_field_with_empty_change() { + let mut test = GridEditorTest::new().await; + let (params, field_meta) = create_single_select_field(&test.grid_id); + let changeset = FieldChangesetParams { + field_id: field_meta.id.clone(), + grid_id: test.grid_id.clone(), + ..Default::default() + }; + + let scripts = vec![ + CreateField { params }, + UpdateField { changeset }, + AssertFieldEqual { + field_index: test.field_count, + field_meta, + }, + ]; + test.run_scripts(scripts).await; +} + +#[tokio::test] +async fn grid_update_field() { + let mut test = GridEditorTest::new().await; + let (single_select_params, single_select_field) = create_single_select_field(&test.grid_id); + let mut cloned_field = single_select_field.clone(); + + let mut single_select_type_option = SingleSelectTypeOption::from(&single_select_field); + single_select_type_option.options.push(SelectOption::new("Unknown")); + let changeset = FieldChangesetParams { + field_id: single_select_field.id.clone(), + grid_id: test.grid_id.clone(), + frozen: Some(true), + width: Some(1000), + type_option_data: Some(single_select_type_option.protobuf_bytes().to_vec()), + ..Default::default() + }; + + cloned_field.frozen = true; + cloned_field.width = 1000; + cloned_field.insert_type_option_entry(&single_select_type_option); + + let scripts = vec![ + CreateField { + params: single_select_params, + }, + UpdateField { changeset }, + AssertFieldEqual { + field_index: test.field_count, + field_meta: cloned_field, + }, + ]; + test.run_scripts(scripts).await; +} + +#[tokio::test] +async fn grid_delete_field() { + let mut test = GridEditorTest::new().await; + let expected_field_count = test.field_count; + let (text_params, text_field) = create_text_field(&test.grid_id); + let scripts = vec![ + CreateField { params: text_params }, + DeleteField { field_meta: text_field }, + AssertFieldCount(expected_field_count), + ]; + test.run_scripts(scripts).await; +} + +#[tokio::test] +async fn grid_create_block() { + let grid_block = GridBlockMetaSnapshot::new(); + let scripts = vec![ + AssertBlockCount(1), + CreateBlock { block: grid_block }, + AssertBlockCount(2), + ]; + GridEditorTest::new().await.run_scripts(scripts).await; +} + +#[tokio::test] +async fn grid_update_block() { + let grid_block = GridBlockMetaSnapshot::new(); + let mut cloned_grid_block = grid_block.clone(); + let changeset = GridBlockInfoChangeset { + block_id: grid_block.block_id.clone(), + start_row_index: Some(2), + row_count: Some(10), + }; + + cloned_grid_block.start_row_index = 2; + cloned_grid_block.row_count = 10; + + let scripts = vec![ + AssertBlockCount(1), + CreateBlock { block: grid_block }, + UpdateBlock { changeset }, + AssertBlockCount(2), + AssertBlockEqual { + block_index: 1, + block: cloned_grid_block, + }, + ]; + GridEditorTest::new().await.run_scripts(scripts).await; +} + +#[tokio::test] +async fn grid_create_row() { + let scripts = vec![AssertRowCount(3), CreateEmptyRow, CreateEmptyRow, AssertRowCount(5)]; + GridEditorTest::new().await.run_scripts(scripts).await; +} + +#[tokio::test] +async fn grid_create_row2() { + let mut test = GridEditorTest::new().await; + let create_row_context = CreateRowMetaBuilder::new(&test.field_metas).build(); + let scripts = vec![ + AssertRowCount(3), + CreateRow { + context: create_row_context, + }, + AssertRowCount(4), + ]; + test.run_scripts(scripts).await; +} + +#[tokio::test] +async fn grid_update_row() { + let mut test = GridEditorTest::new().await; + let context = CreateRowMetaBuilder::new(&test.field_metas).build(); + let changeset = RowMetaChangeset { + row_id: context.row_id.clone(), + height: None, + visibility: None, + cell_by_field_id: Default::default(), + }; + + let scripts = vec![ + AssertRowCount(3), + CreateRow { context }, + UpdateRow { + changeset: changeset.clone(), + }, + AssertRow { changeset }, + AssertRowCount(4), + ]; + test.run_scripts(scripts).await; +} + +#[tokio::test] +async fn grid_delete_row() { + let mut test = GridEditorTest::new().await; + let context_1 = CreateRowMetaBuilder::new(&test.field_metas).build(); + let context_2 = CreateRowMetaBuilder::new(&test.field_metas).build(); + let row_ids = vec![context_1.row_id.clone(), context_2.row_id.clone()]; + let scripts = vec![ + AssertRowCount(3), + CreateRow { context: context_1 }, + CreateRow { context: context_2 }, + AssertBlockCount(1), + AssertBlock { + block_index: 0, + row_count: 5, + start_row_index: 0, + }, + DeleteRow { row_ids }, + AssertBlock { + block_index: 0, + row_count: 3, + start_row_index: 0, + }, + ]; + test.run_scripts(scripts).await; +} + +#[tokio::test] +async fn grid_row_add_cells_test() { + let mut test = GridEditorTest::new().await; + let mut builder = CreateRowMetaBuilder::new(&test.field_metas); + for field in &test.field_metas { + match field.field_type { + FieldType::RichText => { + builder.add_cell(&field.id, "hello world".to_owned()).unwrap(); + } + FieldType::Number => { + builder.add_cell(&field.id, "18,443".to_owned()).unwrap(); + } + FieldType::DateTime => { + builder + .add_cell(&field.id, make_date_cell_string("1647251762")) + .unwrap(); + } + FieldType::SingleSelect => { + let type_option = SingleSelectTypeOption::from(field); + let option = type_option.options.first().unwrap(); + builder.add_select_option_cell(&field.id, option.id.clone()).unwrap(); + } + FieldType::MultiSelect => { + let type_option = MultiSelectTypeOption::from(field); + let ops_ids = type_option + .options + .iter() + .map(|option| option.id.clone()) + .collect::>() + .join(SELECTION_IDS_SEPARATOR); + builder.add_select_option_cell(&field.id, ops_ids).unwrap(); + } + FieldType::Checkbox => { + builder.add_cell(&field.id, "false".to_string()).unwrap(); + } + FieldType::URL => { + builder.add_cell(&field.id, "1".to_string()).unwrap(); + } + } + } + let context = builder.build(); + let scripts = vec![CreateRow { context }, AssertGridMetaPad]; + test.run_scripts(scripts).await; +} + +#[tokio::test] +async fn grid_row_add_date_cell_test() { + let mut test = GridEditorTest::new().await; + let mut builder = CreateRowMetaBuilder::new(&test.field_metas); + let mut date_field = None; + let timestamp = 1647390674; + for field in &test.field_metas { + if field.field_type == FieldType::DateTime { + date_field = Some(field.clone()); + NaiveDateTime::from_timestamp(123, 0); + // The data should not be empty + assert!(builder.add_cell(&field.id, "".to_string()).is_err()); + assert!(builder.add_cell(&field.id, make_date_cell_string("123")).is_ok()); + assert!(builder + .add_cell(&field.id, make_date_cell_string(×tamp.to_string())) + .is_ok()); + } + } + let context = builder.build(); + let date_field = date_field.unwrap(); + let cell_data = context.cell_by_field_id.get(&date_field.id).unwrap().clone(); + assert_eq!( + decode_cell_data_from_type_option_cell_data(cell_data.data.clone(), &date_field, &date_field.field_type) + .parse::() + .unwrap() + .date, + "2022/03/16", + ); + let scripts = vec![CreateRow { context }]; + test.run_scripts(scripts).await; +} + +#[tokio::test] +async fn grid_cell_update() { + let mut test = GridEditorTest::new().await; + let field_metas = &test.field_metas; + let row_metas = &test.row_metas; + let grid_blocks = &test.grid_blocks; + assert_eq!(row_metas.len(), 3); + assert_eq!(grid_blocks.len(), 1); + + let block_id = &grid_blocks.first().unwrap().block_id; + let mut scripts = vec![]; + for (index, row_meta) in row_metas.iter().enumerate() { + for field_meta in field_metas { + if index == 0 { + let data = match field_meta.field_type { + FieldType::RichText => "".to_string(), + FieldType::Number => "123".to_string(), + FieldType::DateTime => make_date_cell_string("123"), + FieldType::SingleSelect => { + let type_option = SingleSelectTypeOption::from(field_meta); + SelectOptionCellContentChangeset::from_insert(&type_option.options.first().unwrap().id).to_str() + } + FieldType::MultiSelect => { + let type_option = MultiSelectTypeOption::from(field_meta); + SelectOptionCellContentChangeset::from_insert(&type_option.options.first().unwrap().id).to_str() + } + FieldType::Checkbox => "1".to_string(), + FieldType::URL => "1".to_string(), + }; + + scripts.push(UpdateCell { + changeset: CellChangeset { + grid_id: block_id.to_string(), + row_id: row_meta.id.clone(), + field_id: field_meta.id.clone(), + cell_content_changeset: Some(data), + }, + is_err: false, + }); + } + + if index == 1 { + let (data, is_err) = match field_meta.field_type { + FieldType::RichText => ("1".to_string().repeat(10001), true), + FieldType::Number => ("abc".to_string(), true), + FieldType::DateTime => ("abc".to_string(), true), + FieldType::SingleSelect => (SelectOptionCellContentChangeset::from_insert("abc").to_str(), false), + FieldType::MultiSelect => (SelectOptionCellContentChangeset::from_insert("abc").to_str(), false), + FieldType::Checkbox => ("2".to_string(), false), + FieldType::URL => ("2".to_string(), false), + }; + + scripts.push(UpdateCell { + changeset: CellChangeset { + grid_id: block_id.to_string(), + row_id: row_meta.id.clone(), + field_id: field_meta.id.clone(), + cell_content_changeset: Some(data), + }, + is_err, + }); + } + } + } + + test.run_scripts(scripts).await; +} + +fn make_date_cell_string(s: &str) -> String { + serde_json::to_string(&DateCellContentChangeset { + date: Some(s.to_string()), + time: None, + }) + .unwrap() +} diff --git a/frontend/rust-lib/flowy-grid/tests/grid/script.rs b/frontend/rust-lib/flowy-grid/tests/grid/script.rs new file mode 100644 index 0000000000..c1fa47c7a9 --- /dev/null +++ b/frontend/rust-lib/flowy-grid/tests/grid/script.rs @@ -0,0 +1,374 @@ +use bytes::Bytes; +use flowy_grid::services::field::*; +use flowy_grid::services::grid_meta_editor::{GridMetaEditor, GridPadBuilder}; +use flowy_grid::services::row::CreateRowMetaPayload; +use flowy_grid_data_model::entities::{ + BuildGridContext, CellChangeset, Field, FieldChangesetParams, FieldMeta, FieldOrder, FieldType, + GridBlockInfoChangeset, GridBlockMetaSnapshot, InsertFieldParams, RowMeta, RowMetaChangeset, RowOrder, + TypeOptionDataEntry, +}; +use flowy_revision::REVISION_WRITE_INTERVAL_IN_MILLIS; +use flowy_sync::client_grid::GridBuilder; +use flowy_test::helper::ViewTest; +use flowy_test::FlowySDKTest; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; +use strum::EnumCount; +use tokio::time::sleep; + +pub enum EditorScript { + CreateField { + params: InsertFieldParams, + }, + UpdateField { + changeset: FieldChangesetParams, + }, + DeleteField { + field_meta: FieldMeta, + }, + AssertFieldCount(usize), + AssertFieldEqual { + field_index: usize, + field_meta: FieldMeta, + }, + CreateBlock { + block: GridBlockMetaSnapshot, + }, + UpdateBlock { + changeset: GridBlockInfoChangeset, + }, + AssertBlockCount(usize), + AssertBlock { + block_index: usize, + row_count: i32, + start_row_index: i32, + }, + AssertBlockEqual { + block_index: usize, + block: GridBlockMetaSnapshot, + }, + CreateEmptyRow, + CreateRow { + context: CreateRowMetaPayload, + }, + UpdateRow { + changeset: RowMetaChangeset, + }, + AssertRow { + changeset: RowMetaChangeset, + }, + DeleteRow { + row_ids: Vec, + }, + UpdateCell { + changeset: CellChangeset, + is_err: bool, + }, + AssertRowCount(usize), + // AssertRowEqual{ row_index: usize, row: RowMeta}, + AssertGridMetaPad, +} + +pub struct GridEditorTest { + pub sdk: FlowySDKTest, + pub grid_id: String, + pub editor: Arc, + pub field_metas: Vec, + pub grid_blocks: Vec, + pub row_metas: Vec>, + pub field_count: usize, + + pub row_order_by_row_id: HashMap, +} + +impl GridEditorTest { + pub async fn new() -> Self { + let sdk = FlowySDKTest::default(); + let _ = sdk.init_user().await; + let build_context = make_template_1_grid(); + let view_data: Bytes = build_context.into(); + let test = ViewTest::new_grid_view(&sdk, view_data.to_vec()).await; + let editor = sdk.grid_manager.open_grid(&test.view.id).await.unwrap(); + let field_metas = editor.get_field_metas::(None).await.unwrap(); + let grid_blocks = editor.get_block_metas().await.unwrap(); + let row_metas = get_row_metas(&editor).await; + + let grid_id = test.view.id; + Self { + sdk, + grid_id, + editor, + field_metas, + grid_blocks, + row_metas, + field_count: FieldType::COUNT, + row_order_by_row_id: HashMap::default(), + } + } + + pub async fn run_scripts(&mut self, scripts: Vec) { + for script in scripts { + self.run_script(script).await; + } + } + + pub async fn run_script(&mut self, script: EditorScript) { + let grid_manager = self.sdk.grid_manager.clone(); + let pool = self.sdk.user_session.db_pool().unwrap(); + let rev_manager = self.editor.rev_manager(); + let _cache = rev_manager.revision_cache().await; + + match script { + EditorScript::CreateField { params } => { + if !self.editor.contain_field(¶ms.field.id).await { + self.field_count += 1; + } + + self.editor.insert_field(params).await.unwrap(); + self.field_metas = self.editor.get_field_metas::(None).await.unwrap(); + assert_eq!(self.field_count, self.field_metas.len()); + } + EditorScript::UpdateField { changeset: change } => { + self.editor.update_field(change).await.unwrap(); + self.field_metas = self.editor.get_field_metas::(None).await.unwrap(); + } + EditorScript::DeleteField { field_meta } => { + if self.editor.contain_field(&field_meta.id).await { + self.field_count -= 1; + } + + self.editor.delete_field(&field_meta.id).await.unwrap(); + self.field_metas = self.editor.get_field_metas::(None).await.unwrap(); + assert_eq!(self.field_count, self.field_metas.len()); + } + EditorScript::AssertFieldCount(count) => { + assert_eq!( + self.editor.get_field_metas::(None).await.unwrap().len(), + count + ); + } + EditorScript::AssertFieldEqual { + field_index, + field_meta, + } => { + let field_metas = self.editor.get_field_metas::(None).await.unwrap(); + assert_eq!(field_metas[field_index].clone(), field_meta); + } + EditorScript::CreateBlock { block } => { + self.editor.create_block(block).await.unwrap(); + self.grid_blocks = self.editor.get_block_metas().await.unwrap(); + } + EditorScript::UpdateBlock { changeset: change } => { + self.editor.update_block(change).await.unwrap(); + } + EditorScript::AssertBlockCount(count) => { + assert_eq!(self.editor.get_block_metas().await.unwrap().len(), count); + } + EditorScript::AssertBlock { + block_index, + row_count, + start_row_index, + } => { + assert_eq!(self.grid_blocks[block_index].row_count, row_count); + assert_eq!(self.grid_blocks[block_index].start_row_index, start_row_index); + } + EditorScript::AssertBlockEqual { block_index, block } => { + let blocks = self.editor.get_block_metas().await.unwrap(); + let compared_block = blocks[block_index].clone(); + assert_eq!(compared_block, block); + } + EditorScript::CreateEmptyRow => { + let row_order = self.editor.create_row(None).await.unwrap(); + self.row_order_by_row_id.insert(row_order.row_id.clone(), row_order); + self.row_metas = self.get_row_metas().await; + self.grid_blocks = self.editor.get_block_metas().await.unwrap(); + } + EditorScript::CreateRow { context } => { + let row_orders = self.editor.insert_rows(vec![context]).await.unwrap(); + for row_order in row_orders { + self.row_order_by_row_id.insert(row_order.row_id.clone(), row_order); + } + self.row_metas = self.get_row_metas().await; + self.grid_blocks = self.editor.get_block_metas().await.unwrap(); + } + EditorScript::UpdateRow { changeset: change } => self.editor.update_row(change).await.unwrap(), + EditorScript::DeleteRow { row_ids } => { + let row_orders = row_ids + .into_iter() + .map(|row_id| self.row_order_by_row_id.get(&row_id).unwrap().clone()) + .collect::>(); + + self.editor.delete_rows(row_orders).await.unwrap(); + self.row_metas = self.get_row_metas().await; + self.grid_blocks = self.editor.get_block_metas().await.unwrap(); + } + EditorScript::AssertRow { changeset } => { + let row = self.row_metas.iter().find(|row| row.id == changeset.row_id).unwrap(); + + if let Some(visibility) = changeset.visibility { + assert_eq!(row.visibility, visibility); + } + + if let Some(height) = changeset.height { + assert_eq!(row.height, height); + } + } + EditorScript::UpdateCell { changeset, is_err } => { + let result = self.editor.update_cell(changeset).await; + if is_err { + assert!(result.is_err()) + } else { + let _ = result.unwrap(); + self.row_metas = self.get_row_metas().await; + } + } + EditorScript::AssertRowCount(count) => { + assert_eq!(self.row_metas.len(), count); + } + EditorScript::AssertGridMetaPad => { + sleep(Duration::from_millis(2 * REVISION_WRITE_INTERVAL_IN_MILLIS)).await; + let mut grid_rev_manager = grid_manager.make_grid_rev_manager(&self.grid_id, pool.clone()).unwrap(); + let grid_pad = grid_rev_manager.load::(None).await.unwrap(); + println!("{}", grid_pad.delta_str()); + } + } + } + + async fn get_row_metas(&self) -> Vec> { + get_row_metas(&self.editor).await + } +} + +async fn get_row_metas(editor: &Arc) -> Vec> { + editor + .grid_block_snapshots(None) + .await + .unwrap() + .pop() + .unwrap() + .row_metas +} + +pub fn create_text_field(grid_id: &str) -> (InsertFieldParams, FieldMeta) { + let field_meta = FieldBuilder::new(RichTextTypeOptionBuilder::default()) + .name("Name") + .visibility(true) + .build(); + + let cloned_field_meta = field_meta.clone(); + + let type_option_data = field_meta + .get_type_option_entry::(&field_meta.field_type) + .unwrap() + .protobuf_bytes() + .to_vec(); + + let field = Field { + id: field_meta.id, + name: field_meta.name, + desc: field_meta.desc, + field_type: field_meta.field_type, + frozen: field_meta.frozen, + visibility: field_meta.visibility, + width: field_meta.width, + is_primary: false, + }; + + let params = InsertFieldParams { + grid_id: grid_id.to_owned(), + field, + type_option_data, + start_field_id: None, + }; + (params, cloned_field_meta) +} + +pub fn create_single_select_field(grid_id: &str) -> (InsertFieldParams, FieldMeta) { + let single_select = SingleSelectTypeOptionBuilder::default() + .option(SelectOption::new("Done")) + .option(SelectOption::new("Progress")); + + let field_meta = FieldBuilder::new(single_select).name("Name").visibility(true).build(); + let cloned_field_meta = field_meta.clone(); + let type_option_data = field_meta + .get_type_option_entry::(&field_meta.field_type) + .unwrap() + .protobuf_bytes() + .to_vec(); + + let field = Field { + id: field_meta.id, + name: field_meta.name, + desc: field_meta.desc, + field_type: field_meta.field_type, + frozen: field_meta.frozen, + visibility: field_meta.visibility, + width: field_meta.width, + is_primary: false, + }; + + let params = InsertFieldParams { + grid_id: grid_id.to_owned(), + field, + type_option_data, + start_field_id: None, + }; + (params, cloned_field_meta) +} + +fn make_template_1_grid() -> BuildGridContext { + let text_field = FieldBuilder::new(RichTextTypeOptionBuilder::default()) + .name("Name") + .visibility(true) + .build(); + + // Single Select + let single_select = SingleSelectTypeOptionBuilder::default() + .option(SelectOption::new("Live")) + .option(SelectOption::new("Completed")) + .option(SelectOption::new("Planned")) + .option(SelectOption::new("Paused")); + let single_select_field = FieldBuilder::new(single_select).name("Status").visibility(true).build(); + + // MultiSelect + let multi_select = MultiSelectTypeOptionBuilder::default() + .option(SelectOption::new("Google")) + .option(SelectOption::new("Facebook")) + .option(SelectOption::new("Twitter")); + let multi_select_field = FieldBuilder::new(multi_select) + .name("Platform") + .visibility(true) + .build(); + + // Number + let number = NumberTypeOptionBuilder::default().set_format(NumberFormat::USD); + let number_field = FieldBuilder::new(number).name("Price").visibility(true).build(); + + // Date + let date = DateTypeOptionBuilder::default() + .date_format(DateFormat::US) + .time_format(TimeFormat::TwentyFourHour); + let date_field = FieldBuilder::new(date).name("Time").visibility(true).build(); + + // Checkbox + let checkbox = CheckboxTypeOptionBuilder::default(); + let checkbox_field = FieldBuilder::new(checkbox).name("is done").visibility(true).build(); + + // URL + let url = URLTypeOptionBuilder::default(); + let url_field = FieldBuilder::new(url).name("link").visibility(true).build(); + + GridBuilder::default() + .add_field(text_field) + .add_field(single_select_field) + .add_field(multi_select_field) + .add_field(number_field) + .add_field(date_field) + .add_field(checkbox_field) + .add_field(url_field) + .add_empty_row() + .add_empty_row() + .add_empty_row() + .build() +} diff --git a/frontend/rust-lib/flowy-revision/src/cache/disk/grid_meta_rev_impl.rs b/frontend/rust-lib/flowy-revision/src/cache/disk/grid_block_meta_rev_impl.rs similarity index 97% rename from frontend/rust-lib/flowy-revision/src/cache/disk/grid_meta_rev_impl.rs rename to frontend/rust-lib/flowy-revision/src/cache/disk/grid_block_meta_rev_impl.rs index e850119c30..52b01440e6 100644 --- a/frontend/rust-lib/flowy-revision/src/cache/disk/grid_meta_rev_impl.rs +++ b/frontend/rust-lib/flowy-revision/src/cache/disk/grid_block_meta_rev_impl.rs @@ -1,6 +1,5 @@ use crate::cache::disk::RevisionDiskCache; use crate::disk::{RevisionChangeset, RevisionRecord, RevisionState}; - use bytes::Bytes; use diesel::{sql_types::Integer, update, SqliteConnection}; use flowy_database::{ @@ -16,12 +15,12 @@ use flowy_sync::{ }; use std::sync::Arc; -pub struct SQLiteGridBlockMetaRevisionPersistence { +pub struct SQLiteGridBlockRevisionPersistence { user_id: String, pub(crate) pool: Arc, } -impl RevisionDiskCache for SQLiteGridBlockMetaRevisionPersistence { +impl RevisionDiskCache for SQLiteGridBlockRevisionPersistence { type Error = FlowyError; fn create_revision_records(&self, revision_records: Vec) -> Result<(), Self::Error> { @@ -82,7 +81,7 @@ impl RevisionDiskCache for SQLiteGridBlockMetaRevisionPersistence { } } -impl SQLiteGridBlockMetaRevisionPersistence { +impl SQLiteGridBlockRevisionPersistence { pub fn new(user_id: &str, pool: Arc) -> Self { Self { user_id: user_id.to_owned(), diff --git a/frontend/rust-lib/flowy-revision/src/cache/disk/grid_rev_impl.rs b/frontend/rust-lib/flowy-revision/src/cache/disk/grid_rev_impl.rs index d51ea8e48c..9ddb21bc8c 100644 --- a/frontend/rust-lib/flowy-revision/src/cache/disk/grid_rev_impl.rs +++ b/frontend/rust-lib/flowy-revision/src/cache/disk/grid_rev_impl.rs @@ -95,7 +95,6 @@ struct GridRevisionSql(); impl GridRevisionSql { fn create(revision_records: Vec, conn: &SqliteConnection) -> Result<(), FlowyError> { // Batch insert: https://diesel.rs/guides/all-about-inserts.html - let records = revision_records .into_iter() .map(|record| { diff --git a/frontend/rust-lib/flowy-revision/src/cache/disk/mod.rs b/frontend/rust-lib/flowy-revision/src/cache/disk/mod.rs index 945c6d707f..814b591f15 100644 --- a/frontend/rust-lib/flowy-revision/src/cache/disk/mod.rs +++ b/frontend/rust-lib/flowy-revision/src/cache/disk/mod.rs @@ -1,10 +1,10 @@ mod folder_rev_impl; -mod grid_meta_rev_impl; +mod grid_block_meta_rev_impl; mod grid_rev_impl; mod text_rev_impl; pub use folder_rev_impl::*; -pub use grid_meta_rev_impl::*; +pub use grid_block_meta_rev_impl::*; pub use grid_rev_impl::*; pub use text_rev_impl::*; diff --git a/frontend/rust-lib/flowy-revision/src/history/mod.rs b/frontend/rust-lib/flowy-revision/src/history/mod.rs new file mode 100644 index 0000000000..9de42831ed --- /dev/null +++ b/frontend/rust-lib/flowy-revision/src/history/mod.rs @@ -0,0 +1,5 @@ +mod persistence; +mod rev_history; + +pub use persistence::*; +pub use rev_history::*; diff --git a/frontend/rust-lib/flowy-revision/src/history/persistence.rs b/frontend/rust-lib/flowy-revision/src/history/persistence.rs new file mode 100644 index 0000000000..9c1bdacc9e --- /dev/null +++ b/frontend/rust-lib/flowy-revision/src/history/persistence.rs @@ -0,0 +1,78 @@ +use crate::history::RevisionHistoryDiskCache; +use flowy_database::{ + prelude::*, + schema::{rev_history, rev_history::dsl}, + ConnectionPool, +}; +use flowy_error::{internal_error, FlowyResult}; +use flowy_sync::entities::revision::Revision; +use std::sync::Arc; + +pub struct SQLiteRevisionHistoryPersistence { + object_id: String, + pool: Arc, +} + +impl SQLiteRevisionHistoryPersistence { + pub fn new(object_id: &str, pool: Arc) -> Self { + let object_id = object_id.to_owned(); + Self { object_id, pool } + } +} + +impl RevisionHistoryDiskCache for SQLiteRevisionHistoryPersistence { + fn write_history(&self, revision: Revision) -> FlowyResult<()> { + let record = ( + dsl::object_id.eq(revision.object_id), + dsl::start_rev_id.eq(revision.base_rev_id), + dsl::end_rev_id.eq(revision.rev_id), + dsl::data.eq(revision.delta_data), + ); + let conn = self.pool.get().map_err(internal_error)?; + + let _ = insert_or_ignore_into(dsl::rev_history) + .values(vec![record]) + .execute(&*conn)?; + Ok(()) + } + + fn read_histories(&self) -> FlowyResult> { + let conn = self.pool.get().map_err(internal_error)?; + let records: Vec = dsl::rev_history + .filter(dsl::object_id.eq(&self.object_id)) + .load::(&*conn)?; + + Ok(records + .into_iter() + .map(|record| record.into()) + .collect::>()) + } +} + +#[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)] +#[table_name = "rev_history"] +struct RevisionRecord { + id: i32, + object_id: String, + start_rev_id: i64, + end_rev_id: i64, + data: Vec, +} + +pub struct RevisionHistory { + pub object_id: String, + pub start_rev_id: i64, + pub end_rev_id: i64, + pub data: Vec, +} + +impl std::convert::From for RevisionHistory { + fn from(record: RevisionRecord) -> Self { + RevisionHistory { + object_id: record.object_id, + start_rev_id: record.start_rev_id, + end_rev_id: record.end_rev_id, + data: record.data, + } + } +} diff --git a/frontend/rust-lib/flowy-revision/src/history/rev_history.rs b/frontend/rust-lib/flowy-revision/src/history/rev_history.rs new file mode 100644 index 0000000000..71bdc0c333 --- /dev/null +++ b/frontend/rust-lib/flowy-revision/src/history/rev_history.rs @@ -0,0 +1,201 @@ +use crate::{RevisionCompactor, RevisionHistory}; +use async_stream::stream; + +use flowy_error::{FlowyError, FlowyResult}; +use flowy_sync::entities::revision::Revision; +use futures_util::future::BoxFuture; +use futures_util::stream::StreamExt; +use futures_util::FutureExt; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::{mpsc, RwLock}; +use tokio::time::interval; + +pub trait RevisionHistoryDiskCache: Send + Sync { + fn write_history(&self, revision: Revision) -> FlowyResult<()>; + + fn read_histories(&self) -> FlowyResult>; +} + +pub struct RevisionHistoryManager { + user_id: String, + stop_tx: mpsc::Sender<()>, + config: RevisionHistoryConfig, + revisions: Arc>>, + disk_cache: Arc, +} + +impl RevisionHistoryManager { + pub fn new( + user_id: &str, + object_id: &str, + config: RevisionHistoryConfig, + disk_cache: Arc, + rev_compactor: Arc, + ) -> Self { + let revisions = Arc::new(RwLock::new(vec![])); + let stop_tx = + spawn_history_checkpoint_runner(user_id, object_id, &disk_cache, &revisions, rev_compactor, &config); + let user_id = user_id.to_owned(); + Self { + user_id, + stop_tx, + config, + revisions, + disk_cache, + } + } + + pub async fn add_revision(&self, revision: &Revision) { + self.revisions.write().await.push(revision.clone()); + } + + pub async fn read_revision_histories(&self) -> FlowyResult> { + self.disk_cache.read_histories() + } +} + +pub struct RevisionHistoryConfig { + check_duration: Duration, +} + +impl std::default::Default for RevisionHistoryConfig { + fn default() -> Self { + Self { + check_duration: Duration::from_secs(5), + } + } +} + +fn spawn_history_checkpoint_runner( + user_id: &str, + object_id: &str, + disk_cache: &Arc, + revisions: &Arc>>, + rev_compactor: Arc, + config: &RevisionHistoryConfig, +) -> mpsc::Sender<()> { + let user_id = user_id.to_string(); + let object_id = object_id.to_string(); + let disk_cache = disk_cache.clone(); + let revisions = revisions.clone(); + + let (checkpoint_tx, checkpoint_rx) = mpsc::channel(1); + let (stop_tx, stop_rx) = mpsc::channel(1); + let checkpoint_sender = FixedDurationCheckpointSender { + user_id, + object_id, + checkpoint_tx, + disk_cache, + revisions, + rev_compactor, + duration: config.check_duration, + }; + tokio::spawn(HistoryCheckpointRunner::new(stop_rx, checkpoint_rx).run()); + tokio::spawn(checkpoint_sender.run()); + stop_tx +} + +struct HistoryCheckpointRunner { + stop_rx: Option>, + checkpoint_rx: Option>, +} + +impl HistoryCheckpointRunner { + fn new(stop_rx: mpsc::Receiver<()>, checkpoint_rx: mpsc::Receiver) -> Self { + Self { + stop_rx: Some(stop_rx), + checkpoint_rx: Some(checkpoint_rx), + } + } + + async fn run(mut self) { + let mut stop_rx = self.stop_rx.take().expect("It should only run once"); + let mut checkpoint_rx = self.checkpoint_rx.take().expect("It should only run once"); + let stream = stream! { + loop { + tokio::select! { + result = checkpoint_rx.recv() => { + match result { + Some(checkpoint) => yield checkpoint, + None => {}, + } + }, + _ = stop_rx.recv() => { + tracing::trace!("Checkpoint runner exit"); + break + }, + }; + } + }; + + stream + .for_each(|checkpoint| async move { + checkpoint.write().await; + }) + .await; + } +} + +struct HistoryCheckpoint { + user_id: String, + object_id: String, + revisions: Vec, + disk_cache: Arc, + rev_compactor: Arc, +} + +impl HistoryCheckpoint { + async fn write(self) { + if self.revisions.is_empty() { + return; + } + + let result = || { + let revision = self + .rev_compactor + .compact(&self.user_id, &self.object_id, self.revisions)?; + let _ = self.disk_cache.write_history(revision)?; + Ok::<(), FlowyError>(()) + }; + + match result() { + Ok(_) => {} + Err(e) => tracing::error!("Write history checkout failed: {:?}", e), + } + } +} + +struct FixedDurationCheckpointSender { + user_id: String, + object_id: String, + checkpoint_tx: mpsc::Sender, + disk_cache: Arc, + revisions: Arc>>, + rev_compactor: Arc, + duration: Duration, +} + +impl FixedDurationCheckpointSender { + fn run(self) -> BoxFuture<'static, ()> { + async move { + let mut interval = interval(self.duration); + let checkpoint_revisions: Vec = self.revisions.write().await.drain(..).collect(); + let checkpoint = HistoryCheckpoint { + user_id: self.user_id.clone(), + object_id: self.object_id.clone(), + revisions: checkpoint_revisions, + disk_cache: self.disk_cache.clone(), + rev_compactor: self.rev_compactor.clone(), + }; + match self.checkpoint_tx.send(checkpoint).await { + Ok(_) => { + interval.tick().await; + self.run(); + } + Err(_) => {} + } + } + .boxed() + } +} diff --git a/frontend/rust-lib/flowy-revision/src/lib.rs b/frontend/rust-lib/flowy-revision/src/lib.rs index 05e60c00e0..b7fd8a12e6 100644 --- a/frontend/rust-lib/flowy-revision/src/lib.rs +++ b/frontend/rust-lib/flowy-revision/src/lib.rs @@ -1,13 +1,17 @@ mod cache; mod conflict_resolve; +// mod history; mod rev_manager; mod rev_persistence; +mod snapshot; mod ws_manager; pub use cache::*; pub use conflict_resolve::*; +// pub use history::*; pub use rev_manager::*; pub use rev_persistence::*; +pub use snapshot::*; pub use ws_manager::*; #[macro_use] diff --git a/frontend/rust-lib/flowy-revision/src/rev_manager.rs b/frontend/rust-lib/flowy-revision/src/rev_manager.rs index 68d8db6f27..bd4d01740d 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_manager.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_manager.rs @@ -1,5 +1,5 @@ use crate::disk::RevisionState; -use crate::{RevisionPersistence, WSDataProviderDataSource}; +use crate::{RevisionPersistence, RevisionSnapshotDiskCache, RevisionSnapshotManager, WSDataProviderDataSource}; use bytes::Bytes; use flowy_error::{FlowyError, FlowyResult}; use flowy_sync::{ @@ -45,14 +45,31 @@ pub struct RevisionManager { user_id: String, rev_id_counter: RevIdCounter, rev_persistence: Arc, - + #[allow(dead_code)] + rev_snapshot: Arc, + rev_compactor: Arc, #[cfg(feature = "flowy_unit_test")] rev_ack_notifier: tokio::sync::broadcast::Sender, } impl RevisionManager { - pub fn new(user_id: &str, object_id: &str, rev_persistence: Arc) -> Self { + pub fn new( + user_id: &str, + object_id: &str, + rev_persistence: RevisionPersistence, + rev_compactor: C, + snapshot_persistence: SP, + ) -> Self + where + SP: 'static + RevisionSnapshotDiskCache, + C: 'static + RevisionCompactor, + { let rev_id_counter = RevIdCounter::new(0); + let rev_compactor = Arc::new(rev_compactor); + + let rev_persistence = Arc::new(rev_persistence); + + let rev_snapshot = Arc::new(RevisionSnapshotManager::new(user_id, object_id, snapshot_persistence)); #[cfg(feature = "flowy_unit_test")] let (revision_ack_notifier, _) = tokio::sync::broadcast::channel(1); @@ -61,7 +78,8 @@ impl RevisionManager { user_id: user_id.to_owned(), rev_id_counter, rev_persistence, - + rev_snapshot, + rev_compactor, #[cfg(feature = "flowy_unit_test")] rev_ack_notifier: revision_ack_notifier, } @@ -100,20 +118,21 @@ impl RevisionManager { } let _ = self.rev_persistence.add_ack_revision(revision).await?; + // self.rev_history.add_revision(revision).await; self.rev_id_counter.set(revision.rev_id); Ok(()) } #[tracing::instrument(level = "debug", skip_all, err)] - pub async fn add_local_revision<'a>( - &'a self, - revision: &Revision, - compactor: Box, - ) -> Result<(), FlowyError> { + pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), FlowyError> { if revision.delta_data.is_empty() { return Err(FlowyError::internal().context("Delta data should be empty")); } - let rev_id = self.rev_persistence.add_sync_revision(revision, compactor).await?; + let rev_id = self + .rev_persistence + .add_sync_revision(revision, &self.rev_compactor) + .await?; + // self.rev_history.add_revision(revision).await; self.rev_id_counter.set(rev_id); Ok(()) } diff --git a/frontend/rust-lib/flowy-revision/src/rev_persistence.rs b/frontend/rust-lib/flowy-revision/src/rev_persistence.rs index 6412b26ec6..98bc89ba24 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_persistence.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_persistence.rs @@ -24,13 +24,13 @@ pub struct RevisionPersistence { } impl RevisionPersistence { - pub fn new( - user_id: &str, - object_id: &str, - disk_cache: Arc>, - ) -> RevisionPersistence { + pub fn new(user_id: &str, object_id: &str, disk_cache: C) -> RevisionPersistence + where + C: 'static + RevisionDiskCache, + { let object_id = object_id.to_owned(); let user_id = user_id.to_owned(); + let disk_cache = Arc::new(disk_cache) as Arc>; let sync_seq = RwLock::new(RevisionSyncSequence::new()); let memory_cache = Arc::new(RevisionMemoryCache::new(&object_id, Arc::new(disk_cache.clone()))); Self { @@ -63,7 +63,7 @@ impl RevisionPersistence { pub(crate) async fn add_sync_revision<'a>( &'a self, revision: &'a Revision, - compactor: Box, + compactor: &Arc, ) -> FlowyResult { let result = self.sync_seq.read().await.compact(); match result { diff --git a/frontend/rust-lib/flowy-revision/src/snapshot/mod.rs b/frontend/rust-lib/flowy-revision/src/snapshot/mod.rs new file mode 100644 index 0000000000..adc4ee2ccc --- /dev/null +++ b/frontend/rust-lib/flowy-revision/src/snapshot/mod.rs @@ -0,0 +1,5 @@ +mod persistence; +mod rev_snapshot; + +pub use persistence::*; +pub use rev_snapshot::*; diff --git a/frontend/rust-lib/flowy-revision/src/snapshot/persistence.rs b/frontend/rust-lib/flowy-revision/src/snapshot/persistence.rs new file mode 100644 index 0000000000..d8d7bae3a6 --- /dev/null +++ b/frontend/rust-lib/flowy-revision/src/snapshot/persistence.rs @@ -0,0 +1,31 @@ +#![allow(clippy::all)] +#![allow(dead_code)] +#![allow(unused_variables)] +use crate::{RevisionSnapshotDiskCache, RevisionSnapshotInfo}; +use flowy_database::ConnectionPool; +use flowy_error::FlowyResult; +use std::sync::Arc; + +pub struct SQLiteRevisionSnapshotPersistence { + object_id: String, + pool: Arc, +} + +impl SQLiteRevisionSnapshotPersistence { + pub fn new(object_id: &str, pool: Arc) -> Self { + Self { + object_id: object_id.to_string(), + pool, + } + } +} + +impl RevisionSnapshotDiskCache for SQLiteRevisionSnapshotPersistence { + fn write_snapshot(&self, object_id: &str, rev_id: i64, data: Vec) -> FlowyResult<()> { + todo!() + } + + fn read_snapshot(&self, object_id: &str, rev_id: i64) -> FlowyResult { + todo!() + } +} diff --git a/frontend/rust-lib/flowy-revision/src/snapshot/rev_snapshot.rs b/frontend/rust-lib/flowy-revision/src/snapshot/rev_snapshot.rs new file mode 100644 index 0000000000..047d21607e --- /dev/null +++ b/frontend/rust-lib/flowy-revision/src/snapshot/rev_snapshot.rs @@ -0,0 +1,32 @@ +#![allow(clippy::all)] +#![allow(dead_code)] +#![allow(unused_variables)] +use flowy_error::FlowyResult; +use std::sync::Arc; + +pub trait RevisionSnapshotDiskCache: Send + Sync { + fn write_snapshot(&self, object_id: &str, rev_id: i64, data: Vec) -> FlowyResult<()>; + fn read_snapshot(&self, object_id: &str, rev_id: i64) -> FlowyResult; +} + +pub struct RevisionSnapshotManager { + user_id: String, + object_id: String, + disk_cache: Arc, +} + +impl RevisionSnapshotManager { + pub fn new(user_id: &str, object_id: &str, disk_cache: D) -> Self + where + D: RevisionSnapshotDiskCache + 'static, + { + let disk_cache = Arc::new(disk_cache); + Self { + user_id: user_id.to_string(), + object_id: object_id.to_string(), + disk_cache, + } + } +} + +pub struct RevisionSnapshotInfo {} diff --git a/frontend/rust-lib/flowy-revision/src/ws_manager.rs b/frontend/rust-lib/flowy-revision/src/ws_manager.rs index 42ad39c617..eb7539c380 100644 --- a/frontend/rust-lib/flowy-revision/src/ws_manager.rs +++ b/frontend/rust-lib/flowy-revision/src/ws_manager.rs @@ -1,6 +1,5 @@ use crate::ConflictRevisionSink; use async_stream::stream; - use bytes::Bytes; use flowy_error::{FlowyError, FlowyResult}; use flowy_sync::entities::{ diff --git a/frontend/rust-lib/flowy-text-block/src/manager.rs b/frontend/rust-lib/flowy-text-block/src/manager.rs index 4ccf015a7f..57e875668e 100644 --- a/frontend/rust-lib/flowy-text-block/src/manager.rs +++ b/frontend/rust-lib/flowy-text-block/src/manager.rs @@ -1,10 +1,13 @@ +use crate::queue::TextBlockRevisionCompactor; use crate::{editor::TextBlockEditor, errors::FlowyError, BlockCloudService}; use bytes::Bytes; use dashmap::DashMap; use flowy_database::ConnectionPool; use flowy_error::FlowyResult; use flowy_revision::disk::SQLiteTextBlockRevisionPersistence; -use flowy_revision::{RevisionCloudService, RevisionManager, RevisionPersistence, RevisionWebSocket}; +use flowy_revision::{ + RevisionCloudService, RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence, +}; use flowy_sync::entities::{ revision::{md5, RepeatedRevision, Revision}, text_block::{TextBlockDeltaPB, TextBlockIdPB}, @@ -139,9 +142,20 @@ impl TextBlockManager { fn make_rev_manager(&self, doc_id: &str, pool: Arc) -> Result { let user_id = self.user.user_id()?; - let disk_cache = Arc::new(SQLiteTextBlockRevisionPersistence::new(&user_id, pool)); - let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, doc_id, disk_cache)); - Ok(RevisionManager::new(&user_id, doc_id, rev_persistence)) + let disk_cache = SQLiteTextBlockRevisionPersistence::new(&user_id, pool.clone()); + let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache); + // let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone()); + let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(doc_id, pool); + let rev_compactor = TextBlockRevisionCompactor(); + + Ok(RevisionManager::new( + &user_id, + doc_id, + rev_persistence, + rev_compactor, + // history_persistence, + snapshot_persistence, + )) } } diff --git a/frontend/rust-lib/flowy-text-block/src/queue.rs b/frontend/rust-lib/flowy-text-block/src/queue.rs index 7c11afd0ff..343ad11e7b 100644 --- a/frontend/rust-lib/flowy-text-block/src/queue.rs +++ b/frontend/rust-lib/flowy-text-block/src/queue.rs @@ -186,10 +186,7 @@ impl EditBlockQueue { &user_id, md5, ); - let _ = self - .rev_manager - .add_local_revision(&revision, Box::new(TextBlockRevisionCompactor())) - .await?; + let _ = self.rev_manager.add_local_revision(&revision).await?; Ok(rev_id.into()) } } diff --git a/shared-lib/flowy-sync/src/client_grid/grid_block_revsion_pad.rs b/shared-lib/flowy-sync/src/client_grid/grid_block_revsion_pad.rs index 8ee18309ad..16b98dc7fc 100644 --- a/shared-lib/flowy-sync/src/client_grid/grid_block_revsion_pad.rs +++ b/shared-lib/flowy-sync/src/client_grid/grid_block_revsion_pad.rs @@ -245,13 +245,13 @@ pub struct GridBlockMetaChange { pub md5: String, } -pub fn make_block_meta_delta(block_rev: &GridBlockRevision) -> GridBlockRevisionDelta { +pub fn make_grid_block_delta(block_rev: &GridBlockRevision) -> GridBlockRevisionDelta { let json = serde_json::to_string(&block_rev).unwrap(); PlainTextDeltaBuilder::new().insert(&json).build() } -pub fn make_block_meta_revisions(user_id: &str, grid_block_meta_data: &GridBlockRevision) -> RepeatedRevision { - let delta = make_block_meta_delta(grid_block_meta_data); +pub fn make_grid_block_revisions(user_id: &str, grid_block_meta_data: &GridBlockRevision) -> RepeatedRevision { + let delta = make_grid_block_delta(grid_block_meta_data); let bytes = delta.to_delta_bytes(); let revision = Revision::initial_revision(user_id, &grid_block_meta_data.block_id, bytes); revision.into() @@ -264,7 +264,7 @@ impl std::default::Default for GridBlockRevisionPad { rows: vec![], }; - let delta = make_block_meta_delta(&block_revision); + let delta = make_grid_block_delta(&block_revision); GridBlockRevisionPad { block_revision, delta } } } diff --git a/shared-lib/flowy-sync/src/client_grid/grid_meta_pad.rs b/shared-lib/flowy-sync/src/client_grid/grid_meta_pad.rs new file mode 100644 index 0000000000..3ef9251e4a --- /dev/null +++ b/shared-lib/flowy-sync/src/client_grid/grid_meta_pad.rs @@ -0,0 +1,432 @@ +use crate::entities::revision::{md5, RepeatedRevision, Revision}; +use crate::errors::{internal_error, CollaborateError, CollaborateResult}; +use crate::util::{cal_diff, make_delta_from_revisions}; +use bytes::Bytes; +use flowy_grid_data_model::entities::{ + gen_block_id, gen_grid_id, FieldChangesetParams, FieldMeta, FieldOrder, FieldType, GridBlockInfoChangeset, + GridBlockMetaSnapshot, GridMeta, +}; +use lib_infra::util::move_vec_element; +use lib_ot::core::{OperationTransformable, PlainTextAttributes, PlainTextDelta, PlainTextDeltaBuilder}; +use std::collections::HashMap; +use std::sync::Arc; + +pub type GridMetaDelta = PlainTextDelta; +pub type GridDeltaBuilder = PlainTextDeltaBuilder; + +pub struct GridMetaPad { + pub(crate) grid_meta: Arc, + pub(crate) delta: GridMetaDelta, +} + +pub trait JsonDeserializer { + fn deserialize(&self, type_option_data: Vec) -> CollaborateResult; +} + +impl GridMetaPad { + pub async fn duplicate_grid_meta(&self) -> (Vec, Vec) { + let fields = self.grid_meta.fields.to_vec(); + + let blocks = self + .grid_meta + .blocks + .iter() + .map(|block| { + let mut duplicated_block = block.clone(); + duplicated_block.block_id = gen_block_id(); + duplicated_block + }) + .collect::>(); + + (fields, blocks) + } + + pub fn from_delta(delta: GridMetaDelta) -> CollaborateResult { + let s = delta.to_str()?; + let grid: GridMeta = serde_json::from_str(&s) + .map_err(|e| CollaborateError::internal().context(format!("Deserialize delta to grid failed: {}", e)))?; + + Ok(Self { + grid_meta: Arc::new(grid), + delta, + }) + } + + pub fn from_revisions(_grid_id: &str, revisions: Vec) -> CollaborateResult { + let grid_delta: GridMetaDelta = make_delta_from_revisions::(revisions)?; + Self::from_delta(grid_delta) + } + + #[tracing::instrument(level = "debug", skip_all, err)] + pub fn create_field_meta( + &mut self, + new_field_meta: FieldMeta, + start_field_id: Option, + ) -> CollaborateResult> { + self.modify_grid(|grid_meta| { + // Check if the field exists or not + if grid_meta + .fields + .iter() + .any(|field_meta| field_meta.id == new_field_meta.id) + { + tracing::error!("Duplicate grid field"); + return Ok(None); + } + + let insert_index = match start_field_id { + None => None, + Some(start_field_id) => grid_meta.fields.iter().position(|field| field.id == start_field_id), + }; + + match insert_index { + None => grid_meta.fields.push(new_field_meta), + Some(index) => grid_meta.fields.insert(index, new_field_meta), + } + Ok(Some(())) + }) + } + + pub fn delete_field_meta(&mut self, field_id: &str) -> CollaborateResult> { + self.modify_grid( + |grid_meta| match grid_meta.fields.iter().position(|field| field.id == field_id) { + None => Ok(None), + Some(index) => { + grid_meta.fields.remove(index); + Ok(Some(())) + } + }, + ) + } + + pub fn duplicate_field_meta( + &mut self, + field_id: &str, + duplicated_field_id: &str, + ) -> CollaborateResult> { + self.modify_grid( + |grid_meta| match grid_meta.fields.iter().position(|field| field.id == field_id) { + None => Ok(None), + Some(index) => { + let mut duplicate_field_meta = grid_meta.fields[index].clone(); + duplicate_field_meta.id = duplicated_field_id.to_string(); + duplicate_field_meta.name = format!("{} (copy)", duplicate_field_meta.name); + grid_meta.fields.insert(index + 1, duplicate_field_meta); + Ok(Some(())) + } + }, + ) + } + + pub fn switch_to_field( + &mut self, + field_id: &str, + field_type: FieldType, + type_option_json_builder: B, + ) -> CollaborateResult> + where + B: FnOnce(&FieldType) -> String, + { + self.modify_grid(|grid_meta| { + // + match grid_meta.fields.iter_mut().find(|field_meta| field_meta.id == field_id) { + None => { + tracing::warn!("Can not find the field with id: {}", field_id); + Ok(None) + } + Some(field_meta) => { + if field_meta.get_type_option_str(&field_type).is_none() { + let type_option_json = type_option_json_builder(&field_type); + field_meta.insert_type_option_str(&field_type, type_option_json); + } + + field_meta.field_type = field_type; + Ok(Some(())) + } + } + }) + } + + pub fn update_field_meta( + &mut self, + changeset: FieldChangesetParams, + deserializer: T, + ) -> CollaborateResult> { + let field_id = changeset.field_id.clone(); + self.modify_field(&field_id, |field| { + let mut is_changed = None; + if let Some(name) = changeset.name { + field.name = name; + is_changed = Some(()) + } + + if let Some(desc) = changeset.desc { + field.desc = desc; + is_changed = Some(()) + } + + if let Some(field_type) = changeset.field_type { + field.field_type = field_type; + is_changed = Some(()) + } + + if let Some(frozen) = changeset.frozen { + field.frozen = frozen; + is_changed = Some(()) + } + + if let Some(visibility) = changeset.visibility { + field.visibility = visibility; + is_changed = Some(()) + } + + if let Some(width) = changeset.width { + field.width = width; + is_changed = Some(()) + } + + if let Some(type_option_data) = changeset.type_option_data { + match deserializer.deserialize(type_option_data) { + Ok(json_str) => { + let field_type = field.field_type.clone(); + field.insert_type_option_str(&field_type, json_str); + is_changed = Some(()) + } + Err(err) => { + tracing::error!("Deserialize data to type option json failed: {}", err); + } + } + } + + Ok(is_changed) + }) + } + + pub fn get_field_meta(&self, field_id: &str) -> Option<(usize, &FieldMeta)> { + self.grid_meta + .fields + .iter() + .enumerate() + .find(|(_, field)| field.id == field_id) + } + + pub fn replace_field_meta(&mut self, field_meta: FieldMeta) -> CollaborateResult> { + self.modify_grid( + |grid_meta| match grid_meta.fields.iter().position(|field| field.id == field_meta.id) { + None => Ok(None), + Some(index) => { + grid_meta.fields.remove(index); + grid_meta.fields.insert(index, field_meta); + Ok(Some(())) + } + }, + ) + } + + pub fn move_field( + &mut self, + field_id: &str, + from_index: usize, + to_index: usize, + ) -> CollaborateResult> { + self.modify_grid(|grid_meta| { + match move_vec_element( + &mut grid_meta.fields, + |field| field.id == field_id, + from_index, + to_index, + ) + .map_err(internal_error)? + { + true => Ok(Some(())), + false => Ok(None), + } + }) + } + + pub fn contain_field(&self, field_id: &str) -> bool { + self.grid_meta.fields.iter().any(|field| field.id == field_id) + } + + pub fn get_field_orders(&self) -> Vec { + self.grid_meta.fields.iter().map(FieldOrder::from).collect() + } + + pub fn get_field_metas(&self, field_orders: Option>) -> CollaborateResult> { + match field_orders { + None => Ok(self.grid_meta.fields.clone()), + Some(field_orders) => { + let field_by_field_id = self + .grid_meta + .fields + .iter() + .map(|field| (&field.id, field)) + .collect::>(); + + let fields = field_orders + .iter() + .flat_map(|field_order| match field_by_field_id.get(&field_order.field_id) { + None => { + tracing::error!("Can't find the field with id: {}", field_order.field_id); + None + } + Some(field) => Some((*field).clone()), + }) + .collect::>(); + Ok(fields) + } + } + } + + pub fn create_block_meta(&mut self, block: GridBlockMetaSnapshot) -> CollaborateResult> { + self.modify_grid(|grid_meta| { + if grid_meta.blocks.iter().any(|b| b.block_id == block.block_id) { + tracing::warn!("Duplicate grid block"); + Ok(None) + } else { + match grid_meta.blocks.last() { + None => grid_meta.blocks.push(block), + Some(last_block) => { + if last_block.start_row_index > block.start_row_index + && last_block.len() > block.start_row_index + { + let msg = "GridBlock's start_row_index should be greater than the last_block's start_row_index and its len".to_string(); + return Err(CollaborateError::internal().context(msg)) + } + grid_meta.blocks.push(block); + } + } + Ok(Some(())) + } + }) + } + + pub fn get_block_metas(&self) -> Vec { + self.grid_meta.blocks.clone() + } + + pub fn update_block_meta(&mut self, changeset: GridBlockInfoChangeset) -> CollaborateResult> { + let block_id = changeset.block_id.clone(); + self.modify_block(&block_id, |block| { + let mut is_changed = None; + + if let Some(row_count) = changeset.row_count { + block.row_count = row_count; + is_changed = Some(()); + } + + if let Some(start_row_index) = changeset.start_row_index { + block.start_row_index = start_row_index; + is_changed = Some(()); + } + + Ok(is_changed) + }) + } + + pub fn md5(&self) -> String { + md5(&self.delta.to_delta_bytes()) + } + + pub fn delta_str(&self) -> String { + self.delta.to_delta_str() + } + + pub fn delta_bytes(&self) -> Bytes { + self.delta.to_delta_bytes() + } + + pub fn fields(&self) -> &[FieldMeta] { + &self.grid_meta.fields + } + + fn modify_grid(&mut self, f: F) -> CollaborateResult> + where + F: FnOnce(&mut GridMeta) -> CollaborateResult>, + { + let cloned_grid = self.grid_meta.clone(); + match f(Arc::make_mut(&mut self.grid_meta))? { + None => Ok(None), + Some(_) => { + let old = json_from_grid(&cloned_grid)?; + let new = json_from_grid(&self.grid_meta)?; + match cal_diff::(old, new) { + None => Ok(None), + Some(delta) => { + self.delta = self.delta.compose(&delta)?; + Ok(Some(GridChangeset { delta, md5: self.md5() })) + } + } + } + } + } + + pub fn modify_block(&mut self, block_id: &str, f: F) -> CollaborateResult> + where + F: FnOnce(&mut GridBlockMetaSnapshot) -> CollaborateResult>, + { + self.modify_grid( + |grid_meta| match grid_meta.blocks.iter().position(|block| block.block_id == block_id) { + None => { + tracing::warn!("[GridMetaPad]: Can't find any block with id: {}", block_id); + Ok(None) + } + Some(index) => f(&mut grid_meta.blocks[index]), + }, + ) + } + + pub fn modify_field(&mut self, field_id: &str, f: F) -> CollaborateResult> + where + F: FnOnce(&mut FieldMeta) -> CollaborateResult>, + { + self.modify_grid( + |grid_meta| match grid_meta.fields.iter().position(|field| field.id == field_id) { + None => { + tracing::warn!("[GridMetaPad]: Can't find any field with id: {}", field_id); + Ok(None) + } + Some(index) => f(&mut grid_meta.fields[index]), + }, + ) + } +} + +fn json_from_grid(grid: &Arc) -> CollaborateResult { + let json = serde_json::to_string(grid) + .map_err(|err| internal_error(format!("Serialize grid to json str failed. {:?}", err)))?; + Ok(json) +} + +pub struct GridChangeset { + pub delta: GridMetaDelta, + /// md5: the md5 of the grid after applying the change. + pub md5: String, +} + +pub fn make_grid_delta(grid_meta: &GridMeta) -> GridMetaDelta { + let json = serde_json::to_string(&grid_meta).unwrap(); + PlainTextDeltaBuilder::new().insert(&json).build() +} + +pub fn make_grid_revisions(user_id: &str, grid_meta: &GridMeta) -> RepeatedRevision { + let delta = make_grid_delta(grid_meta); + let bytes = delta.to_delta_bytes(); + let revision = Revision::initial_revision(user_id, &grid_meta.grid_id, bytes); + revision.into() +} + +impl std::default::Default for GridMetaPad { + fn default() -> Self { + let grid = GridMeta { + grid_id: gen_grid_id(), + fields: vec![], + blocks: vec![], + }; + let delta = make_grid_delta(&grid); + GridMetaPad { + grid_meta: Arc::new(grid), + delta, + } + } +}