Feat/restore from snapshot (#1699)

* feat: snapshot for folder

* feat: snapshot for document

Co-authored-by: nathan <nathan@appflowy.io>
This commit is contained in:
Nathan.fooo 2023-01-12 22:31:39 +08:00 committed by GitHub
parent 6a36bcd31d
commit 9215f5188c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 287 additions and 111 deletions

View File

@ -5,6 +5,7 @@ import 'package:app_flowy/user/presentation/sign_up_screen.dart';
import 'package:app_flowy/user/presentation/skip_log_in_screen.dart'; import 'package:app_flowy/user/presentation/skip_log_in_screen.dart';
import 'package:app_flowy/user/presentation/welcome_screen.dart'; import 'package:app_flowy/user/presentation/welcome_screen.dart';
import 'package:app_flowy/workspace/presentation/home/home_screen.dart'; import 'package:app_flowy/workspace/presentation/home/home_screen.dart';
import 'package:appflowy_backend/dispatch/dispatch.dart';
import 'package:flowy_infra/time/duration.dart'; import 'package:flowy_infra/time/duration.dart';
import 'package:flowy_infra_ui/widget/route/animation.dart'; import 'package:flowy_infra_ui/widget/route/animation.dart';
import 'package:appflowy_backend/protobuf/flowy-user/protobuf.dart' import 'package:appflowy_backend/protobuf/flowy-user/protobuf.dart'
@ -44,21 +45,31 @@ class AuthRouter {
class SplashRoute { class SplashRoute {
Future<void> pushWelcomeScreen( Future<void> pushWelcomeScreen(
BuildContext context, UserProfilePB userProfile) async { BuildContext context,
UserProfilePB userProfile,
) async {
final screen = WelcomeScreen(userProfile: userProfile); final screen = WelcomeScreen(userProfile: userProfile);
final workspaceId = await Navigator.of(context).push( await Navigator.of(context).push(
PageRoutes.fade( PageRoutes.fade(
() => screen, () => screen,
RouteDurations.slow.inMilliseconds * .001, RouteDurations.slow.inMilliseconds * .001,
), ),
); );
// ignore: use_build_context_synchronously FolderEventReadCurrentWorkspace().send().then((result) {
pushHomeScreen(context, userProfile, workspaceId); result.fold(
(workspaceSettingPB) =>
pushHomeScreen(context, userProfile, workspaceSettingPB),
(r) => null,
);
});
} }
void pushHomeScreen(BuildContext context, UserProfilePB userProfile, void pushHomeScreen(
WorkspaceSettingPB workspaceSetting) { BuildContext context,
UserProfilePB userProfile,
WorkspaceSettingPB workspaceSetting,
) {
Navigator.push( Navigator.push(
context, context,
PageRoutes.fade( PageRoutes.fade(

View File

@ -91,6 +91,7 @@ fn crate_log_filter(level: String) -> String {
filters.push(format!("lib_infra={}", level)); filters.push(format!("lib_infra={}", level));
filters.push(format!("flowy_sync={}", level)); filters.push(format!("flowy_sync={}", level));
filters.push(format!("flowy_revision={}", level)); filters.push(format!("flowy_revision={}", level));
filters.push(format!("flowy_revision_persistence={}", level));
filters.push(format!("flowy_task={}", level)); filters.push(format!("flowy_task={}", level));
// filters.push(format!("lib_dispatch={}", level)); // filters.push(format!("lib_dispatch={}", level));

View File

@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
DROP TABLE document_rev_snapshot;

View File

@ -0,0 +1,9 @@
-- Your SQL goes here
CREATE TABLE document_rev_snapshot (
snapshot_id TEXT NOT NULL PRIMARY KEY DEFAULT '',
object_id TEXT NOT NULL DEFAULT '',
rev_id BIGINT NOT NULL DEFAULT 0,
base_rev_id BIGINT NOT NULL DEFAULT 0,
timestamp BIGINT NOT NULL DEFAULT 0,
data BLOB NOT NULL DEFAULT (x'')
);

View File

@ -1,4 +1,6 @@
table! { // @generated automatically by Diesel CLI.
diesel::table! {
app_table (id) { app_table (id) {
id -> Text, id -> Text,
workspace_id -> Text, workspace_id -> Text,
@ -13,7 +15,18 @@ table! {
} }
} }
table! { diesel::table! {
document_rev_snapshot (snapshot_id) {
snapshot_id -> Text,
object_id -> Text,
rev_id -> BigInt,
base_rev_id -> BigInt,
timestamp -> BigInt,
data -> Binary,
}
}
diesel::table! {
document_rev_table (id) { document_rev_table (id) {
id -> Integer, id -> Integer,
document_id -> Text, document_id -> Text,
@ -24,7 +37,7 @@ table! {
} }
} }
table! { diesel::table! {
folder_rev_snapshot (snapshot_id) { folder_rev_snapshot (snapshot_id) {
snapshot_id -> Text, snapshot_id -> Text,
object_id -> Text, object_id -> Text,
@ -35,14 +48,14 @@ table! {
} }
} }
table! { diesel::table! {
grid_block_index_table (row_id) { grid_block_index_table (row_id) {
row_id -> Text, row_id -> Text,
block_id -> Text, block_id -> Text,
} }
} }
table! { diesel::table! {
grid_meta_rev_table (id) { grid_meta_rev_table (id) {
id -> Integer, id -> Integer,
object_id -> Text, object_id -> Text,
@ -53,7 +66,7 @@ table! {
} }
} }
table! { diesel::table! {
grid_rev_snapshot (snapshot_id) { grid_rev_snapshot (snapshot_id) {
snapshot_id -> Text, snapshot_id -> Text,
object_id -> Text, object_id -> Text,
@ -64,7 +77,7 @@ table! {
} }
} }
table! { diesel::table! {
grid_rev_table (id) { grid_rev_table (id) {
id -> Integer, id -> Integer,
object_id -> Text, object_id -> Text,
@ -75,7 +88,7 @@ table! {
} }
} }
table! { diesel::table! {
grid_view_rev_table (id) { grid_view_rev_table (id) {
id -> Integer, id -> Integer,
object_id -> Text, object_id -> Text,
@ -86,14 +99,14 @@ table! {
} }
} }
table! { diesel::table! {
kv_table (key) { kv_table (key) {
key -> Text, key -> Text,
value -> Binary, value -> Binary,
} }
} }
table! { diesel::table! {
rev_snapshot (id) { rev_snapshot (id) {
id -> Integer, id -> Integer,
object_id -> Text, object_id -> Text,
@ -102,7 +115,7 @@ table! {
} }
} }
table! { diesel::table! {
rev_table (id) { rev_table (id) {
id -> Integer, id -> Integer,
doc_id -> Text, doc_id -> Text,
@ -114,7 +127,7 @@ table! {
} }
} }
table! { diesel::table! {
trash_table (id) { trash_table (id) {
id -> Text, id -> Text,
name -> Text, name -> Text,
@ -125,7 +138,7 @@ table! {
} }
} }
table! { diesel::table! {
user_table (id) { user_table (id) {
id -> Text, id -> Text,
name -> Text, name -> Text,
@ -136,7 +149,7 @@ table! {
} }
} }
table! { diesel::table! {
view_table (id) { view_table (id) {
id -> Text, id -> Text,
belong_to_id -> Text, belong_to_id -> Text,
@ -152,7 +165,7 @@ table! {
} }
} }
table! { diesel::table! {
workspace_table (id) { workspace_table (id) {
id -> Text, id -> Text,
name -> Text, name -> Text,
@ -164,8 +177,9 @@ table! {
} }
} }
allow_tables_to_appear_in_same_query!( diesel::allow_tables_to_appear_in_same_query!(
app_table, app_table,
document_rev_snapshot,
document_rev_table, document_rev_table,
folder_rev_snapshot, folder_rev_snapshot,
grid_block_index_table, grid_block_index_table,

View File

@ -86,6 +86,10 @@ impl RevisionObjectDeserializer for DocumentRevisionSerde {
let document = Document::new(tree); let document = Document::new(tree);
Result::<Document, FlowyError>::Ok(document) Result::<Document, FlowyError>::Ok(document)
} }
fn recover_operations_from_revisions(_revisions: Vec<Revision>) -> Option<Self::Output> {
None
}
} }
impl RevisionObjectSerializer for DocumentRevisionSerde { impl RevisionObjectSerializer for DocumentRevisionSerde {

View File

@ -1,7 +1,10 @@
use crate::editor::{initial_document_content, AppFlowyDocumentEditor, DocumentRevisionMergeable}; use crate::editor::{initial_document_content, AppFlowyDocumentEditor, DocumentRevisionMergeable};
use crate::entities::{DocumentVersionPB, EditParams}; use crate::entities::{DocumentVersionPB, EditParams};
use crate::old_editor::editor::{DeltaDocumentEditor, DeltaDocumentRevisionMergeable}; use crate::old_editor::editor::{DeltaDocumentEditor, DeltaDocumentRevisionMergeable};
use crate::services::rev_sqlite::{SQLiteDeltaDocumentRevisionPersistence, SQLiteDocumentRevisionPersistence}; use crate::services::rev_sqlite::{
SQLiteDeltaDocumentRevisionPersistence, SQLiteDocumentRevisionPersistence,
SQLiteDocumentRevisionSnapshotPersistence,
};
use crate::services::DocumentPersistence; use crate::services::DocumentPersistence;
use crate::{errors::FlowyError, DocumentCloudService}; use crate::{errors::FlowyError, DocumentCloudService};
use bytes::Bytes; use bytes::Bytes;
@ -261,15 +264,16 @@ impl DocumentManager {
pool: Arc<ConnectionPool>, pool: Arc<ConnectionPool>,
) -> Result<RevisionManager<Arc<ConnectionPool>>, FlowyError> { ) -> Result<RevisionManager<Arc<ConnectionPool>>, FlowyError> {
let user_id = self.user.user_id()?; let user_id = self.user.user_id()?;
let disk_cache = SQLiteDocumentRevisionPersistence::new(&user_id, pool); let disk_cache = SQLiteDocumentRevisionPersistence::new(&user_id, pool.clone());
let configuration = RevisionPersistenceConfiguration::new(100, true); let configuration = RevisionPersistenceConfiguration::new(200, true);
let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache, configuration); let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache, configuration);
let snapshot_persistence = SQLiteDocumentRevisionSnapshotPersistence::new(doc_id, pool);
Ok(RevisionManager::new( Ok(RevisionManager::new(
&user_id, &user_id,
doc_id, doc_id,
rev_persistence, rev_persistence,
DocumentRevisionMergeable(), DocumentRevisionMergeable(),
PhantomSnapshotPersistence(), snapshot_persistence,
)) ))
} }

View File

@ -1,5 +1,6 @@
#![allow(unused_attributes)] #![allow(unused_attributes)]
#![allow(unused_attributes)] #![allow(unused_attributes)]
use crate::old_editor::queue::{EditDocumentQueue, EditorCommand, EditorCommandSender}; use crate::old_editor::queue::{EditDocumentQueue, EditorCommand, EditorCommandSender};
use crate::{errors::FlowyError, DocumentEditor, DocumentUser}; use crate::{errors::FlowyError, DocumentEditor, DocumentUser};
use bytes::Bytes; use bytes::Bytes;
@ -260,6 +261,10 @@ impl RevisionObjectDeserializer for DeltaDocumentRevisionSerde {
base_rev_id, base_rev_id,
}) })
} }
fn recover_operations_from_revisions(_revisions: Vec<Revision>) -> Option<Self::Output> {
None
}
} }
impl RevisionObjectSerializer for DeltaDocumentRevisionSerde { impl RevisionObjectSerializer for DeltaDocumentRevisionSerde {

View File

@ -0,0 +1,96 @@
use bytes::Bytes;
use flowy_database::{
prelude::*,
schema::{document_rev_snapshot, document_rev_snapshot::dsl},
ConnectionPool,
};
use flowy_error::{internal_error, FlowyResult};
use flowy_revision::{RevisionSnapshot, RevisionSnapshotDiskCache};
use lib_infra::util::timestamp;
use std::sync::Arc;
pub struct SQLiteDocumentRevisionSnapshotPersistence {
object_id: String,
pool: Arc<ConnectionPool>,
}
impl SQLiteDocumentRevisionSnapshotPersistence {
pub fn new(object_id: &str, pool: Arc<ConnectionPool>) -> Self {
Self {
object_id: object_id.to_string(),
pool,
}
}
fn gen_snapshot_id(&self, rev_id: i64) -> String {
format!("{}:{}", self.object_id, rev_id)
}
}
impl RevisionSnapshotDiskCache for SQLiteDocumentRevisionSnapshotPersistence {
fn should_generate_snapshot_from_range(&self, start_rev_id: i64, current_rev_id: i64) -> bool {
(current_rev_id - start_rev_id) >= 150
}
fn write_snapshot(&self, rev_id: i64, data: Vec<u8>) -> FlowyResult<()> {
let conn = self.pool.get().map_err(internal_error)?;
let snapshot_id = self.gen_snapshot_id(rev_id);
let timestamp = timestamp();
let record = (
dsl::snapshot_id.eq(&snapshot_id),
dsl::object_id.eq(&self.object_id),
dsl::rev_id.eq(rev_id),
dsl::base_rev_id.eq(rev_id),
dsl::timestamp.eq(timestamp),
dsl::data.eq(data),
);
let _ = insert_or_ignore_into(dsl::document_rev_snapshot)
.values(record)
.execute(&*conn)?;
Ok(())
}
fn read_snapshot(&self, rev_id: i64) -> FlowyResult<Option<RevisionSnapshot>> {
let conn = self.pool.get().map_err(internal_error)?;
let snapshot_id = self.gen_snapshot_id(rev_id);
let record = dsl::document_rev_snapshot
.filter(dsl::snapshot_id.eq(&snapshot_id))
.first::<DocumentSnapshotRecord>(&*conn)?;
Ok(Some(record.into()))
}
fn read_last_snapshot(&self) -> FlowyResult<Option<RevisionSnapshot>> {
let conn = self.pool.get().map_err(internal_error)?;
let latest_record = dsl::document_rev_snapshot
.filter(dsl::object_id.eq(&self.object_id))
.order(dsl::timestamp.desc())
// .select(max(dsl::rev_id))
// .select((dsl::id, dsl::object_id, dsl::rev_id, dsl::data))
.first::<DocumentSnapshotRecord>(&*conn)?;
Ok(Some(latest_record.into()))
}
}
#[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)]
#[table_name = "document_rev_snapshot"]
#[primary_key("snapshot_id")]
struct DocumentSnapshotRecord {
snapshot_id: String,
object_id: String,
rev_id: i64,
base_rev_id: i64,
timestamp: i64,
data: Vec<u8>,
}
impl std::convert::From<DocumentSnapshotRecord> for RevisionSnapshot {
fn from(record: DocumentSnapshotRecord) -> Self {
RevisionSnapshot {
rev_id: record.rev_id,
base_rev_id: record.base_rev_id,
timestamp: record.timestamp,
data: Bytes::from(record.data),
}
}
}

View File

@ -1,5 +1,7 @@
mod document_rev_sqlite_v0; mod document_rev_sqlite_v0;
mod document_rev_sqlite_v1; mod document_rev_sqlite_v1;
mod document_snapshot;
pub use document_rev_sqlite_v0::*; pub use document_rev_sqlite_v0::*;
pub use document_rev_sqlite_v1::*; pub use document_rev_sqlite_v1::*;
pub use document_snapshot::*;

View File

@ -170,7 +170,7 @@ impl FolderManager {
let pool = self.persistence.db_pool()?; let pool = self.persistence.db_pool()?;
let object_id = folder_id.as_ref(); let object_id = folder_id.as_ref();
let disk_cache = SQLiteFolderRevisionPersistence::new(user_id, pool.clone()); let disk_cache = SQLiteFolderRevisionPersistence::new(user_id, pool.clone());
let configuration = RevisionPersistenceConfiguration::new(100, false); let configuration = RevisionPersistenceConfiguration::new(200, false);
let rev_persistence = RevisionPersistence::new(user_id, object_id, disk_cache, configuration); let rev_persistence = RevisionPersistence::new(user_id, object_id, disk_cache, configuration);
let rev_compactor = FolderRevisionMergeable(); let rev_compactor = FolderRevisionMergeable();

View File

@ -9,7 +9,8 @@ use flowy_revision::{
RevisionWebSocket, RevisionWebSocket,
}; };
use flowy_sync::client_folder::{FolderChangeset, FolderPad}; use flowy_sync::client_folder::{FolderChangeset, FolderPad};
use flowy_sync::util::make_operations_from_revisions; use flowy_sync::server_folder::FolderOperations;
use flowy_sync::util::{make_operations_from_revisions, recover_operation_from_revisions};
use lib_infra::future::FutureResult; use lib_infra::future::FutureResult;
use lib_ot::core::EmptyAttributes; use lib_ot::core::EmptyAttributes;
use parking_lot::RwLock; use parking_lot::RwLock;
@ -102,8 +103,19 @@ impl RevisionObjectDeserializer for FolderRevisionSerde {
type Output = FolderPad; type Output = FolderPad;
fn deserialize_revisions(_object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> { fn deserialize_revisions(_object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
let pad = FolderPad::from_revisions(revisions)?; let operations: FolderOperations = make_operations_from_revisions(revisions)?;
Ok(pad) Ok(FolderPad::from_operations(operations)?)
}
fn recover_operations_from_revisions(revisions: Vec<Revision>) -> Option<Self::Output> {
if let Some(operations) = recover_operation_from_revisions(revisions, |operations| {
FolderPad::from_operations(operations.clone()).is_ok()
}) {
if let Ok(pad) = FolderPad::from_operations(operations) {
return Some(pad);
}
}
None
} }
} }

View File

@ -123,7 +123,7 @@ fn migration_flag_key(user_id: &str, version: &str) -> String {
md5(format!("{}{}", user_id, version,)) md5(format!("{}{}", user_id, version,))
} }
pub struct FolderRevisionResettable { struct FolderRevisionResettable {
folder_id: String, folder_id: String,
} }

View File

@ -118,12 +118,12 @@ impl FolderPersistence {
write_to_disk: true, write_to_disk: true,
}; };
let disk_cache = mk_text_block_revision_disk_cache(user_id, pool); let disk_cache = make_folder_revision_disk_cache(user_id, pool);
disk_cache.delete_and_insert_records(folder_id.as_ref(), None, vec![record]) disk_cache.delete_and_insert_records(folder_id.as_ref(), None, vec![record])
} }
} }
pub fn mk_text_block_revision_disk_cache( pub fn make_folder_revision_disk_cache(
user_id: &str, user_id: &str,
pool: Arc<ConnectionPool>, pool: Arc<ConnectionPool>,
) -> Arc<dyn RevisionDiskCache<Arc<ConnectionPool>, Error = FlowyError>> { ) -> Arc<dyn RevisionDiskCache<Arc<ConnectionPool>, Error = FlowyError>> {

View File

@ -65,7 +65,7 @@ impl RevisionSnapshotDiskCache for SQLiteFolderRevisionSnapshotPersistence {
let conn = self.pool.get().map_err(internal_error)?; let conn = self.pool.get().map_err(internal_error)?;
let latest_record = dsl::folder_rev_snapshot let latest_record = dsl::folder_rev_snapshot
.filter(dsl::object_id.eq(&self.object_id)) .filter(dsl::object_id.eq(&self.object_id))
.order(dsl::rev_id.desc()) .order(dsl::timestamp.desc())
// .select(max(dsl::rev_id)) // .select(max(dsl::rev_id))
// .select((dsl::id, dsl::object_id, dsl::rev_id, dsl::data)) // .select((dsl::id, dsl::object_id, dsl::rev_id, dsl::data))
.first::<FolderSnapshotRecord>(&*conn)?; .first::<FolderSnapshotRecord>(&*conn)?;

View File

@ -1,5 +1,5 @@
mod folder_rev_sqlite; mod folder_rev_sqlite;
mod folder_snapshot_sqlite_impl; mod folder_snapshot;
pub use folder_rev_sqlite::*; pub use folder_rev_sqlite::*;
pub use folder_snapshot_sqlite_impl::*; pub use folder_snapshot::*;

View File

@ -162,7 +162,7 @@ impl GridManager {
// Create revision persistence // Create revision persistence
let disk_cache = SQLiteGridRevisionPersistence::new(&user_id, pool.clone()); let disk_cache = SQLiteGridRevisionPersistence::new(&user_id, pool.clone());
let configuration = RevisionPersistenceConfiguration::new(4, false); let configuration = RevisionPersistenceConfiguration::new(6, false);
let rev_persistence = RevisionPersistence::new(&user_id, grid_id, disk_cache, configuration); let rev_persistence = RevisionPersistence::new(&user_id, grid_id, disk_cache, configuration);
// Create snapshot persistence // Create snapshot persistence

View File

@ -178,10 +178,15 @@ impl RevisionCloudService for GridBlockRevisionCloudService {
struct GridBlockRevisionSerde(); struct GridBlockRevisionSerde();
impl RevisionObjectDeserializer for GridBlockRevisionSerde { impl RevisionObjectDeserializer for GridBlockRevisionSerde {
type Output = GridBlockRevisionPad; type Output = GridBlockRevisionPad;
fn deserialize_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> { fn deserialize_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
let pad = GridBlockRevisionPad::from_revisions(object_id, revisions)?; let pad = GridBlockRevisionPad::from_revisions(object_id, revisions)?;
Ok(pad) Ok(pad)
} }
fn recover_operations_from_revisions(_revisions: Vec<Revision>) -> Option<Self::Output> {
None
}
} }
impl RevisionObjectSerializer for GridBlockRevisionSerde { impl RevisionObjectSerializer for GridBlockRevisionSerde {

View File

@ -887,6 +887,10 @@ impl RevisionObjectDeserializer for GridRevisionSerde {
let pad = GridRevisionPad::from_revisions(revisions)?; let pad = GridRevisionPad::from_revisions(revisions)?;
Ok(pad) Ok(pad)
} }
fn recover_operations_from_revisions(_revisions: Vec<Revision>) -> Option<Self::Output> {
None
}
} }
impl RevisionObjectSerializer for GridRevisionSerde { impl RevisionObjectSerializer for GridRevisionSerde {
fn combine_revisions(revisions: Vec<Revision>) -> FlowyResult<Bytes> { fn combine_revisions(revisions: Vec<Revision>) -> FlowyResult<Bytes> {

View File

@ -86,7 +86,7 @@ impl RevisionSnapshotDiskCache for SQLiteGridRevisionSnapshotPersistence {
let conn = self.pool.get().map_err(internal_error)?; let conn = self.pool.get().map_err(internal_error)?;
let latest_record = dsl::grid_rev_snapshot let latest_record = dsl::grid_rev_snapshot
.filter(dsl::object_id.eq(&self.object_id)) .filter(dsl::object_id.eq(&self.object_id))
.order(dsl::rev_id.desc()) .order(dsl::timestamp.desc())
// .select(max(dsl::rev_id)) // .select(max(dsl::rev_id))
// .select((dsl::id, dsl::object_id, dsl::rev_id, dsl::data)) // .select((dsl::id, dsl::object_id, dsl::rev_id, dsl::data))
.first::<GridSnapshotRecord>(&*conn)?; .first::<GridSnapshotRecord>(&*conn)?;

View File

@ -1,9 +1,9 @@
mod grid_block_sqlite_impl; mod grid_block_sqlite_impl;
mod grid_snapshot_sqlite_impl; mod grid_snapshot;
mod grid_sqlite_impl; mod grid_sqlite_impl;
mod grid_view_sqlite_impl; mod grid_view_sqlite_impl;
pub use grid_block_sqlite_impl::*; pub use grid_block_sqlite_impl::*;
pub use grid_snapshot_sqlite_impl::*; pub use grid_snapshot::*;
pub use grid_sqlite_impl::*; pub use grid_sqlite_impl::*;
pub use grid_view_sqlite_impl::*; pub use grid_view_sqlite_impl::*;

View File

@ -41,6 +41,10 @@ impl RevisionObjectDeserializer for GridViewRevisionSerde {
let pad = GridViewRevisionPad::from_revisions(object_id, revisions)?; let pad = GridViewRevisionPad::from_revisions(object_id, revisions)?;
Ok(pad) Ok(pad)
} }
fn recover_operations_from_revisions(_revisions: Vec<Revision>) -> Option<Self::Output> {
None
}
} }
impl RevisionObjectSerializer for GridViewRevisionSerde { impl RevisionObjectSerializer for GridViewRevisionSerde {

View File

@ -34,6 +34,8 @@ pub trait RevisionObjectDeserializer: Send + Sync {
/// * `revisions`: a list of revisions that represent the object /// * `revisions`: a list of revisions that represent the object
/// ///
fn deserialize_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output>; fn deserialize_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output>;
fn recover_operations_from_revisions(revisions: Vec<Revision>) -> Option<Self::Output>;
} }
pub trait RevisionObjectSerializer: Send + Sync { pub trait RevisionObjectSerializer: Send + Sync {
@ -125,24 +127,28 @@ impl<Connection: 'static> RevisionManager<Connection> {
} }
} }
#[tracing::instrument(level = "trace", skip_all, fields(deserializer, object) err)] #[tracing::instrument(name = "revision_manager_initialize", level = "info", skip_all, fields(deserializer, object_id, deserialize_revisions) err)]
pub async fn initialize<B>(&mut self, _cloud: Option<Arc<dyn RevisionCloudService>>) -> FlowyResult<B::Output> pub async fn initialize<B>(&mut self, _cloud: Option<Arc<dyn RevisionCloudService>>) -> FlowyResult<B::Output>
where where
B: RevisionObjectDeserializer, B: RevisionObjectDeserializer,
{ {
let revision_records = self.rev_persistence.load_all_records(&self.object_id)?; let revision_records = self.rev_persistence.load_all_records(&self.object_id)?;
tracing::Span::current().record("object", &self.object_id.as_str()); tracing::Span::current().record("object_id", &self.object_id.as_str());
tracing::Span::current().record("deserializer", &std::any::type_name::<B>()); tracing::Span::current().record("deserializer", &std::any::type_name::<B>());
let revisions: Vec<Revision> = revision_records.iter().map(|record| record.revision.clone()).collect(); let revisions: Vec<Revision> = revision_records.iter().map(|record| record.revision.clone()).collect();
tracing::Span::current().record("deserialize_revisions", &revisions.len());
let current_rev_id = revisions.last().as_ref().map(|revision| revision.rev_id).unwrap_or(0); let current_rev_id = revisions.last().as_ref().map(|revision| revision.rev_id).unwrap_or(0);
match B::deserialize_revisions(&self.object_id, revisions) { match B::deserialize_revisions(&self.object_id, revisions.clone()) {
Ok(object) => { Ok(object) => {
self.rev_persistence.sync_revision_records(&revision_records).await?; self.rev_persistence.sync_revision_records(&revision_records).await?;
self.rev_id_counter.set(current_rev_id); self.rev_id_counter.set(current_rev_id);
Ok(object) Ok(object)
} }
Err(err) => match self.rev_snapshot.restore_from_snapshot::<B>(current_rev_id) { Err(e) => match self.rev_snapshot.restore_from_snapshot::<B>(current_rev_id) {
None => Err(err), None => {
tracing::info!("Restore object from validation revisions");
B::recover_operations_from_revisions(revisions).ok_or(e)
}
Some((object, snapshot_rev)) => { Some((object, snapshot_rev)) => {
let snapshot_rev_id = snapshot_rev.rev_id; let snapshot_rev_id = snapshot_rev.rev_id;
let _ = self.rev_persistence.reset(vec![snapshot_rev]).await; let _ = self.rev_persistence.reset(vec![snapshot_rev]).await;

View File

@ -87,11 +87,12 @@ where
} }
/// Find the nearest revision base on the passed-in rev_id /// Find the nearest revision base on the passed-in rev_id
#[tracing::instrument(level = "trace", skip_all)]
pub fn restore_from_snapshot<B>(&self, rev_id: i64) -> Option<(B::Output, Revision)> pub fn restore_from_snapshot<B>(&self, rev_id: i64) -> Option<(B::Output, Revision)>
where where
B: RevisionObjectDeserializer, B: RevisionObjectDeserializer,
{ {
tracing::trace!("Try to find if {} has snapshot", self.object_id); tracing::info!("Try to find if {} has snapshot", self.object_id);
let snapshot = self.rev_snapshot_persistence.read_last_snapshot().ok()??; let snapshot = self.rev_snapshot_persistence.read_last_snapshot().ok()??;
let snapshot_rev_id = snapshot.rev_id; let snapshot_rev_id = snapshot.rev_id;
let revision = Revision::new( let revision = Revision::new(
@ -101,13 +102,13 @@ where
snapshot.data, snapshot.data,
"".to_owned(), "".to_owned(),
); );
tracing::trace!( tracing::info!(
"Try to restore from snapshot: {}, {}", "Try to restore from snapshot: {}, {}",
snapshot.base_rev_id, snapshot.base_rev_id,
snapshot.rev_id snapshot.rev_id
); );
let object = B::deserialize_revisions(&self.object_id, vec![revision.clone()]).ok()?; let object = B::deserialize_revisions(&self.object_id, vec![revision.clone()]).ok()?;
tracing::trace!( tracing::info!(
"Restore {} from snapshot with rev_id: {}", "Restore {} from snapshot with rev_id: {}",
self.object_id, self.object_id,
snapshot_rev_id snapshot_rev_id

View File

@ -332,4 +332,8 @@ impl RevisionObjectDeserializer for RevisionObjectMockSerde {
Ok(object) Ok(object)
} }
fn recover_operations_from_revisions(_revisions: Vec<Revision>) -> Option<Self::Output> {
None
}
} }

View File

@ -41,7 +41,6 @@ impl FolderPad {
} }
pub fn from_operations(operations: FolderOperations) -> CollaborateResult<Self> { pub fn from_operations(operations: FolderOperations) -> CollaborateResult<Self> {
// TODO: Reconvert from history if delta.to_str() failed.
let content = operations.content()?; let content = operations.content()?;
let mut deserializer = serde_json::Deserializer::from_reader(content.as_bytes()); let mut deserializer = serde_json::Deserializer::from_reader(content.as_bytes());
@ -443,15 +442,6 @@ pub fn initial_folder_operations(folder_pad: &FolderPad) -> CollaborateResult<Fo
Ok(operations) Ok(operations)
} }
impl std::default::Default for FolderPad {
fn default() -> Self {
FolderPad {
folder_rev: FolderRevision::default(),
operations: default_folder_operations(),
}
}
}
pub struct FolderChangeset { pub struct FolderChangeset {
pub operations: FolderOperations, pub operations: FolderOperations,
/// md5: the md5 of the FolderPad's operations after applying the change. /// md5: the md5 of the FolderPad's operations after applying the change.

View File

@ -42,6 +42,7 @@ impl CollaborateError {
static_error!(record_not_found, ErrorCode::RecordNotFound); static_error!(record_not_found, ErrorCode::RecordNotFound);
static_error!(revision_conflict, ErrorCode::RevisionConflict); static_error!(revision_conflict, ErrorCode::RevisionConflict);
static_error!(can_not_delete_primary_field, ErrorCode::CannotDeleteThePrimaryField); static_error!(can_not_delete_primary_field, ErrorCode::CannotDeleteThePrimaryField);
static_error!(unexpected_empty_revision, ErrorCode::UnexpectedEmptyRevision);
} }
impl fmt::Display for CollaborateError { impl fmt::Display for CollaborateError {
@ -60,6 +61,7 @@ pub enum ErrorCode {
RevisionConflict = 203, RevisionConflict = 203,
RecordNotFound = 300, RecordNotFound = 300,
CannotDeleteThePrimaryField = 301, CannotDeleteThePrimaryField = 301,
UnexpectedEmptyRevision = 302,
SerdeError = 999, SerdeError = 999,
InternalError = 1000, InternalError = 1000,
} }

View File

@ -39,39 +39,49 @@ where
let mut new_operations = DeltaOperations::<T>::new(); let mut new_operations = DeltaOperations::<T>::new();
for revision in revisions { for revision in revisions {
if revision.bytes.is_empty() { if revision.bytes.is_empty() {
tracing::warn!("revision delta_data is empty"); return Err(CollaborateError::unexpected_empty_revision().context("Unexpected Empty revision"));
continue;
} }
let operations = DeltaOperations::<T>::from_bytes(revision.bytes).map_err(|e| { let operations = DeltaOperations::<T>::from_bytes(revision.bytes).map_err(|e| {
let err_msg = format!("Deserialize revision failed: {:?}", e); let err_msg = format!("Deserialize revision failed: {:?}", e);
CollaborateError::internal().context(err_msg) CollaborateError::internal().context(err_msg)
})?; })?;
match new_operations.compose(&operations) { new_operations = new_operations.compose(&operations)?;
Ok(composed_operations) => {
new_operations = composed_operations;
// if composed_operations.content().is_ok() {
// new_operations = composed_operations;
// } else {
// tracing::error!(
// "Compose operation failed: rev_id: {}, object_id: {} {:?}",
// revision.rev_id,
// revision.object_id,
// operations
// );
// return Ok(new_operations);
// }
}
Err(e) => {
tracing::error!("Compose operation failed: {}, {:?}", e, operations);
return Ok(new_operations);
}
}
} }
Ok(new_operations) Ok(new_operations)
} }
pub fn recover_operation_from_revisions<T>(
revisions: Vec<Revision>,
validator: impl Fn(&DeltaOperations<T>) -> bool,
) -> Option<DeltaOperations<T>>
where
T: OperationAttributes + DeserializeOwned + OperationAttributes,
{
let mut new_operations = DeltaOperations::<T>::new();
for revision in revisions {
if let Ok(operations) = DeltaOperations::<T>::from_bytes(revision.bytes) {
match new_operations.compose(&operations) {
Ok(composed_operations) => {
if validator(&composed_operations) {
new_operations = composed_operations;
} else {
break;
}
}
Err(_) => break,
}
} else {
break;
}
}
return if new_operations.is_empty() {
None
} else {
Some(new_operations)
};
}
pub fn pair_rev_id_from_revision_pbs(revisions: &[Revision]) -> (i64, i64) { pub fn pair_rev_id_from_revision_pbs(revisions: &[Revision]) -> (i64, i64) {
let mut rev_id = 0; let mut rev_id = 0;
revisions.iter().for_each(|revision| { revisions.iter().for_each(|revision| {

View File

@ -26,36 +26,26 @@ impl<'de> Deserialize<'de> for FolderRevision {
where where
A: MapAccess<'de>, A: MapAccess<'de>,
{ {
let f = |map: &mut A,
workspaces: &mut Option<Vec<WorkspaceRevision>>,
trash: &mut Option<Vec<TrashRevision>>| match map.next_key::<String>()
{
Ok(Some(key)) => {
if key == "workspaces" && workspaces.is_none() {
*workspaces = Some(map.next_value::<Vec<WorkspaceRevision>>().ok()?);
}
if key == "trash" && trash.is_none() {
*trash = Some(map.next_value::<Vec<TrashRevision>>().ok()?);
}
Some(())
}
Ok(None) => None,
Err(_e) => None,
};
let mut workspaces: Option<Vec<WorkspaceRevision>> = None; let mut workspaces: Option<Vec<WorkspaceRevision>> = None;
let mut trash: Option<Vec<TrashRevision>> = None; let mut trash: Option<Vec<TrashRevision>> = None;
while f(&mut map, &mut workspaces, &mut trash).is_some() { while let Some(key) = map.next_key::<String>()? {
if workspaces.is_some() && trash.is_some() { if key == "workspaces" && workspaces.is_none() {
break; workspaces = Some(map.next_value::<Vec<WorkspaceRevision>>()?);
}
if key == "trash" && trash.is_none() {
trash = Some(map.next_value::<Vec<TrashRevision>>()?);
} }
} }
*self.0 = Some(FolderRevision { if let Some(workspaces) = workspaces {
workspaces: workspaces.unwrap_or_default().into_iter().map(Arc::new).collect(), *self.0 = Some(FolderRevision {
trash: trash.unwrap_or_default().into_iter().map(Arc::new).collect(), workspaces: workspaces.into_iter().map(Arc::new).collect(),
}); trash: trash.unwrap_or_default().into_iter().map(Arc::new).collect(),
Ok(()) });
Ok(())
} else {
Err(de::Error::missing_field("workspaces"))
}
} }
} }

View File

@ -293,6 +293,11 @@ where
pub fn extend(&mut self, other: Self) { pub fn extend(&mut self, other: Self) {
other.ops.into_iter().for_each(|op| self.add(op)); other.ops.into_iter().for_each(|op| self.add(op));
} }
/// Get the content that the [Delta] represents.
pub fn content(&self) -> Result<String, OTError> {
self.apply("")
}
} }
impl<T> OperationTransform for DeltaOperations<T> impl<T> OperationTransform for DeltaOperations<T>
@ -607,11 +612,6 @@ where
serde_json::to_string(self).unwrap_or_else(|_| "".to_owned()) serde_json::to_string(self).unwrap_or_else(|_| "".to_owned())
} }
/// Get the content that the [Delta] represents.
pub fn content(&self) -> Result<String, OTError> {
self.apply("")
}
/// Serial the [Delta] into a String in Bytes format /// Serial the [Delta] into a String in Bytes format
pub fn json_bytes(&self) -> Bytes { pub fn json_bytes(&self) -> Bytes {
let json = self.json_str(); let json = self.json_str();