diff --git a/backend/src/services/document/persistence.rs b/backend/src/services/document/persistence.rs index 972ae4a26b..2fffcad697 100644 --- a/backend/src/services/document/persistence.rs +++ b/backend/src/services/document/persistence.rs @@ -35,7 +35,7 @@ pub async fn read_document( } } -#[tracing::instrument(level = "debug", skip(document_manager, params), fields(delta), err)] +#[tracing::instrument(level = "debug", skip(document_manager, params), err)] pub async fn reset_document( document_manager: &Arc, mut params: ResetDocumentParams, diff --git a/backend/tests/document_test/edit_test.rs b/backend/tests/document_test/edit_test.rs index d976f10532..0aff223881 100644 --- a/backend/tests/document_test/edit_test.rs +++ b/backend/tests/document_test/edit_test.rs @@ -24,7 +24,7 @@ async fn delta_sync_while_editing() { DocScript::ClientInsertText(0, "abc"), DocScript::ClientInsertText(3, "123"), DocScript::AssertClient(r#"[{"insert":"abc123\n"}]"#), - DocScript::AssertServer(r#"[{"insert":"abc123\n"}]"#, 2), + DocScript::AssertServer(r#"[{"insert":"abc123\n"}]"#, 1), ]) .await; } @@ -50,11 +50,11 @@ async fn delta_sync_while_editing_with_attribute() { DocScript::ClientInsertText(0, "abc"), DocScript::ClientFormatText(Interval::new(0, 3), RichTextAttribute::Bold(true)), DocScript::AssertClient(r#"[{"insert":"abc","attributes":{"bold":true}},{"insert":"\n"}]"#), - DocScript::AssertServer(r#"[{"insert":"abc","attributes":{"bold":true}},{"insert":"\n"}]"#, 2), + DocScript::AssertServer(r#"[{"insert":"abc","attributes":{"bold":true}},{"insert":"\n"}]"#, 1), DocScript::ClientInsertText(3, "efg"), DocScript::ClientFormatText(Interval::new(3, 5), RichTextAttribute::Italic(true)), DocScript::AssertClient(r#"[{"insert":"abc","attributes":{"bold":true}},{"insert":"ef","attributes":{"bold":true,"italic":true}},{"insert":"g","attributes":{"bold":true}},{"insert":"\n"}]"#), - DocScript::AssertServer(r#"[{"insert":"abc","attributes":{"bold":true}},{"insert":"ef","attributes":{"bold":true,"italic":true}},{"insert":"g","attributes":{"bold":true}},{"insert":"\n"}]"#, 4), + DocScript::AssertServer(r#"[{"insert":"abc","attributes":{"bold":true}},{"insert":"ef","attributes":{"bold":true,"italic":true}},{"insert":"g","attributes":{"bold":true}},{"insert":"\n"}]"#, 3), ]) .await; } diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/dispatch/dispatch.dart b/frontend/app_flowy/packages/flowy_sdk/lib/dispatch/dispatch.dart index a10ba4fafb..97fa0861f1 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/dispatch/dispatch.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/dispatch/dispatch.dart @@ -3,7 +3,6 @@ import 'package:dartz/dartz.dart'; import 'package:flowy_log/flowy_log.dart'; // ignore: unnecessary_import import 'package:flowy_sdk/protobuf/dart-ffi/ffi_response.pb.dart'; -import 'package:flowy_sdk/protobuf/dart-ffi/ffi_request.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-collaboration/document_info.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-net/event.pb.dart'; diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/revision.pb.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/revision.pb.dart index 15d4558b36..b0fc9820d0 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/revision.pb.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/revision.pb.dart @@ -235,22 +235,17 @@ class RevId extends $pb.GeneratedMessage { class RevisionRange extends $pb.GeneratedMessage { static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'RevisionRange', createEmptyInstance: create) - ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'objectId') - ..aInt64(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'start') - ..aInt64(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'end') + ..aInt64(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'start') + ..aInt64(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'end') ..hasRequiredFields = false ; RevisionRange._() : super(); factory RevisionRange({ - $core.String? objectId, $fixnum.Int64? start, $fixnum.Int64? end, }) { final _result = create(); - if (objectId != null) { - _result.objectId = objectId; - } if (start != null) { _result.start = start; } @@ -281,30 +276,21 @@ class RevisionRange extends $pb.GeneratedMessage { static RevisionRange? _defaultInstance; @$pb.TagNumber(1) - $core.String get objectId => $_getSZ(0); + $fixnum.Int64 get start => $_getI64(0); @$pb.TagNumber(1) - set objectId($core.String v) { $_setString(0, v); } + set start($fixnum.Int64 v) { $_setInt64(0, v); } @$pb.TagNumber(1) - $core.bool hasObjectId() => $_has(0); + $core.bool hasStart() => $_has(0); @$pb.TagNumber(1) - void clearObjectId() => clearField(1); + void clearStart() => clearField(1); @$pb.TagNumber(2) - $fixnum.Int64 get start => $_getI64(1); + $fixnum.Int64 get end => $_getI64(1); @$pb.TagNumber(2) - set start($fixnum.Int64 v) { $_setInt64(1, v); } + set end($fixnum.Int64 v) { $_setInt64(1, v); } @$pb.TagNumber(2) - $core.bool hasStart() => $_has(1); + $core.bool hasEnd() => $_has(1); @$pb.TagNumber(2) - void clearStart() => clearField(2); - - @$pb.TagNumber(3) - $fixnum.Int64 get end => $_getI64(2); - @$pb.TagNumber(3) - set end($fixnum.Int64 v) { $_setInt64(2, v); } - @$pb.TagNumber(3) - $core.bool hasEnd() => $_has(2); - @$pb.TagNumber(3) - void clearEnd() => clearField(3); + void clearEnd() => clearField(2); } diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/revision.pbjson.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/revision.pbjson.dart index 676b62cd24..64a4feed7e 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/revision.pbjson.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/revision.pbjson.dart @@ -70,11 +70,10 @@ final $typed_data.Uint8List revIdDescriptor = $convert.base64Decode('CgVSZXZJZBI const RevisionRange$json = const { '1': 'RevisionRange', '2': const [ - const {'1': 'object_id', '3': 1, '4': 1, '5': 9, '10': 'objectId'}, - const {'1': 'start', '3': 2, '4': 1, '5': 3, '10': 'start'}, - const {'1': 'end', '3': 3, '4': 1, '5': 3, '10': 'end'}, + const {'1': 'start', '3': 1, '4': 1, '5': 3, '10': 'start'}, + const {'1': 'end', '3': 2, '4': 1, '5': 3, '10': 'end'}, ], }; /// Descriptor for `RevisionRange`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List revisionRangeDescriptor = $convert.base64Decode('Cg1SZXZpc2lvblJhbmdlEhsKCW9iamVjdF9pZBgBIAEoCVIIb2JqZWN0SWQSFAoFc3RhcnQYAiABKANSBXN0YXJ0EhAKA2VuZBgDIAEoA1IDZW5k'); +final $typed_data.Uint8List revisionRangeDescriptor = $convert.base64Decode('Cg1SZXZpc2lvblJhbmdlEhQKBXN0YXJ0GAEgASgDUgVzdGFydBIQCgNlbmQYAiABKANSA2VuZA=='); diff --git a/frontend/rust-lib/flowy-core/src/services/folder_editor.rs b/frontend/rust-lib/flowy-core/src/services/folder_editor.rs index 4b4d981507..c4c7d2d9b3 100755 --- a/frontend/rust-lib/flowy-core/src/services/folder_editor.rs +++ b/frontend/rust-lib/flowy-core/src/services/folder_editor.rs @@ -5,12 +5,14 @@ use flowy_collaboration::{ }; use crate::controller::FolderId; +use flowy_collaboration::util::make_delta_from_revisions; use flowy_error::{FlowyError, FlowyResult}; use flowy_sync::{ - RevisionCache, RevisionCloudService, RevisionManager, RevisionObjectBuilder, RevisionWebSocket, + RevisionCache, RevisionCloudService, RevisionCompact, RevisionManager, RevisionObjectBuilder, RevisionWebSocket, RevisionWebSocketManager, }; use lib_infra::future::FutureResult; +use lib_ot::core::PlainAttributes; use lib_sqlite::ConnectionPool; use parking_lot::RwLock; use std::sync::Arc; @@ -36,7 +38,11 @@ impl FolderEditor { let cloud = Arc::new(FolderRevisionCloudServiceImpl { token: token.to_string(), }); - let folder = Arc::new(RwLock::new(rev_manager.load::(cloud).await?)); + let folder = Arc::new(RwLock::new( + rev_manager + .load::(cloud) + .await?, + )); let rev_manager = Arc::new(rev_manager); let ws_manager = make_folder_ws_manager( user_id, @@ -78,9 +84,19 @@ impl FolderEditor { &self.user_id, md5, ); - let _ = futures::executor::block_on(async { self.rev_manager.add_local_revision(&revision).await })?; + let _ = futures::executor::block_on(async { + self.rev_manager + .add_local_revision::(&revision) + .await + })?; Ok(()) } + + #[allow(dead_code)] + pub fn folder_json(&self) -> FlowyResult { + let json = self.folder.read().to_json()?; + Ok(json) + } } struct FolderPadBuilder(); @@ -112,3 +128,25 @@ impl FolderEditor { self.rev_manager.clone() } } + +struct FolderRevisionCompact(); +impl RevisionCompact for FolderRevisionCompact { + fn compact_revisions(user_id: &str, object_id: &str, mut revisions: Vec) -> FlowyResult { + if revisions.is_empty() { + return Err(FlowyError::internal().context("Can't compact the empty folder's revisions")); + } + + if revisions.len() == 1 { + return Ok(revisions.pop().unwrap()); + } + + let first_revision = revisions.first().unwrap(); + let last_revision = revisions.last().unwrap(); + + let (base_rev_id, rev_id) = first_revision.pair_rev_id(); + let md5 = last_revision.md5.clone(); + let delta = make_delta_from_revisions::(revisions)?; + let delta_data = delta.to_bytes(); + Ok(Revision::new(object_id, base_rev_id, rev_id, delta_data, user_id, md5)) + } +} diff --git a/frontend/rust-lib/flowy-core/src/services/persistence/mod.rs b/frontend/rust-lib/flowy-core/src/services/persistence/mod.rs index 448a9ef02a..8dbf050433 100755 --- a/frontend/rust-lib/flowy-core/src/services/persistence/mod.rs +++ b/frontend/rust-lib/flowy-core/src/services/persistence/mod.rs @@ -122,6 +122,6 @@ impl FolderPersistence { let conn = pool.get()?; let disk_cache = mk_revision_disk_cache(user_id, pool); - disk_cache.write_revision_records(vec![record], &conn) + disk_cache.create_revision_records(vec![record], &conn) } } diff --git a/frontend/rust-lib/flowy-core/src/services/web_socket.rs b/frontend/rust-lib/flowy-core/src/services/web_socket.rs index c6e752769c..95c8fb2ead 100755 --- a/frontend/rust-lib/flowy-core/src/services/web_socket.rs +++ b/frontend/rust-lib/flowy-core/src/services/web_socket.rs @@ -10,7 +10,7 @@ use flowy_collaboration::{ use flowy_error::FlowyError; use flowy_sync::*; use lib_infra::future::{BoxResultFuture, FutureResult}; -use lib_ot::core::{Delta, OperationTransformable, PlainDelta, PlainTextAttributes}; +use lib_ot::core::{Delta, OperationTransformable, PlainAttributes, PlainDelta}; use parking_lot::RwLock; use std::{sync::Arc, time::Duration}; @@ -23,7 +23,7 @@ pub(crate) async fn make_folder_ws_manager( ) -> Arc { let composite_sink_provider = Arc::new(CompositeWSSinkDataProvider::new(folder_id, rev_manager.clone())); let resolve_target = Arc::new(FolderRevisionResolveTarget { folder_pad }); - let resolver = RevisionConflictResolver::::new( + let resolver = RevisionConflictResolver::::new( user_id, resolve_target, Arc::new(composite_sink_provider.clone()), @@ -58,8 +58,8 @@ struct FolderRevisionResolveTarget { folder_pad: Arc>, } -impl ResolverTarget for FolderRevisionResolveTarget { - fn compose_delta(&self, delta: Delta) -> BoxResultFuture { +impl ResolverTarget for FolderRevisionResolveTarget { + fn compose_delta(&self, delta: Delta) -> BoxResultFuture { let folder_pad = self.folder_pad.clone(); Box::pin(async move { let md5 = folder_pad.write().compose_remote_delta(delta)?; @@ -69,8 +69,8 @@ impl ResolverTarget for FolderRevisionResolveTarget { fn transform_delta( &self, - delta: Delta, - ) -> BoxResultFuture, FlowyError> { + delta: Delta, + ) -> BoxResultFuture, FlowyError> { let folder_pad = self.folder_pad.clone(); Box::pin(async move { let read_guard = folder_pad.read(); @@ -92,7 +92,7 @@ impl ResolverTarget for FolderRevisionResolveTarget { }) } - fn reset_delta(&self, delta: Delta) -> BoxResultFuture { + fn reset_delta(&self, delta: Delta) -> BoxResultFuture { let folder_pad = self.folder_pad.clone(); Box::pin(async move { let md5 = folder_pad.write().reset_folder(delta)?; @@ -102,7 +102,7 @@ impl ResolverTarget for FolderRevisionResolveTarget { } struct FolderWSStreamConsumerAdapter { - resolver: Arc>, + resolver: Arc>, } impl RevisionWSSteamConsumer for FolderWSStreamConsumerAdapter { diff --git a/frontend/rust-lib/flowy-core/tests/workspace/folder_test.rs b/frontend/rust-lib/flowy-core/tests/workspace/folder_test.rs index 63257f4efb..9092c10aab 100755 --- a/frontend/rust-lib/flowy-core/tests/workspace/folder_test.rs +++ b/frontend/rust-lib/flowy-core/tests/workspace/folder_test.rs @@ -64,8 +64,8 @@ async fn workspace_read() { async fn workspace_create_with_apps() { let mut test = FolderTest::new().await; test.run_scripts(vec![CreateApp { - name: "App", - desc: "App description", + name: "App".to_string(), + desc: "App description".to_string(), }]) .await; @@ -147,12 +147,12 @@ async fn app_create_with_view() { let mut app = test.app.clone(); test.run_scripts(vec![ CreateView { - name: "View A", - desc: "View A description", + name: "View A".to_owned(), + desc: "View A description".to_owned(), }, CreateView { - name: "View B", - desc: "View B description", + name: "View B".to_owned(), + desc: "View B description".to_owned(), }, ReadApp(app.id), ]) @@ -219,12 +219,12 @@ async fn view_delete_all() { let app = test.app.clone(); test.run_scripts(vec![ CreateView { - name: "View A", - desc: "View A description", + name: "View A".to_owned(), + desc: "View A description".to_owned(), }, CreateView { - name: "View B", - desc: "View B description", + name: "View B".to_owned(), + desc: "View B description".to_owned(), }, ReadApp(app.id.clone()), ]) @@ -250,8 +250,8 @@ async fn view_delete_all_permanent() { let app = test.app.clone(); test.run_scripts(vec![ CreateView { - name: "View A", - desc: "View A description", + name: "View A".to_owned(), + desc: "View A description".to_owned(), }, ReadApp(app.id.clone()), ]) @@ -299,13 +299,8 @@ async fn folder_sync_revision_seq() { rev_id: 2, state: RevisionState::Sync, }, - AssertRevisionState { - rev_id: 3, - state: RevisionState::Sync, - }, AssertNextSyncRevId(Some(1)), AssertNextSyncRevId(Some(2)), - AssertNextSyncRevId(Some(3)), AssertRevisionState { rev_id: 1, state: RevisionState::Ack, @@ -314,10 +309,6 @@ async fn folder_sync_revision_seq() { rev_id: 2, state: RevisionState::Ack, }, - AssertRevisionState { - rev_id: 3, - state: RevisionState::Ack, - }, ]) .await; } @@ -325,35 +316,50 @@ async fn folder_sync_revision_seq() { #[tokio::test] async fn folder_sync_revision_with_new_app() { let mut test = FolderTest::new().await; + let app_name = "AppFlowy contributors".to_owned(); + let app_desc = "Welcome to be a AppFlowy contributor".to_owned(); + test.run_scripts(vec![ AssertNextSyncRevId(Some(1)), AssertNextSyncRevId(Some(2)), - AssertNextSyncRevId(Some(3)), CreateApp { - name: "New App", - desc: "", + name: app_name.clone(), + desc: app_desc.clone(), }, - AssertCurrentRevId(4), - AssertNextSyncRevId(Some(4)), + AssertCurrentRevId(3), + AssertNextSyncRevId(Some(3)), AssertNextSyncRevId(None), ]) .await; + + let app = test.app.clone(); + assert_eq!(app.name, app_name); + assert_eq!(app.desc, app_desc); + test.run_scripts(vec![ReadApp(app.id.clone()), AssertApp(app)]).await; } #[tokio::test] async fn folder_sync_revision_with_new_view() { let mut test = FolderTest::new().await; + let view_name = "AppFlowy features".to_owned(); + let view_desc = "😁".to_owned(); + test.run_scripts(vec![ AssertNextSyncRevId(Some(1)), AssertNextSyncRevId(Some(2)), - AssertNextSyncRevId(Some(3)), CreateView { - name: "New App", - desc: "", + name: view_name.clone(), + desc: view_desc.clone(), }, - AssertCurrentRevId(4), - AssertNextSyncRevId(Some(4)), + AssertCurrentRevId(3), + AssertNextSyncRevId(Some(3)), AssertNextSyncRevId(None), ]) .await; + + let view = test.view.clone(); + assert_eq!(view.name, view_name); + assert_eq!(view.desc, view_desc); + test.run_scripts(vec![ReadView(view.id.clone()), AssertView(view)]) + .await; } diff --git a/frontend/rust-lib/flowy-core/tests/workspace/script.rs b/frontend/rust-lib/flowy-core/tests/workspace/script.rs index 1f8fde85fa..d3674f0a75 100755 --- a/frontend/rust-lib/flowy-core/tests/workspace/script.rs +++ b/frontend/rust-lib/flowy-core/tests/workspace/script.rs @@ -21,7 +21,7 @@ pub enum FolderScript { ReadWorkspace(Option), // App - CreateApp { name: &'static str, desc: &'static str }, + CreateApp { name: String, desc: String }, AssertAppJson(String), AssertApp(App), ReadApp(String), @@ -29,7 +29,7 @@ pub enum FolderScript { DeleteApp, // View - CreateView { name: &'static str, desc: &'static str }, + CreateView { name: String, desc: String }, AssertView(View), ReadView(String), UpdateView { name: Option, desc: Option }, @@ -97,7 +97,7 @@ impl FolderTest { let sdk = &self.sdk; let folder_editor: Arc = sdk.folder_manager.folder_editor().await; let rev_manager = folder_editor.rev_manager(); - let cache = rev_manager.revision_cache(); + let cache = rev_manager.revision_cache().await; match script { FolderScript::ReadAllWorkspaces => { @@ -124,7 +124,7 @@ impl FolderTest { self.workspace = workspace; } FolderScript::CreateApp { name, desc } => { - let app = create_app(sdk, &self.workspace.id, name, desc).await; + let app = create_app(sdk, &self.workspace.id, &name, &desc).await; self.app = app; } FolderScript::AssertAppJson(expected_json) => { @@ -146,7 +146,7 @@ impl FolderTest { } FolderScript::CreateView { name, desc } => { - let view = create_view(sdk, &self.app.id, name, desc, ViewType::Doc).await; + let view = create_view(sdk, &self.app.id, &name, &desc, ViewType::Doc).await; self.view = view; } FolderScript::AssertView(view) => { @@ -193,7 +193,7 @@ impl FolderTest { } } FolderScript::AssertCurrentRevId(rev_id) => { - assert_eq!(rev_manager.rev_id(), rev_id); + assert_eq!(rev_manager.rev_id(), rev_id, "Current rev_id is not match"); } FolderScript::AssertNextSyncRevId(rev_id) => { let next_revision = rev_manager.next_sync_revision().await.unwrap(); @@ -201,7 +201,8 @@ impl FolderTest { assert!(next_revision.is_none(), "Next revision should be None"); return; } - let next_revision = next_revision.unwrap(); + let next_revision = next_revision + .unwrap_or_else(|| panic!("Expected Next revision is {}, but receive None", rev_id.unwrap())); let mut receiver = rev_manager.revision_ack_receiver(); let _ = receiver.recv().await; assert_eq!(next_revision.rev_id, rev_id.unwrap()); diff --git a/frontend/rust-lib/flowy-document/src/core/editor.rs b/frontend/rust-lib/flowy-document/src/core/editor.rs index 63dceb6e8d..f1c92cc7c7 100755 --- a/frontend/rust-lib/flowy-document/src/core/editor.rs +++ b/frontend/rust-lib/flowy-document/src/core/editor.rs @@ -1,3 +1,4 @@ +use crate::core::DocumentRevisionCompact; use crate::{ core::{make_document_ws_manager, EditorCommand, EditorCommandQueue, EditorCommandSender}, errors::FlowyError, @@ -36,7 +37,9 @@ impl ClientDocumentEditor { web_socket: Arc, server: Arc, ) -> FlowyResult> { - let document_info = rev_manager.load::(server).await?; + let document_info = rev_manager + .load::(server) + .await?; let delta = document_info.delta()?; let rev_manager = Arc::new(rev_manager); let doc_id = doc_id.to_string(); diff --git a/frontend/rust-lib/flowy-document/src/core/queue.rs b/frontend/rust-lib/flowy-document/src/core/queue.rs index 87265b81d1..2550aeab64 100755 --- a/frontend/rust-lib/flowy-document/src/core/queue.rs +++ b/frontend/rust-lib/flowy-document/src/core/queue.rs @@ -1,12 +1,13 @@ use crate::{core::web_socket::EditorCommandReceiver, DocumentUser}; use async_stream::stream; +use flowy_collaboration::util::make_delta_from_revisions; use flowy_collaboration::{ client_document::{history::UndoResult, ClientDocument}, entities::revision::{RevId, Revision}, errors::CollaborateError, }; -use flowy_error::FlowyError; -use flowy_sync::{DeltaMD5, RevisionManager, TransformDeltas}; +use flowy_error::{FlowyError, FlowyResult}; +use flowy_sync::{DeltaMD5, RevisionCompact, RevisionManager, TransformDeltas}; use futures::stream::StreamExt; use lib_ot::{ core::{Interval, OperationTransformable}, @@ -183,11 +184,36 @@ impl EditorCommandQueue { &user_id, md5, ); - let _ = self.rev_manager.add_local_revision(&revision).await?; + let _ = self + .rev_manager + .add_local_revision::(&revision) + .await?; Ok(rev_id.into()) } } +pub(crate) struct DocumentRevisionCompact(); +impl RevisionCompact for DocumentRevisionCompact { + fn compact_revisions(user_id: &str, object_id: &str, mut revisions: Vec) -> FlowyResult { + if revisions.is_empty() { + return Err(FlowyError::internal().context("Can't compact the empty document's revisions")); + } + + if revisions.len() == 1 { + return Ok(revisions.pop().unwrap()); + } + + let first_revision = revisions.first().unwrap(); + let last_revision = revisions.last().unwrap(); + + let (base_rev_id, rev_id) = first_revision.pair_rev_id(); + let md5 = last_revision.md5.clone(); + let delta = make_delta_from_revisions::(revisions)?; + let delta_data = delta.to_bytes(); + Ok(Revision::new(object_id, base_rev_id, rev_id, delta_data, user_id, md5)) + } +} + pub(crate) type Ret = oneshot::Sender>; pub(crate) enum EditorCommand { diff --git a/frontend/rust-lib/flowy-document/tests/document/document_test.rs b/frontend/rust-lib/flowy-document/tests/document/document_test.rs index 8c8e17aff1..86fd1d4a2e 100755 --- a/frontend/rust-lib/flowy-document/tests/document/document_test.rs +++ b/frontend/rust-lib/flowy-document/tests/document/document_test.rs @@ -11,7 +11,7 @@ async fn document_sync_current_rev_id_check() { AssertCurrentRevId(2), InsertText("3", 2), AssertCurrentRevId(3), - AssertNextRevId(None), + AssertNextSyncRevId(None), AssertJson(r#"[{"insert":"123\n"}]"#), ]; EditorTest::new().await.run_scripts(scripts).await; @@ -38,7 +38,7 @@ async fn document_sync_insert_test() { InsertText("2", 1), InsertText("3", 2), AssertJson(r#"[{"insert":"123\n"}]"#), - AssertNextRevId(None), + AssertNextSyncRevId(None), ]; EditorTest::new().await.run_scripts(scripts).await; } diff --git a/frontend/rust-lib/flowy-document/tests/document/edit_script.rs b/frontend/rust-lib/flowy-document/tests/document/edit_script.rs index d9ad0999ee..23abfc1da8 100755 --- a/frontend/rust-lib/flowy-document/tests/document/edit_script.rs +++ b/frontend/rust-lib/flowy-document/tests/document/edit_script.rs @@ -11,7 +11,7 @@ pub enum EditorScript { Replace(Interval, &'static str), AssertRevisionState(i64, RevisionState), - AssertNextRevId(Option), + AssertNextSyncRevId(Option), AssertCurrentRevId(i64), AssertJson(&'static str), } @@ -38,7 +38,7 @@ impl EditorTest { async fn run_script(&mut self, script: EditorScript) { let rev_manager = self.editor.rev_manager(); - let cache = rev_manager.revision_cache(); + let cache = rev_manager.revision_cache().await; let _user_id = self.sdk.user_session.user_id().unwrap(); // let ws_manager = self.sdk.ws_conn.clone(); // let token = self.sdk.user_session.token().unwrap(); @@ -60,13 +60,15 @@ impl EditorTest { EditorScript::AssertCurrentRevId(rev_id) => { assert_eq!(self.editor.rev_manager().rev_id(), rev_id); } - EditorScript::AssertNextRevId(rev_id) => { + EditorScript::AssertNextSyncRevId(rev_id) => { let next_revision = rev_manager.next_sync_revision().await.unwrap(); if rev_id.is_none() { assert!(next_revision.is_none(), "Next revision should be None"); return; } let next_revision = next_revision.unwrap(); + let mut receiver = rev_manager.revision_ack_receiver(); + let _ = receiver.recv().await; assert_eq!(next_revision.rev_id, rev_id.unwrap()); } EditorScript::AssertJson(expected) => { diff --git a/frontend/rust-lib/flowy-sync/src/cache/disk/mod.rs b/frontend/rust-lib/flowy-sync/src/cache/disk/mod.rs index 229d484ed2..97a8704f17 100755 --- a/frontend/rust-lib/flowy-sync/src/cache/disk/mod.rs +++ b/frontend/rust-lib/flowy-sync/src/cache/disk/mod.rs @@ -9,7 +9,7 @@ use std::fmt::Debug; pub trait RevisionDiskCache: Sync + Send { type Error: Debug; - fn write_revision_records( + fn create_revision_records( &self, revision_records: Vec, conn: &SqliteConnection, @@ -22,6 +22,7 @@ pub trait RevisionDiskCache: Sync + Send { rev_ids: Option>, ) -> Result, Self::Error>; + // Read the revision which rev_id >= range.start && rev_id <= range.end fn read_revision_records_with_range( &self, object_id: &str, @@ -38,5 +39,12 @@ pub trait RevisionDiskCache: Sync + Send { conn: &SqliteConnection, ) -> Result<(), Self::Error>; - fn reset_object(&self, object_id: &str, revision_records: Vec) -> Result<(), Self::Error>; + // Delete and insert will be executed in the same transaction. + // It deletes all the records if the deleted_rev_ids is None and then insert the new records + fn delete_and_insert_records( + &self, + object_id: &str, + deleted_rev_ids: Option>, + inserted_records: Vec, + ) -> Result<(), Self::Error>; } diff --git a/frontend/rust-lib/flowy-sync/src/cache/disk/sql_impl.rs b/frontend/rust-lib/flowy-sync/src/cache/disk/sql_impl.rs index 305b3bf568..0aba4cc6ad 100755 --- a/frontend/rust-lib/flowy-sync/src/cache/disk/sql_impl.rs +++ b/frontend/rust-lib/flowy-sync/src/cache/disk/sql_impl.rs @@ -22,7 +22,7 @@ pub struct SQLitePersistence { impl RevisionDiskCache for SQLitePersistence { type Error = FlowyError; - fn write_revision_records( + fn create_revision_records( &self, revision_records: Vec, conn: &SqliteConnection, @@ -72,11 +72,16 @@ impl RevisionDiskCache for SQLitePersistence { Ok(()) } - fn reset_object(&self, object_id: &str, revision_records: Vec) -> Result<(), Self::Error> { + fn delete_and_insert_records( + &self, + object_id: &str, + deleted_rev_ids: Option>, + inserted_records: Vec, + ) -> Result<(), Self::Error> { let conn = self.pool.get().map_err(internal_error)?; conn.immediate_transaction::<_, FlowyError, _>(|| { - let _ = self.delete_revision_records(object_id, None, &*conn)?; - let _ = self.write_revision_records(revision_records, &*conn)?; + let _ = self.delete_revision_records(object_id, deleted_rev_ids, &*conn)?; + let _ = self.create_revision_records(inserted_records, &*conn)?; Ok(()) }) } @@ -96,6 +101,7 @@ pub struct RevisionTableSql {} impl RevisionTableSql { pub(crate) fn create(revision_records: Vec, conn: &SqliteConnection) -> Result<(), FlowyError> { // Batch insert: https://diesel.rs/guides/all-about-inserts.html + let records = revision_records .into_iter() .map(|record| { @@ -172,7 +178,8 @@ impl RevisionTableSql { rev_ids: Option>, conn: &SqliteConnection, ) -> Result<(), FlowyError> { - let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(object_id)).into_boxed(); + let filter = dsl::rev_table.filter(dsl::doc_id.eq(object_id)); + let mut sql = diesel::delete(filter).into_boxed(); if let Some(rev_ids) = rev_ids { sql = sql.filter(dsl::rev_id.eq_any(rev_ids)); } diff --git a/frontend/rust-lib/flowy-sync/src/cache/memory.rs b/frontend/rust-lib/flowy-sync/src/cache/memory.rs index d0742a69b5..8d7c3eb6a2 100755 --- a/frontend/rust-lib/flowy-sync/src/cache/memory.rs +++ b/frontend/rust-lib/flowy-sync/src/cache/memory.rs @@ -40,19 +40,12 @@ impl RevisionMemoryCache { }; let rev_id = record.revision.rev_id; - if self.revs_map.contains_key(&rev_id) { - return; - } - - if let Some(rev_id) = self.pending_write_revs.read().await.last() { - if *rev_id >= record.revision.rev_id { - tracing::error!("Duplicated revision added to memory_cache"); - return; - } - } self.revs_map.insert(rev_id, record); - self.pending_write_revs.write().await.push(rev_id); - self.make_checkpoint().await; + + if !self.pending_write_revs.read().await.contains(&rev_id) { + self.pending_write_revs.write().await.push(rev_id); + self.make_checkpoint().await; + } } pub(crate) async fn ack(&self, rev_id: &i64) { @@ -61,12 +54,12 @@ impl RevisionMemoryCache { Some(mut record) => record.ack(), } - if !self.pending_write_revs.read().await.contains(rev_id) { + if self.pending_write_revs.read().await.contains(rev_id) { + self.make_checkpoint().await; + } else { // The revision must be saved on disk if the pending_write_revs // doesn't contains the rev_id. self.delegate.receive_ack(&self.object_id, *rev_id); - } else { - self.make_checkpoint().await; } } @@ -74,6 +67,16 @@ impl RevisionMemoryCache { self.revs_map.get(rev_id).map(|r| r.value().clone()) } + pub(crate) fn remove(&self, rev_id: &i64) { + let _ = self.revs_map.remove(rev_id); + } + + pub(crate) fn remove_with_range(&self, range: &RevisionRange) { + for rev_id in range.iter() { + self.remove(&rev_id); + } + } + pub(crate) async fn get_with_range(&self, range: &RevisionRange) -> Result, FlowyError> { let revs = range .iter() @@ -82,7 +85,7 @@ impl RevisionMemoryCache { Ok(revs) } - pub(crate) async fn reset_with_revisions(&self, revision_records: &[RevisionRecord]) -> FlowyResult<()> { + pub(crate) async fn reset_with_revisions(&self, revision_records: Vec) { self.revs_map.clear(); if let Some(handler) = self.defer_save.write().await.take() { handler.abort(); @@ -91,13 +94,12 @@ impl RevisionMemoryCache { let mut write_guard = self.pending_write_revs.write().await; write_guard.clear(); for record in revision_records { - self.revs_map.insert(record.revision.rev_id, record.clone()); write_guard.push(record.revision.rev_id); + self.revs_map.insert(record.revision.rev_id, record); } drop(write_guard); self.make_checkpoint().await; - Ok(()) } async fn make_checkpoint(&self) { diff --git a/frontend/rust-lib/flowy-sync/src/cache/mod.rs b/frontend/rust-lib/flowy-sync/src/cache/mod.rs index d1fffb9664..e7ee2f7388 100755 --- a/frontend/rust-lib/flowy-sync/src/cache/mod.rs +++ b/frontend/rust-lib/flowy-sync/src/cache/mod.rs @@ -5,9 +5,11 @@ use crate::cache::{ disk::{RevisionChangeset, RevisionDiskCache, RevisionTableState, SQLitePersistence}, memory::{RevisionMemoryCache, RevisionMemoryCacheDelegate}, }; + use flowy_collaboration::entities::revision::{Revision, RevisionRange, RevisionState}; use flowy_database::ConnectionPool; use flowy_error::{internal_error, FlowyError, FlowyResult}; + use std::{ borrow::Cow, sync::{ @@ -16,6 +18,7 @@ use std::{ }, }; use tokio::task::spawn_blocking; + pub const REVISION_WRITE_INTERVAL_IN_MILLIS: u64 = 600; pub struct RevisionCache { @@ -24,14 +27,6 @@ pub struct RevisionCache { memory_cache: Arc, latest_rev_id: AtomicI64, } - -pub fn mk_revision_disk_cache( - user_id: &str, - pool: Arc, -) -> Arc> { - Arc::new(SQLitePersistence::new(user_id, pool)) -} - impl RevisionCache { pub fn new(user_id: &str, object_id: &str, pool: Arc) -> RevisionCache { let disk_cache = Arc::new(SQLitePersistence::new(user_id, pool)); @@ -45,16 +40,10 @@ impl RevisionCache { } } - pub async fn add( - &self, - revision: Revision, - state: RevisionState, - write_to_disk: bool, - ) -> FlowyResult { + pub async fn add(&self, revision: Revision, state: RevisionState, write_to_disk: bool) -> FlowyResult<()> { if self.memory_cache.contains(&revision.rev_id) { return Err(FlowyError::internal().context(format!("Duplicate revision: {} {:?}", revision.rev_id, state))); } - let state = state.as_ref().clone(); let rev_id = revision.rev_id; let record = RevisionRecord { revision, @@ -62,12 +51,29 @@ impl RevisionCache { write_to_disk, }; - self.memory_cache.add(Cow::Borrowed(&record)).await; + self.memory_cache.add(Cow::Owned(record)).await; self.set_latest_rev_id(rev_id); - Ok(record) + Ok(()) + } + + pub async fn compact(&self, range: &RevisionRange, new_revision: Revision) -> FlowyResult<()> { + self.memory_cache.remove_with_range(range); + let rev_id = new_revision.rev_id; + let record = RevisionRecord { + revision: new_revision, + state: RevisionState::Sync, + write_to_disk: true, + }; + + let rev_ids = range.to_rev_ids(); + let _ = self + .disk_cache + .delete_and_insert_records(&self.object_id, Some(rev_ids), vec![record.clone()])?; + self.memory_cache.add(Cow::Owned(record)).await; + self.set_latest_rev_id(rev_id); + Ok(()) } - #[allow(dead_code)] pub async fn ack(&self, rev_id: i64) { self.memory_cache.ack(&rev_id).await; } @@ -79,10 +85,9 @@ impl RevisionCache { .read_revision_records(&self.object_id, Some(vec![rev_id])) { Ok(mut records) => { - if !records.is_empty() { - assert_eq!(records.len(), 1); - } - records.pop() + let record = records.pop()?; + assert!(records.is_empty()); + Some(record) } Err(e) => { tracing::error!("{}", e); @@ -97,22 +102,24 @@ impl RevisionCache { self.disk_cache.read_revision_records(doc_id, None) } - pub async fn latest_revision(&self) -> Revision { - let rev_id = self.latest_rev_id.load(SeqCst); - self.get(rev_id).await.unwrap().revision - } - - pub async fn revisions_in_range(&self, range: RevisionRange) -> FlowyResult> { + // Read the revision which rev_id >= range.start && rev_id <= range.end + pub async fn revisions_in_range(&self, range: &RevisionRange) -> FlowyResult> { + let range = range.clone(); let mut records = self.memory_cache.get_with_range(&range).await?; let range_len = range.len() as usize; if records.len() != range_len { let disk_cache = self.disk_cache.clone(); - let doc_id = self.object_id.clone(); - records = spawn_blocking(move || disk_cache.read_revision_records_with_range(&doc_id, &range)) + let object_id = self.object_id.clone(); + records = spawn_blocking(move || disk_cache.read_revision_records_with_range(&object_id, &range)) .await .map_err(internal_error)??; if records.len() != range_len { + // #[cfg(debug_assertions)] + // records.iter().for_each(|record| { + // let delta = PlainDelta::from_bytes(&record.revision.delta_data).unwrap(); + // tracing::trace!("{}", delta.to_string()); + // }); tracing::error!("Revisions len is not equal to range required"); } } @@ -122,9 +129,9 @@ impl RevisionCache { .collect::>()) } - #[tracing::instrument(level = "debug", skip(self, doc_id, revisions))] - pub async fn reset_with_revisions(&self, doc_id: &str, revisions: Vec) -> FlowyResult<()> { - let revision_records = revisions + #[tracing::instrument(level = "debug", skip(self, revisions), err)] + pub async fn reset_with_revisions(&self, object_id: &str, revisions: Vec) -> FlowyResult<()> { + let records = revisions .to_vec() .into_iter() .map(|revision| RevisionRecord { @@ -134,8 +141,11 @@ impl RevisionCache { }) .collect::>(); - let _ = self.memory_cache.reset_with_revisions(&revision_records).await?; - let _ = self.disk_cache.reset_object(doc_id, revision_records)?; + let _ = self + .disk_cache + .delete_and_insert_records(object_id, None, records.clone())?; + let _ = self.memory_cache.reset_with_revisions(records).await; + Ok(()) } @@ -145,6 +155,13 @@ impl RevisionCache { } } +pub fn mk_revision_disk_cache( + user_id: &str, + pool: Arc, +) -> Arc> { + Arc::new(SQLitePersistence::new(user_id, pool)) +} + impl RevisionMemoryCacheDelegate for Arc { #[tracing::instrument(level = "trace", skip(self, records), fields(checkpoint_result), err)] fn checkpoint_tick(&self, mut records: Vec) -> FlowyResult<()> { @@ -155,7 +172,7 @@ impl RevisionMemoryCacheDelegate for Arc { "checkpoint_result", &format!("{} records were saved", records.len()).as_str(), ); - let _ = self.write_revision_records(records, conn)?; + let _ = self.create_revision_records(records, conn)?; } Ok(()) } @@ -173,7 +190,7 @@ impl RevisionMemoryCacheDelegate for Arc { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct RevisionRecord { pub revision: Revision, pub state: RevisionState, diff --git a/frontend/rust-lib/flowy-sync/src/rev_manager.rs b/frontend/rust-lib/flowy-sync/src/rev_manager.rs index 03b088578f..b3c46c0b21 100755 --- a/frontend/rust-lib/flowy-sync/src/rev_manager.rs +++ b/frontend/rust-lib/flowy-sync/src/rev_manager.rs @@ -1,11 +1,11 @@ -use crate::{RevisionCache, RevisionRecord}; -use dashmap::DashMap; +use crate::RevisionCache; use flowy_collaboration::{ entities::revision::{RepeatedRevision, Revision, RevisionRange, RevisionState}, util::{pair_rev_id_from_revisions, RevIdCounter}, }; use flowy_error::{FlowyError, FlowyResult}; use lib_infra::future::FutureResult; + use std::{collections::VecDeque, sync::Arc}; use tokio::sync::RwLock; @@ -18,12 +18,15 @@ pub trait RevisionObjectBuilder: Send + Sync { fn build_with_revisions(object_id: &str, revisions: Vec) -> FlowyResult; } +pub trait RevisionCompact: Send + Sync { + fn compact_revisions(user_id: &str, object_id: &str, revisions: Vec) -> FlowyResult; +} + pub struct RevisionManager { pub object_id: String, user_id: String, rev_id_counter: RevIdCounter, - revision_cache: Arc, - revision_sync_seq: Arc, + cache: Arc>, #[cfg(feature = "flowy_unit_test")] revision_ack_notifier: tokio::sync::broadcast::Sender, @@ -32,7 +35,11 @@ pub struct RevisionManager { impl RevisionManager { pub fn new(user_id: &str, object_id: &str, revision_cache: Arc) -> Self { let rev_id_counter = RevIdCounter::new(0); - let revision_sync_seq = Arc::new(RevisionSyncSequence::new()); + let cache = Arc::new(RwLock::new(RevisionCacheCompact::new( + object_id, + user_id, + revision_cache, + ))); #[cfg(feature = "flowy_unit_test")] let (revision_ack_notifier, _) = tokio::sync::broadcast::channel(1); @@ -40,38 +47,36 @@ impl RevisionManager { object_id: object_id.to_string(), user_id: user_id.to_owned(), rev_id_counter, - revision_cache, - revision_sync_seq, + cache, #[cfg(feature = "flowy_unit_test")] revision_ack_notifier, } } - pub async fn load(&mut self, cloud: Arc) -> FlowyResult + pub async fn load(&mut self, cloud: Arc) -> FlowyResult where - Builder: RevisionObjectBuilder, + B: RevisionObjectBuilder, + C: RevisionCompact, { let (revisions, rev_id) = RevisionLoader { object_id: self.object_id.clone(), user_id: self.user_id.clone(), cloud, - revision_cache: self.revision_cache.clone(), - revision_sync_seq: self.revision_sync_seq.clone(), + cache: self.cache.clone(), } - .load() + .load::() .await?; self.rev_id_counter.set(rev_id); - Builder::build_with_revisions(&self.object_id, revisions) + B::build_with_revisions(&self.object_id, revisions) } #[tracing::instrument(level = "debug", skip(self, revisions), err)] pub async fn reset_object(&self, revisions: RepeatedRevision) -> FlowyResult<()> { let rev_id = pair_rev_id_from_revisions(&revisions).1; - let _ = self - .revision_cache - .reset_with_revisions(&self.object_id, revisions.into_inner()) - .await?; + + let write_guard = self.cache.write().await; + let _ = write_guard.reset(revisions.into_inner()).await?; self.rev_id_counter.set(rev_id); Ok(()) } @@ -81,34 +86,31 @@ impl RevisionManager { if revision.delta_data.is_empty() { return Err(FlowyError::internal().context("Delta data should be empty")); } - let _ = self - .revision_cache - .add(revision.clone(), RevisionState::Ack, true) - .await?; + + let write_guard = self.cache.write().await; + let _ = write_guard.add_ack_revision(revision).await?; self.rev_id_counter.set(revision.rev_id); Ok(()) } #[tracing::instrument(level = "debug", skip(self, revision))] - pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), FlowyError> { + pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), FlowyError> + where + C: RevisionCompact, + { if revision.delta_data.is_empty() { return Err(FlowyError::internal().context("Delta data should be empty")); } + let mut write_guard = self.cache.write().await; + let rev_id = write_guard.write_sync_revision::(revision).await?; - let record = self - .revision_cache - .add(revision.clone(), RevisionState::Sync, true) - .await?; - self.revision_sync_seq.add_revision_record(record).await?; + self.rev_id_counter.set(rev_id); Ok(()) } #[tracing::instrument(level = "debug", skip(self), err)] pub async fn ack_revision(&self, rev_id: i64) -> Result<(), FlowyError> { - #[cfg(feature = "flowy_unit_test")] - if self.revision_sync_seq.ack(&rev_id).await.is_ok() { - self.revision_cache.ack(rev_id).await; - + if self.cache.write().await.ack_revision(rev_id).await.is_ok() { #[cfg(feature = "flowy_unit_test")] let _ = self.revision_ack_notifier.send(rev_id); } @@ -126,75 +128,153 @@ impl RevisionManager { } pub async fn get_revisions_in_range(&self, range: RevisionRange) -> Result, FlowyError> { - debug_assert!(range.object_id == self.object_id); - let revisions = self.revision_cache.revisions_in_range(range.clone()).await?; + let revisions = self.cache.read().await.revisions_in_range(&range).await?; Ok(revisions) } - pub fn next_sync_revision(&self) -> FutureResult, FlowyError> { - let revision_sync_seq = self.revision_sync_seq.clone(); - let revision_cache = self.revision_cache.clone(); - FutureResult::new(async move { - match revision_sync_seq.next_sync_revision_record().await { - None => match revision_sync_seq.next_sync_rev_id().await { - None => Ok(None), - Some(rev_id) => Ok(revision_cache.get(rev_id).await.map(|record| record.revision)), - }, - Some((_, record)) => Ok(Some(record.revision)), - } - }) - } - - pub async fn latest_revision(&self) -> Revision { - self.revision_cache.latest_revision().await + pub async fn next_sync_revision(&self) -> FlowyResult> { + Ok(self.cache.read().await.next_sync_revision().await?) } pub async fn get_revision(&self, rev_id: i64) -> Option { - self.revision_cache.get(rev_id).await.map(|record| record.revision) + self.cache.read().await.get(rev_id).await.map(|record| record.revision) } } -struct RevisionSyncSequence { - revs_map: Arc>, - local_revs: Arc>>, +#[cfg(feature = "flowy_unit_test")] +impl RevisionManager { + pub async fn revision_cache(&self) -> Arc { + self.cache.read().await.inner.clone() + } + pub fn revision_ack_receiver(&self) -> tokio::sync::broadcast::Receiver { + self.revision_ack_notifier.subscribe() + } } -impl std::default::Default for RevisionSyncSequence { - fn default() -> Self { - let local_revs = Arc::new(RwLock::new(VecDeque::new())); - RevisionSyncSequence { - revs_map: Arc::new(DashMap::new()), - local_revs, +struct RevisionCacheCompact { + object_id: String, + user_id: String, + inner: Arc, + sync_seq: RevisionSyncSequence, +} + +impl RevisionCacheCompact { + fn new(object_id: &str, user_id: &str, inner: Arc) -> Self { + let sync_seq = RevisionSyncSequence::new(); + let object_id = object_id.to_owned(); + let user_id = user_id.to_owned(); + Self { + object_id, + user_id, + inner, + sync_seq, } } + + async fn add_ack_revision(&self, revision: &Revision) -> FlowyResult<()> { + self.inner.add(revision.clone(), RevisionState::Ack, true).await + } + + async fn add_sync_revision(&mut self, revision: &Revision) -> FlowyResult<()> { + self.inner.add(revision.clone(), RevisionState::Sync, false).await?; + self.sync_seq.add(revision.rev_id)?; + Ok(()) + } + + #[tracing::instrument(level = "trace", skip(self, revision), fields(rev_id, compact_range), err)] + async fn write_sync_revision(&mut self, revision: &Revision) -> FlowyResult + where + C: RevisionCompact, + { + match self.sync_seq.compact() { + None => { + tracing::Span::current().record("rev_id", &revision.rev_id); + self.inner.add(revision.clone(), RevisionState::Sync, true).await?; + self.sync_seq.add(revision.rev_id)?; + Ok(revision.rev_id) + } + Some((range, mut compact_seq)) => { + tracing::Span::current().record("compact_range", &format!("{}", range).as_str()); + let mut revisions = self.inner.revisions_in_range(&range).await?; + if range.to_rev_ids().len() != revisions.len() { + debug_assert_eq!(range.to_rev_ids().len(), revisions.len()); + } + + // append the new revision + revisions.push(revision.clone()); + + // compact multiple revisions into one + let compact_revision = C::compact_revisions(&self.user_id, &self.object_id, revisions)?; + let rev_id = compact_revision.rev_id; + tracing::Span::current().record("rev_id", &rev_id); + + // insert new revision + compact_seq.push_back(rev_id); + + // replace the revisions in range with compact revision + self.inner.compact(&range, compact_revision).await?; + debug_assert_eq!(self.sync_seq.len(), compact_seq.len()); + self.sync_seq.reset(compact_seq); + Ok(rev_id) + } + } + } + + async fn ack_revision(&mut self, rev_id: i64) -> FlowyResult<()> { + if self.sync_seq.ack(&rev_id).is_ok() { + self.inner.ack(rev_id).await; + } + Ok(()) + } + + async fn next_sync_revision(&self) -> FlowyResult> { + if cfg!(feature = "flowy_unit_test") { + match self.sync_seq.next_rev_id() { + None => Ok(None), + Some(rev_id) => Ok(self.inner.get(rev_id).await.map(|record| record.revision)), + } + } else { + Ok(None) + } + } + + async fn reset(&self, revisions: Vec) -> FlowyResult<()> { + self.inner.reset_with_revisions(&self.object_id, revisions).await?; + Ok(()) + } } +impl std::ops::Deref for RevisionCacheCompact { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +#[derive(Default)] +struct RevisionSyncSequence(VecDeque); impl RevisionSyncSequence { fn new() -> Self { RevisionSyncSequence::default() } - async fn add_revision_record(&self, record: RevisionRecord) -> FlowyResult<()> { - if !record.state.is_need_sync() { - return Ok(()); - } - + fn add(&mut self, new_rev_id: i64) -> FlowyResult<()> { // The last revision's rev_id must be greater than the new one. - if let Some(rev_id) = self.local_revs.read().await.back() { - if *rev_id >= record.revision.rev_id { + if let Some(rev_id) = self.0.back() { + if *rev_id >= new_rev_id { return Err( FlowyError::internal().context(format!("The new revision's id must be greater than {}", rev_id)) ); } } - self.local_revs.write().await.push_back(record.revision.rev_id); - self.revs_map.insert(record.revision.rev_id, record); + self.0.push_back(new_rev_id); Ok(()) } - #[allow(dead_code)] - async fn ack(&self, rev_id: &i64) -> FlowyResult<()> { - if let Some(pop_rev_id) = self.next_sync_rev_id().await { + fn ack(&mut self, rev_id: &i64) -> FlowyResult<()> { + let cur_rev_id = self.0.front().cloned(); + if let Some(pop_rev_id) = cur_rev_id { if &pop_rev_id != rev_id { let desc = format!( "The ack rev_id:{} is not equal to the current rev_id:{}", @@ -202,22 +282,33 @@ impl RevisionSyncSequence { ); return Err(FlowyError::internal().context(desc)); } - - self.revs_map.remove(&pop_rev_id); - let _ = self.local_revs.write().await.pop_front(); + let _ = self.0.pop_front(); } Ok(()) } - async fn next_sync_revision_record(&self) -> Option<(i64, RevisionRecord)> { - match self.local_revs.read().await.front() { - None => None, - Some(rev_id) => self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())), - } + fn next_rev_id(&self) -> Option { + self.0.front().cloned() } - async fn next_sync_rev_id(&self) -> Option { - self.local_revs.read().await.front().copied() + fn reset(&mut self, new_seq: VecDeque) { + self.0 = new_seq; + } + + fn len(&self) -> usize { + self.0.len() + } + + // Compact the rev_ids into one except the current synchronizing rev_id. + fn compact(&self) -> Option<(RevisionRange, VecDeque)> { + self.next_rev_id()?; + + let mut new_seq = self.0.clone(); + let mut drained = new_seq.drain(1..).collect::>(); + + let start = drained.pop_front()?; + let end = drained.pop_back().unwrap_or(start); + Some((RevisionRange { start, end }, new_seq)) } } @@ -225,42 +316,32 @@ struct RevisionLoader { object_id: String, user_id: String, cloud: Arc, - revision_cache: Arc, - revision_sync_seq: Arc, + cache: Arc>, } impl RevisionLoader { - async fn load(&self) -> Result<(Vec, i64), FlowyError> { - let records = self.revision_cache.batch_get(&self.object_id)?; + async fn load(&self) -> Result<(Vec, i64), FlowyError> + where + C: RevisionCompact, + { + let records = self.cache.read().await.batch_get(&self.object_id)?; let revisions: Vec; let mut rev_id = 0; if records.is_empty() { let remote_revisions = self.cloud.fetch_object(&self.user_id, &self.object_id).await?; for revision in &remote_revisions { rev_id = revision.rev_id; - let _ = self - .revision_cache - .add(revision.clone(), RevisionState::Ack, true) - .await?; + let _ = self.cache.read().await.add_ack_revision(revision).await?; } revisions = remote_revisions; } else { - for record in records.clone() { - let f = || async { - rev_id = record.revision.rev_id; - if record.state == RevisionState::Sync { - // Sync the records if their state is RevisionState::Sync. - let _ = self.revision_sync_seq.add_revision_record(record.clone()).await?; - let _ = self.revision_cache.add(record.revision, record.state, false).await?; - } - Ok::<(), FlowyError>(()) - }; - match f().await { - Ok(_) => {} - Err(e) => tracing::error!("[RevisionLoader]: {}", e), + for record in &records { + rev_id = record.revision.rev_id; + if record.state == RevisionState::Sync { + // Sync the records if their state is RevisionState::Sync. + let _ = self.cache.write().await.add_sync_revision(&record.revision).await?; } } - revisions = records.into_iter().map(|record| record.revision).collect::<_>(); } @@ -271,25 +352,3 @@ impl RevisionLoader { Ok((revisions, rev_id)) } } - -#[cfg(feature = "flowy_unit_test")] -impl RevisionSyncSequence { - #[allow(dead_code)] - pub fn revs_map(&self) -> Arc> { - self.revs_map.clone() - } - #[allow(dead_code)] - pub fn pending_revs(&self) -> Arc>> { - self.local_revs.clone() - } -} - -#[cfg(feature = "flowy_unit_test")] -impl RevisionManager { - pub fn revision_cache(&self) -> Arc { - self.revision_cache.clone() - } - pub fn revision_ack_receiver(&self) -> tokio::sync::broadcast::Receiver { - self.revision_ack_notifier.subscribe() - } -} diff --git a/frontend/rust-lib/flowy-sync/src/ws_manager.rs b/frontend/rust-lib/flowy-sync/src/ws_manager.rs index 850bd2fb33..36a78f06c7 100755 --- a/frontend/rust-lib/flowy-sync/src/ws_manager.rs +++ b/frontend/rust-lib/flowy-sync/src/ws_manager.rs @@ -300,6 +300,13 @@ impl RevisionWSSink { } } +async fn tick(sender: mpsc::Sender<()>, duration: Duration) { + let mut interval = interval(duration); + while sender.send(()).await.is_ok() { + interval.tick().await; + } +} + impl std::fmt::Display for RevisionWSSink { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!("{}RevisionWSSink", self.object_name)) @@ -312,13 +319,6 @@ impl std::ops::Drop for RevisionWSSink { } } -async fn tick(sender: mpsc::Sender<()>, duration: Duration) { - let mut interval = interval(duration); - while sender.send(()).await.is_ok() { - interval.tick().await; - } -} - #[derive(Clone)] enum Source { Custom, diff --git a/shared-lib/flowy-collaboration/src/client_folder/builder.rs b/shared-lib/flowy-collaboration/src/client_folder/builder.rs index 3e33189b8f..3f70290536 100644 --- a/shared-lib/flowy-collaboration/src/client_folder/builder.rs +++ b/shared-lib/flowy-collaboration/src/client_folder/builder.rs @@ -1,10 +1,12 @@ +use crate::entities::folder_info::FolderDelta; +use crate::util::make_delta_from_revisions; use crate::{ client_folder::{default_folder_delta, FolderPad}, entities::revision::Revision, errors::{CollaborateError, CollaborateResult}, }; use flowy_core_data_model::entities::{trash::Trash, workspace::Workspace}; -use lib_ot::core::{OperationTransformable, PlainDelta, PlainDeltaBuilder}; +use lib_ot::core::{PlainAttributes, PlainDelta, PlainDeltaBuilder}; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -45,15 +47,7 @@ impl FolderPadBuilder { } pub(crate) fn build_with_revisions(self, revisions: Vec) -> CollaborateResult { - let mut folder_delta = PlainDelta::new(); - for revision in revisions { - if revision.delta_data.is_empty() { - tracing::warn!("revision delta_data is empty"); - } - - let delta = PlainDelta::from_bytes(revision.delta_data)?; - folder_delta = folder_delta.compose(&delta)?; - } + let folder_delta: FolderDelta = make_delta_from_revisions::(revisions)?; self.build_with_delta(folder_delta) } diff --git a/shared-lib/flowy-collaboration/src/client_folder/folder_pad.rs b/shared-lib/flowy-collaboration/src/client_folder/folder_pad.rs index f1d914bc85..b751656caa 100644 --- a/shared-lib/flowy-collaboration/src/client_folder/folder_pad.rs +++ b/shared-lib/flowy-collaboration/src/client_folder/folder_pad.rs @@ -8,7 +8,7 @@ use crate::{ }; use dissimilar::*; use flowy_core_data_model::entities::{app::App, trash::Trash, view::View, workspace::Workspace}; -use lib_ot::core::{Delta, FlowyStr, OperationTransformable, PlainDeltaBuilder, PlainTextAttributes}; +use lib_ot::core::{Delta, FlowyStr, OperationTransformable, PlainAttributes, PlainDeltaBuilder}; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -285,6 +285,11 @@ impl FolderPad { pub fn md5(&self) -> String { md5(&self.root.to_bytes()) } + + pub fn to_json(&self) -> CollaborateResult { + serde_json::to_string(self) + .map_err(|e| CollaborateError::internal().context(format!("serial trash to json failed: {}", e))) + } } impl FolderPad { @@ -372,14 +377,9 @@ impl FolderPad { } }) } - - fn to_json(&self) -> CollaborateResult { - serde_json::to_string(self) - .map_err(|e| CollaborateError::internal().context(format!("serial trash to json failed: {}", e))) - } } -fn cal_diff(old: String, new: String) -> Delta { +fn cal_diff(old: String, new: String) -> Delta { let chunks = dissimilar::diff(&old, &new); let mut delta_builder = PlainDeltaBuilder::new(); for chunk in &chunks { diff --git a/shared-lib/flowy-collaboration/src/entities/revision.rs b/shared-lib/flowy-collaboration/src/entities/revision.rs index 21ec8630f8..ec1ce54ebf 100644 --- a/shared-lib/flowy-collaboration/src/entities/revision.rs +++ b/shared-lib/flowy-collaboration/src/entities/revision.rs @@ -173,12 +173,9 @@ impl std::fmt::Display for RevId { #[derive(Debug, Clone, Default, ProtoBuf)] pub struct RevisionRange { #[pb(index = 1)] - pub object_id: String, - - #[pb(index = 2)] pub start: i64, - #[pb(index = 3)] + #[pb(index = 2)] pub end: i64, } @@ -203,9 +200,13 @@ impl RevisionRange { } pub fn iter(&self) -> RangeInclusive { - debug_assert!(self.start != self.end); + // debug_assert!(self.start != self.end); RangeInclusive::new(self.start, self.end) } + + pub fn to_rev_ids(&self) -> Vec { + self.iter().collect::>() + } } #[inline] diff --git a/shared-lib/flowy-collaboration/src/protobuf/model/revision.rs b/shared-lib/flowy-collaboration/src/protobuf/model/revision.rs index 312ea9279e..3283a4a6be 100644 --- a/shared-lib/flowy-collaboration/src/protobuf/model/revision.rs +++ b/shared-lib/flowy-collaboration/src/protobuf/model/revision.rs @@ -730,7 +730,6 @@ impl ::protobuf::reflect::ProtobufValue for RevId { #[derive(PartialEq,Clone,Default)] pub struct RevisionRange { // message fields - pub object_id: ::std::string::String, pub start: i64, pub end: i64, // special fields @@ -749,33 +748,7 @@ impl RevisionRange { ::std::default::Default::default() } - // string object_id = 1; - - - pub fn get_object_id(&self) -> &str { - &self.object_id - } - pub fn clear_object_id(&mut self) { - self.object_id.clear(); - } - - // Param is passed by value, moved - pub fn set_object_id(&mut self, v: ::std::string::String) { - self.object_id = v; - } - - // Mutable pointer to the field. - // If field is not initialized, it is initialized with default value first. - pub fn mut_object_id(&mut self) -> &mut ::std::string::String { - &mut self.object_id - } - - // Take field - pub fn take_object_id(&mut self) -> ::std::string::String { - ::std::mem::replace(&mut self.object_id, ::std::string::String::new()) - } - - // int64 start = 2; + // int64 start = 1; pub fn get_start(&self) -> i64 { @@ -790,7 +763,7 @@ impl RevisionRange { self.start = v; } - // int64 end = 3; + // int64 end = 2; pub fn get_end(&self) -> i64 { @@ -816,16 +789,13 @@ impl ::protobuf::Message for RevisionRange { let (field_number, wire_type) = is.read_tag_unpack()?; match field_number { 1 => { - ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.object_id)?; - }, - 2 => { if wire_type != ::protobuf::wire_format::WireTypeVarint { return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); } let tmp = is.read_int64()?; self.start = tmp; }, - 3 => { + 2 => { if wire_type != ::protobuf::wire_format::WireTypeVarint { return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); } @@ -844,14 +814,11 @@ impl ::protobuf::Message for RevisionRange { #[allow(unused_variables)] fn compute_size(&self) -> u32 { let mut my_size = 0; - if !self.object_id.is_empty() { - my_size += ::protobuf::rt::string_size(1, &self.object_id); - } if self.start != 0 { - my_size += ::protobuf::rt::value_size(2, self.start, ::protobuf::wire_format::WireTypeVarint); + my_size += ::protobuf::rt::value_size(1, self.start, ::protobuf::wire_format::WireTypeVarint); } if self.end != 0 { - my_size += ::protobuf::rt::value_size(3, self.end, ::protobuf::wire_format::WireTypeVarint); + my_size += ::protobuf::rt::value_size(2, self.end, ::protobuf::wire_format::WireTypeVarint); } my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); self.cached_size.set(my_size); @@ -859,14 +826,11 @@ impl ::protobuf::Message for RevisionRange { } fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { - if !self.object_id.is_empty() { - os.write_string(1, &self.object_id)?; - } if self.start != 0 { - os.write_int64(2, self.start)?; + os.write_int64(1, self.start)?; } if self.end != 0 { - os.write_int64(3, self.end)?; + os.write_int64(2, self.end)?; } os.write_unknown_fields(self.get_unknown_fields())?; ::std::result::Result::Ok(()) @@ -906,11 +870,6 @@ impl ::protobuf::Message for RevisionRange { static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; descriptor.get(|| { let mut fields = ::std::vec::Vec::new(); - fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( - "object_id", - |m: &RevisionRange| { &m.object_id }, - |m: &mut RevisionRange| { &mut m.object_id }, - )); fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>( "start", |m: &RevisionRange| { &m.start }, @@ -937,7 +896,6 @@ impl ::protobuf::Message for RevisionRange { impl ::protobuf::Clear for RevisionRange { fn clear(&mut self) { - self.object_id.clear(); self.start = 0; self.end = 0; self.unknown_fields.clear(); @@ -1065,60 +1023,57 @@ static file_descriptor_proto_data: &'static [u8] = b"\ evTypeR\x02ty\x12\x17\n\x07user_id\x18\x07\x20\x01(\tR\x06userId\"3\n\ \x10RepeatedRevision\x12\x1f\n\x05items\x18\x01\x20\x03(\x0b2\t.Revision\ R\x05items\"\x1d\n\x05RevId\x12\x14\n\x05value\x18\x01\x20\x01(\x03R\x05\ - value\"T\n\rRevisionRange\x12\x1b\n\tobject_id\x18\x01\x20\x01(\tR\x08ob\ - jectId\x12\x14\n\x05start\x18\x02\x20\x01(\x03R\x05start\x12\x10\n\x03en\ - d\x18\x03\x20\x01(\x03R\x03end*\"\n\rRevisionState\x12\x08\n\x04Sync\x10\ - \0\x12\x07\n\x03Ack\x10\x01*4\n\x07RevType\x12\x13\n\x0fDeprecatedLocal\ - \x10\0\x12\x14\n\x10DeprecatedRemote\x10\x01J\xe8\x07\n\x06\x12\x04\0\0\ - \x1d\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\n\ - \x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\ - \x03\x03\x04\x1a\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\t\n\x0c\n\ - \x05\x04\0\x02\0\x01\x12\x03\x03\n\x15\n\x0c\n\x05\x04\0\x02\0\x03\x12\ - \x03\x03\x18\x19\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x15\n\x0c\n\ - \x05\x04\0\x02\x01\x05\x12\x03\x04\x04\t\n\x0c\n\x05\x04\0\x02\x01\x01\ - \x12\x03\x04\n\x10\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x13\x14\n\ - \x0b\n\x04\x04\0\x02\x02\x12\x03\x05\x04\x19\n\x0c\n\x05\x04\0\x02\x02\ - \x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x14\n\ - \x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05\x17\x18\n\x0b\n\x04\x04\0\x02\ - \x03\x12\x03\x06\x04\x13\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x06\x04\n\ - \n\x0c\n\x05\x04\0\x02\x03\x01\x12\x03\x06\x0b\x0e\n\x0c\n\x05\x04\0\x02\ - \x03\x03\x12\x03\x06\x11\x12\n\x0b\n\x04\x04\0\x02\x04\x12\x03\x07\x04\ - \x19\n\x0c\n\x05\x04\0\x02\x04\x05\x12\x03\x07\x04\n\n\x0c\n\x05\x04\0\ - \x02\x04\x01\x12\x03\x07\x0b\x14\n\x0c\n\x05\x04\0\x02\x04\x03\x12\x03\ - \x07\x17\x18\n\x0b\n\x04\x04\0\x02\x05\x12\x03\x08\x04\x13\n\x0c\n\x05\ - \x04\0\x02\x05\x06\x12\x03\x08\x04\x0b\n\x0c\n\x05\x04\0\x02\x05\x01\x12\ - \x03\x08\x0c\x0e\n\x0c\n\x05\x04\0\x02\x05\x03\x12\x03\x08\x11\x12\n\x0b\ - \n\x04\x04\0\x02\x06\x12\x03\t\x04\x17\n\x0c\n\x05\x04\0\x02\x06\x05\x12\ - \x03\t\x04\n\n\x0c\n\x05\x04\0\x02\x06\x01\x12\x03\t\x0b\x12\n\x0c\n\x05\ - \x04\0\x02\x06\x03\x12\x03\t\x15\x16\n\n\n\x02\x04\x01\x12\x04\x0b\0\r\ - \x01\n\n\n\x03\x04\x01\x01\x12\x03\x0b\x08\x18\n\x0b\n\x04\x04\x01\x02\0\ - \x12\x03\x0c\x04\x20\n\x0c\n\x05\x04\x01\x02\0\x04\x12\x03\x0c\x04\x0c\n\ - \x0c\n\x05\x04\x01\x02\0\x06\x12\x03\x0c\r\x15\n\x0c\n\x05\x04\x01\x02\0\ - \x01\x12\x03\x0c\x16\x1b\n\x0c\n\x05\x04\x01\x02\0\x03\x12\x03\x0c\x1e\ - \x1f\n\n\n\x02\x04\x02\x12\x04\x0e\0\x10\x01\n\n\n\x03\x04\x02\x01\x12\ - \x03\x0e\x08\r\n\x0b\n\x04\x04\x02\x02\0\x12\x03\x0f\x04\x14\n\x0c\n\x05\ - \x04\x02\x02\0\x05\x12\x03\x0f\x04\t\n\x0c\n\x05\x04\x02\x02\0\x01\x12\ - \x03\x0f\n\x0f\n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03\x0f\x12\x13\n\n\n\ - \x02\x04\x03\x12\x04\x11\0\x15\x01\n\n\n\x03\x04\x03\x01\x12\x03\x11\x08\ - \x15\n\x0b\n\x04\x04\x03\x02\0\x12\x03\x12\x04\x19\n\x0c\n\x05\x04\x03\ - \x02\0\x05\x12\x03\x12\x04\n\n\x0c\n\x05\x04\x03\x02\0\x01\x12\x03\x12\ - \x0b\x14\n\x0c\n\x05\x04\x03\x02\0\x03\x12\x03\x12\x17\x18\n\x0b\n\x04\ - \x04\x03\x02\x01\x12\x03\x13\x04\x14\n\x0c\n\x05\x04\x03\x02\x01\x05\x12\ - \x03\x13\x04\t\n\x0c\n\x05\x04\x03\x02\x01\x01\x12\x03\x13\n\x0f\n\x0c\n\ - \x05\x04\x03\x02\x01\x03\x12\x03\x13\x12\x13\n\x0b\n\x04\x04\x03\x02\x02\ - \x12\x03\x14\x04\x12\n\x0c\n\x05\x04\x03\x02\x02\x05\x12\x03\x14\x04\t\n\ - \x0c\n\x05\x04\x03\x02\x02\x01\x12\x03\x14\n\r\n\x0c\n\x05\x04\x03\x02\ - \x02\x03\x12\x03\x14\x10\x11\n\n\n\x02\x05\0\x12\x04\x16\0\x19\x01\n\n\n\ - \x03\x05\0\x01\x12\x03\x16\x05\x12\n\x0b\n\x04\x05\0\x02\0\x12\x03\x17\ - \x04\r\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x17\x04\x08\n\x0c\n\x05\x05\0\ - \x02\0\x02\x12\x03\x17\x0b\x0c\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x18\x04\ - \x0c\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x18\x04\x07\n\x0c\n\x05\x05\0\ - \x02\x01\x02\x12\x03\x18\n\x0b\n\n\n\x02\x05\x01\x12\x04\x1a\0\x1d\x01\n\ - \n\n\x03\x05\x01\x01\x12\x03\x1a\x05\x0c\n\x0b\n\x04\x05\x01\x02\0\x12\ - \x03\x1b\x04\x18\n\x0c\n\x05\x05\x01\x02\0\x01\x12\x03\x1b\x04\x13\n\x0c\ - \n\x05\x05\x01\x02\0\x02\x12\x03\x1b\x16\x17\n\x0b\n\x04\x05\x01\x02\x01\ - \x12\x03\x1c\x04\x19\n\x0c\n\x05\x05\x01\x02\x01\x01\x12\x03\x1c\x04\x14\ - \n\x0c\n\x05\x05\x01\x02\x01\x02\x12\x03\x1c\x17\x18b\x06proto3\ + value\"7\n\rRevisionRange\x12\x14\n\x05start\x18\x01\x20\x01(\x03R\x05st\ + art\x12\x10\n\x03end\x18\x02\x20\x01(\x03R\x03end*\"\n\rRevisionState\ + \x12\x08\n\x04Sync\x10\0\x12\x07\n\x03Ack\x10\x01*4\n\x07RevType\x12\x13\ + \n\x0fDeprecatedLocal\x10\0\x12\x14\n\x10DeprecatedRemote\x10\x01J\xb1\ + \x07\n\x06\x12\x04\0\0\x1c\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\ + \x04\0\x12\x04\x02\0\n\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x10\n\x0b\ + \n\x04\x04\0\x02\0\x12\x03\x03\x04\x1a\n\x0c\n\x05\x04\0\x02\0\x05\x12\ + \x03\x03\x04\t\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\n\x15\n\x0c\n\x05\ + \x04\0\x02\0\x03\x12\x03\x03\x18\x19\n\x0b\n\x04\x04\0\x02\x01\x12\x03\ + \x04\x04\x15\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\x04\t\n\x0c\n\x05\ + \x04\0\x02\x01\x01\x12\x03\x04\n\x10\n\x0c\n\x05\x04\0\x02\x01\x03\x12\ + \x03\x04\x13\x14\n\x0b\n\x04\x04\0\x02\x02\x12\x03\x05\x04\x19\n\x0c\n\ + \x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\ + \x12\x03\x05\n\x14\n\x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05\x17\x18\n\ + \x0b\n\x04\x04\0\x02\x03\x12\x03\x06\x04\x13\n\x0c\n\x05\x04\0\x02\x03\ + \x05\x12\x03\x06\x04\n\n\x0c\n\x05\x04\0\x02\x03\x01\x12\x03\x06\x0b\x0e\ + \n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\x06\x11\x12\n\x0b\n\x04\x04\0\x02\ + \x04\x12\x03\x07\x04\x19\n\x0c\n\x05\x04\0\x02\x04\x05\x12\x03\x07\x04\n\ + \n\x0c\n\x05\x04\0\x02\x04\x01\x12\x03\x07\x0b\x14\n\x0c\n\x05\x04\0\x02\ + \x04\x03\x12\x03\x07\x17\x18\n\x0b\n\x04\x04\0\x02\x05\x12\x03\x08\x04\ + \x13\n\x0c\n\x05\x04\0\x02\x05\x06\x12\x03\x08\x04\x0b\n\x0c\n\x05\x04\0\ + \x02\x05\x01\x12\x03\x08\x0c\x0e\n\x0c\n\x05\x04\0\x02\x05\x03\x12\x03\ + \x08\x11\x12\n\x0b\n\x04\x04\0\x02\x06\x12\x03\t\x04\x17\n\x0c\n\x05\x04\ + \0\x02\x06\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\0\x02\x06\x01\x12\x03\t\ + \x0b\x12\n\x0c\n\x05\x04\0\x02\x06\x03\x12\x03\t\x15\x16\n\n\n\x02\x04\ + \x01\x12\x04\x0b\0\r\x01\n\n\n\x03\x04\x01\x01\x12\x03\x0b\x08\x18\n\x0b\ + \n\x04\x04\x01\x02\0\x12\x03\x0c\x04\x20\n\x0c\n\x05\x04\x01\x02\0\x04\ + \x12\x03\x0c\x04\x0c\n\x0c\n\x05\x04\x01\x02\0\x06\x12\x03\x0c\r\x15\n\ + \x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x0c\x16\x1b\n\x0c\n\x05\x04\x01\x02\ + \0\x03\x12\x03\x0c\x1e\x1f\n\n\n\x02\x04\x02\x12\x04\x0e\0\x10\x01\n\n\n\ + \x03\x04\x02\x01\x12\x03\x0e\x08\r\n\x0b\n\x04\x04\x02\x02\0\x12\x03\x0f\ + \x04\x14\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0f\x04\t\n\x0c\n\x05\x04\ + \x02\x02\0\x01\x12\x03\x0f\n\x0f\n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03\ + \x0f\x12\x13\n\n\n\x02\x04\x03\x12\x04\x11\0\x14\x01\n\n\n\x03\x04\x03\ + \x01\x12\x03\x11\x08\x15\n\x0b\n\x04\x04\x03\x02\0\x12\x03\x12\x04\x14\n\ + \x0c\n\x05\x04\x03\x02\0\x05\x12\x03\x12\x04\t\n\x0c\n\x05\x04\x03\x02\0\ + \x01\x12\x03\x12\n\x0f\n\x0c\n\x05\x04\x03\x02\0\x03\x12\x03\x12\x12\x13\ + \n\x0b\n\x04\x04\x03\x02\x01\x12\x03\x13\x04\x12\n\x0c\n\x05\x04\x03\x02\ + \x01\x05\x12\x03\x13\x04\t\n\x0c\n\x05\x04\x03\x02\x01\x01\x12\x03\x13\n\ + \r\n\x0c\n\x05\x04\x03\x02\x01\x03\x12\x03\x13\x10\x11\n\n\n\x02\x05\0\ + \x12\x04\x15\0\x18\x01\n\n\n\x03\x05\0\x01\x12\x03\x15\x05\x12\n\x0b\n\ + \x04\x05\0\x02\0\x12\x03\x16\x04\r\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\ + \x16\x04\x08\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x16\x0b\x0c\n\x0b\n\x04\ + \x05\0\x02\x01\x12\x03\x17\x04\x0c\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\ + \x17\x04\x07\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x17\n\x0b\n\n\n\x02\ + \x05\x01\x12\x04\x19\0\x1c\x01\n\n\n\x03\x05\x01\x01\x12\x03\x19\x05\x0c\ + \n\x0b\n\x04\x05\x01\x02\0\x12\x03\x1a\x04\x18\n\x0c\n\x05\x05\x01\x02\0\ + \x01\x12\x03\x1a\x04\x13\n\x0c\n\x05\x05\x01\x02\0\x02\x12\x03\x1a\x16\ + \x17\n\x0b\n\x04\x05\x01\x02\x01\x12\x03\x1b\x04\x19\n\x0c\n\x05\x05\x01\ + \x02\x01\x01\x12\x03\x1b\x04\x14\n\x0c\n\x05\x05\x01\x02\x01\x02\x12\x03\ + \x1b\x17\x18b\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/shared-lib/flowy-collaboration/src/protobuf/proto/revision.proto b/shared-lib/flowy-collaboration/src/protobuf/proto/revision.proto index 1de427323b..34c37409ab 100644 --- a/shared-lib/flowy-collaboration/src/protobuf/proto/revision.proto +++ b/shared-lib/flowy-collaboration/src/protobuf/proto/revision.proto @@ -16,9 +16,8 @@ message RevId { int64 value = 1; } message RevisionRange { - string object_id = 1; - int64 start = 2; - int64 end = 3; + int64 start = 1; + int64 end = 2; } enum RevisionState { Sync = 0; diff --git a/shared-lib/flowy-collaboration/src/server_folder/folder_manager.rs b/shared-lib/flowy-collaboration/src/server_folder/folder_manager.rs index 96b7cb71cb..b81cb7fb91 100644 --- a/shared-lib/flowy-collaboration/src/server_folder/folder_manager.rs +++ b/shared-lib/flowy-collaboration/src/server_folder/folder_manager.rs @@ -12,7 +12,7 @@ use crate::{ use async_stream::stream; use futures::stream::StreamExt; use lib_infra::future::BoxResultFuture; -use lib_ot::core::PlainTextAttributes; +use lib_ot::core::PlainAttributes; use std::{collections::HashMap, fmt::Debug, sync::Arc}; use tokio::{ sync::{mpsc, oneshot, RwLock}, @@ -187,7 +187,7 @@ impl ServerFolderManager { } } -type FolderRevisionSynchronizer = RevisionSynchronizer; +type FolderRevisionSynchronizer = RevisionSynchronizer; struct OpenFolderHandler { folder_id: String, diff --git a/shared-lib/flowy-collaboration/src/server_folder/folder_pad.rs b/shared-lib/flowy-collaboration/src/server_folder/folder_pad.rs index 3c7c002ef6..811974ae44 100644 --- a/shared-lib/flowy-collaboration/src/server_folder/folder_pad.rs +++ b/shared-lib/flowy-collaboration/src/server_folder/folder_pad.rs @@ -1,5 +1,5 @@ use crate::{entities::folder_info::FolderDelta, errors::CollaborateError, synchronizer::RevisionSyncObject}; -use lib_ot::core::{Delta, OperationTransformable, PlainTextAttributes}; +use lib_ot::core::{Delta, OperationTransformable, PlainAttributes}; pub struct ServerFolder { folder_id: String, @@ -15,12 +15,12 @@ impl ServerFolder { } } -impl RevisionSyncObject for ServerFolder { +impl RevisionSyncObject for ServerFolder { fn id(&self) -> &str { &self.folder_id } - fn compose(&mut self, other: &Delta) -> Result<(), CollaborateError> { + fn compose(&mut self, other: &Delta) -> Result<(), CollaborateError> { let new_delta = self.delta.compose(other)?; self.delta = new_delta; Ok(()) @@ -28,8 +28,8 @@ impl RevisionSyncObject for ServerFolder { fn transform( &self, - other: &Delta, - ) -> Result<(Delta, Delta), CollaborateError> { + other: &Delta, + ) -> Result<(Delta, Delta), CollaborateError> { let value = self.delta.transform(other)?; Ok(value) } @@ -38,7 +38,7 @@ impl RevisionSyncObject for ServerFolder { self.delta.to_json() } - fn set_delta(&mut self, new_delta: Delta) { + fn set_delta(&mut self, new_delta: Delta) { self.delta = new_delta; } } diff --git a/shared-lib/flowy-collaboration/src/synchronizer.rs b/shared-lib/flowy-collaboration/src/synchronizer.rs index 75eee6b603..78e11ed565 100644 --- a/shared-lib/flowy-collaboration/src/synchronizer.rs +++ b/shared-lib/flowy-collaboration/src/synchronizer.rs @@ -118,7 +118,6 @@ where } else { // The server delta is outdated, pull the missing revision from the client. let range = RevisionRange { - object_id: self.object_id.clone(), start: server_rev_id, end: first_revision.rev_id, }; diff --git a/shared-lib/flowy-collaboration/src/util.rs b/shared-lib/flowy-collaboration/src/util.rs index 0893073b4d..5b8107a1c9 100644 --- a/shared-lib/flowy-collaboration/src/util.rs +++ b/shared-lib/flowy-collaboration/src/util.rs @@ -72,6 +72,10 @@ where { let mut delta = Delta::::new(); for revision in revisions { + if revision.delta_data.is_empty() { + tracing::warn!("revision delta_data is empty"); + } + let revision_delta = Delta::::from_bytes(revision.delta_data).map_err(|e| { let err_msg = format!("Deserialize remote revision failed: {:?}", e); CollaborateError::internal().context(err_msg) diff --git a/shared-lib/flowy-core-data-model/src/entities/app/app_create.rs b/shared-lib/flowy-core-data-model/src/entities/app/app_create.rs deleted file mode 100644 index 87c58090aa..0000000000 --- a/shared-lib/flowy-core-data-model/src/entities/app/app_create.rs +++ /dev/null @@ -1,113 +0,0 @@ -use crate::{ - entities::view::RepeatedView, - errors::*, - impl_def_and_def_mut, - parser::{ - app::{AppColorStyle, AppName}, - workspace::WorkspaceIdentify, - }, -}; -use flowy_derive::ProtoBuf; -use std::convert::TryInto; - -#[derive(ProtoBuf, Default)] -pub struct CreateAppRequest { - #[pb(index = 1)] - pub workspace_id: String, - - #[pb(index = 2)] - pub name: String, - - #[pb(index = 3)] - pub desc: String, - - #[pb(index = 4)] - pub color_style: ColorStyle, -} - -#[derive(ProtoBuf, Default, Debug, Clone)] -pub struct ColorStyle { - #[pb(index = 1)] - pub theme_color: String, -} - -#[derive(ProtoBuf, Default, Debug)] -pub struct CreateAppParams { - #[pb(index = 1)] - pub workspace_id: String, - - #[pb(index = 2)] - pub name: String, - - #[pb(index = 3)] - pub desc: String, - - #[pb(index = 4)] - pub color_style: ColorStyle, -} - -impl TryInto for CreateAppRequest { - type Error = ErrorCode; - - fn try_into(self) -> Result { - let name = AppName::parse(self.name)?; - let id = WorkspaceIdentify::parse(self.workspace_id)?; - let color_style = AppColorStyle::parse(self.color_style.theme_color.clone())?; - - Ok(CreateAppParams { - workspace_id: id.0, - name: name.0, - desc: self.desc, - color_style: color_style.into(), - }) - } -} - -impl std::convert::From for ColorStyle { - fn from(data: AppColorStyle) -> Self { - ColorStyle { - theme_color: data.theme_color, - } - } -} - -#[derive(PartialEq, ProtoBuf, Default, Debug, Clone)] -pub struct App { - #[pb(index = 1)] - pub id: String, - - #[pb(index = 2)] - pub workspace_id: String, - - #[pb(index = 3)] - pub name: String, - - #[pb(index = 4)] - pub desc: String, - - #[pb(index = 5)] - pub belongings: RepeatedView, - - #[pb(index = 6)] - pub version: i64, - - #[pb(index = 7)] - pub modified_time: i64, - - #[pb(index = 8)] - pub create_time: i64, -} - -impl App { - pub fn take_belongings(&mut self) -> RepeatedView { - std::mem::take(&mut self.belongings) - } -} - -#[derive(PartialEq, Debug, Default, ProtoBuf, Clone)] -pub struct RepeatedApp { - #[pb(index = 1)] - pub items: Vec, -} - -impl_def_and_def_mut!(RepeatedApp, App); diff --git a/shared-lib/flowy-core-data-model/src/entities/view/view_create.rs b/shared-lib/flowy-core-data-model/src/entities/view/view_create.rs deleted file mode 100644 index 84fcdfb1fb..0000000000 --- a/shared-lib/flowy-core-data-model/src/entities/view/view_create.rs +++ /dev/null @@ -1,177 +0,0 @@ -use crate::{ - entities::trash::{Trash, TrashType}, - errors::ErrorCode, - impl_def_and_def_mut, - parser::{ - app::AppIdentify, - view::{ViewName, ViewThumbnail}, - }, -}; -use flowy_collaboration::document::default::initial_delta_string; -use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; -use std::convert::TryInto; - -#[derive(PartialEq, Debug, ProtoBuf_Enum, Clone)] -pub enum ViewType { - Blank = 0, - Doc = 1, -} - -impl std::default::Default for ViewType { - fn default() -> Self { - ViewType::Blank - } -} - -impl std::convert::From for ViewType { - fn from(val: i32) -> Self { - match val { - 1 => ViewType::Doc, - 0 => ViewType::Blank, - _ => { - log::error!("Invalid view type: {}", val); - ViewType::Blank - } - } - } -} - -#[derive(Default, ProtoBuf)] -pub struct CreateViewRequest { - #[pb(index = 1)] - pub belong_to_id: String, - - #[pb(index = 2)] - pub name: String, - - #[pb(index = 3)] - pub desc: String, - - #[pb(index = 4, one_of)] - pub thumbnail: Option, - - #[pb(index = 5)] - pub view_type: ViewType, -} - -#[derive(Default, ProtoBuf, Debug, Clone)] -pub struct CreateViewParams { - #[pb(index = 1)] - pub belong_to_id: String, - - #[pb(index = 2)] - pub name: String, - - #[pb(index = 3)] - pub desc: String, - - #[pb(index = 4)] - pub thumbnail: String, - - #[pb(index = 5)] - pub view_type: ViewType, - - // ViewType::Doc -> Delta string - #[pb(index = 6)] - pub view_data: String, - - #[pb(index = 7)] - pub view_id: String, -} - -impl CreateViewParams { - pub fn new( - belong_to_id: String, - name: String, - desc: String, - view_type: ViewType, - thumbnail: String, - view_data: String, - view_id: String, - ) -> Self { - Self { - belong_to_id, - name, - desc, - thumbnail, - view_type, - view_data, - view_id, - } - } -} - -impl TryInto for CreateViewRequest { - type Error = ErrorCode; - - fn try_into(self) -> Result { - let name = ViewName::parse(self.name)?.0; - let belong_to_id = AppIdentify::parse(self.belong_to_id)?.0; - let view_data = initial_delta_string(); - let view_id = uuid::Uuid::new_v4().to_string(); - let thumbnail = match self.thumbnail { - None => "".to_string(), - Some(thumbnail) => ViewThumbnail::parse(thumbnail)?.0, - }; - - Ok(CreateViewParams::new( - belong_to_id, - name, - self.desc, - self.view_type, - thumbnail, - view_data, - view_id, - )) - } -} - -#[derive(PartialEq, ProtoBuf, Default, Debug, Clone)] -pub struct View { - #[pb(index = 1)] - pub id: String, - - #[pb(index = 2)] - pub belong_to_id: String, - - #[pb(index = 3)] - pub name: String, - - #[pb(index = 4)] - pub desc: String, - - #[pb(index = 5)] - pub view_type: ViewType, - - #[pb(index = 6)] - pub version: i64, - - #[pb(index = 7)] - pub belongings: RepeatedView, - - #[pb(index = 8)] - pub modified_time: i64, - - #[pb(index = 9)] - pub create_time: i64, -} - -#[derive(PartialEq, Debug, Default, ProtoBuf, Clone)] -pub struct RepeatedView { - #[pb(index = 1)] - pub items: Vec, -} - -impl_def_and_def_mut!(RepeatedView, View); - -impl std::convert::From for Trash { - fn from(view: View) -> Self { - Trash { - id: view.id, - name: view.name, - modified_time: view.modified_time, - create_time: view.create_time, - ty: TrashType::View, - } - } -} diff --git a/shared-lib/flowy-core-data-model/src/entities/view/view_query.rs b/shared-lib/flowy-core-data-model/src/entities/view/view_query.rs deleted file mode 100644 index 797f3ed95c..0000000000 --- a/shared-lib/flowy-core-data-model/src/entities/view/view_query.rs +++ /dev/null @@ -1,67 +0,0 @@ -use crate::{errors::ErrorCode, parser::view::ViewIdentify}; -use flowy_collaboration::entities::doc::DocumentId; -use flowy_derive::ProtoBuf; -use std::convert::TryInto; - -#[derive(Default, ProtoBuf)] -pub struct QueryViewRequest { - #[pb(index = 1)] - pub view_ids: Vec, -} - -#[derive(Default, ProtoBuf, Clone, Debug)] -pub struct ViewId { - #[pb(index = 1)] - pub view_id: String, -} - -impl std::convert::From for ViewId { - fn from(view_id: String) -> Self { - ViewId { view_id } - } -} - -impl std::convert::From for DocumentId { - fn from(identifier: ViewId) -> Self { - DocumentId { - doc_id: identifier.view_id, - } - } -} - -impl TryInto for QueryViewRequest { - type Error = ErrorCode; - fn try_into(self) -> Result { - debug_assert!(self.view_ids.len() == 1); - if self.view_ids.len() != 1 { - log::error!("The len of view_ids should be equal to 1"); - return Err(ErrorCode::ViewIdInvalid); - } - - let view_id = self.view_ids.first().unwrap().clone(); - let view_id = ViewIdentify::parse(view_id)?.0; - - Ok(ViewId { view_id }) - } -} - -#[derive(Default, ProtoBuf)] -pub struct RepeatedViewId { - #[pb(index = 1)] - pub items: Vec, -} - -impl TryInto for QueryViewRequest { - type Error = ErrorCode; - - fn try_into(self) -> Result { - let mut view_ids = vec![]; - for view_id in self.view_ids { - let view_id = ViewIdentify::parse(view_id)?.0; - - view_ids.push(view_id); - } - - Ok(RepeatedViewId { items: view_ids }) - } -} diff --git a/shared-lib/flowy-core-data-model/src/entities/workspace/workspace_create.rs b/shared-lib/flowy-core-data-model/src/entities/workspace/workspace_create.rs deleted file mode 100644 index 7f051ea948..0000000000 --- a/shared-lib/flowy-core-data-model/src/entities/workspace/workspace_create.rs +++ /dev/null @@ -1,74 +0,0 @@ -use crate::{ - entities::app::RepeatedApp, - errors::*, - impl_def_and_def_mut, - parser::workspace::{WorkspaceDesc, WorkspaceName}, -}; -use flowy_derive::ProtoBuf; -use std::convert::TryInto; - -#[derive(ProtoBuf, Default)] -pub struct CreateWorkspaceRequest { - #[pb(index = 1)] - pub name: String, - - #[pb(index = 2)] - pub desc: String, -} - -#[derive(Clone, ProtoBuf, Default, Debug)] -pub struct CreateWorkspaceParams { - #[pb(index = 1)] - pub name: String, - - #[pb(index = 2)] - pub desc: String, -} - -impl TryInto for CreateWorkspaceRequest { - type Error = ErrorCode; - - fn try_into(self) -> Result { - let name = WorkspaceName::parse(self.name)?; - let desc = WorkspaceDesc::parse(self.desc)?; - - Ok(CreateWorkspaceParams { - name: name.0, - desc: desc.0, - }) - } -} - -#[derive(PartialEq, ProtoBuf, Default, Debug, Clone)] -pub struct Workspace { - #[pb(index = 1)] - pub id: String, - - #[pb(index = 2)] - pub name: String, - - #[pb(index = 3)] - pub desc: String, - - #[pb(index = 4)] - pub apps: RepeatedApp, - - #[pb(index = 5)] - pub modified_time: i64, - - #[pb(index = 6)] - pub create_time: i64, -} - -impl Workspace { - pub fn take_apps(&mut self) -> RepeatedApp { - std::mem::take(&mut self.apps) - } -} -#[derive(PartialEq, Debug, Default, ProtoBuf)] -pub struct RepeatedWorkspace { - #[pb(index = 1)] - pub items: Vec, -} - -impl_def_and_def_mut!(RepeatedWorkspace, Workspace); diff --git a/shared-lib/flowy-core-data-model/src/entities/workspace/workspace_query.rs b/shared-lib/flowy-core-data-model/src/entities/workspace/workspace_query.rs deleted file mode 100644 index f4f33c166b..0000000000 --- a/shared-lib/flowy-core-data-model/src/entities/workspace/workspace_query.rs +++ /dev/null @@ -1,42 +0,0 @@ -use crate::{errors::*, parser::workspace::WorkspaceIdentify}; -use flowy_derive::ProtoBuf; -use std::convert::TryInto; - -#[derive(Default, ProtoBuf, Clone)] -pub struct QueryWorkspaceRequest { - // return all workspace if workspace_id is None - #[pb(index = 1, one_of)] - pub workspace_id: Option, -} - -impl QueryWorkspaceRequest { - pub fn new(workspace_id: Option) -> Self { - Self { workspace_id } - } -} - -// Read all workspaces if the workspace_id is None -#[derive(Clone, ProtoBuf, Default, Debug)] -pub struct WorkspaceId { - #[pb(index = 1, one_of)] - pub workspace_id: Option, -} - -impl WorkspaceId { - pub fn new(workspace_id: Option) -> Self { - Self { workspace_id } - } -} - -impl TryInto for QueryWorkspaceRequest { - type Error = ErrorCode; - - fn try_into(self) -> Result { - let workspace_id = match self.workspace_id { - None => None, - Some(workspace_id) => Some(WorkspaceIdentify::parse(workspace_id)?.0), - }; - - Ok(WorkspaceId { workspace_id }) - } -} diff --git a/shared-lib/lib-ot/src/core/delta/builder.rs b/shared-lib/lib-ot/src/core/delta/builder.rs index 231409c5d5..2289e3d02a 100644 --- a/shared-lib/lib-ot/src/core/delta/builder.rs +++ b/shared-lib/lib-ot/src/core/delta/builder.rs @@ -1,6 +1,6 @@ -use crate::core::{trim, Attributes, Delta, PlainTextAttributes}; +use crate::core::{trim, Attributes, Delta, PlainAttributes}; -pub type PlainDeltaBuilder = DeltaBuilder; +pub type PlainDeltaBuilder = DeltaBuilder; pub struct DeltaBuilder { delta: Delta, diff --git a/shared-lib/lib-ot/src/core/delta/delta.rs b/shared-lib/lib-ot/src/core/delta/delta.rs index 609320996a..78f067b800 100644 --- a/shared-lib/lib-ot/src/core/delta/delta.rs +++ b/shared-lib/lib-ot/src/core/delta/delta.rs @@ -13,7 +13,7 @@ use std::{ str::FromStr, }; -pub type PlainDelta = Delta; +pub type PlainDelta = Delta; // TODO: optimize the memory usage with Arc::make_mut or Cow #[derive(Clone, Debug, PartialEq, Eq)] diff --git a/shared-lib/lib-ot/src/core/operation/builder.rs b/shared-lib/lib-ot/src/core/operation/builder.rs index dc7b6dd7d9..bcada8ad4e 100644 --- a/shared-lib/lib-ot/src/core/operation/builder.rs +++ b/shared-lib/lib-ot/src/core/operation/builder.rs @@ -1,10 +1,10 @@ use crate::{ - core::{Attributes, Operation, PlainTextAttributes}, + core::{Attributes, Operation, PlainAttributes}, rich_text::RichTextAttributes, }; pub type RichTextOpBuilder = OpBuilder; -pub type PlainTextOpBuilder = OpBuilder; +pub type PlainTextOpBuilder = OpBuilder; pub struct OpBuilder { ty: Operation, diff --git a/shared-lib/lib-ot/src/core/operation/operation.rs b/shared-lib/lib-ot/src/core/operation/operation.rs index a30f884ae6..9dfdea395e 100644 --- a/shared-lib/lib-ot/src/core/operation/operation.rs +++ b/shared-lib/lib-ot/src/core/operation/operation.rs @@ -339,14 +339,14 @@ where } #[derive(Debug, Clone, Eq, PartialEq, Default, Serialize, Deserialize)] -pub struct PlainTextAttributes(); -impl fmt::Display for PlainTextAttributes { +pub struct PlainAttributes(); +impl fmt::Display for PlainAttributes { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("PlainTextAttributes") + f.write_str("PlainAttributes") } } -impl Attributes for PlainTextAttributes { +impl Attributes for PlainAttributes { fn is_empty(&self) -> bool { true } @@ -356,7 +356,7 @@ impl Attributes for PlainTextAttributes { fn extend_other(&mut self, _other: Self) {} } -impl OperationTransformable for PlainTextAttributes { +impl OperationTransformable for PlainAttributes { fn compose(&self, _other: &Self) -> Result { Ok(self.clone()) }