Merge pull request #858 from AppFlowy-IO/feat/migrate_revision_helper

Feat/migrate revision helper
This commit is contained in:
Nathan.fooo 2022-08-16 15:48:48 +08:00 committed by GitHub
commit 0b26ad0541
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 302 additions and 136 deletions

View File

@ -1039,6 +1039,7 @@ dependencies = [
"lib-ot",
"lib-ws",
"serde",
"serde_json",
"strum",
"strum_macros",
"tokio",

View File

@ -5,15 +5,18 @@ use crate::{
};
use flowy_database::kv::KV;
use flowy_error::{FlowyError, FlowyResult};
use flowy_folder_data_model::revision::{AppRevision, ViewRevision, WorkspaceRevision};
use flowy_folder_data_model::revision::{AppRevision, FolderRevision, ViewRevision, WorkspaceRevision};
use flowy_revision::disk::SQLiteTextBlockRevisionPersistence;
use flowy_revision::{RevisionLoader, RevisionPersistence};
use flowy_revision::reset::{RevisionResettable, RevisionStructReset};
use flowy_sync::client_folder::make_folder_rev_json_str;
use flowy_sync::entities::revision::Revision;
use flowy_sync::{client_folder::FolderPad, entities::revision::md5};
use std::sync::Arc;
const V1_MIGRATION: &str = "FOLDER_V1_MIGRATION";
const V2_MIGRATION: &str = "FOLDER_V2_MIGRATION";
#[allow(dead_code)]
const V3_MIGRATION: &str = "FOLDER_V3_MIGRATION";
pub(crate) struct FolderMigration {
user_id: String,
@ -29,7 +32,7 @@ impl FolderMigration {
}
pub fn run_v1_migration(&self) -> FlowyResult<Option<FolderPad>> {
let key = md5(format!("{}{}", self.user_id, V1_MIGRATION));
let key = migration_flag_key(&self.user_id, V1_MIGRATION);
if KV::get_bool(&key) {
return Ok(None);
}
@ -79,32 +82,63 @@ impl FolderMigration {
Ok(Some(folder))
}
pub async fn run_v2_migration(&self, user_id: &str, folder_id: &FolderId) -> FlowyResult<Option<FolderPad>> {
let key = md5(format!("{}{}", self.user_id, V2_MIGRATION));
pub async fn run_v2_migration(&self, folder_id: &FolderId) -> FlowyResult<()> {
let key = migration_flag_key(&self.user_id, V2_MIGRATION);
if KV::get_bool(&key) {
return Ok(None);
return Ok(());
}
let pool = self.database.db_pool()?;
let disk_cache = SQLiteTextBlockRevisionPersistence::new(user_id, pool);
let rev_persistence = Arc::new(RevisionPersistence::new(user_id, folder_id.as_ref(), disk_cache));
let (revisions, _) = RevisionLoader {
object_id: folder_id.as_ref().to_owned(),
user_id: self.user_id.clone(),
cloud: None,
rev_persistence,
}
.load()
.await?;
if revisions.is_empty() {
tracing::trace!("Run folder v2 migration, but revision is empty");
KV::set_bool(&key, true);
return Ok(None);
}
let pad = FolderPad::from_revisions(revisions)?;
let _ = self.migration_folder_rev_struct(folder_id).await?;
KV::set_bool(&key, true);
tracing::trace!("Run folder v2 migration");
Ok(Some(pad))
Ok(())
}
#[allow(dead_code)]
pub async fn run_v3_migration(&self, folder_id: &FolderId) -> FlowyResult<()> {
let key = migration_flag_key(&self.user_id, V3_MIGRATION);
if KV::get_bool(&key) {
return Ok(());
}
let _ = self.migration_folder_rev_struct(folder_id).await?;
KV::set_bool(&key, true);
tracing::trace!("Run folder v3 migration");
Ok(())
}
pub async fn migration_folder_rev_struct(&self, folder_id: &FolderId) -> FlowyResult<()> {
let object = FolderRevisionResettable {
folder_id: folder_id.as_ref().to_owned(),
};
let pool = self.database.db_pool()?;
let disk_cache = SQLiteTextBlockRevisionPersistence::new(&self.user_id, pool);
let reset = RevisionStructReset::new(&self.user_id, object, Arc::new(disk_cache));
reset.run().await
}
}
fn migration_flag_key(user_id: &str, version: &str) -> String {
md5(format!("{}{}", user_id, version,))
}
pub struct FolderRevisionResettable {
folder_id: String,
}
impl RevisionResettable for FolderRevisionResettable {
fn target_id(&self) -> &str {
&self.folder_id
}
fn target_reset_rev_str(&self, revisions: Vec<Revision>) -> FlowyResult<String> {
let pad = FolderPad::from_revisions(revisions)?;
let json = pad.to_json()?;
Ok(json)
}
fn default_target_rev_str(&self) -> FlowyResult<String> {
let folder = FolderRevision::default();
let json = make_folder_rev_json_str(&folder)?;
Ok(json)
}
}

View File

@ -100,10 +100,9 @@ impl FolderPersistence {
self.save_folder(user_id, folder_id, migrated_folder).await?;
}
if let Some(migrated_folder) = migrations.run_v2_migration(user_id, folder_id).await? {
self.save_folder(user_id, folder_id, migrated_folder).await?;
}
let _ = migrations.run_v2_migration(folder_id).await?;
// let _ = migrations.run_v3_migration(folder_id).await?;
Ok(())
}

View File

@ -96,7 +96,7 @@ impl GridManager {
pub async fn open_grid<T: AsRef<str>>(&self, grid_id: T) -> FlowyResult<Arc<GridRevisionEditor>> {
let grid_id = grid_id.as_ref();
tracing::Span::current().record("grid_id", &grid_id);
let _ = self.migration.migration_grid_if_need(grid_id).await;
let _ = self.migration.run_v1_migration(grid_id).await;
self.get_or_create_grid_editor(grid_id).await
}
@ -193,7 +193,7 @@ pub async fn make_grid_view_data(
grid_manager: Arc<GridManager>,
build_context: BuildGridContext,
) -> FlowyResult<Bytes> {
for block_meta_data in &build_context.blocks_meta_data {
for block_meta_data in &build_context.blocks {
let block_id = &block_meta_data.block_id;
// Indexing the block's rows
block_meta_data.rows.iter().for_each(|row| {
@ -208,6 +208,7 @@ pub async fn make_grid_view_data(
let _ = grid_manager.create_grid_block(&block_id, repeated_revision).await?;
}
// Will replace the grid_id with the value returned by the gen_grid_id()
let grid_id = view_id.to_owned();
let grid_rev = GridRevision::from_build_context(&grid_id, build_context);
@ -219,7 +220,7 @@ pub async fn make_grid_view_data(
let _ = grid_manager.create_grid(&grid_id, repeated_revision).await?;
// Create grid view
let grid_view = GridViewRevision::new(view_id.to_owned(), view_id.to_owned());
let grid_view = GridViewRevision::new(grid_id, view_id.to_owned());
let grid_view_delta = make_grid_view_delta(&grid_view);
let grid_view_delta_bytes = grid_view_delta.json_bytes();
let repeated_revision: RepeatedRevision =

View File

@ -546,8 +546,8 @@ impl GridRevisionEditor {
Ok(BuildGridContext {
field_revs: duplicated_fields.into_iter().map(Arc::new).collect(),
blocks: duplicated_blocks,
blocks_meta_data,
block_metas: duplicated_blocks,
blocks: blocks_meta_data,
})
}

View File

@ -1,23 +1,18 @@
use crate::entities::{CreateRowParams, GridFilterConfiguration, GridSettingPB, RepeatedGridGroupPB, RowPB};
use crate::manager::GridUser;
use crate::services::block_manager::GridBlockManager;
use crate::services::grid_editor_task::GridServiceTaskScheduler;
use crate::services::grid_view_editor::{GridViewRevisionDataSource, GridViewRevisionDelegate, GridViewRevisionEditor};
use bytes::Bytes;
use crate::entities::{CreateRowParams, GridFilterConfiguration, GridSettingPB, RepeatedGridGroupPB, RowPB};
use crate::services::grid_editor_task::GridServiceTaskScheduler;
use crate::services::block_manager::GridBlockManager;
use dashmap::DashMap;
use flowy_error::FlowyResult;
use flowy_grid_data_model::revision::{FieldRevision, RowRevision};
use flowy_revision::disk::SQLiteGridViewRevisionPersistence;
use flowy_revision::{RevisionCompactor, RevisionManager, RevisionPersistence, SQLiteRevisionSnapshotPersistence};
use flowy_sync::client_grid::GridRevisionPad;
use flowy_sync::entities::revision::Revision;
use flowy_sync::util::make_text_delta_from_revisions;
use flowy_sync::entities::grid::GridSettingChangesetParams;
use flowy_sync::entities::revision::Revision;
use flowy_sync::util::make_text_delta_from_revisions;
use lib_infra::future::{wrap_future, AFFuture};
use std::sync::Arc;
use tokio::sync::RwLock;

View File

@ -1,19 +1,17 @@
use crate::manager::GridUser;
use crate::services::persistence::GridDatabase;
use flowy_database::kv::KV;
use flowy_error::FlowyResult;
use flowy_grid_data_model::revision::GridRevision;
use flowy_revision::disk::{RevisionRecord, SQLiteGridRevisionPersistence};
use flowy_revision::{mk_grid_block_revision_disk_cache, RevisionLoader, RevisionPersistence};
use flowy_revision::disk::SQLiteGridRevisionPersistence;
use flowy_revision::reset::{RevisionResettable, RevisionStructReset};
use flowy_sync::client_grid::{make_grid_rev_json_str, GridRevisionPad};
use flowy_sync::entities::revision::Revision;
use lib_ot::core::TextDeltaBuilder;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use flowy_sync::util::md5;
use std::sync::Arc;
const V1_MIGRATION: &str = "GRID_V1_MIGRATION";
pub(crate) struct GridMigration {
user: Arc<dyn GridUser>,
database: Arc<dyn GridDatabase>,
@ -24,90 +22,52 @@ impl GridMigration {
Self { user, database }
}
pub async fn migration_grid_if_need(&self, grid_id: &str) -> FlowyResult<()> {
match KV::get_str(grid_id) {
None => {
let _ = self.reset_grid_rev(grid_id).await?;
let _ = self.save_migrate_record(grid_id)?;
}
Some(s) => {
let mut record = MigrationGridRecord::from_str(&s)?;
let empty_json = self.empty_grid_rev_json()?;
if record.len < empty_json.len() {
let _ = self.reset_grid_rev(grid_id).await?;
record.len = empty_json.len();
KV::set_str(grid_id, record.to_string());
}
}
}
Ok(())
}
async fn reset_grid_rev(&self, grid_id: &str) -> FlowyResult<()> {
pub async fn run_v1_migration(&self, grid_id: &str) -> FlowyResult<()> {
let user_id = self.user.user_id()?;
let pool = self.database.db_pool()?;
let grid_rev_pad = self.get_grid_revision_pad(grid_id).await?;
let json = grid_rev_pad.json_str()?;
let delta_data = TextDeltaBuilder::new().insert(&json).build().json_bytes();
let revision = Revision::initial_revision(&user_id, grid_id, delta_data);
let record = RevisionRecord::new(revision);
//
let disk_cache = mk_grid_block_revision_disk_cache(&user_id, pool);
let _ = disk_cache.delete_and_insert_records(grid_id, None, vec![record]);
let key = migration_flag_key(&user_id, V1_MIGRATION, grid_id);
if KV::get_bool(&key) {
return Ok(());
}
let _ = self.migration_grid_rev_struct(grid_id).await?;
tracing::trace!("Run grid:{} v1 migration", grid_id);
KV::set_bool(&key, true);
Ok(())
}
fn save_migrate_record(&self, grid_id: &str) -> FlowyResult<()> {
let empty_json_str = self.empty_grid_rev_json()?;
let record = MigrationGridRecord {
pub async fn migration_grid_rev_struct(&self, grid_id: &str) -> FlowyResult<()> {
let object = GridRevisionResettable {
grid_id: grid_id.to_owned(),
len: empty_json_str.len(),
};
KV::set_str(grid_id, record.to_string());
Ok(())
}
fn empty_grid_rev_json(&self) -> FlowyResult<String> {
let empty_grid_rev = GridRevision::default();
let empty_json = make_grid_rev_json_str(&empty_grid_rev)?;
Ok(empty_json)
}
async fn get_grid_revision_pad(&self, grid_id: &str) -> FlowyResult<GridRevisionPad> {
let pool = self.database.db_pool()?;
let user_id = self.user.user_id()?;
let pool = self.database.db_pool()?;
let disk_cache = SQLiteGridRevisionPersistence::new(&user_id, pool);
let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, grid_id, disk_cache));
let (revisions, _) = RevisionLoader {
object_id: grid_id.to_owned(),
user_id,
cloud: None,
rev_persistence,
}
.load()
.await?;
let pad = GridRevisionPad::from_revisions(revisions)?;
Ok(pad)
let reset = RevisionStructReset::new(&user_id, object, Arc::new(disk_cache));
reset.run().await
}
}
#[derive(Serialize, Deserialize)]
struct MigrationGridRecord {
fn migration_flag_key(user_id: &str, version: &str, grid_id: &str) -> String {
md5(format!("{}{}{}", user_id, version, grid_id,))
}
pub struct GridRevisionResettable {
grid_id: String,
len: usize,
}
impl FromStr for MigrationGridRecord {
type Err = serde_json::Error;
impl RevisionResettable for GridRevisionResettable {
fn target_id(&self) -> &str {
&self.grid_id
}
fn from_str(s: &str) -> Result<Self, Self::Err> {
serde_json::from_str::<MigrationGridRecord>(s)
}
}
impl ToString for MigrationGridRecord {
fn to_string(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| "".to_string())
fn target_reset_rev_str(&self, revisions: Vec<Revision>) -> FlowyResult<String> {
let pad = GridRevisionPad::from_revisions(revisions)?;
let json = pad.json_str()?;
Ok(json)
}
fn default_target_rev_str(&self) -> FlowyResult<String> {
let grid_rev = GridRevision::default();
let json = make_grid_rev_json_str(&grid_rev)?;
Ok(json)
}
}

View File

@ -23,6 +23,7 @@ dashmap = "5"
serde = { version = "1.0", features = ["derive"] }
futures-util = "0.3.15"
async-stream = "0.3.2"
serde_json = {version = "1.0"}
[features]
flowy_unit_test = ["lib-ot/flowy_unit_test"]

View File

@ -8,9 +8,10 @@ pub use grid_block_impl::*;
pub use grid_impl::*;
pub use grid_view_impl::*;
use flowy_error::FlowyResult;
use flowy_error::{FlowyError, FlowyResult};
use flowy_sync::entities::revision::{RevId, Revision, RevisionRange};
use std::fmt::Debug;
use std::sync::Arc;
pub trait RevisionDiskCache: Sync + Send {
type Error: Debug;
@ -45,6 +46,50 @@ pub trait RevisionDiskCache: Sync + Send {
) -> Result<(), Self::Error>;
}
impl<T> RevisionDiskCache for Arc<T>
where
T: RevisionDiskCache<Error = FlowyError>,
{
type Error = FlowyError;
fn create_revision_records(&self, revision_records: Vec<RevisionRecord>) -> Result<(), Self::Error> {
(**self).create_revision_records(revision_records)
}
fn read_revision_records(
&self,
object_id: &str,
rev_ids: Option<Vec<i64>>,
) -> Result<Vec<RevisionRecord>, Self::Error> {
(**self).read_revision_records(object_id, rev_ids)
}
fn read_revision_records_with_range(
&self,
object_id: &str,
range: &RevisionRange,
) -> Result<Vec<RevisionRecord>, Self::Error> {
(**self).read_revision_records_with_range(object_id, range)
}
fn update_revision_record(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()> {
(**self).update_revision_record(changesets)
}
fn delete_revision_records(&self, object_id: &str, rev_ids: Option<Vec<i64>>) -> Result<(), Self::Error> {
(**self).delete_revision_records(object_id, rev_ids)
}
fn delete_and_insert_records(
&self,
object_id: &str,
deleted_rev_ids: Option<Vec<i64>>,
inserted_records: Vec<RevisionRecord>,
) -> Result<(), Self::Error> {
(**self).delete_and_insert_records(object_id, deleted_rev_ids, inserted_records)
}
}
#[derive(Clone, Debug)]
pub struct RevisionRecord {
pub revision: Revision,

View File

@ -1,2 +1,3 @@
pub mod disk;
pub(crate) mod memory;
pub mod reset;

View File

@ -0,0 +1,115 @@
use crate::disk::{RevisionDiskCache, RevisionRecord};
use crate::{RevisionLoader, RevisionPersistence};
use flowy_database::kv::KV;
use flowy_error::{FlowyError, FlowyResult};
use flowy_sync::entities::revision::Revision;
use lib_ot::core::TextDeltaBuilder;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use std::sync::Arc;
pub trait RevisionResettable {
fn target_id(&self) -> &str;
// String in json format
fn target_reset_rev_str(&self, revisions: Vec<Revision>) -> FlowyResult<String>;
// String in json format
fn default_target_rev_str(&self) -> FlowyResult<String>;
}
pub struct RevisionStructReset<T> {
user_id: String,
target: T,
disk_cache: Arc<dyn RevisionDiskCache<Error = FlowyError>>,
}
impl<T> RevisionStructReset<T>
where
T: RevisionResettable,
{
pub fn new(user_id: &str, object: T, disk_cache: Arc<dyn RevisionDiskCache<Error = FlowyError>>) -> Self {
Self {
user_id: user_id.to_owned(),
target: object,
disk_cache,
}
}
pub async fn run(&self) -> FlowyResult<()> {
match KV::get_str(self.target.target_id()) {
None => {
tracing::trace!("😁 reset object");
let _ = self.reset_object().await?;
let _ = self.save_migrate_record()?;
}
Some(s) => {
let mut record = MigrationGridRecord::from_str(&s)?;
let rev_str = self.target.default_target_rev_str()?;
if record.len < rev_str.len() {
let _ = self.reset_object().await?;
record.len = rev_str.len();
KV::set_str(self.target.target_id(), record.to_string());
}
}
}
Ok(())
}
async fn reset_object(&self) -> FlowyResult<()> {
let rev_persistence = Arc::new(RevisionPersistence::from_disk_cache(
&self.user_id,
self.target.target_id(),
self.disk_cache.clone(),
));
let (revisions, _) = RevisionLoader {
object_id: self.target.target_id().to_owned(),
user_id: self.user_id.clone(),
cloud: None,
rev_persistence,
}
.load()
.await?;
let s = self.target.target_reset_rev_str(revisions)?;
let delta_data = TextDeltaBuilder::new().insert(&s).build().json_bytes();
let revision = Revision::initial_revision(&self.user_id, self.target.target_id(), delta_data);
let record = RevisionRecord::new(revision);
tracing::trace!("Reset {} revision record object", self.target.target_id());
let _ = self
.disk_cache
.delete_and_insert_records(self.target.target_id(), None, vec![record]);
Ok(())
}
fn save_migrate_record(&self) -> FlowyResult<()> {
let rev_str = self.target.default_target_rev_str()?;
let record = MigrationGridRecord {
object_id: self.target.target_id().to_owned(),
len: rev_str.len(),
};
KV::set_str(self.target.target_id(), record.to_string());
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct MigrationGridRecord {
object_id: String,
len: usize,
}
impl FromStr for MigrationGridRecord {
type Err = serde_json::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
serde_json::from_str::<MigrationGridRecord>(s)
}
}
impl ToString for MigrationGridRecord {
fn to_string(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| "".to_string())
}
}

View File

@ -28,9 +28,17 @@ impl RevisionPersistence {
where
C: 'static + RevisionDiskCache<Error = FlowyError>,
{
let disk_cache = Arc::new(disk_cache) as Arc<dyn RevisionDiskCache<Error = FlowyError>>;
Self::from_disk_cache(user_id, object_id, disk_cache)
}
pub fn from_disk_cache(
user_id: &str,
object_id: &str,
disk_cache: Arc<dyn RevisionDiskCache<Error = FlowyError>>,
) -> RevisionPersistence {
let object_id = object_id.to_owned();
let user_id = user_id.to_owned();
let disk_cache = Arc::new(disk_cache) as Arc<dyn RevisionDiskCache<Error = FlowyError>>;
let sync_seq = RwLock::new(RevisionSyncSequence::new());
let memory_cache = Arc::new(RevisionMemoryCache::new(&object_id, Arc::new(disk_cache.clone())));
Self {

View File

@ -75,6 +75,7 @@ fn crate_log_filter(level: String) -> String {
filters.push(format!("lib_ws={}", level));
filters.push(format!("lib_infra={}", level));
filters.push(format!("flowy_sync={}", level));
filters.push(format!("flowy_revision={}", level));
// filters.push(format!("lib_dispatch={}", level));
filters.push(format!("dart_ffi={}", "info"));

View File

@ -57,7 +57,7 @@ where
match self.into_inner().into_bytes() {
Ok(bytes) => {
log::trace!("Serialize Data: {:?} to event response", std::any::type_name::<T>());
return ResponseBuilder::Ok().data(bytes).build();
ResponseBuilder::Ok().data(bytes).build()
}
Err(e) => e.into(),
}

View File

@ -18,7 +18,7 @@ pub struct ViewRevision {
#[serde(default)]
pub data_type: ViewDataTypeRevision,
pub version: i64,
pub version: i64, // Deprecated
pub belongings: Vec<ViewRevision>,

View File

@ -38,7 +38,7 @@ impl GridRevision {
Self {
grid_id: grid_id.to_owned(),
fields: context.field_revs,
blocks: context.blocks.into_iter().map(Arc::new).collect(),
blocks: context.block_metas.into_iter().map(Arc::new).collect(),
}
}
}
@ -186,8 +186,8 @@ pub trait TypeOptionDataDeserializer {
#[derive(Clone, Default, Deserialize, Serialize)]
pub struct BuildGridContext {
pub field_revs: Vec<Arc<FieldRevision>>,
pub blocks: Vec<GridBlockMetaRevision>,
pub blocks_meta_data: Vec<GridBlockRevision>,
pub block_metas: Vec<GridBlockMetaRevision>,
pub blocks: Vec<GridBlockRevision>,
}
impl BuildGridContext {

View File

@ -319,11 +319,16 @@ impl FolderPad {
}
pub fn to_json(&self) -> CollaborateResult<String> {
serde_json::to_string(&self.folder_rev)
.map_err(|e| CollaborateError::internal().context(format!("serial trash to json failed: {}", e)))
make_folder_rev_json_str(&self.folder_rev)
}
}
pub fn make_folder_rev_json_str(folder_rev: &FolderRevision) -> CollaborateResult<String> {
let json = serde_json::to_string(folder_rev)
.map_err(|err| internal_error(format!("Serialize folder to json str failed. {:?}", err)))?;
Ok(json)
}
impl FolderPad {
fn modify_workspaces<F>(&mut self, f: F) -> CollaborateResult<Option<FolderChangeset>>
where

View File

@ -18,8 +18,8 @@ impl std::default::Default for GridBuilder {
rows: vec![],
};
build_context.blocks.push(block_meta);
build_context.blocks_meta_data.push(block_meta_data);
build_context.block_metas.push(block_meta);
build_context.blocks.push(block_meta_data);
GridBuilder { build_context }
}
@ -34,8 +34,8 @@ impl GridBuilder {
}
pub fn add_row(&mut self, row_rev: RowRevision) {
let block_meta_rev = self.build_context.blocks.first_mut().unwrap();
let block_rev = self.build_context.blocks_meta_data.first_mut().unwrap();
let block_meta_rev = self.build_context.block_metas.first_mut().unwrap();
let block_rev = self.build_context.blocks.first_mut().unwrap();
block_rev.rows.push(Arc::new(row_rev));
block_meta_rev.row_count += 1;
}
@ -50,7 +50,7 @@ impl GridBuilder {
}
pub fn block_id(&self) -> &str {
&self.build_context.blocks.first().unwrap().block_id
&self.build_context.block_metas.first().unwrap().block_id
}
pub fn build(self) -> BuildGridContext {