Merge pull request #654 from AppFlowy-IO/feat/snapshot_db_schema

Add snapshot db schema (WIP)
This commit is contained in:
Nathan.fooo 2022-07-20 16:58:07 +08:00 committed by GitHub
commit 0f4f51cc0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1685 additions and 97 deletions

View File

@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
DROP TABLE rev_snapshot;

View File

@ -0,0 +1,7 @@
-- Your SQL goes here
CREATE TABLE rev_snapshot (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
object_id TEXT NOT NULL DEFAULT '',
rev_id BIGINT NOT NULL DEFAULT 0,
data BLOB NOT NULL DEFAULT (x'')
);

View File

@ -49,6 +49,15 @@ table! {
}
}
table! {
rev_snapshot (id) {
id -> Integer,
object_id -> Text,
rev_id -> BigInt,
data -> Binary,
}
}
table! {
rev_table (id) {
id -> Integer,
@ -116,6 +125,7 @@ allow_tables_to_appear_in_same_query!(
grid_meta_rev_table,
grid_rev_table,
kv_table,
rev_snapshot,
rev_table,
trash_table,
user_table,

View File

@ -1,4 +1,5 @@
use crate::entities::view::ViewDataType;
use crate::services::folder_editor::FolderRevisionCompactor;
use crate::{
dart_notification::{send_dart_notification, FolderNotification},
entities::workspace::RepeatedWorkspacePB,
@ -13,7 +14,7 @@ use bytes::Bytes;
use flowy_error::FlowyError;
use flowy_folder_data_model::user_default;
use flowy_revision::disk::SQLiteTextBlockRevisionPersistence;
use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket};
use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence};
use flowy_sync::client_document::default::{initial_quill_delta_string, initial_read_me};
use flowy_sync::{client_folder::FolderPad, entities::ws_data::ServerRevisionWSData};
use lazy_static::lazy_static;
@ -161,9 +162,20 @@ impl FolderManager {
let _ = self.persistence.initialize(user_id, &folder_id).await?;
let pool = self.persistence.db_pool()?;
let disk_cache = Arc::new(SQLiteTextBlockRevisionPersistence::new(user_id, pool));
let rev_persistence = Arc::new(RevisionPersistence::new(user_id, folder_id.as_ref(), disk_cache));
let rev_manager = RevisionManager::new(user_id, folder_id.as_ref(), rev_persistence);
let object_id = folder_id.as_ref();
let disk_cache = SQLiteTextBlockRevisionPersistence::new(user_id, pool.clone());
let rev_persistence = RevisionPersistence::new(user_id, object_id, disk_cache);
let rev_compactor = FolderRevisionCompactor();
// let history_persistence = SQLiteRevisionHistoryPersistence::new(object_id, pool.clone());
let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(object_id, pool);
let rev_manager = RevisionManager::new(
user_id,
folder_id.as_ref(),
rev_persistence,
rev_compactor,
// history_persistence,
snapshot_persistence,
);
let folder_editor = FolderEditor::new(user_id, &folder_id, token, rev_manager, self.web_socket.clone()).await?;
*self.folder_editor.write().await = Some(Arc::new(folder_editor));

View File

@ -89,11 +89,7 @@ impl FolderEditor {
&self.user_id,
md5,
);
let _ = futures::executor::block_on(async {
self.rev_manager
.add_local_revision(&revision, Box::new(FolderRevisionCompactor()))
.await
})?;
let _ = futures::executor::block_on(async { self.rev_manager.add_local_revision(&revision).await })?;
Ok(())
}
@ -133,7 +129,7 @@ impl FolderEditor {
}
}
struct FolderRevisionCompactor();
pub struct FolderRevisionCompactor();
impl RevisionCompactor for FolderRevisionCompactor {
fn bytes_from_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
let delta = make_delta_from_revisions::<PlainTextAttributes>(revisions)?;

View File

@ -85,7 +85,7 @@ impl FolderMigration {
return Ok(None);
}
let pool = self.database.db_pool()?;
let disk_cache = Arc::new(SQLiteTextBlockRevisionPersistence::new(user_id, pool));
let disk_cache = SQLiteTextBlockRevisionPersistence::new(user_id, pool);
let rev_persistence = Arc::new(RevisionPersistence::new(user_id, folder_id.as_ref(), disk_cache));
let (revisions, _) = RevisionLoader {
object_id: folder_id.as_ref().to_owned(),

View File

@ -1,4 +1,5 @@
use crate::services::grid_editor::GridRevisionEditor;
use crate::services::block_revision_editor::GridBlockRevisionCompactor;
use crate::services::grid_editor::{GridRevisionCompactor, GridRevisionEditor};
use crate::services::persistence::block_index::BlockIndexCache;
use crate::services::persistence::kv::GridKVPersistence;
use crate::services::persistence::GridDatabase;
@ -8,9 +9,9 @@ use dashmap::DashMap;
use flowy_database::ConnectionPool;
use flowy_error::{FlowyError, FlowyResult};
use flowy_grid_data_model::revision::{BuildGridContext, GridRevision};
use flowy_revision::disk::{SQLiteGridBlockMetaRevisionPersistence, SQLiteGridRevisionPersistence};
use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket};
use flowy_sync::client_grid::{make_block_meta_delta, make_grid_delta};
use flowy_revision::disk::{SQLiteGridBlockRevisionPersistence, SQLiteGridRevisionPersistence};
use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence};
use flowy_sync::client_grid::{make_grid_block_delta, make_grid_delta};
use flowy_sync::entities::revision::{RepeatedRevision, Revision};
use std::sync::Arc;
use tokio::sync::RwLock;
@ -61,14 +62,10 @@ impl GridManager {
}
#[tracing::instrument(level = "debug", skip_all, err)]
pub async fn create_grid_block_meta<T: AsRef<str>>(
&self,
block_id: T,
revisions: RepeatedRevision,
) -> FlowyResult<()> {
pub async fn create_grid_block<T: AsRef<str>>(&self, block_id: T, revisions: RepeatedRevision) -> FlowyResult<()> {
let block_id = block_id.as_ref();
let db_pool = self.grid_user.db_pool()?;
let rev_manager = self.make_grid_block_meta_rev_manager(block_id, db_pool)?;
let rev_manager = self.make_grid_block_rev_manager(block_id, db_pool)?;
let _ = rev_manager.reset_object(revisions).await?;
Ok(())
}
@ -113,7 +110,7 @@ impl GridManager {
None => {
tracing::trace!("Create grid editor with id: {}", grid_id);
let db_pool = self.grid_user.db_pool()?;
let editor = self.make_grid_editor(grid_id, db_pool).await?;
let editor = self.make_grid_rev_editor(grid_id, db_pool).await?;
if self.grid_editors.contains_key(grid_id) {
tracing::warn!("Grid:{} already exists in cache", grid_id);
@ -127,7 +124,7 @@ impl GridManager {
}
#[tracing::instrument(level = "trace", skip(self, pool), err)]
async fn make_grid_editor(
async fn make_grid_rev_editor(
&self,
grid_id: &str,
pool: Arc<ConnectionPool>,
@ -147,22 +144,22 @@ impl GridManager {
pub fn make_grid_rev_manager(&self, grid_id: &str, pool: Arc<ConnectionPool>) -> FlowyResult<RevisionManager> {
let user_id = self.grid_user.user_id()?;
let disk_cache = Arc::new(SQLiteGridRevisionPersistence::new(&user_id, pool));
let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, grid_id, disk_cache));
let rev_manager = RevisionManager::new(&user_id, grid_id, rev_persistence);
let disk_cache = SQLiteGridRevisionPersistence::new(&user_id, pool.clone());
let rev_persistence = RevisionPersistence::new(&user_id, grid_id, disk_cache);
let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(grid_id, pool);
let rev_compactor = GridRevisionCompactor();
let rev_manager = RevisionManager::new(&user_id, grid_id, rev_persistence, rev_compactor, snapshot_persistence);
Ok(rev_manager)
}
fn make_grid_block_meta_rev_manager(
&self,
block_d: &str,
pool: Arc<ConnectionPool>,
) -> FlowyResult<RevisionManager> {
fn make_grid_block_rev_manager(&self, block_id: &str, pool: Arc<ConnectionPool>) -> FlowyResult<RevisionManager> {
let user_id = self.grid_user.user_id()?;
let disk_cache = Arc::new(SQLiteGridBlockMetaRevisionPersistence::new(&user_id, pool));
let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, block_d, disk_cache));
let rev_manager = RevisionManager::new(&user_id, block_d, rev_persistence);
let disk_cache = SQLiteGridBlockRevisionPersistence::new(&user_id, pool.clone());
let rev_persistence = RevisionPersistence::new(&user_id, block_id, disk_cache);
let rev_compactor = GridBlockRevisionCompactor();
let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(block_id, pool);
let rev_manager =
RevisionManager::new(&user_id, block_id, rev_persistence, rev_compactor, snapshot_persistence);
Ok(rev_manager)
}
}
@ -181,13 +178,11 @@ pub async fn make_grid_view_data(
});
// Create grid's block
let grid_block_meta_delta = make_block_meta_delta(block_meta_data);
let block_meta_delta_data = grid_block_meta_delta.to_delta_bytes();
let grid_block_delta = make_grid_block_delta(block_meta_data);
let block_delta_data = grid_block_delta.to_delta_bytes();
let repeated_revision: RepeatedRevision =
Revision::initial_revision(user_id, block_id, block_meta_delta_data).into();
let _ = grid_manager
.create_grid_block_meta(&block_id, repeated_revision)
.await?;
Revision::initial_revision(user_id, block_id, block_delta_data).into();
let _ = grid_manager.create_grid_block(&block_id, repeated_revision).await?;
}
let grid_rev = GridRevision::from_build_context(view_id, build_context);

View File

@ -1,7 +1,7 @@
use crate::dart_notification::{send_dart_notification, GridNotification};
use crate::entities::{CellChangesetPB, GridBlockChangesetPB, GridRowPB, InsertedRowPB, UpdatedRowPB};
use crate::manager::GridUser;
use crate::services::block_revision_editor::GridBlockRevisionEditor;
use crate::services::block_revision_editor::{GridBlockRevisionCompactor, GridBlockRevisionEditor};
use crate::services::persistence::block_index::BlockIndexCache;
use crate::services::row::{block_from_row_orders, GridBlockSnapshot};
use dashmap::DashMap;
@ -9,8 +9,8 @@ use flowy_error::FlowyResult;
use flowy_grid_data_model::revision::{
GridBlockMetaRevision, GridBlockMetaRevisionChangeset, RowMetaChangeset, RowRevision,
};
use flowy_revision::disk::SQLiteGridBlockMetaRevisionPersistence;
use flowy_revision::{RevisionManager, RevisionPersistence};
use flowy_revision::disk::SQLiteGridBlockRevisionPersistence;
use flowy_revision::{RevisionManager, RevisionPersistence, SQLiteRevisionSnapshotPersistence};
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
@ -280,8 +280,10 @@ async fn make_block_editor(user: &Arc<dyn GridUser>, block_id: &str) -> FlowyRes
let user_id = user.user_id()?;
let pool = user.db_pool()?;
let disk_cache = Arc::new(SQLiteGridBlockMetaRevisionPersistence::new(&user_id, pool));
let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, block_id, disk_cache));
let rev_manager = RevisionManager::new(&user_id, block_id, rev_persistence);
let disk_cache = SQLiteGridBlockRevisionPersistence::new(&user_id, pool.clone());
let rev_persistence = RevisionPersistence::new(&user_id, block_id, disk_cache);
let rev_compactor = GridBlockRevisionCompactor();
let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(block_id, pool);
let rev_manager = RevisionManager::new(&user_id, block_id, rev_persistence, rev_compactor, snapshot_persistence);
GridBlockRevisionEditor::new(&user_id, &token, block_id, rev_manager).await
}

View File

@ -26,7 +26,7 @@ impl GridBlockRevisionEditor {
block_id: &str,
mut rev_manager: RevisionManager,
) -> FlowyResult<Self> {
let cloud = Arc::new(GridBlockMetaRevisionCloudService {
let cloud = Arc::new(GridBlockRevisionCloudService {
token: token.to_owned(),
});
let block_meta_pad = rev_manager.load::<GridBlockMetaPadBuilder>(Some(cloud)).await?;
@ -170,20 +170,17 @@ impl GridBlockRevisionEditor {
&user_id,
md5,
);
let _ = self
.rev_manager
.add_local_revision(&revision, Box::new(GridBlockMetaRevisionCompactor()))
.await?;
let _ = self.rev_manager.add_local_revision(&revision).await?;
Ok(())
}
}
struct GridBlockMetaRevisionCloudService {
struct GridBlockRevisionCloudService {
#[allow(dead_code)]
token: String,
}
impl RevisionCloudService for GridBlockMetaRevisionCloudService {
impl RevisionCloudService for GridBlockRevisionCloudService {
#[tracing::instrument(level = "trace", skip(self))]
fn fetch_object(&self, _user_id: &str, _object_id: &str) -> FutureResult<Vec<Revision>, FlowyError> {
FutureResult::new(async move { Ok(vec![]) })
@ -200,8 +197,8 @@ impl RevisionObjectBuilder for GridBlockMetaPadBuilder {
}
}
struct GridBlockMetaRevisionCompactor();
impl RevisionCompactor for GridBlockMetaRevisionCompactor {
pub struct GridBlockRevisionCompactor();
impl RevisionCompactor for GridBlockRevisionCompactor {
fn bytes_from_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
let delta = make_delta_from_revisions::<PlainTextAttributes>(revisions)?;
Ok(delta.to_delta_bytes())

View File

@ -582,10 +582,7 @@ impl GridRevisionEditor {
&user_id,
md5,
);
let _ = self
.rev_manager
.add_local_revision(&revision, Box::new(GridRevisionCompactor()))
.await?;
let _ = self.rev_manager.add_local_revision(&revision).await?;
Ok(())
}
@ -664,7 +661,7 @@ impl RevisionCloudService for GridRevisionCloudService {
}
}
struct GridRevisionCompactor();
pub struct GridRevisionCompactor();
impl RevisionCompactor for GridRevisionCompactor {
fn bytes_from_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
let delta = make_delta_from_revisions::<PlainTextAttributes>(revisions)?;

View File

@ -0,0 +1,381 @@
use crate::grid::script::EditorScript::*;
use crate::grid::script::*;
use chrono::NaiveDateTime;
use flowy_grid::services::field::{
DateCellContentChangeset, DateCellData, MultiSelectTypeOption, SelectOption, SelectOptionCellContentChangeset,
SingleSelectTypeOption, SELECTION_IDS_SEPARATOR,
};
use flowy_grid::services::row::{decode_cell_data_from_type_option_cell_data, CreateRowMetaBuilder};
use flowy_grid_data_model::entities::{
CellChangeset, FieldChangesetParams, FieldType, GridBlockInfoChangeset, GridBlockMetaSnapshot, RowMetaChangeset,
TypeOptionDataEntry,
};
#[tokio::test]
async fn grid_create_field() {
let mut test = GridEditorTest::new().await;
let (text_field_params, text_field_meta) = create_text_field(&test.grid_id);
let (single_select_params, single_select_field) = create_single_select_field(&test.grid_id);
let scripts = vec![
CreateField {
params: text_field_params,
},
AssertFieldEqual {
field_index: test.field_count,
field_meta: text_field_meta,
},
];
test.run_scripts(scripts).await;
let scripts = vec![
CreateField {
params: single_select_params,
},
AssertFieldEqual {
field_index: test.field_count,
field_meta: single_select_field,
},
];
test.run_scripts(scripts).await;
}
#[tokio::test]
async fn grid_create_duplicate_field() {
let mut test = GridEditorTest::new().await;
let (params, _) = create_text_field(&test.grid_id);
let field_count = test.field_count;
let expected_field_count = field_count + 1;
let scripts = vec![
CreateField { params: params.clone() },
CreateField { params },
AssertFieldCount(expected_field_count),
];
test.run_scripts(scripts).await;
}
#[tokio::test]
async fn grid_update_field_with_empty_change() {
let mut test = GridEditorTest::new().await;
let (params, field_meta) = create_single_select_field(&test.grid_id);
let changeset = FieldChangesetParams {
field_id: field_meta.id.clone(),
grid_id: test.grid_id.clone(),
..Default::default()
};
let scripts = vec![
CreateField { params },
UpdateField { changeset },
AssertFieldEqual {
field_index: test.field_count,
field_meta,
},
];
test.run_scripts(scripts).await;
}
#[tokio::test]
async fn grid_update_field() {
let mut test = GridEditorTest::new().await;
let (single_select_params, single_select_field) = create_single_select_field(&test.grid_id);
let mut cloned_field = single_select_field.clone();
let mut single_select_type_option = SingleSelectTypeOption::from(&single_select_field);
single_select_type_option.options.push(SelectOption::new("Unknown"));
let changeset = FieldChangesetParams {
field_id: single_select_field.id.clone(),
grid_id: test.grid_id.clone(),
frozen: Some(true),
width: Some(1000),
type_option_data: Some(single_select_type_option.protobuf_bytes().to_vec()),
..Default::default()
};
cloned_field.frozen = true;
cloned_field.width = 1000;
cloned_field.insert_type_option_entry(&single_select_type_option);
let scripts = vec![
CreateField {
params: single_select_params,
},
UpdateField { changeset },
AssertFieldEqual {
field_index: test.field_count,
field_meta: cloned_field,
},
];
test.run_scripts(scripts).await;
}
#[tokio::test]
async fn grid_delete_field() {
let mut test = GridEditorTest::new().await;
let expected_field_count = test.field_count;
let (text_params, text_field) = create_text_field(&test.grid_id);
let scripts = vec![
CreateField { params: text_params },
DeleteField { field_meta: text_field },
AssertFieldCount(expected_field_count),
];
test.run_scripts(scripts).await;
}
#[tokio::test]
async fn grid_create_block() {
let grid_block = GridBlockMetaSnapshot::new();
let scripts = vec![
AssertBlockCount(1),
CreateBlock { block: grid_block },
AssertBlockCount(2),
];
GridEditorTest::new().await.run_scripts(scripts).await;
}
#[tokio::test]
async fn grid_update_block() {
let grid_block = GridBlockMetaSnapshot::new();
let mut cloned_grid_block = grid_block.clone();
let changeset = GridBlockInfoChangeset {
block_id: grid_block.block_id.clone(),
start_row_index: Some(2),
row_count: Some(10),
};
cloned_grid_block.start_row_index = 2;
cloned_grid_block.row_count = 10;
let scripts = vec![
AssertBlockCount(1),
CreateBlock { block: grid_block },
UpdateBlock { changeset },
AssertBlockCount(2),
AssertBlockEqual {
block_index: 1,
block: cloned_grid_block,
},
];
GridEditorTest::new().await.run_scripts(scripts).await;
}
#[tokio::test]
async fn grid_create_row() {
let scripts = vec![AssertRowCount(3), CreateEmptyRow, CreateEmptyRow, AssertRowCount(5)];
GridEditorTest::new().await.run_scripts(scripts).await;
}
#[tokio::test]
async fn grid_create_row2() {
let mut test = GridEditorTest::new().await;
let create_row_context = CreateRowMetaBuilder::new(&test.field_metas).build();
let scripts = vec![
AssertRowCount(3),
CreateRow {
context: create_row_context,
},
AssertRowCount(4),
];
test.run_scripts(scripts).await;
}
#[tokio::test]
async fn grid_update_row() {
let mut test = GridEditorTest::new().await;
let context = CreateRowMetaBuilder::new(&test.field_metas).build();
let changeset = RowMetaChangeset {
row_id: context.row_id.clone(),
height: None,
visibility: None,
cell_by_field_id: Default::default(),
};
let scripts = vec![
AssertRowCount(3),
CreateRow { context },
UpdateRow {
changeset: changeset.clone(),
},
AssertRow { changeset },
AssertRowCount(4),
];
test.run_scripts(scripts).await;
}
#[tokio::test]
async fn grid_delete_row() {
let mut test = GridEditorTest::new().await;
let context_1 = CreateRowMetaBuilder::new(&test.field_metas).build();
let context_2 = CreateRowMetaBuilder::new(&test.field_metas).build();
let row_ids = vec![context_1.row_id.clone(), context_2.row_id.clone()];
let scripts = vec![
AssertRowCount(3),
CreateRow { context: context_1 },
CreateRow { context: context_2 },
AssertBlockCount(1),
AssertBlock {
block_index: 0,
row_count: 5,
start_row_index: 0,
},
DeleteRow { row_ids },
AssertBlock {
block_index: 0,
row_count: 3,
start_row_index: 0,
},
];
test.run_scripts(scripts).await;
}
#[tokio::test]
async fn grid_row_add_cells_test() {
let mut test = GridEditorTest::new().await;
let mut builder = CreateRowMetaBuilder::new(&test.field_metas);
for field in &test.field_metas {
match field.field_type {
FieldType::RichText => {
builder.add_cell(&field.id, "hello world".to_owned()).unwrap();
}
FieldType::Number => {
builder.add_cell(&field.id, "18,443".to_owned()).unwrap();
}
FieldType::DateTime => {
builder
.add_cell(&field.id, make_date_cell_string("1647251762"))
.unwrap();
}
FieldType::SingleSelect => {
let type_option = SingleSelectTypeOption::from(field);
let option = type_option.options.first().unwrap();
builder.add_select_option_cell(&field.id, option.id.clone()).unwrap();
}
FieldType::MultiSelect => {
let type_option = MultiSelectTypeOption::from(field);
let ops_ids = type_option
.options
.iter()
.map(|option| option.id.clone())
.collect::<Vec<_>>()
.join(SELECTION_IDS_SEPARATOR);
builder.add_select_option_cell(&field.id, ops_ids).unwrap();
}
FieldType::Checkbox => {
builder.add_cell(&field.id, "false".to_string()).unwrap();
}
FieldType::URL => {
builder.add_cell(&field.id, "1".to_string()).unwrap();
}
}
}
let context = builder.build();
let scripts = vec![CreateRow { context }, AssertGridMetaPad];
test.run_scripts(scripts).await;
}
#[tokio::test]
async fn grid_row_add_date_cell_test() {
let mut test = GridEditorTest::new().await;
let mut builder = CreateRowMetaBuilder::new(&test.field_metas);
let mut date_field = None;
let timestamp = 1647390674;
for field in &test.field_metas {
if field.field_type == FieldType::DateTime {
date_field = Some(field.clone());
NaiveDateTime::from_timestamp(123, 0);
// The data should not be empty
assert!(builder.add_cell(&field.id, "".to_string()).is_err());
assert!(builder.add_cell(&field.id, make_date_cell_string("123")).is_ok());
assert!(builder
.add_cell(&field.id, make_date_cell_string(&timestamp.to_string()))
.is_ok());
}
}
let context = builder.build();
let date_field = date_field.unwrap();
let cell_data = context.cell_by_field_id.get(&date_field.id).unwrap().clone();
assert_eq!(
decode_cell_data_from_type_option_cell_data(cell_data.data.clone(), &date_field, &date_field.field_type)
.parse::<DateCellData>()
.unwrap()
.date,
"2022/03/16",
);
let scripts = vec![CreateRow { context }];
test.run_scripts(scripts).await;
}
#[tokio::test]
async fn grid_cell_update() {
let mut test = GridEditorTest::new().await;
let field_metas = &test.field_metas;
let row_metas = &test.row_metas;
let grid_blocks = &test.grid_blocks;
assert_eq!(row_metas.len(), 3);
assert_eq!(grid_blocks.len(), 1);
let block_id = &grid_blocks.first().unwrap().block_id;
let mut scripts = vec![];
for (index, row_meta) in row_metas.iter().enumerate() {
for field_meta in field_metas {
if index == 0 {
let data = match field_meta.field_type {
FieldType::RichText => "".to_string(),
FieldType::Number => "123".to_string(),
FieldType::DateTime => make_date_cell_string("123"),
FieldType::SingleSelect => {
let type_option = SingleSelectTypeOption::from(field_meta);
SelectOptionCellContentChangeset::from_insert(&type_option.options.first().unwrap().id).to_str()
}
FieldType::MultiSelect => {
let type_option = MultiSelectTypeOption::from(field_meta);
SelectOptionCellContentChangeset::from_insert(&type_option.options.first().unwrap().id).to_str()
}
FieldType::Checkbox => "1".to_string(),
FieldType::URL => "1".to_string(),
};
scripts.push(UpdateCell {
changeset: CellChangeset {
grid_id: block_id.to_string(),
row_id: row_meta.id.clone(),
field_id: field_meta.id.clone(),
cell_content_changeset: Some(data),
},
is_err: false,
});
}
if index == 1 {
let (data, is_err) = match field_meta.field_type {
FieldType::RichText => ("1".to_string().repeat(10001), true),
FieldType::Number => ("abc".to_string(), true),
FieldType::DateTime => ("abc".to_string(), true),
FieldType::SingleSelect => (SelectOptionCellContentChangeset::from_insert("abc").to_str(), false),
FieldType::MultiSelect => (SelectOptionCellContentChangeset::from_insert("abc").to_str(), false),
FieldType::Checkbox => ("2".to_string(), false),
FieldType::URL => ("2".to_string(), false),
};
scripts.push(UpdateCell {
changeset: CellChangeset {
grid_id: block_id.to_string(),
row_id: row_meta.id.clone(),
field_id: field_meta.id.clone(),
cell_content_changeset: Some(data),
},
is_err,
});
}
}
}
test.run_scripts(scripts).await;
}
fn make_date_cell_string(s: &str) -> String {
serde_json::to_string(&DateCellContentChangeset {
date: Some(s.to_string()),
time: None,
})
.unwrap()
}

View File

@ -0,0 +1,374 @@
use bytes::Bytes;
use flowy_grid::services::field::*;
use flowy_grid::services::grid_meta_editor::{GridMetaEditor, GridPadBuilder};
use flowy_grid::services::row::CreateRowMetaPayload;
use flowy_grid_data_model::entities::{
BuildGridContext, CellChangeset, Field, FieldChangesetParams, FieldMeta, FieldOrder, FieldType,
GridBlockInfoChangeset, GridBlockMetaSnapshot, InsertFieldParams, RowMeta, RowMetaChangeset, RowOrder,
TypeOptionDataEntry,
};
use flowy_revision::REVISION_WRITE_INTERVAL_IN_MILLIS;
use flowy_sync::client_grid::GridBuilder;
use flowy_test::helper::ViewTest;
use flowy_test::FlowySDKTest;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use strum::EnumCount;
use tokio::time::sleep;
pub enum EditorScript {
CreateField {
params: InsertFieldParams,
},
UpdateField {
changeset: FieldChangesetParams,
},
DeleteField {
field_meta: FieldMeta,
},
AssertFieldCount(usize),
AssertFieldEqual {
field_index: usize,
field_meta: FieldMeta,
},
CreateBlock {
block: GridBlockMetaSnapshot,
},
UpdateBlock {
changeset: GridBlockInfoChangeset,
},
AssertBlockCount(usize),
AssertBlock {
block_index: usize,
row_count: i32,
start_row_index: i32,
},
AssertBlockEqual {
block_index: usize,
block: GridBlockMetaSnapshot,
},
CreateEmptyRow,
CreateRow {
context: CreateRowMetaPayload,
},
UpdateRow {
changeset: RowMetaChangeset,
},
AssertRow {
changeset: RowMetaChangeset,
},
DeleteRow {
row_ids: Vec<String>,
},
UpdateCell {
changeset: CellChangeset,
is_err: bool,
},
AssertRowCount(usize),
// AssertRowEqual{ row_index: usize, row: RowMeta},
AssertGridMetaPad,
}
pub struct GridEditorTest {
pub sdk: FlowySDKTest,
pub grid_id: String,
pub editor: Arc<GridMetaEditor>,
pub field_metas: Vec<FieldMeta>,
pub grid_blocks: Vec<GridBlockMetaSnapshot>,
pub row_metas: Vec<Arc<RowMeta>>,
pub field_count: usize,
pub row_order_by_row_id: HashMap<String, RowOrder>,
}
impl GridEditorTest {
pub async fn new() -> Self {
let sdk = FlowySDKTest::default();
let _ = sdk.init_user().await;
let build_context = make_template_1_grid();
let view_data: Bytes = build_context.into();
let test = ViewTest::new_grid_view(&sdk, view_data.to_vec()).await;
let editor = sdk.grid_manager.open_grid(&test.view.id).await.unwrap();
let field_metas = editor.get_field_metas::<FieldOrder>(None).await.unwrap();
let grid_blocks = editor.get_block_metas().await.unwrap();
let row_metas = get_row_metas(&editor).await;
let grid_id = test.view.id;
Self {
sdk,
grid_id,
editor,
field_metas,
grid_blocks,
row_metas,
field_count: FieldType::COUNT,
row_order_by_row_id: HashMap::default(),
}
}
pub async fn run_scripts(&mut self, scripts: Vec<EditorScript>) {
for script in scripts {
self.run_script(script).await;
}
}
pub async fn run_script(&mut self, script: EditorScript) {
let grid_manager = self.sdk.grid_manager.clone();
let pool = self.sdk.user_session.db_pool().unwrap();
let rev_manager = self.editor.rev_manager();
let _cache = rev_manager.revision_cache().await;
match script {
EditorScript::CreateField { params } => {
if !self.editor.contain_field(&params.field.id).await {
self.field_count += 1;
}
self.editor.insert_field(params).await.unwrap();
self.field_metas = self.editor.get_field_metas::<FieldOrder>(None).await.unwrap();
assert_eq!(self.field_count, self.field_metas.len());
}
EditorScript::UpdateField { changeset: change } => {
self.editor.update_field(change).await.unwrap();
self.field_metas = self.editor.get_field_metas::<FieldOrder>(None).await.unwrap();
}
EditorScript::DeleteField { field_meta } => {
if self.editor.contain_field(&field_meta.id).await {
self.field_count -= 1;
}
self.editor.delete_field(&field_meta.id).await.unwrap();
self.field_metas = self.editor.get_field_metas::<FieldOrder>(None).await.unwrap();
assert_eq!(self.field_count, self.field_metas.len());
}
EditorScript::AssertFieldCount(count) => {
assert_eq!(
self.editor.get_field_metas::<FieldOrder>(None).await.unwrap().len(),
count
);
}
EditorScript::AssertFieldEqual {
field_index,
field_meta,
} => {
let field_metas = self.editor.get_field_metas::<FieldOrder>(None).await.unwrap();
assert_eq!(field_metas[field_index].clone(), field_meta);
}
EditorScript::CreateBlock { block } => {
self.editor.create_block(block).await.unwrap();
self.grid_blocks = self.editor.get_block_metas().await.unwrap();
}
EditorScript::UpdateBlock { changeset: change } => {
self.editor.update_block(change).await.unwrap();
}
EditorScript::AssertBlockCount(count) => {
assert_eq!(self.editor.get_block_metas().await.unwrap().len(), count);
}
EditorScript::AssertBlock {
block_index,
row_count,
start_row_index,
} => {
assert_eq!(self.grid_blocks[block_index].row_count, row_count);
assert_eq!(self.grid_blocks[block_index].start_row_index, start_row_index);
}
EditorScript::AssertBlockEqual { block_index, block } => {
let blocks = self.editor.get_block_metas().await.unwrap();
let compared_block = blocks[block_index].clone();
assert_eq!(compared_block, block);
}
EditorScript::CreateEmptyRow => {
let row_order = self.editor.create_row(None).await.unwrap();
self.row_order_by_row_id.insert(row_order.row_id.clone(), row_order);
self.row_metas = self.get_row_metas().await;
self.grid_blocks = self.editor.get_block_metas().await.unwrap();
}
EditorScript::CreateRow { context } => {
let row_orders = self.editor.insert_rows(vec![context]).await.unwrap();
for row_order in row_orders {
self.row_order_by_row_id.insert(row_order.row_id.clone(), row_order);
}
self.row_metas = self.get_row_metas().await;
self.grid_blocks = self.editor.get_block_metas().await.unwrap();
}
EditorScript::UpdateRow { changeset: change } => self.editor.update_row(change).await.unwrap(),
EditorScript::DeleteRow { row_ids } => {
let row_orders = row_ids
.into_iter()
.map(|row_id| self.row_order_by_row_id.get(&row_id).unwrap().clone())
.collect::<Vec<RowOrder>>();
self.editor.delete_rows(row_orders).await.unwrap();
self.row_metas = self.get_row_metas().await;
self.grid_blocks = self.editor.get_block_metas().await.unwrap();
}
EditorScript::AssertRow { changeset } => {
let row = self.row_metas.iter().find(|row| row.id == changeset.row_id).unwrap();
if let Some(visibility) = changeset.visibility {
assert_eq!(row.visibility, visibility);
}
if let Some(height) = changeset.height {
assert_eq!(row.height, height);
}
}
EditorScript::UpdateCell { changeset, is_err } => {
let result = self.editor.update_cell(changeset).await;
if is_err {
assert!(result.is_err())
} else {
let _ = result.unwrap();
self.row_metas = self.get_row_metas().await;
}
}
EditorScript::AssertRowCount(count) => {
assert_eq!(self.row_metas.len(), count);
}
EditorScript::AssertGridMetaPad => {
sleep(Duration::from_millis(2 * REVISION_WRITE_INTERVAL_IN_MILLIS)).await;
let mut grid_rev_manager = grid_manager.make_grid_rev_manager(&self.grid_id, pool.clone()).unwrap();
let grid_pad = grid_rev_manager.load::<GridPadBuilder>(None).await.unwrap();
println!("{}", grid_pad.delta_str());
}
}
}
async fn get_row_metas(&self) -> Vec<Arc<RowMeta>> {
get_row_metas(&self.editor).await
}
}
async fn get_row_metas(editor: &Arc<GridMetaEditor>) -> Vec<Arc<RowMeta>> {
editor
.grid_block_snapshots(None)
.await
.unwrap()
.pop()
.unwrap()
.row_metas
}
pub fn create_text_field(grid_id: &str) -> (InsertFieldParams, FieldMeta) {
let field_meta = FieldBuilder::new(RichTextTypeOptionBuilder::default())
.name("Name")
.visibility(true)
.build();
let cloned_field_meta = field_meta.clone();
let type_option_data = field_meta
.get_type_option_entry::<RichTextTypeOption>(&field_meta.field_type)
.unwrap()
.protobuf_bytes()
.to_vec();
let field = Field {
id: field_meta.id,
name: field_meta.name,
desc: field_meta.desc,
field_type: field_meta.field_type,
frozen: field_meta.frozen,
visibility: field_meta.visibility,
width: field_meta.width,
is_primary: false,
};
let params = InsertFieldParams {
grid_id: grid_id.to_owned(),
field,
type_option_data,
start_field_id: None,
};
(params, cloned_field_meta)
}
pub fn create_single_select_field(grid_id: &str) -> (InsertFieldParams, FieldMeta) {
let single_select = SingleSelectTypeOptionBuilder::default()
.option(SelectOption::new("Done"))
.option(SelectOption::new("Progress"));
let field_meta = FieldBuilder::new(single_select).name("Name").visibility(true).build();
let cloned_field_meta = field_meta.clone();
let type_option_data = field_meta
.get_type_option_entry::<SingleSelectTypeOption>(&field_meta.field_type)
.unwrap()
.protobuf_bytes()
.to_vec();
let field = Field {
id: field_meta.id,
name: field_meta.name,
desc: field_meta.desc,
field_type: field_meta.field_type,
frozen: field_meta.frozen,
visibility: field_meta.visibility,
width: field_meta.width,
is_primary: false,
};
let params = InsertFieldParams {
grid_id: grid_id.to_owned(),
field,
type_option_data,
start_field_id: None,
};
(params, cloned_field_meta)
}
fn make_template_1_grid() -> BuildGridContext {
let text_field = FieldBuilder::new(RichTextTypeOptionBuilder::default())
.name("Name")
.visibility(true)
.build();
// Single Select
let single_select = SingleSelectTypeOptionBuilder::default()
.option(SelectOption::new("Live"))
.option(SelectOption::new("Completed"))
.option(SelectOption::new("Planned"))
.option(SelectOption::new("Paused"));
let single_select_field = FieldBuilder::new(single_select).name("Status").visibility(true).build();
// MultiSelect
let multi_select = MultiSelectTypeOptionBuilder::default()
.option(SelectOption::new("Google"))
.option(SelectOption::new("Facebook"))
.option(SelectOption::new("Twitter"));
let multi_select_field = FieldBuilder::new(multi_select)
.name("Platform")
.visibility(true)
.build();
// Number
let number = NumberTypeOptionBuilder::default().set_format(NumberFormat::USD);
let number_field = FieldBuilder::new(number).name("Price").visibility(true).build();
// Date
let date = DateTypeOptionBuilder::default()
.date_format(DateFormat::US)
.time_format(TimeFormat::TwentyFourHour);
let date_field = FieldBuilder::new(date).name("Time").visibility(true).build();
// Checkbox
let checkbox = CheckboxTypeOptionBuilder::default();
let checkbox_field = FieldBuilder::new(checkbox).name("is done").visibility(true).build();
// URL
let url = URLTypeOptionBuilder::default();
let url_field = FieldBuilder::new(url).name("link").visibility(true).build();
GridBuilder::default()
.add_field(text_field)
.add_field(single_select_field)
.add_field(multi_select_field)
.add_field(number_field)
.add_field(date_field)
.add_field(checkbox_field)
.add_field(url_field)
.add_empty_row()
.add_empty_row()
.add_empty_row()
.build()
}

View File

@ -1,6 +1,5 @@
use crate::cache::disk::RevisionDiskCache;
use crate::disk::{RevisionChangeset, RevisionRecord, RevisionState};
use bytes::Bytes;
use diesel::{sql_types::Integer, update, SqliteConnection};
use flowy_database::{
@ -16,12 +15,12 @@ use flowy_sync::{
};
use std::sync::Arc;
pub struct SQLiteGridBlockMetaRevisionPersistence {
pub struct SQLiteGridBlockRevisionPersistence {
user_id: String,
pub(crate) pool: Arc<ConnectionPool>,
}
impl RevisionDiskCache for SQLiteGridBlockMetaRevisionPersistence {
impl RevisionDiskCache for SQLiteGridBlockRevisionPersistence {
type Error = FlowyError;
fn create_revision_records(&self, revision_records: Vec<RevisionRecord>) -> Result<(), Self::Error> {
@ -82,7 +81,7 @@ impl RevisionDiskCache for SQLiteGridBlockMetaRevisionPersistence {
}
}
impl SQLiteGridBlockMetaRevisionPersistence {
impl SQLiteGridBlockRevisionPersistence {
pub fn new(user_id: &str, pool: Arc<ConnectionPool>) -> Self {
Self {
user_id: user_id.to_owned(),

View File

@ -95,7 +95,6 @@ struct GridRevisionSql();
impl GridRevisionSql {
fn create(revision_records: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
// Batch insert: https://diesel.rs/guides/all-about-inserts.html
let records = revision_records
.into_iter()
.map(|record| {

View File

@ -1,10 +1,10 @@
mod folder_rev_impl;
mod grid_meta_rev_impl;
mod grid_block_meta_rev_impl;
mod grid_rev_impl;
mod text_rev_impl;
pub use folder_rev_impl::*;
pub use grid_meta_rev_impl::*;
pub use grid_block_meta_rev_impl::*;
pub use grid_rev_impl::*;
pub use text_rev_impl::*;

View File

@ -0,0 +1,5 @@
mod persistence;
mod rev_history;
pub use persistence::*;
pub use rev_history::*;

View File

@ -0,0 +1,78 @@
use crate::history::RevisionHistoryDiskCache;
use flowy_database::{
prelude::*,
schema::{rev_history, rev_history::dsl},
ConnectionPool,
};
use flowy_error::{internal_error, FlowyResult};
use flowy_sync::entities::revision::Revision;
use std::sync::Arc;
pub struct SQLiteRevisionHistoryPersistence {
object_id: String,
pool: Arc<ConnectionPool>,
}
impl SQLiteRevisionHistoryPersistence {
pub fn new(object_id: &str, pool: Arc<ConnectionPool>) -> Self {
let object_id = object_id.to_owned();
Self { object_id, pool }
}
}
impl RevisionHistoryDiskCache for SQLiteRevisionHistoryPersistence {
fn write_history(&self, revision: Revision) -> FlowyResult<()> {
let record = (
dsl::object_id.eq(revision.object_id),
dsl::start_rev_id.eq(revision.base_rev_id),
dsl::end_rev_id.eq(revision.rev_id),
dsl::data.eq(revision.delta_data),
);
let conn = self.pool.get().map_err(internal_error)?;
let _ = insert_or_ignore_into(dsl::rev_history)
.values(vec![record])
.execute(&*conn)?;
Ok(())
}
fn read_histories(&self) -> FlowyResult<Vec<RevisionHistory>> {
let conn = self.pool.get().map_err(internal_error)?;
let records: Vec<RevisionRecord> = dsl::rev_history
.filter(dsl::object_id.eq(&self.object_id))
.load::<RevisionRecord>(&*conn)?;
Ok(records
.into_iter()
.map(|record| record.into())
.collect::<Vec<RevisionHistory>>())
}
}
#[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)]
#[table_name = "rev_history"]
struct RevisionRecord {
id: i32,
object_id: String,
start_rev_id: i64,
end_rev_id: i64,
data: Vec<u8>,
}
pub struct RevisionHistory {
pub object_id: String,
pub start_rev_id: i64,
pub end_rev_id: i64,
pub data: Vec<u8>,
}
impl std::convert::From<RevisionRecord> for RevisionHistory {
fn from(record: RevisionRecord) -> Self {
RevisionHistory {
object_id: record.object_id,
start_rev_id: record.start_rev_id,
end_rev_id: record.end_rev_id,
data: record.data,
}
}
}

View File

@ -0,0 +1,201 @@
use crate::{RevisionCompactor, RevisionHistory};
use async_stream::stream;
use flowy_error::{FlowyError, FlowyResult};
use flowy_sync::entities::revision::Revision;
use futures_util::future::BoxFuture;
use futures_util::stream::StreamExt;
use futures_util::FutureExt;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, RwLock};
use tokio::time::interval;
pub trait RevisionHistoryDiskCache: Send + Sync {
fn write_history(&self, revision: Revision) -> FlowyResult<()>;
fn read_histories(&self) -> FlowyResult<Vec<RevisionHistory>>;
}
pub struct RevisionHistoryManager {
user_id: String,
stop_tx: mpsc::Sender<()>,
config: RevisionHistoryConfig,
revisions: Arc<RwLock<Vec<Revision>>>,
disk_cache: Arc<dyn RevisionHistoryDiskCache>,
}
impl RevisionHistoryManager {
pub fn new(
user_id: &str,
object_id: &str,
config: RevisionHistoryConfig,
disk_cache: Arc<dyn RevisionHistoryDiskCache>,
rev_compactor: Arc<dyn RevisionCompactor>,
) -> Self {
let revisions = Arc::new(RwLock::new(vec![]));
let stop_tx =
spawn_history_checkpoint_runner(user_id, object_id, &disk_cache, &revisions, rev_compactor, &config);
let user_id = user_id.to_owned();
Self {
user_id,
stop_tx,
config,
revisions,
disk_cache,
}
}
pub async fn add_revision(&self, revision: &Revision) {
self.revisions.write().await.push(revision.clone());
}
pub async fn read_revision_histories(&self) -> FlowyResult<Vec<RevisionHistory>> {
self.disk_cache.read_histories()
}
}
pub struct RevisionHistoryConfig {
check_duration: Duration,
}
impl std::default::Default for RevisionHistoryConfig {
fn default() -> Self {
Self {
check_duration: Duration::from_secs(5),
}
}
}
fn spawn_history_checkpoint_runner(
user_id: &str,
object_id: &str,
disk_cache: &Arc<dyn RevisionHistoryDiskCache>,
revisions: &Arc<RwLock<Vec<Revision>>>,
rev_compactor: Arc<dyn RevisionCompactor>,
config: &RevisionHistoryConfig,
) -> mpsc::Sender<()> {
let user_id = user_id.to_string();
let object_id = object_id.to_string();
let disk_cache = disk_cache.clone();
let revisions = revisions.clone();
let (checkpoint_tx, checkpoint_rx) = mpsc::channel(1);
let (stop_tx, stop_rx) = mpsc::channel(1);
let checkpoint_sender = FixedDurationCheckpointSender {
user_id,
object_id,
checkpoint_tx,
disk_cache,
revisions,
rev_compactor,
duration: config.check_duration,
};
tokio::spawn(HistoryCheckpointRunner::new(stop_rx, checkpoint_rx).run());
tokio::spawn(checkpoint_sender.run());
stop_tx
}
struct HistoryCheckpointRunner {
stop_rx: Option<mpsc::Receiver<()>>,
checkpoint_rx: Option<mpsc::Receiver<HistoryCheckpoint>>,
}
impl HistoryCheckpointRunner {
fn new(stop_rx: mpsc::Receiver<()>, checkpoint_rx: mpsc::Receiver<HistoryCheckpoint>) -> Self {
Self {
stop_rx: Some(stop_rx),
checkpoint_rx: Some(checkpoint_rx),
}
}
async fn run(mut self) {
let mut stop_rx = self.stop_rx.take().expect("It should only run once");
let mut checkpoint_rx = self.checkpoint_rx.take().expect("It should only run once");
let stream = stream! {
loop {
tokio::select! {
result = checkpoint_rx.recv() => {
match result {
Some(checkpoint) => yield checkpoint,
None => {},
}
},
_ = stop_rx.recv() => {
tracing::trace!("Checkpoint runner exit");
break
},
};
}
};
stream
.for_each(|checkpoint| async move {
checkpoint.write().await;
})
.await;
}
}
struct HistoryCheckpoint {
user_id: String,
object_id: String,
revisions: Vec<Revision>,
disk_cache: Arc<dyn RevisionHistoryDiskCache>,
rev_compactor: Arc<dyn RevisionCompactor>,
}
impl HistoryCheckpoint {
async fn write(self) {
if self.revisions.is_empty() {
return;
}
let result = || {
let revision = self
.rev_compactor
.compact(&self.user_id, &self.object_id, self.revisions)?;
let _ = self.disk_cache.write_history(revision)?;
Ok::<(), FlowyError>(())
};
match result() {
Ok(_) => {}
Err(e) => tracing::error!("Write history checkout failed: {:?}", e),
}
}
}
struct FixedDurationCheckpointSender {
user_id: String,
object_id: String,
checkpoint_tx: mpsc::Sender<HistoryCheckpoint>,
disk_cache: Arc<dyn RevisionHistoryDiskCache>,
revisions: Arc<RwLock<Vec<Revision>>>,
rev_compactor: Arc<dyn RevisionCompactor>,
duration: Duration,
}
impl FixedDurationCheckpointSender {
fn run(self) -> BoxFuture<'static, ()> {
async move {
let mut interval = interval(self.duration);
let checkpoint_revisions: Vec<Revision> = self.revisions.write().await.drain(..).collect();
let checkpoint = HistoryCheckpoint {
user_id: self.user_id.clone(),
object_id: self.object_id.clone(),
revisions: checkpoint_revisions,
disk_cache: self.disk_cache.clone(),
rev_compactor: self.rev_compactor.clone(),
};
match self.checkpoint_tx.send(checkpoint).await {
Ok(_) => {
interval.tick().await;
self.run();
}
Err(_) => {}
}
}
.boxed()
}
}

View File

@ -1,13 +1,17 @@
mod cache;
mod conflict_resolve;
// mod history;
mod rev_manager;
mod rev_persistence;
mod snapshot;
mod ws_manager;
pub use cache::*;
pub use conflict_resolve::*;
// pub use history::*;
pub use rev_manager::*;
pub use rev_persistence::*;
pub use snapshot::*;
pub use ws_manager::*;
#[macro_use]

View File

@ -1,5 +1,5 @@
use crate::disk::RevisionState;
use crate::{RevisionPersistence, WSDataProviderDataSource};
use crate::{RevisionPersistence, RevisionSnapshotDiskCache, RevisionSnapshotManager, WSDataProviderDataSource};
use bytes::Bytes;
use flowy_error::{FlowyError, FlowyResult};
use flowy_sync::{
@ -45,14 +45,31 @@ pub struct RevisionManager {
user_id: String,
rev_id_counter: RevIdCounter,
rev_persistence: Arc<RevisionPersistence>,
#[allow(dead_code)]
rev_snapshot: Arc<RevisionSnapshotManager>,
rev_compactor: Arc<dyn RevisionCompactor>,
#[cfg(feature = "flowy_unit_test")]
rev_ack_notifier: tokio::sync::broadcast::Sender<i64>,
}
impl RevisionManager {
pub fn new(user_id: &str, object_id: &str, rev_persistence: Arc<RevisionPersistence>) -> Self {
pub fn new<SP, C>(
user_id: &str,
object_id: &str,
rev_persistence: RevisionPersistence,
rev_compactor: C,
snapshot_persistence: SP,
) -> Self
where
SP: 'static + RevisionSnapshotDiskCache,
C: 'static + RevisionCompactor,
{
let rev_id_counter = RevIdCounter::new(0);
let rev_compactor = Arc::new(rev_compactor);
let rev_persistence = Arc::new(rev_persistence);
let rev_snapshot = Arc::new(RevisionSnapshotManager::new(user_id, object_id, snapshot_persistence));
#[cfg(feature = "flowy_unit_test")]
let (revision_ack_notifier, _) = tokio::sync::broadcast::channel(1);
@ -61,7 +78,8 @@ impl RevisionManager {
user_id: user_id.to_owned(),
rev_id_counter,
rev_persistence,
rev_snapshot,
rev_compactor,
#[cfg(feature = "flowy_unit_test")]
rev_ack_notifier: revision_ack_notifier,
}
@ -100,20 +118,21 @@ impl RevisionManager {
}
let _ = self.rev_persistence.add_ack_revision(revision).await?;
// self.rev_history.add_revision(revision).await;
self.rev_id_counter.set(revision.rev_id);
Ok(())
}
#[tracing::instrument(level = "debug", skip_all, err)]
pub async fn add_local_revision<'a>(
&'a self,
revision: &Revision,
compactor: Box<dyn RevisionCompactor + 'a>,
) -> Result<(), FlowyError> {
pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
if revision.delta_data.is_empty() {
return Err(FlowyError::internal().context("Delta data should be empty"));
}
let rev_id = self.rev_persistence.add_sync_revision(revision, compactor).await?;
let rev_id = self
.rev_persistence
.add_sync_revision(revision, &self.rev_compactor)
.await?;
// self.rev_history.add_revision(revision).await;
self.rev_id_counter.set(rev_id);
Ok(())
}

View File

@ -24,13 +24,13 @@ pub struct RevisionPersistence {
}
impl RevisionPersistence {
pub fn new(
user_id: &str,
object_id: &str,
disk_cache: Arc<dyn RevisionDiskCache<Error = FlowyError>>,
) -> RevisionPersistence {
pub fn new<C>(user_id: &str, object_id: &str, disk_cache: C) -> RevisionPersistence
where
C: 'static + RevisionDiskCache<Error = FlowyError>,
{
let object_id = object_id.to_owned();
let user_id = user_id.to_owned();
let disk_cache = Arc::new(disk_cache) as Arc<dyn RevisionDiskCache<Error = FlowyError>>;
let sync_seq = RwLock::new(RevisionSyncSequence::new());
let memory_cache = Arc::new(RevisionMemoryCache::new(&object_id, Arc::new(disk_cache.clone())));
Self {
@ -63,7 +63,7 @@ impl RevisionPersistence {
pub(crate) async fn add_sync_revision<'a>(
&'a self,
revision: &'a Revision,
compactor: Box<dyn RevisionCompactor + 'a>,
compactor: &Arc<dyn RevisionCompactor + 'a>,
) -> FlowyResult<i64> {
let result = self.sync_seq.read().await.compact();
match result {

View File

@ -0,0 +1,5 @@
mod persistence;
mod rev_snapshot;
pub use persistence::*;
pub use rev_snapshot::*;

View File

@ -0,0 +1,31 @@
#![allow(clippy::all)]
#![allow(dead_code)]
#![allow(unused_variables)]
use crate::{RevisionSnapshotDiskCache, RevisionSnapshotInfo};
use flowy_database::ConnectionPool;
use flowy_error::FlowyResult;
use std::sync::Arc;
pub struct SQLiteRevisionSnapshotPersistence {
object_id: String,
pool: Arc<ConnectionPool>,
}
impl SQLiteRevisionSnapshotPersistence {
pub fn new(object_id: &str, pool: Arc<ConnectionPool>) -> Self {
Self {
object_id: object_id.to_string(),
pool,
}
}
}
impl RevisionSnapshotDiskCache for SQLiteRevisionSnapshotPersistence {
fn write_snapshot(&self, object_id: &str, rev_id: i64, data: Vec<u8>) -> FlowyResult<()> {
todo!()
}
fn read_snapshot(&self, object_id: &str, rev_id: i64) -> FlowyResult<RevisionSnapshotInfo> {
todo!()
}
}

View File

@ -0,0 +1,32 @@
#![allow(clippy::all)]
#![allow(dead_code)]
#![allow(unused_variables)]
use flowy_error::FlowyResult;
use std::sync::Arc;
pub trait RevisionSnapshotDiskCache: Send + Sync {
fn write_snapshot(&self, object_id: &str, rev_id: i64, data: Vec<u8>) -> FlowyResult<()>;
fn read_snapshot(&self, object_id: &str, rev_id: i64) -> FlowyResult<RevisionSnapshotInfo>;
}
pub struct RevisionSnapshotManager {
user_id: String,
object_id: String,
disk_cache: Arc<dyn RevisionSnapshotDiskCache>,
}
impl RevisionSnapshotManager {
pub fn new<D>(user_id: &str, object_id: &str, disk_cache: D) -> Self
where
D: RevisionSnapshotDiskCache + 'static,
{
let disk_cache = Arc::new(disk_cache);
Self {
user_id: user_id.to_string(),
object_id: object_id.to_string(),
disk_cache,
}
}
}
pub struct RevisionSnapshotInfo {}

View File

@ -1,6 +1,5 @@
use crate::ConflictRevisionSink;
use async_stream::stream;
use bytes::Bytes;
use flowy_error::{FlowyError, FlowyResult};
use flowy_sync::entities::{

View File

@ -1,10 +1,13 @@
use crate::queue::TextBlockRevisionCompactor;
use crate::{editor::TextBlockEditor, errors::FlowyError, BlockCloudService};
use bytes::Bytes;
use dashmap::DashMap;
use flowy_database::ConnectionPool;
use flowy_error::FlowyResult;
use flowy_revision::disk::SQLiteTextBlockRevisionPersistence;
use flowy_revision::{RevisionCloudService, RevisionManager, RevisionPersistence, RevisionWebSocket};
use flowy_revision::{
RevisionCloudService, RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence,
};
use flowy_sync::entities::{
revision::{md5, RepeatedRevision, Revision},
text_block::{TextBlockDeltaPB, TextBlockIdPB},
@ -139,9 +142,20 @@ impl TextBlockManager {
fn make_rev_manager(&self, doc_id: &str, pool: Arc<ConnectionPool>) -> Result<RevisionManager, FlowyError> {
let user_id = self.user.user_id()?;
let disk_cache = Arc::new(SQLiteTextBlockRevisionPersistence::new(&user_id, pool));
let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, doc_id, disk_cache));
Ok(RevisionManager::new(&user_id, doc_id, rev_persistence))
let disk_cache = SQLiteTextBlockRevisionPersistence::new(&user_id, pool.clone());
let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache);
// let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone());
let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(doc_id, pool);
let rev_compactor = TextBlockRevisionCompactor();
Ok(RevisionManager::new(
&user_id,
doc_id,
rev_persistence,
rev_compactor,
// history_persistence,
snapshot_persistence,
))
}
}

View File

@ -186,10 +186,7 @@ impl EditBlockQueue {
&user_id,
md5,
);
let _ = self
.rev_manager
.add_local_revision(&revision, Box::new(TextBlockRevisionCompactor()))
.await?;
let _ = self.rev_manager.add_local_revision(&revision).await?;
Ok(rev_id.into())
}
}

View File

@ -245,13 +245,13 @@ pub struct GridBlockMetaChange {
pub md5: String,
}
pub fn make_block_meta_delta(block_rev: &GridBlockRevision) -> GridBlockRevisionDelta {
pub fn make_grid_block_delta(block_rev: &GridBlockRevision) -> GridBlockRevisionDelta {
let json = serde_json::to_string(&block_rev).unwrap();
PlainTextDeltaBuilder::new().insert(&json).build()
}
pub fn make_block_meta_revisions(user_id: &str, grid_block_meta_data: &GridBlockRevision) -> RepeatedRevision {
let delta = make_block_meta_delta(grid_block_meta_data);
pub fn make_grid_block_revisions(user_id: &str, grid_block_meta_data: &GridBlockRevision) -> RepeatedRevision {
let delta = make_grid_block_delta(grid_block_meta_data);
let bytes = delta.to_delta_bytes();
let revision = Revision::initial_revision(user_id, &grid_block_meta_data.block_id, bytes);
revision.into()
@ -264,7 +264,7 @@ impl std::default::Default for GridBlockRevisionPad {
rows: vec![],
};
let delta = make_block_meta_delta(&block_revision);
let delta = make_grid_block_delta(&block_revision);
GridBlockRevisionPad { block_revision, delta }
}
}

View File

@ -0,0 +1,432 @@
use crate::entities::revision::{md5, RepeatedRevision, Revision};
use crate::errors::{internal_error, CollaborateError, CollaborateResult};
use crate::util::{cal_diff, make_delta_from_revisions};
use bytes::Bytes;
use flowy_grid_data_model::entities::{
gen_block_id, gen_grid_id, FieldChangesetParams, FieldMeta, FieldOrder, FieldType, GridBlockInfoChangeset,
GridBlockMetaSnapshot, GridMeta,
};
use lib_infra::util::move_vec_element;
use lib_ot::core::{OperationTransformable, PlainTextAttributes, PlainTextDelta, PlainTextDeltaBuilder};
use std::collections::HashMap;
use std::sync::Arc;
pub type GridMetaDelta = PlainTextDelta;
pub type GridDeltaBuilder = PlainTextDeltaBuilder;
pub struct GridMetaPad {
pub(crate) grid_meta: Arc<GridMeta>,
pub(crate) delta: GridMetaDelta,
}
pub trait JsonDeserializer {
fn deserialize(&self, type_option_data: Vec<u8>) -> CollaborateResult<String>;
}
impl GridMetaPad {
pub async fn duplicate_grid_meta(&self) -> (Vec<FieldMeta>, Vec<GridBlockMetaSnapshot>) {
let fields = self.grid_meta.fields.to_vec();
let blocks = self
.grid_meta
.blocks
.iter()
.map(|block| {
let mut duplicated_block = block.clone();
duplicated_block.block_id = gen_block_id();
duplicated_block
})
.collect::<Vec<GridBlockMetaSnapshot>>();
(fields, blocks)
}
pub fn from_delta(delta: GridMetaDelta) -> CollaborateResult<Self> {
let s = delta.to_str()?;
let grid: GridMeta = serde_json::from_str(&s)
.map_err(|e| CollaborateError::internal().context(format!("Deserialize delta to grid failed: {}", e)))?;
Ok(Self {
grid_meta: Arc::new(grid),
delta,
})
}
pub fn from_revisions(_grid_id: &str, revisions: Vec<Revision>) -> CollaborateResult<Self> {
let grid_delta: GridMetaDelta = make_delta_from_revisions::<PlainTextAttributes>(revisions)?;
Self::from_delta(grid_delta)
}
#[tracing::instrument(level = "debug", skip_all, err)]
pub fn create_field_meta(
&mut self,
new_field_meta: FieldMeta,
start_field_id: Option<String>,
) -> CollaborateResult<Option<GridChangeset>> {
self.modify_grid(|grid_meta| {
// Check if the field exists or not
if grid_meta
.fields
.iter()
.any(|field_meta| field_meta.id == new_field_meta.id)
{
tracing::error!("Duplicate grid field");
return Ok(None);
}
let insert_index = match start_field_id {
None => None,
Some(start_field_id) => grid_meta.fields.iter().position(|field| field.id == start_field_id),
};
match insert_index {
None => grid_meta.fields.push(new_field_meta),
Some(index) => grid_meta.fields.insert(index, new_field_meta),
}
Ok(Some(()))
})
}
pub fn delete_field_meta(&mut self, field_id: &str) -> CollaborateResult<Option<GridChangeset>> {
self.modify_grid(
|grid_meta| match grid_meta.fields.iter().position(|field| field.id == field_id) {
None => Ok(None),
Some(index) => {
grid_meta.fields.remove(index);
Ok(Some(()))
}
},
)
}
pub fn duplicate_field_meta(
&mut self,
field_id: &str,
duplicated_field_id: &str,
) -> CollaborateResult<Option<GridChangeset>> {
self.modify_grid(
|grid_meta| match grid_meta.fields.iter().position(|field| field.id == field_id) {
None => Ok(None),
Some(index) => {
let mut duplicate_field_meta = grid_meta.fields[index].clone();
duplicate_field_meta.id = duplicated_field_id.to_string();
duplicate_field_meta.name = format!("{} (copy)", duplicate_field_meta.name);
grid_meta.fields.insert(index + 1, duplicate_field_meta);
Ok(Some(()))
}
},
)
}
pub fn switch_to_field<B>(
&mut self,
field_id: &str,
field_type: FieldType,
type_option_json_builder: B,
) -> CollaborateResult<Option<GridChangeset>>
where
B: FnOnce(&FieldType) -> String,
{
self.modify_grid(|grid_meta| {
//
match grid_meta.fields.iter_mut().find(|field_meta| field_meta.id == field_id) {
None => {
tracing::warn!("Can not find the field with id: {}", field_id);
Ok(None)
}
Some(field_meta) => {
if field_meta.get_type_option_str(&field_type).is_none() {
let type_option_json = type_option_json_builder(&field_type);
field_meta.insert_type_option_str(&field_type, type_option_json);
}
field_meta.field_type = field_type;
Ok(Some(()))
}
}
})
}
pub fn update_field_meta<T: JsonDeserializer>(
&mut self,
changeset: FieldChangesetParams,
deserializer: T,
) -> CollaborateResult<Option<GridChangeset>> {
let field_id = changeset.field_id.clone();
self.modify_field(&field_id, |field| {
let mut is_changed = None;
if let Some(name) = changeset.name {
field.name = name;
is_changed = Some(())
}
if let Some(desc) = changeset.desc {
field.desc = desc;
is_changed = Some(())
}
if let Some(field_type) = changeset.field_type {
field.field_type = field_type;
is_changed = Some(())
}
if let Some(frozen) = changeset.frozen {
field.frozen = frozen;
is_changed = Some(())
}
if let Some(visibility) = changeset.visibility {
field.visibility = visibility;
is_changed = Some(())
}
if let Some(width) = changeset.width {
field.width = width;
is_changed = Some(())
}
if let Some(type_option_data) = changeset.type_option_data {
match deserializer.deserialize(type_option_data) {
Ok(json_str) => {
let field_type = field.field_type.clone();
field.insert_type_option_str(&field_type, json_str);
is_changed = Some(())
}
Err(err) => {
tracing::error!("Deserialize data to type option json failed: {}", err);
}
}
}
Ok(is_changed)
})
}
pub fn get_field_meta(&self, field_id: &str) -> Option<(usize, &FieldMeta)> {
self.grid_meta
.fields
.iter()
.enumerate()
.find(|(_, field)| field.id == field_id)
}
pub fn replace_field_meta(&mut self, field_meta: FieldMeta) -> CollaborateResult<Option<GridChangeset>> {
self.modify_grid(
|grid_meta| match grid_meta.fields.iter().position(|field| field.id == field_meta.id) {
None => Ok(None),
Some(index) => {
grid_meta.fields.remove(index);
grid_meta.fields.insert(index, field_meta);
Ok(Some(()))
}
},
)
}
pub fn move_field(
&mut self,
field_id: &str,
from_index: usize,
to_index: usize,
) -> CollaborateResult<Option<GridChangeset>> {
self.modify_grid(|grid_meta| {
match move_vec_element(
&mut grid_meta.fields,
|field| field.id == field_id,
from_index,
to_index,
)
.map_err(internal_error)?
{
true => Ok(Some(())),
false => Ok(None),
}
})
}
pub fn contain_field(&self, field_id: &str) -> bool {
self.grid_meta.fields.iter().any(|field| field.id == field_id)
}
pub fn get_field_orders(&self) -> Vec<FieldOrder> {
self.grid_meta.fields.iter().map(FieldOrder::from).collect()
}
pub fn get_field_metas(&self, field_orders: Option<Vec<FieldOrder>>) -> CollaborateResult<Vec<FieldMeta>> {
match field_orders {
None => Ok(self.grid_meta.fields.clone()),
Some(field_orders) => {
let field_by_field_id = self
.grid_meta
.fields
.iter()
.map(|field| (&field.id, field))
.collect::<HashMap<&String, &FieldMeta>>();
let fields = field_orders
.iter()
.flat_map(|field_order| match field_by_field_id.get(&field_order.field_id) {
None => {
tracing::error!("Can't find the field with id: {}", field_order.field_id);
None
}
Some(field) => Some((*field).clone()),
})
.collect::<Vec<FieldMeta>>();
Ok(fields)
}
}
}
pub fn create_block_meta(&mut self, block: GridBlockMetaSnapshot) -> CollaborateResult<Option<GridChangeset>> {
self.modify_grid(|grid_meta| {
if grid_meta.blocks.iter().any(|b| b.block_id == block.block_id) {
tracing::warn!("Duplicate grid block");
Ok(None)
} else {
match grid_meta.blocks.last() {
None => grid_meta.blocks.push(block),
Some(last_block) => {
if last_block.start_row_index > block.start_row_index
&& last_block.len() > block.start_row_index
{
let msg = "GridBlock's start_row_index should be greater than the last_block's start_row_index and its len".to_string();
return Err(CollaborateError::internal().context(msg))
}
grid_meta.blocks.push(block);
}
}
Ok(Some(()))
}
})
}
pub fn get_block_metas(&self) -> Vec<GridBlockMetaSnapshot> {
self.grid_meta.blocks.clone()
}
pub fn update_block_meta(&mut self, changeset: GridBlockInfoChangeset) -> CollaborateResult<Option<GridChangeset>> {
let block_id = changeset.block_id.clone();
self.modify_block(&block_id, |block| {
let mut is_changed = None;
if let Some(row_count) = changeset.row_count {
block.row_count = row_count;
is_changed = Some(());
}
if let Some(start_row_index) = changeset.start_row_index {
block.start_row_index = start_row_index;
is_changed = Some(());
}
Ok(is_changed)
})
}
pub fn md5(&self) -> String {
md5(&self.delta.to_delta_bytes())
}
pub fn delta_str(&self) -> String {
self.delta.to_delta_str()
}
pub fn delta_bytes(&self) -> Bytes {
self.delta.to_delta_bytes()
}
pub fn fields(&self) -> &[FieldMeta] {
&self.grid_meta.fields
}
fn modify_grid<F>(&mut self, f: F) -> CollaborateResult<Option<GridChangeset>>
where
F: FnOnce(&mut GridMeta) -> CollaborateResult<Option<()>>,
{
let cloned_grid = self.grid_meta.clone();
match f(Arc::make_mut(&mut self.grid_meta))? {
None => Ok(None),
Some(_) => {
let old = json_from_grid(&cloned_grid)?;
let new = json_from_grid(&self.grid_meta)?;
match cal_diff::<PlainTextAttributes>(old, new) {
None => Ok(None),
Some(delta) => {
self.delta = self.delta.compose(&delta)?;
Ok(Some(GridChangeset { delta, md5: self.md5() }))
}
}
}
}
}
pub fn modify_block<F>(&mut self, block_id: &str, f: F) -> CollaborateResult<Option<GridChangeset>>
where
F: FnOnce(&mut GridBlockMetaSnapshot) -> CollaborateResult<Option<()>>,
{
self.modify_grid(
|grid_meta| match grid_meta.blocks.iter().position(|block| block.block_id == block_id) {
None => {
tracing::warn!("[GridMetaPad]: Can't find any block with id: {}", block_id);
Ok(None)
}
Some(index) => f(&mut grid_meta.blocks[index]),
},
)
}
pub fn modify_field<F>(&mut self, field_id: &str, f: F) -> CollaborateResult<Option<GridChangeset>>
where
F: FnOnce(&mut FieldMeta) -> CollaborateResult<Option<()>>,
{
self.modify_grid(
|grid_meta| match grid_meta.fields.iter().position(|field| field.id == field_id) {
None => {
tracing::warn!("[GridMetaPad]: Can't find any field with id: {}", field_id);
Ok(None)
}
Some(index) => f(&mut grid_meta.fields[index]),
},
)
}
}
fn json_from_grid(grid: &Arc<GridMeta>) -> CollaborateResult<String> {
let json = serde_json::to_string(grid)
.map_err(|err| internal_error(format!("Serialize grid to json str failed. {:?}", err)))?;
Ok(json)
}
pub struct GridChangeset {
pub delta: GridMetaDelta,
/// md5: the md5 of the grid after applying the change.
pub md5: String,
}
pub fn make_grid_delta(grid_meta: &GridMeta) -> GridMetaDelta {
let json = serde_json::to_string(&grid_meta).unwrap();
PlainTextDeltaBuilder::new().insert(&json).build()
}
pub fn make_grid_revisions(user_id: &str, grid_meta: &GridMeta) -> RepeatedRevision {
let delta = make_grid_delta(grid_meta);
let bytes = delta.to_delta_bytes();
let revision = Revision::initial_revision(user_id, &grid_meta.grid_id, bytes);
revision.into()
}
impl std::default::Default for GridMetaPad {
fn default() -> Self {
let grid = GridMeta {
grid_id: gen_grid_id(),
fields: vec![],
blocks: vec![],
};
let delta = make_grid_delta(&grid);
GridMetaPad {
grid_meta: Arc::new(grid),
delta,
}
}
}