diff --git a/frontend/rust-lib/flowy-document/src/editor/document.rs b/frontend/rust-lib/flowy-document/src/editor/document.rs index bf3ff469c8..8af53a5727 100644 --- a/frontend/rust-lib/flowy-document/src/editor/document.rs +++ b/frontend/rust-lib/flowy-document/src/editor/document.rs @@ -28,9 +28,9 @@ impl Document { } } - pub fn md5(&self) -> String { - // format!("{:x}", md5::compute(bytes)) - "".to_owned() + pub fn document_md5(&self) -> String { + let bytes = self.tree.to_bytes(); + format!("{:x}", md5::compute(&bytes)) } pub fn get_tree(&self) -> &NodeTree { diff --git a/frontend/rust-lib/flowy-document/src/editor/queue.rs b/frontend/rust-lib/flowy-document/src/editor/queue.rs index ef415cb4f1..24448c9bca 100644 --- a/frontend/rust-lib/flowy-document/src/editor/queue.rs +++ b/frontend/rust-lib/flowy-document/src/editor/queue.rs @@ -63,7 +63,7 @@ impl DocumentQueue { Command::ComposeTransaction { transaction, ret } => { self.document.write().await.apply_transaction(transaction.clone())?; let _ = self - .save_local_operations(transaction, self.document.read().await.md5()) + .save_local_operations(transaction, self.document.read().await.document_md5()) .await?; let _ = ret.send(Ok(())); } diff --git a/frontend/rust-lib/flowy-document/src/manager.rs b/frontend/rust-lib/flowy-document/src/manager.rs index ac031ad35b..658e3e0a99 100644 --- a/frontend/rust-lib/flowy-document/src/manager.rs +++ b/frontend/rust-lib/flowy-document/src/manager.rs @@ -12,11 +12,8 @@ use flowy_revision::{ RevisionCloudService, RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence, }; use flowy_sync::client_document::initial_delta_document_content; -use flowy_sync::entities::{ - document::DocumentIdPB, - revision::{md5, RepeatedRevision, Revision}, - ws_data::ServerRevisionWSData, -}; +use flowy_sync::entities::{document::DocumentIdPB, revision::Revision, ws_data::ServerRevisionWSData}; +use flowy_sync::util::md5; use lib_infra::future::FutureResult; use lib_ws::WSConnectState; use std::any::Any; @@ -139,7 +136,7 @@ impl DocumentManager { Ok(()) } - pub async fn create_document>(&self, doc_id: T, revisions: RepeatedRevision) -> FlowyResult<()> { + pub async fn create_document>(&self, doc_id: T, revisions: Vec) -> FlowyResult<()> { let doc_id = doc_id.as_ref().to_owned(); let db_pool = self.persistence.database.db_pool()?; // Maybe we could save the document to disk without creating the RevisionManager diff --git a/frontend/rust-lib/flowy-document/src/old_editor/queue.rs b/frontend/rust-lib/flowy-document/src/old_editor/queue.rs index bfe7336257..063ae4f254 100644 --- a/frontend/rust-lib/flowy-document/src/old_editor/queue.rs +++ b/frontend/rust-lib/flowy-document/src/old_editor/queue.rs @@ -3,7 +3,7 @@ use crate::DocumentUser; use async_stream::stream; use flowy_database::ConnectionPool; use flowy_error::FlowyError; -use flowy_revision::{OperationsMD5, RevisionManager, TransformOperations}; +use flowy_revision::{RevisionMD5, RevisionManager, TransformOperations}; use flowy_sync::{ client_document::{history::UndoResult, ClientDocument}, entities::revision::{RevId, Revision}, @@ -70,7 +70,7 @@ impl EditDocumentQueue { EditorCommand::ComposeLocalOperations { operations, ret } => { let mut document = self.document.write().await; let _ = document.compose_operations(operations.clone())?; - let md5 = document.md5(); + let md5 = document.document_md5(); drop(document); let _ = self.save_local_operations(operations, md5).await?; let _ = ret.send(Ok(())); @@ -78,16 +78,16 @@ impl EditDocumentQueue { EditorCommand::ComposeRemoteOperation { client_operations, ret } => { let mut document = self.document.write().await; let _ = document.compose_operations(client_operations.clone())?; - let md5 = document.md5(); + let md5 = document.document_md5(); drop(document); - let _ = ret.send(Ok(md5)); + let _ = ret.send(Ok(md5.into())); } EditorCommand::ResetOperations { operations, ret } => { let mut document = self.document.write().await; let _ = document.set_operations(operations); - let md5 = document.md5(); + let md5 = document.document_md5(); drop(document); - let _ = ret.send(Ok(md5)); + let _ = ret.send(Ok(md5.into())); } EditorCommand::TransformOperations { operations, ret } => { let f = || async { @@ -114,14 +114,14 @@ impl EditDocumentQueue { EditorCommand::Insert { index, data, ret } => { let mut write_guard = self.document.write().await; let operations = write_guard.insert(index, data)?; - let md5 = write_guard.md5(); + let md5 = write_guard.document_md5(); let _ = self.save_local_operations(operations, md5).await?; let _ = ret.send(Ok(())); } EditorCommand::Delete { interval, ret } => { let mut write_guard = self.document.write().await; let operations = write_guard.delete(interval)?; - let md5 = write_guard.md5(); + let md5 = write_guard.document_md5(); let _ = self.save_local_operations(operations, md5).await?; let _ = ret.send(Ok(())); } @@ -132,14 +132,14 @@ impl EditDocumentQueue { } => { let mut write_guard = self.document.write().await; let operations = write_guard.format(interval, attribute)?; - let md5 = write_guard.md5(); + let md5 = write_guard.document_md5(); let _ = self.save_local_operations(operations, md5).await?; let _ = ret.send(Ok(())); } EditorCommand::Replace { interval, data, ret } => { let mut write_guard = self.document.write().await; let operations = write_guard.replace(interval, data)?; - let md5 = write_guard.md5(); + let md5 = write_guard.document_md5(); let _ = self.save_local_operations(operations, md5).await?; let _ = ret.send(Ok(())); } @@ -152,14 +152,14 @@ impl EditDocumentQueue { EditorCommand::Undo { ret } => { let mut write_guard = self.document.write().await; let UndoResult { operations } = write_guard.undo()?; - let md5 = write_guard.md5(); + let md5 = write_guard.document_md5(); let _ = self.save_local_operations(operations, md5).await?; let _ = ret.send(Ok(())); } EditorCommand::Redo { ret } => { let mut write_guard = self.document.write().await; let UndoResult { operations } = write_guard.redo()?; - let md5 = write_guard.md5(); + let md5 = write_guard.document_md5(); let _ = self.save_local_operations(operations, md5).await?; let _ = ret.send(Ok(())); } @@ -197,11 +197,11 @@ pub(crate) enum EditorCommand { }, ComposeRemoteOperation { client_operations: DeltaTextOperations, - ret: Ret, + ret: Ret, }, ResetOperations { operations: DeltaTextOperations, - ret: Ret, + ret: Ret, }, TransformOperations { operations: DeltaTextOperations, diff --git a/frontend/rust-lib/flowy-document/src/old_editor/web_socket.rs b/frontend/rust-lib/flowy-document/src/old_editor/web_socket.rs index 9b9a870f1f..bbaa1c511f 100644 --- a/frontend/rust-lib/flowy-document/src/old_editor/web_socket.rs +++ b/frontend/rust-lib/flowy-document/src/old_editor/web_socket.rs @@ -136,7 +136,7 @@ impl ConflictResolver for DocumentConflictResolv fn compose_operations( &self, operations: DeltaDocumentResolveOperations, - ) -> BoxResultFuture { + ) -> BoxResultFuture { let tx = self.edit_cmd_tx.clone(); let operations = operations.into_inner(); Box::pin(async move { @@ -172,10 +172,7 @@ impl ConflictResolver for DocumentConflictResolv }) } - fn reset_operations( - &self, - operations: DeltaDocumentResolveOperations, - ) -> BoxResultFuture { + fn reset_operations(&self, operations: DeltaDocumentResolveOperations) -> BoxResultFuture { let tx = self.edit_cmd_tx.clone(); let operations = operations.into_inner(); Box::pin(async move { diff --git a/frontend/rust-lib/flowy-document/src/services/migration.rs b/frontend/rust-lib/flowy-document/src/services/migration.rs index 6dc8bed07d..d922486e47 100644 --- a/frontend/rust-lib/flowy-document/src/services/migration.rs +++ b/frontend/rust-lib/flowy-document/src/services/migration.rs @@ -4,9 +4,9 @@ use crate::DocumentDatabase; use bytes::Bytes; use flowy_database::kv::KV; use flowy_error::FlowyResult; -use flowy_revision::disk::{RevisionDiskCache, RevisionRecord}; -use flowy_sync::entities::revision::{md5, Revision}; -use flowy_sync::util::make_operations_from_revisions; +use flowy_revision::disk::{RevisionDiskCache, SyncRecord}; +use flowy_sync::entities::revision::Revision; +use flowy_sync::util::{make_operations_from_revisions, md5}; use std::sync::Arc; const V1_MIGRATION: &str = "DOCUMENT_V1_MIGRATION"; @@ -44,7 +44,7 @@ impl DocumentMigration { let bytes = Bytes::from(transaction.to_bytes()?); let md5 = format!("{:x}", md5::compute(&bytes)); let revision = Revision::new(&document_id, 0, 1, bytes, &self.user_id, md5); - let record = RevisionRecord::new(revision); + let record = SyncRecord::new(revision); match disk_cache.create_revision_records(vec![record]) { Ok(_) => {} Err(err) => { diff --git a/frontend/rust-lib/flowy-document/src/services/persistence/rev_sqlite/document_rev_sqlite_v0.rs b/frontend/rust-lib/flowy-document/src/services/persistence/rev_sqlite/document_rev_sqlite_v0.rs index 9aa9e2e77b..7a4485861f 100644 --- a/frontend/rust-lib/flowy-document/src/services/persistence/rev_sqlite/document_rev_sqlite_v0.rs +++ b/frontend/rust-lib/flowy-document/src/services/persistence/rev_sqlite/document_rev_sqlite_v0.rs @@ -7,7 +7,7 @@ use flowy_database::{ ConnectionPool, }; use flowy_error::{internal_error, FlowyError, FlowyResult}; -use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionRecord, RevisionState}; +use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionState, SyncRecord}; use flowy_sync::{ entities::revision::{RevType, Revision, RevisionRange}, util::md5, @@ -23,7 +23,7 @@ pub struct SQLiteDeltaDocumentRevisionPersistence { impl RevisionDiskCache> for SQLiteDeltaDocumentRevisionPersistence { type Error = FlowyError; - fn create_revision_records(&self, revision_records: Vec) -> Result<(), Self::Error> { + fn create_revision_records(&self, revision_records: Vec) -> Result<(), Self::Error> { let conn = self.pool.get().map_err(internal_error)?; let _ = DeltaRevisionSql::create(revision_records, &*conn)?; Ok(()) @@ -37,7 +37,7 @@ impl RevisionDiskCache> for SQLiteDeltaDocumentRevisionPersi &self, object_id: &str, rev_ids: Option>, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { let conn = self.pool.get().map_err(internal_error)?; let records = DeltaRevisionSql::read(&self.user_id, object_id, rev_ids, &*conn)?; Ok(records) @@ -47,7 +47,7 @@ impl RevisionDiskCache> for SQLiteDeltaDocumentRevisionPersi &self, object_id: &str, range: &RevisionRange, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { let conn = &*self.pool.get().map_err(internal_error)?; let revisions = DeltaRevisionSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?; Ok(revisions) @@ -74,7 +74,7 @@ impl RevisionDiskCache> for SQLiteDeltaDocumentRevisionPersi &self, object_id: &str, deleted_rev_ids: Option>, - inserted_records: Vec, + inserted_records: Vec, ) -> Result<(), Self::Error> { let conn = self.pool.get().map_err(internal_error)?; conn.immediate_transaction::<_, FlowyError, _>(|| { @@ -97,7 +97,7 @@ impl SQLiteDeltaDocumentRevisionPersistence { pub struct DeltaRevisionSql {} impl DeltaRevisionSql { - fn create(revision_records: Vec, conn: &SqliteConnection) -> Result<(), FlowyError> { + fn create(revision_records: Vec, conn: &SqliteConnection) -> Result<(), FlowyError> { // Batch insert: https://diesel.rs/guides/all-about-inserts.html let records = revision_records @@ -143,7 +143,7 @@ impl DeltaRevisionSql { object_id: &str, rev_ids: Option>, conn: &SqliteConnection, - ) -> Result, FlowyError> { + ) -> Result, FlowyError> { let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(object_id)).into_boxed(); if let Some(rev_ids) = rev_ids { sql = sql.filter(dsl::rev_id.eq_any(rev_ids)); @@ -162,7 +162,7 @@ impl DeltaRevisionSql { object_id: &str, range: RevisionRange, conn: &SqliteConnection, - ) -> Result, FlowyError> { + ) -> Result, FlowyError> { let rev_tables = dsl::rev_table .filter(dsl::rev_id.ge(range.start)) .filter(dsl::rev_id.le(range.end)) @@ -244,7 +244,7 @@ impl std::default::Default for TextRevisionState { } } -fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> RevisionRecord { +fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> SyncRecord { let md5 = md5(&table.data); let revision = Revision::new( &table.doc_id, @@ -254,7 +254,7 @@ fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> Revisio user_id, md5, ); - RevisionRecord { + SyncRecord { revision, state: table.state.into(), write_to_disk: false, diff --git a/frontend/rust-lib/flowy-document/src/services/persistence/rev_sqlite/document_rev_sqlite_v1.rs b/frontend/rust-lib/flowy-document/src/services/persistence/rev_sqlite/document_rev_sqlite_v1.rs index da82a4eee9..d02528a79f 100644 --- a/frontend/rust-lib/flowy-document/src/services/persistence/rev_sqlite/document_rev_sqlite_v1.rs +++ b/frontend/rust-lib/flowy-document/src/services/persistence/rev_sqlite/document_rev_sqlite_v1.rs @@ -7,7 +7,7 @@ use flowy_database::{ ConnectionPool, }; use flowy_error::{internal_error, FlowyError, FlowyResult}; -use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionRecord, RevisionState}; +use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionState, SyncRecord}; use flowy_sync::{ entities::revision::{Revision, RevisionRange}, util::md5, @@ -22,7 +22,7 @@ pub struct SQLiteDocumentRevisionPersistence { impl RevisionDiskCache> for SQLiteDocumentRevisionPersistence { type Error = FlowyError; - fn create_revision_records(&self, revision_records: Vec) -> Result<(), Self::Error> { + fn create_revision_records(&self, revision_records: Vec) -> Result<(), Self::Error> { let conn = self.pool.get().map_err(internal_error)?; let _ = DocumentRevisionSql::create(revision_records, &*conn)?; Ok(()) @@ -36,7 +36,7 @@ impl RevisionDiskCache> for SQLiteDocumentRevisionPersistenc &self, object_id: &str, rev_ids: Option>, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { let conn = self.pool.get().map_err(internal_error)?; let records = DocumentRevisionSql::read(&self.user_id, object_id, rev_ids, &*conn)?; Ok(records) @@ -46,7 +46,7 @@ impl RevisionDiskCache> for SQLiteDocumentRevisionPersistenc &self, object_id: &str, range: &RevisionRange, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { let conn = &*self.pool.get().map_err(internal_error)?; let revisions = DocumentRevisionSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?; Ok(revisions) @@ -73,7 +73,7 @@ impl RevisionDiskCache> for SQLiteDocumentRevisionPersistenc &self, object_id: &str, deleted_rev_ids: Option>, - inserted_records: Vec, + inserted_records: Vec, ) -> Result<(), Self::Error> { let conn = self.pool.get().map_err(internal_error)?; conn.immediate_transaction::<_, FlowyError, _>(|| { @@ -96,7 +96,7 @@ impl SQLiteDocumentRevisionPersistence { struct DocumentRevisionSql {} impl DocumentRevisionSql { - fn create(revision_records: Vec, conn: &SqliteConnection) -> Result<(), FlowyError> { + fn create(revision_records: Vec, conn: &SqliteConnection) -> Result<(), FlowyError> { // Batch insert: https://diesel.rs/guides/all-about-inserts.html let records = revision_records .into_iter() @@ -142,7 +142,7 @@ impl DocumentRevisionSql { object_id: &str, rev_ids: Option>, conn: &SqliteConnection, - ) -> Result, FlowyError> { + ) -> Result, FlowyError> { let mut sql = dsl::document_rev_table .filter(dsl::document_id.eq(object_id)) .into_boxed(); @@ -163,7 +163,7 @@ impl DocumentRevisionSql { object_id: &str, range: RevisionRange, conn: &SqliteConnection, - ) -> Result, FlowyError> { + ) -> Result, FlowyError> { let rev_tables = dsl::document_rev_table .filter(dsl::rev_id.ge(range.start)) .filter(dsl::rev_id.le(range.end)) @@ -220,7 +220,7 @@ impl std::default::Default for DocumentRevisionState { } } -fn mk_revision_record_from_table(user_id: &str, table: DocumentRevisionTable) -> RevisionRecord { +fn mk_revision_record_from_table(user_id: &str, table: DocumentRevisionTable) -> SyncRecord { let md5 = md5(&table.data); let revision = Revision::new( &table.document_id, @@ -230,7 +230,7 @@ fn mk_revision_record_from_table(user_id: &str, table: DocumentRevisionTable) -> user_id, md5, ); - RevisionRecord { + SyncRecord { revision, state: table.state.into(), write_to_disk: false, diff --git a/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs b/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs index 7f363980ab..49ff81f128 100644 --- a/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs +++ b/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs @@ -9,11 +9,12 @@ use flowy_error::{FlowyError, FlowyResult}; use flowy_folder_data_model::revision::{AppRevision, FolderRevision, ViewRevision, WorkspaceRevision}; use flowy_revision::reset::{RevisionResettable, RevisionStructReset}; use flowy_sync::client_folder::make_folder_rev_json_str; +use flowy_sync::client_folder::FolderPad; use flowy_sync::entities::revision::Revision; use flowy_sync::server_folder::FolderOperationsBuilder; -use flowy_sync::{client_folder::FolderPad, entities::revision::md5}; use crate::services::persistence::rev_sqlite::SQLiteFolderRevisionPersistence; +use flowy_sync::util::md5; use std::sync::Arc; const V1_MIGRATION: &str = "FOLDER_V1_MIGRATION"; diff --git a/frontend/rust-lib/flowy-folder/src/services/persistence/mod.rs b/frontend/rust-lib/flowy-folder/src/services/persistence/mod.rs index a0da34aee2..147effff9b 100644 --- a/frontend/rust-lib/flowy-folder/src/services/persistence/mod.rs +++ b/frontend/rust-lib/flowy-folder/src/services/persistence/mod.rs @@ -11,7 +11,7 @@ use crate::{ use flowy_database::ConnectionPool; use flowy_error::{FlowyError, FlowyResult}; use flowy_folder_data_model::revision::{AppRevision, TrashRevision, ViewRevision, WorkspaceRevision}; -use flowy_revision::disk::{RevisionDiskCache, RevisionRecord, RevisionState}; +use flowy_revision::disk::{RevisionDiskCache, RevisionState, SyncRecord}; use flowy_sync::{client_folder::FolderPad, entities::revision::Revision}; use crate::services::persistence::rev_sqlite::SQLiteFolderRevisionPersistence; @@ -112,7 +112,7 @@ impl FolderPersistence { let json = folder.to_json()?; let delta_data = FolderOperationsBuilder::new().insert(&json).build().json_bytes(); let revision = Revision::initial_revision(user_id, folder_id.as_ref(), delta_data); - let record = RevisionRecord { + let record = SyncRecord { revision, state: RevisionState::Sync, write_to_disk: true, diff --git a/frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/folder_rev_sqlite.rs b/frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/folder_rev_sqlite.rs index a80c95ba15..c094d4be8c 100644 --- a/frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/folder_rev_sqlite.rs +++ b/frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/folder_rev_sqlite.rs @@ -7,7 +7,7 @@ use flowy_database::{ ConnectionPool, }; use flowy_error::{internal_error, FlowyError, FlowyResult}; -use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionRecord, RevisionState}; +use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionState, SyncRecord}; use flowy_sync::{ entities::revision::{RevType, Revision, RevisionRange}, util::md5, @@ -23,7 +23,7 @@ pub struct SQLiteFolderRevisionPersistence { impl RevisionDiskCache> for SQLiteFolderRevisionPersistence { type Error = FlowyError; - fn create_revision_records(&self, revision_records: Vec) -> Result<(), Self::Error> { + fn create_revision_records(&self, revision_records: Vec) -> Result<(), Self::Error> { let conn = self.pool.get().map_err(internal_error)?; let _ = FolderRevisionSql::create(revision_records, &*conn)?; Ok(()) @@ -37,7 +37,7 @@ impl RevisionDiskCache> for SQLiteFolderRevisionPersistence &self, object_id: &str, rev_ids: Option>, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { let conn = self.pool.get().map_err(internal_error)?; let records = FolderRevisionSql::read(&self.user_id, object_id, rev_ids, &*conn)?; Ok(records) @@ -47,7 +47,7 @@ impl RevisionDiskCache> for SQLiteFolderRevisionPersistence &self, object_id: &str, range: &RevisionRange, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { let conn = &*self.pool.get().map_err(internal_error)?; let revisions = FolderRevisionSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?; Ok(revisions) @@ -74,7 +74,7 @@ impl RevisionDiskCache> for SQLiteFolderRevisionPersistence &self, object_id: &str, deleted_rev_ids: Option>, - inserted_records: Vec, + inserted_records: Vec, ) -> Result<(), Self::Error> { let conn = self.pool.get().map_err(internal_error)?; conn.immediate_transaction::<_, FlowyError, _>(|| { @@ -97,7 +97,7 @@ impl SQLiteFolderRevisionPersistence { struct FolderRevisionSql {} impl FolderRevisionSql { - fn create(revision_records: Vec, conn: &SqliteConnection) -> Result<(), FlowyError> { + fn create(revision_records: Vec, conn: &SqliteConnection) -> Result<(), FlowyError> { // Batch insert: https://diesel.rs/guides/all-about-inserts.html let records = revision_records @@ -143,7 +143,7 @@ impl FolderRevisionSql { object_id: &str, rev_ids: Option>, conn: &SqliteConnection, - ) -> Result, FlowyError> { + ) -> Result, FlowyError> { let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(object_id)).into_boxed(); if let Some(rev_ids) = rev_ids { sql = sql.filter(dsl::rev_id.eq_any(rev_ids)); @@ -162,7 +162,7 @@ impl FolderRevisionSql { object_id: &str, range: RevisionRange, conn: &SqliteConnection, - ) -> Result, FlowyError> { + ) -> Result, FlowyError> { let rev_tables = dsl::rev_table .filter(dsl::rev_id.ge(range.start)) .filter(dsl::rev_id.le(range.end)) @@ -220,7 +220,7 @@ impl std::default::Default for TextRevisionState { } } -fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> RevisionRecord { +fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> SyncRecord { let md5 = md5(&table.data); let revision = Revision::new( &table.doc_id, @@ -230,7 +230,7 @@ fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> Revisio user_id, md5, ); - RevisionRecord { + SyncRecord { revision, state: table.state.into(), write_to_disk: false, diff --git a/frontend/rust-lib/flowy-folder/src/services/web_socket.rs b/frontend/rust-lib/flowy-folder/src/services/web_socket.rs index 204ebc7f04..106e7ccb4b 100644 --- a/frontend/rust-lib/flowy-folder/src/services/web_socket.rs +++ b/frontend/rust-lib/flowy-folder/src/services/web_socket.rs @@ -78,12 +78,12 @@ struct FolderConflictResolver { } impl ConflictResolver for FolderConflictResolver { - fn compose_operations(&self, operations: FolderResolveOperations) -> BoxResultFuture { + fn compose_operations(&self, operations: FolderResolveOperations) -> BoxResultFuture { let operations = operations.into_inner(); let folder_pad = self.folder_pad.clone(); Box::pin(async move { let md5 = folder_pad.write().compose_remote_operations(operations)?; - Ok(md5) + Ok(md5.into()) }) } @@ -113,11 +113,11 @@ impl ConflictResolver for FolderConflictResolver { }) } - fn reset_operations(&self, operations: FolderResolveOperations) -> BoxResultFuture { + fn reset_operations(&self, operations: FolderResolveOperations) -> BoxResultFuture { let folder_pad = self.folder_pad.clone(); Box::pin(async move { let md5 = folder_pad.write().reset_folder(operations.into_inner())?; - Ok(md5) + Ok(md5.into()) }) } } diff --git a/frontend/rust-lib/flowy-grid/src/manager.rs b/frontend/rust-lib/flowy-grid/src/manager.rs index 9f91f64c98..72c6e808dc 100644 --- a/frontend/rust-lib/flowy-grid/src/manager.rs +++ b/frontend/rust-lib/flowy-grid/src/manager.rs @@ -15,7 +15,7 @@ use flowy_error::{FlowyError, FlowyResult}; use flowy_grid_data_model::revision::{BuildGridContext, GridRevision, GridViewRevision}; use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence}; use flowy_sync::client_grid::{make_grid_block_operations, make_grid_operations, make_grid_view_operations}; -use flowy_sync::entities::revision::{RepeatedRevision, Revision}; +use flowy_sync::entities::revision::Revision; use std::sync::Arc; use tokio::sync::RwLock; @@ -67,7 +67,7 @@ impl GridManager { } #[tracing::instrument(level = "debug", skip_all, err)] - pub async fn create_grid>(&self, grid_id: T, revisions: RepeatedRevision) -> FlowyResult<()> { + pub async fn create_grid>(&self, grid_id: T, revisions: Vec) -> FlowyResult<()> { let grid_id = grid_id.as_ref(); let db_pool = self.grid_user.db_pool()?; let rev_manager = self.make_grid_rev_manager(grid_id, db_pool)?; @@ -77,7 +77,7 @@ impl GridManager { } #[tracing::instrument(level = "debug", skip_all, err)] - async fn create_grid_view>(&self, view_id: T, revisions: RepeatedRevision) -> FlowyResult<()> { + async fn create_grid_view>(&self, view_id: T, revisions: Vec) -> FlowyResult<()> { let view_id = view_id.as_ref(); let rev_manager = make_grid_view_rev_manager(&self.grid_user, view_id).await?; let _ = rev_manager.reset_object(revisions).await?; @@ -85,7 +85,7 @@ impl GridManager { } #[tracing::instrument(level = "debug", skip_all, err)] - pub async fn create_grid_block>(&self, block_id: T, revisions: RepeatedRevision) -> FlowyResult<()> { + pub async fn create_grid_block>(&self, block_id: T, revisions: Vec) -> FlowyResult<()> { let block_id = block_id.as_ref(); let db_pool = self.grid_user.db_pool()?; let rev_manager = self.make_grid_block_rev_manager(block_id, db_pool)?; @@ -208,9 +208,8 @@ pub async fn make_grid_view_data( // Create grid's block let grid_block_delta = make_grid_block_operations(block_meta_data); let block_delta_data = grid_block_delta.json_bytes(); - let repeated_revision: RepeatedRevision = - Revision::initial_revision(user_id, block_id, block_delta_data).into(); - let _ = grid_manager.create_grid_block(&block_id, repeated_revision).await?; + let revision = Revision::initial_revision(user_id, block_id, block_delta_data); + let _ = grid_manager.create_grid_block(&block_id, vec![revision]).await?; } // Will replace the grid_id with the value returned by the gen_grid_id() @@ -220,9 +219,8 @@ pub async fn make_grid_view_data( // Create grid let grid_rev_delta = make_grid_operations(&grid_rev); let grid_rev_delta_bytes = grid_rev_delta.json_bytes(); - let repeated_revision: RepeatedRevision = - Revision::initial_revision(user_id, &grid_id, grid_rev_delta_bytes.clone()).into(); - let _ = grid_manager.create_grid(&grid_id, repeated_revision).await?; + let revision = Revision::initial_revision(user_id, &grid_id, grid_rev_delta_bytes.clone()); + let _ = grid_manager.create_grid(&grid_id, vec![revision]).await?; // Create grid view let grid_view = if grid_view_revision_data.is_empty() { @@ -232,9 +230,8 @@ pub async fn make_grid_view_data( }; let grid_view_delta = make_grid_view_operations(&grid_view); let grid_view_delta_bytes = grid_view_delta.json_bytes(); - let repeated_revision: RepeatedRevision = - Revision::initial_revision(user_id, view_id, grid_view_delta_bytes).into(); - let _ = grid_manager.create_grid_view(view_id, repeated_revision).await?; + let revision = Revision::initial_revision(user_id, view_id, grid_view_delta_bytes); + let _ = grid_manager.create_grid_view(view_id, vec![revision]).await?; Ok(grid_rev_delta_bytes) } diff --git a/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_block_impl.rs b/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_block_impl.rs index 27b4e7790b..a4895ae5e8 100644 --- a/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_block_impl.rs +++ b/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_block_impl.rs @@ -7,7 +7,7 @@ use flowy_database::{ ConnectionPool, }; use flowy_error::{internal_error, FlowyError, FlowyResult}; -use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionRecord, RevisionState}; +use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionState, SyncRecord}; use flowy_sync::{ entities::revision::{Revision, RevisionRange}, util::md5, @@ -22,7 +22,7 @@ pub struct SQLiteGridBlockRevisionPersistence { impl RevisionDiskCache> for SQLiteGridBlockRevisionPersistence { type Error = FlowyError; - fn create_revision_records(&self, revision_records: Vec) -> Result<(), Self::Error> { + fn create_revision_records(&self, revision_records: Vec) -> Result<(), Self::Error> { let conn = self.pool.get().map_err(internal_error)?; let _ = GridMetaRevisionSql::create(revision_records, &*conn)?; Ok(()) @@ -36,7 +36,7 @@ impl RevisionDiskCache> for SQLiteGridBlockRevisionPersisten &self, object_id: &str, rev_ids: Option>, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { let conn = self.pool.get().map_err(internal_error)?; let records = GridMetaRevisionSql::read(&self.user_id, object_id, rev_ids, &*conn)?; Ok(records) @@ -46,7 +46,7 @@ impl RevisionDiskCache> for SQLiteGridBlockRevisionPersisten &self, object_id: &str, range: &RevisionRange, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { let conn = &*self.pool.get().map_err(internal_error)?; let revisions = GridMetaRevisionSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?; Ok(revisions) @@ -73,7 +73,7 @@ impl RevisionDiskCache> for SQLiteGridBlockRevisionPersisten &self, object_id: &str, deleted_rev_ids: Option>, - inserted_records: Vec, + inserted_records: Vec, ) -> Result<(), Self::Error> { let conn = self.pool.get().map_err(internal_error)?; conn.immediate_transaction::<_, FlowyError, _>(|| { @@ -95,7 +95,7 @@ impl SQLiteGridBlockRevisionPersistence { struct GridMetaRevisionSql(); impl GridMetaRevisionSql { - fn create(revision_records: Vec, conn: &SqliteConnection) -> Result<(), FlowyError> { + fn create(revision_records: Vec, conn: &SqliteConnection) -> Result<(), FlowyError> { // Batch insert: https://diesel.rs/guides/all-about-inserts.html let records = revision_records @@ -142,7 +142,7 @@ impl GridMetaRevisionSql { object_id: &str, rev_ids: Option>, conn: &SqliteConnection, - ) -> Result, FlowyError> { + ) -> Result, FlowyError> { let mut sql = dsl::grid_meta_rev_table .filter(dsl::object_id.eq(object_id)) .into_boxed(); @@ -163,7 +163,7 @@ impl GridMetaRevisionSql { object_id: &str, range: RevisionRange, conn: &SqliteConnection, - ) -> Result, FlowyError> { + ) -> Result, FlowyError> { let rev_tables = dsl::grid_meta_rev_table .filter(dsl::rev_id.ge(range.start)) .filter(dsl::rev_id.le(range.end)) @@ -219,7 +219,7 @@ impl std::default::Default for GridBlockRevisionState { } } -fn mk_revision_record_from_table(user_id: &str, table: GridBlockRevisionTable) -> RevisionRecord { +fn mk_revision_record_from_table(user_id: &str, table: GridBlockRevisionTable) -> SyncRecord { let md5 = md5(&table.data); let revision = Revision::new( &table.object_id, @@ -229,7 +229,7 @@ fn mk_revision_record_from_table(user_id: &str, table: GridBlockRevisionTable) - user_id, md5, ); - RevisionRecord { + SyncRecord { revision, state: table.state.into(), write_to_disk: false, diff --git a/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_impl.rs b/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_impl.rs index 5e86286f22..3fd121c5e3 100644 --- a/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_impl.rs +++ b/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_impl.rs @@ -7,7 +7,7 @@ use flowy_database::{ ConnectionPool, }; use flowy_error::{internal_error, FlowyError, FlowyResult}; -use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionRecord, RevisionState}; +use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionState, SyncRecord}; use flowy_sync::{ entities::revision::{Revision, RevisionRange}, util::md5, @@ -22,7 +22,7 @@ pub struct SQLiteGridRevisionPersistence { impl RevisionDiskCache> for SQLiteGridRevisionPersistence { type Error = FlowyError; - fn create_revision_records(&self, revision_records: Vec) -> Result<(), Self::Error> { + fn create_revision_records(&self, revision_records: Vec) -> Result<(), Self::Error> { let conn = self.pool.get().map_err(internal_error)?; let _ = GridRevisionSql::create(revision_records, &*conn)?; Ok(()) @@ -36,7 +36,7 @@ impl RevisionDiskCache> for SQLiteGridRevisionPersistence { &self, object_id: &str, rev_ids: Option>, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { let conn = self.pool.get().map_err(internal_error)?; let records = GridRevisionSql::read(&self.user_id, object_id, rev_ids, &*conn)?; Ok(records) @@ -46,7 +46,7 @@ impl RevisionDiskCache> for SQLiteGridRevisionPersistence { &self, object_id: &str, range: &RevisionRange, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { let conn = &*self.pool.get().map_err(internal_error)?; let revisions = GridRevisionSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?; Ok(revisions) @@ -73,7 +73,7 @@ impl RevisionDiskCache> for SQLiteGridRevisionPersistence { &self, object_id: &str, deleted_rev_ids: Option>, - inserted_records: Vec, + inserted_records: Vec, ) -> Result<(), Self::Error> { let conn = self.pool.get().map_err(internal_error)?; conn.immediate_transaction::<_, FlowyError, _>(|| { @@ -95,7 +95,7 @@ impl SQLiteGridRevisionPersistence { struct GridRevisionSql(); impl GridRevisionSql { - fn create(revision_records: Vec, conn: &SqliteConnection) -> Result<(), FlowyError> { + fn create(revision_records: Vec, conn: &SqliteConnection) -> Result<(), FlowyError> { // Batch insert: https://diesel.rs/guides/all-about-inserts.html let records = revision_records .into_iter() @@ -141,7 +141,7 @@ impl GridRevisionSql { object_id: &str, rev_ids: Option>, conn: &SqliteConnection, - ) -> Result, FlowyError> { + ) -> Result, FlowyError> { let mut sql = dsl::grid_rev_table.filter(dsl::object_id.eq(object_id)).into_boxed(); if let Some(rev_ids) = rev_ids { sql = sql.filter(dsl::rev_id.eq_any(rev_ids)); @@ -160,7 +160,7 @@ impl GridRevisionSql { object_id: &str, range: RevisionRange, conn: &SqliteConnection, - ) -> Result, FlowyError> { + ) -> Result, FlowyError> { let rev_tables = dsl::grid_rev_table .filter(dsl::rev_id.ge(range.start)) .filter(dsl::rev_id.le(range.end)) @@ -217,7 +217,7 @@ impl std::default::Default for GridRevisionState { } } -fn mk_revision_record_from_table(user_id: &str, table: GridRevisionTable) -> RevisionRecord { +fn mk_revision_record_from_table(user_id: &str, table: GridRevisionTable) -> SyncRecord { let md5 = md5(&table.data); let revision = Revision::new( &table.object_id, @@ -227,7 +227,7 @@ fn mk_revision_record_from_table(user_id: &str, table: GridRevisionTable) -> Rev user_id, md5, ); - RevisionRecord { + SyncRecord { revision, state: table.state.into(), write_to_disk: false, diff --git a/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_view_impl.rs b/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_view_impl.rs index 737e7eaece..fe9490d0de 100644 --- a/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_view_impl.rs +++ b/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_view_impl.rs @@ -7,7 +7,7 @@ use flowy_database::{ ConnectionPool, }; use flowy_error::{internal_error, FlowyError, FlowyResult}; -use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionRecord, RevisionState}; +use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionState, SyncRecord}; use flowy_sync::{ entities::revision::{Revision, RevisionRange}, util::md5, @@ -31,7 +31,7 @@ impl SQLiteGridViewRevisionPersistence { impl RevisionDiskCache> for SQLiteGridViewRevisionPersistence { type Error = FlowyError; - fn create_revision_records(&self, revision_records: Vec) -> Result<(), Self::Error> { + fn create_revision_records(&self, revision_records: Vec) -> Result<(), Self::Error> { let conn = self.pool.get().map_err(internal_error)?; let _ = GridViewRevisionSql::create(revision_records, &*conn)?; Ok(()) @@ -45,7 +45,7 @@ impl RevisionDiskCache> for SQLiteGridViewRevisionPersistenc &self, object_id: &str, rev_ids: Option>, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { let conn = self.pool.get().map_err(internal_error)?; let records = GridViewRevisionSql::read(&self.user_id, object_id, rev_ids, &*conn)?; Ok(records) @@ -55,7 +55,7 @@ impl RevisionDiskCache> for SQLiteGridViewRevisionPersistenc &self, object_id: &str, range: &RevisionRange, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { let conn = &*self.pool.get().map_err(internal_error)?; let revisions = GridViewRevisionSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?; Ok(revisions) @@ -82,7 +82,7 @@ impl RevisionDiskCache> for SQLiteGridViewRevisionPersistenc &self, object_id: &str, deleted_rev_ids: Option>, - inserted_records: Vec, + inserted_records: Vec, ) -> Result<(), Self::Error> { let conn = self.pool.get().map_err(internal_error)?; conn.immediate_transaction::<_, FlowyError, _>(|| { @@ -95,7 +95,7 @@ impl RevisionDiskCache> for SQLiteGridViewRevisionPersistenc struct GridViewRevisionSql(); impl GridViewRevisionSql { - fn create(revision_records: Vec, conn: &SqliteConnection) -> Result<(), FlowyError> { + fn create(revision_records: Vec, conn: &SqliteConnection) -> Result<(), FlowyError> { // Batch insert: https://diesel.rs/guides/all-about-inserts.html let records = revision_records .into_iter() @@ -141,7 +141,7 @@ impl GridViewRevisionSql { object_id: &str, rev_ids: Option>, conn: &SqliteConnection, - ) -> Result, FlowyError> { + ) -> Result, FlowyError> { let mut sql = dsl::grid_view_rev_table .filter(dsl::object_id.eq(object_id)) .into_boxed(); @@ -162,7 +162,7 @@ impl GridViewRevisionSql { object_id: &str, range: RevisionRange, conn: &SqliteConnection, - ) -> Result, FlowyError> { + ) -> Result, FlowyError> { let rev_tables = dsl::grid_view_rev_table .filter(dsl::rev_id.ge(range.start)) .filter(dsl::rev_id.le(range.end)) @@ -219,7 +219,7 @@ impl std::default::Default for GridViewRevisionState { } } -fn mk_revision_record_from_table(user_id: &str, table: GridViewRevisionTable) -> RevisionRecord { +fn mk_revision_record_from_table(user_id: &str, table: GridViewRevisionTable) -> SyncRecord { let md5 = md5(&table.data); let revision = Revision::new( &table.object_id, @@ -229,7 +229,7 @@ fn mk_revision_record_from_table(user_id: &str, table: GridViewRevisionTable) -> user_id, md5, ); - RevisionRecord { + SyncRecord { revision, state: table.state.into(), write_to_disk: false, diff --git a/frontend/rust-lib/flowy-revision/src/cache/disk.rs b/frontend/rust-lib/flowy-revision/src/cache/disk.rs index 06e85f9d5e..f89f36367b 100644 --- a/frontend/rust-lib/flowy-revision/src/cache/disk.rs +++ b/frontend/rust-lib/flowy-revision/src/cache/disk.rs @@ -5,23 +5,20 @@ use std::sync::Arc; pub trait RevisionDiskCache: Sync + Send { type Error: Debug; - fn create_revision_records(&self, revision_records: Vec) -> Result<(), Self::Error>; + fn create_revision_records(&self, revision_records: Vec) -> Result<(), Self::Error>; fn get_connection(&self) -> Result; // Read all the records if the rev_ids is None - fn read_revision_records( - &self, - object_id: &str, - rev_ids: Option>, - ) -> Result, Self::Error>; + fn read_revision_records(&self, object_id: &str, rev_ids: Option>) + -> Result, Self::Error>; // Read the revision which rev_id >= range.start && rev_id <= range.end fn read_revision_records_with_range( &self, object_id: &str, range: &RevisionRange, - ) -> Result, Self::Error>; + ) -> Result, Self::Error>; fn update_revision_record(&self, changesets: Vec) -> FlowyResult<()>; @@ -34,7 +31,7 @@ pub trait RevisionDiskCache: Sync + Send { &self, object_id: &str, deleted_rev_ids: Option>, - inserted_records: Vec, + inserted_records: Vec, ) -> Result<(), Self::Error>; } @@ -44,7 +41,7 @@ where { type Error = FlowyError; - fn create_revision_records(&self, revision_records: Vec) -> Result<(), Self::Error> { + fn create_revision_records(&self, revision_records: Vec) -> Result<(), Self::Error> { (**self).create_revision_records(revision_records) } @@ -56,7 +53,7 @@ where &self, object_id: &str, rev_ids: Option>, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { (**self).read_revision_records(object_id, rev_ids) } @@ -64,7 +61,7 @@ where &self, object_id: &str, range: &RevisionRange, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { (**self).read_revision_records_with_range(object_id, range) } @@ -80,20 +77,20 @@ where &self, object_id: &str, deleted_rev_ids: Option>, - inserted_records: Vec, + inserted_records: Vec, ) -> Result<(), Self::Error> { (**self).delete_and_insert_records(object_id, deleted_rev_ids, inserted_records) } } #[derive(Clone, Debug)] -pub struct RevisionRecord { +pub struct SyncRecord { pub revision: Revision, pub state: RevisionState, pub write_to_disk: bool, } -impl RevisionRecord { +impl SyncRecord { pub fn new(revision: Revision) -> Self { Self { revision, diff --git a/frontend/rust-lib/flowy-revision/src/cache/memory.rs b/frontend/rust-lib/flowy-revision/src/cache/memory.rs index 6120c3f224..ad6795437c 100644 --- a/frontend/rust-lib/flowy-revision/src/cache/memory.rs +++ b/frontend/rust-lib/flowy-revision/src/cache/memory.rs @@ -1,4 +1,4 @@ -use crate::disk::RevisionRecord; +use crate::disk::SyncRecord; use crate::REVISION_WRITE_INTERVAL_IN_MILLIS; use dashmap::DashMap; use flowy_error::{FlowyError, FlowyResult}; @@ -7,15 +7,15 @@ use std::{borrow::Cow, sync::Arc, time::Duration}; use tokio::{sync::RwLock, task::JoinHandle}; pub(crate) trait RevisionMemoryCacheDelegate: Send + Sync { - fn checkpoint_tick(&self, records: Vec) -> FlowyResult<()>; + fn send_sync(&self, records: Vec) -> FlowyResult<()>; fn receive_ack(&self, object_id: &str, rev_id: i64); } pub(crate) struct RevisionMemoryCache { object_id: String, - revs_map: Arc>, + revs_map: Arc>, delegate: Arc, - pending_write_revs: Arc>>, + defer_write_revs: Arc>>, defer_save: RwLock>>, } @@ -25,7 +25,7 @@ impl RevisionMemoryCache { object_id: object_id.to_owned(), revs_map: Arc::new(DashMap::new()), delegate, - pending_write_revs: Arc::new(RwLock::new(vec![])), + defer_write_revs: Arc::new(RwLock::new(vec![])), defer_save: RwLock::new(None), } } @@ -34,7 +34,7 @@ impl RevisionMemoryCache { self.revs_map.contains_key(rev_id) } - pub(crate) async fn add<'a>(&'a self, record: Cow<'a, RevisionRecord>) { + pub(crate) async fn add<'a>(&'a self, record: Cow<'a, SyncRecord>) { let record = match record { Cow::Borrowed(record) => record.clone(), Cow::Owned(record) => record, @@ -43,11 +43,11 @@ impl RevisionMemoryCache { let rev_id = record.revision.rev_id; self.revs_map.insert(rev_id, record); - let mut write_guard = self.pending_write_revs.write().await; + let mut write_guard = self.defer_write_revs.write().await; if !write_guard.contains(&rev_id) { write_guard.push(rev_id); drop(write_guard); - self.make_checkpoint().await; + self.tick_checkpoint().await; } } @@ -57,8 +57,8 @@ impl RevisionMemoryCache { Some(mut record) => record.ack(), } - if self.pending_write_revs.read().await.contains(rev_id) { - self.make_checkpoint().await; + if self.defer_write_revs.read().await.contains(rev_id) { + self.tick_checkpoint().await; } else { // The revision must be saved on disk if the pending_write_revs // doesn't contains the rev_id. @@ -66,7 +66,7 @@ impl RevisionMemoryCache { } } - pub(crate) async fn get(&self, rev_id: &i64) -> Option { + pub(crate) async fn get(&self, rev_id: &i64) -> Option { self.revs_map.get(rev_id).map(|r| r.value().clone()) } @@ -80,21 +80,21 @@ impl RevisionMemoryCache { } } - pub(crate) async fn get_with_range(&self, range: &RevisionRange) -> Result, FlowyError> { + pub(crate) async fn get_with_range(&self, range: &RevisionRange) -> Result, FlowyError> { let revs = range .iter() .flat_map(|rev_id| self.revs_map.get(&rev_id).map(|record| record.clone())) - .collect::>(); + .collect::>(); Ok(revs) } - pub(crate) async fn reset_with_revisions(&self, revision_records: Vec) { + pub(crate) async fn reset_with_revisions(&self, revision_records: Vec) { self.revs_map.clear(); if let Some(handler) = self.defer_save.write().await.take() { handler.abort(); } - let mut write_guard = self.pending_write_revs.write().await; + let mut write_guard = self.defer_write_revs.write().await; write_guard.clear(); for record in revision_records { write_guard.push(record.revision.rev_id); @@ -102,21 +102,21 @@ impl RevisionMemoryCache { } drop(write_guard); - self.make_checkpoint().await; + self.tick_checkpoint().await; } - async fn make_checkpoint(&self) { + async fn tick_checkpoint(&self) { // https://github.com/async-graphql/async-graphql/blob/ed8449beec3d9c54b94da39bab33cec809903953/src/dataloader/mod.rs#L362 if let Some(handler) = self.defer_save.write().await.take() { handler.abort(); } - if self.pending_write_revs.read().await.is_empty() { + if self.defer_write_revs.read().await.is_empty() { return; } let rev_map = self.revs_map.clone(); - let pending_write_revs = self.pending_write_revs.clone(); + let pending_write_revs = self.defer_write_revs.clone(); let delegate = self.delegate.clone(); *self.defer_save.write().await = Some(tokio::spawn(async move { @@ -128,7 +128,7 @@ impl RevisionMemoryCache { // // Use saturating_sub and split_off ? // https://stackoverflow.com/questions/28952411/what-is-the-idiomatic-way-to-pop-the-last-n-elements-in-a-mutable-vec - let mut save_records: Vec = vec![]; + let mut save_records: Vec = vec![]; revs_write_guard.iter().for_each(|rev_id| match rev_map.get(rev_id) { None => {} Some(value) => { @@ -136,7 +136,7 @@ impl RevisionMemoryCache { } }); - if delegate.checkpoint_tick(save_records).is_ok() { + if delegate.send_sync(save_records).is_ok() { revs_write_guard.clear(); drop(revs_write_guard); } diff --git a/frontend/rust-lib/flowy-revision/src/cache/reset.rs b/frontend/rust-lib/flowy-revision/src/cache/reset.rs index bd128a9d9e..a1cb58a6bd 100644 --- a/frontend/rust-lib/flowy-revision/src/cache/reset.rs +++ b/frontend/rust-lib/flowy-revision/src/cache/reset.rs @@ -1,4 +1,4 @@ -use crate::disk::{RevisionDiskCache, RevisionRecord}; +use crate::disk::{RevisionDiskCache, SyncRecord}; use crate::{RevisionLoader, RevisionPersistence}; use bytes::Bytes; use flowy_error::{FlowyError, FlowyResult}; @@ -76,7 +76,7 @@ where let bytes = self.target.reset_data(revisions)?; let revision = Revision::initial_revision(&self.user_id, self.target.target_id(), bytes); - let record = RevisionRecord::new(revision); + let record = SyncRecord::new(revision); tracing::trace!("Reset {} revision record object", self.target.target_id()); let _ = self diff --git a/frontend/rust-lib/flowy-revision/src/conflict_resolve.rs b/frontend/rust-lib/flowy-revision/src/conflict_resolve.rs index e48cb59407..4fb8260311 100644 --- a/frontend/rust-lib/flowy-revision/src/conflict_resolve.rs +++ b/frontend/rust-lib/flowy-revision/src/conflict_resolve.rs @@ -1,4 +1,4 @@ -use crate::RevisionManager; +use crate::{RevisionMD5, RevisionManager}; use bytes::Bytes; use flowy_error::{FlowyError, FlowyResult}; use flowy_sync::entities::{ @@ -8,8 +8,6 @@ use flowy_sync::entities::{ use lib_infra::future::BoxResultFuture; use std::{convert::TryFrom, sync::Arc}; -pub type OperationsMD5 = String; - pub struct TransformOperations { pub client_operations: Operations, pub server_operations: Option, @@ -28,12 +26,12 @@ pub trait ConflictResolver where Operations: Send + Sync, { - fn compose_operations(&self, operations: Operations) -> BoxResultFuture; + fn compose_operations(&self, operations: Operations) -> BoxResultFuture; fn transform_operations( &self, operations: Operations, ) -> BoxResultFuture, FlowyError>; - fn reset_operations(&self, operations: Operations) -> BoxResultFuture; + fn reset_operations(&self, operations: Operations) -> BoxResultFuture; } pub trait ConflictRevisionSink: Send + Sync + 'static { @@ -129,9 +127,8 @@ where // The server_prime is None means the client local revisions conflict with the // // server, and it needs to override the client delta. let md5 = self.resolver.reset_operations(client_operations).await?; - let repeated_revision = RepeatedRevision::new(revisions); - assert_eq!(repeated_revision.last().unwrap().md5, md5); - let _ = self.rev_manager.reset_object(repeated_revision).await?; + debug_assert!(md5.is_equal(&revisions.last().unwrap().md5)); + let _ = self.rev_manager.reset_object(revisions).await?; Ok(None) } Some(server_operations) => { @@ -158,7 +155,7 @@ fn make_client_and_server_revision( rev_manager: &Arc>, client_operations: Operations, server_operations: Option, - md5: String, + md5: RevisionMD5, ) -> (Revision, Option) where Operations: OperationsSerializer, diff --git a/frontend/rust-lib/flowy-revision/src/rev_manager.rs b/frontend/rust-lib/flowy-revision/src/rev_manager.rs index 0b89de828f..49348ccc98 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_manager.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_manager.rs @@ -3,8 +3,8 @@ use crate::{RevisionPersistence, RevisionSnapshotDiskCache, RevisionSnapshotMana use bytes::Bytes; use flowy_error::{FlowyError, FlowyResult}; use flowy_sync::{ - entities::revision::{RepeatedRevision, Revision, RevisionRange}, - util::{pair_rev_id_from_revisions, RevIdCounter}, + entities::revision::{Revision, RevisionRange}, + util::{md5, pair_rev_id_from_revisions, RevIdCounter}, }; use lib_infra::future::FutureResult; use std::sync::Arc; @@ -143,9 +143,9 @@ impl RevisionManager { } #[tracing::instrument(level = "debug", skip(self, revisions), err)] - pub async fn reset_object(&self, revisions: RepeatedRevision) -> FlowyResult<()> { + pub async fn reset_object(&self, revisions: Vec) -> FlowyResult<()> { let rev_id = pair_rev_id_from_revisions(&revisions).1; - let _ = self.rev_persistence.reset(revisions.into_inner()).await?; + let _ = self.rev_persistence.reset(revisions).await?; self.rev_id_counter.set(rev_id); Ok(()) } @@ -191,7 +191,7 @@ impl RevisionManager { pub fn next_rev_id_pair(&self) -> (i64, i64) { let cur = self.rev_id_counter.value(); - let next = self.rev_id_counter.next(); + let next = self.rev_id_counter.next_id(); (cur, next) } @@ -283,3 +283,56 @@ impl RevisionLoader { Ok(revisions) } } + +/// Represents as the md5 of the revision object after applying the +/// revision. For example, RevisionMD5 will be the md5 of the document +/// content. +#[derive(Debug, Clone)] +pub struct RevisionMD5(String); + +impl RevisionMD5 { + pub fn from_bytes>(bytes: T) -> Result { + Ok(RevisionMD5(md5(bytes))) + } + + pub fn into_inner(self) -> String { + self.0 + } + + pub fn is_equal(&self, s: &str) -> bool { + self.0 == s + } +} + +impl std::convert::From for String { + fn from(md5: RevisionMD5) -> Self { + md5.0 + } +} + +impl std::convert::From<&str> for RevisionMD5 { + fn from(s: &str) -> Self { + Self(s.to_owned()) + } +} +impl std::convert::From for RevisionMD5 { + fn from(s: String) -> Self { + Self(s) + } +} + +impl std::ops::Deref for RevisionMD5 { + type Target = String; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl PartialEq for RevisionMD5 { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } +} + +impl std::cmp::Eq for RevisionMD5 {} diff --git a/frontend/rust-lib/flowy-revision/src/rev_persistence.rs b/frontend/rust-lib/flowy-revision/src/rev_persistence.rs index 7a58cee1d8..e0c9529c1f 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_persistence.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_persistence.rs @@ -2,10 +2,9 @@ use crate::cache::{ disk::{RevisionChangeset, RevisionDiskCache}, memory::RevisionMemoryCacheDelegate, }; -use crate::disk::{RevisionRecord, RevisionState}; +use crate::disk::{RevisionState, SyncRecord}; use crate::memory::RevisionMemoryCache; use crate::RevisionCompress; - use flowy_error::{internal_error, FlowyError, FlowyResult}; use flowy_sync::entities::revision::{Revision, RevisionRange}; use std::collections::VecDeque; @@ -20,10 +19,13 @@ pub struct RevisionPersistence { object_id: String, disk_cache: Arc>, memory_cache: Arc, - sync_seq: RwLock, + sync_seq: RwLock, } -impl RevisionPersistence { +impl RevisionPersistence +where + Connection: 'static, +{ pub fn new(user_id: &str, object_id: &str, disk_cache: C) -> RevisionPersistence where C: 'static + RevisionDiskCache, @@ -39,7 +41,7 @@ impl RevisionPersistence { ) -> RevisionPersistence { let object_id = object_id.to_owned(); let user_id = user_id.to_owned(); - let sync_seq = RwLock::new(RevisionSyncSequence::new()); + let sync_seq = RwLock::new(DeferSyncSequence::new()); let memory_cache = Arc::new(RevisionMemoryCache::new(&object_id, Arc::new(disk_cache.clone()))); Self { user_id, @@ -131,7 +133,7 @@ impl RevisionPersistence { pub(crate) async fn reset(&self, revisions: Vec) -> FlowyResult<()> { let records = revisions .into_iter() - .map(|revision| RevisionRecord { + .map(|revision| SyncRecord { revision, state: RevisionState::Sync, write_to_disk: false, @@ -151,7 +153,7 @@ impl RevisionPersistence { tracing::warn!("Duplicate revision: {}:{}-{:?}", self.object_id, revision.rev_id, state); return Ok(()); } - let record = RevisionRecord { + let record = SyncRecord { revision, state, write_to_disk, @@ -172,7 +174,7 @@ impl RevisionPersistence { Ok(()) } - pub async fn get(&self, rev_id: i64) -> Option { + pub async fn get(&self, rev_id: i64) -> Option { match self.memory_cache.get(&rev_id).await { None => match self .disk_cache @@ -192,7 +194,7 @@ impl RevisionPersistence { } } - pub fn batch_get(&self, doc_id: &str) -> FlowyResult> { + pub fn batch_get(&self, doc_id: &str) -> FlowyResult> { self.disk_cache.read_revision_records(doc_id, None) } @@ -225,7 +227,7 @@ impl RevisionPersistence { } impl RevisionMemoryCacheDelegate for Arc> { - fn checkpoint_tick(&self, mut records: Vec) -> FlowyResult<()> { + fn send_sync(&self, mut records: Vec) -> FlowyResult<()> { records.retain(|record| record.write_to_disk); if !records.is_empty() { tracing::Span::current().record( @@ -251,10 +253,10 @@ impl RevisionMemoryCacheDelegate for Arc); -impl RevisionSyncSequence { +struct DeferSyncSequence(VecDeque); +impl DeferSyncSequence { fn new() -> Self { - RevisionSyncSequence::default() + DeferSyncSequence::default() } fn add(&mut self, new_rev_id: i64) -> FlowyResult<()> { diff --git a/frontend/rust-lib/flowy-revision/src/ws_manager.rs b/frontend/rust-lib/flowy-revision/src/ws_manager.rs index eb7539c380..7413cf37d0 100644 --- a/frontend/rust-lib/flowy-revision/src/ws_manager.rs +++ b/frontend/rust-lib/flowy-revision/src/ws_manager.rs @@ -28,7 +28,7 @@ pub trait RevisionWSDataStream: Send + Sync { } // The sink provides the data that will be sent through the web socket to the -// backend. +// server. pub trait RevisionWebSocketSink: Send + Sync { fn next(&self) -> FutureResult, FlowyError>; } diff --git a/frontend/rust-lib/flowy-revision/tests/main.rs b/frontend/rust-lib/flowy-revision/tests/main.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/frontend/rust-lib/flowy-revision/tests/main.rs @@ -0,0 +1 @@ + diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/folder_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/folder_deps.rs index 4625bfa6bf..0320d078e9 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/folder_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/folder_deps.rs @@ -18,7 +18,7 @@ use flowy_net::{ http_server::folder::FolderHttpCloudService, local_server::LocalServer, ws::connection::FlowyWebSocketConnect, }; use flowy_revision::{RevisionWebSocket, WSStateReceiver}; -use flowy_sync::entities::revision::{RepeatedRevision, Revision}; +use flowy_sync::entities::revision::Revision; use flowy_sync::entities::ws_data::ClientRevisionWSData; use flowy_user::services::UserSession; use futures_core::future::BoxFuture; @@ -151,12 +151,12 @@ impl ViewDataProcessor for DocumentViewDataProcessor { ) -> FutureResult<(), FlowyError> { // Only accept Document type debug_assert_eq!(layout, ViewLayoutTypePB::Document); - let repeated_revision: RepeatedRevision = Revision::initial_revision(user_id, view_id, view_data).into(); + let revision = Revision::initial_revision(user_id, view_id, view_data); let view_id = view_id.to_string(); let manager = self.0.clone(); FutureResult::new(async move { - let _ = manager.create_document(view_id, repeated_revision).await?; + let _ = manager.create_document(view_id, vec![revision]).await?; Ok(()) }) } @@ -194,9 +194,8 @@ impl ViewDataProcessor for DocumentViewDataProcessor { let document_content = self.0.initial_document_content(); FutureResult::new(async move { let delta_data = Bytes::from(document_content); - let repeated_revision: RepeatedRevision = - Revision::initial_revision(&user_id, &view_id, delta_data.clone()).into(); - let _ = manager.create_document(view_id, repeated_revision).await?; + let revision = Revision::initial_revision(&user_id, &view_id, delta_data.clone()); + let _ = manager.create_document(view_id, vec![revision]).await?; Ok(delta_data) }) } @@ -226,11 +225,11 @@ impl ViewDataProcessor for GridViewDataProcessor { _layout: ViewLayoutTypePB, delta_data: Bytes, ) -> FutureResult<(), FlowyError> { - let repeated_revision: RepeatedRevision = Revision::initial_revision(user_id, view_id, delta_data).into(); + let revision = Revision::initial_revision(user_id, view_id, delta_data); let view_id = view_id.to_string(); let grid_manager = self.0.clone(); FutureResult::new(async move { - let _ = grid_manager.create_grid(view_id, repeated_revision).await?; + let _ = grid_manager.create_grid(view_id, vec![revision]).await?; Ok(()) }) } diff --git a/shared-lib/flowy-sync/src/client_document/document_pad.rs b/shared-lib/flowy-sync/src/client_document/document_pad.rs index be438dca25..ab8863c69b 100644 --- a/shared-lib/flowy-sync/src/client_document/document_pad.rs +++ b/shared-lib/flowy-sync/src/client_document/document_pad.rs @@ -1,3 +1,4 @@ +use crate::util::md5; use crate::{ client_document::{ history::{History, UndoResult}, @@ -77,9 +78,9 @@ impl ClientDocument { &self.operations } - pub fn md5(&self) -> String { + pub fn document_md5(&self) -> String { let bytes = self.to_bytes(); - format!("{:x}", md5::compute(bytes)) + md5(&bytes) } pub fn set_notify(&mut self, notify: mpsc::UnboundedSender<()>) { diff --git a/shared-lib/flowy-sync/src/client_folder/folder_pad.rs b/shared-lib/flowy-sync/src/client_folder/folder_pad.rs index e11d73963d..bb1dfb6902 100644 --- a/shared-lib/flowy-sync/src/client_folder/folder_pad.rs +++ b/shared-lib/flowy-sync/src/client_folder/folder_pad.rs @@ -1,9 +1,9 @@ use crate::errors::internal_error; use crate::server_folder::{FolderOperations, FolderOperationsBuilder}; -use crate::util::cal_diff; +use crate::util::{cal_diff, md5}; use crate::{ client_folder::builder::FolderPadBuilder, - entities::revision::{md5, Revision}, + entities::revision::Revision, errors::{CollaborateError, CollaborateResult}, }; use flowy_folder_data_model::revision::{AppRevision, FolderRevision, TrashRevision, ViewRevision, WorkspaceRevision}; @@ -61,7 +61,7 @@ impl FolderPad { self.folder_rev = folder.folder_rev; self.operations = folder.operations; - Ok(self.md5()) + Ok(self.folder_md5()) } pub fn compose_remote_operations(&mut self, operations: FolderOperations) -> CollaborateResult { @@ -313,7 +313,7 @@ impl FolderPad { } } - pub fn md5(&self) -> String { + pub fn folder_md5(&self) -> String { md5(&self.operations.json_bytes()) } @@ -345,7 +345,7 @@ impl FolderPad { self.operations = self.operations.compose(&operations)?; Ok(Some(FolderChangeset { operations, - md5: self.md5(), + md5: self.folder_md5(), })) } } @@ -383,7 +383,7 @@ impl FolderPad { self.operations = self.operations.compose(&operations)?; Ok(Some(FolderChangeset { operations, - md5: self.md5(), + md5: self.folder_md5(), })) } } diff --git a/shared-lib/flowy-sync/src/client_grid/block_revision_pad.rs b/shared-lib/flowy-sync/src/client_grid/block_revision_pad.rs index 36f65837c0..c5a504c358 100644 --- a/shared-lib/flowy-sync/src/client_grid/block_revision_pad.rs +++ b/shared-lib/flowy-sync/src/client_grid/block_revision_pad.rs @@ -1,6 +1,6 @@ -use crate::entities::revision::{md5, RepeatedRevision, Revision}; +use crate::entities::revision::{RepeatedRevision, Revision}; use crate::errors::{CollaborateError, CollaborateResult}; -use crate::util::{cal_diff, make_operations_from_revisions}; +use crate::util::{cal_diff, make_operations_from_revisions, md5}; use flowy_grid_data_model::revision::{ gen_block_id, gen_row_id, CellRevision, GridBlockRevision, RowChangeset, RowRevision, }; diff --git a/shared-lib/flowy-sync/src/client_grid/grid_revision_pad.rs b/shared-lib/flowy-sync/src/client_grid/grid_revision_pad.rs index f27c46d250..6f99a23e53 100644 --- a/shared-lib/flowy-sync/src/client_grid/grid_revision_pad.rs +++ b/shared-lib/flowy-sync/src/client_grid/grid_revision_pad.rs @@ -1,6 +1,6 @@ -use crate::entities::revision::{md5, RepeatedRevision, Revision}; +use crate::entities::revision::{RepeatedRevision, Revision}; use crate::errors::{internal_error, CollaborateError, CollaborateResult}; -use crate::util::{cal_diff, make_operations_from_revisions}; +use crate::util::{cal_diff, make_operations_from_revisions, md5}; use flowy_grid_data_model::revision::{ gen_block_id, gen_grid_id, FieldRevision, FieldTypeRevision, GridBlockMetaRevision, GridBlockMetaRevisionChangeset, @@ -315,7 +315,7 @@ impl GridRevisionPad { }) } - pub fn md5(&self) -> String { + pub fn grid_md5(&self) -> String { md5(&self.operations.json_bytes()) } @@ -343,7 +343,7 @@ impl GridRevisionPad { self.operations = self.operations.compose(&operations)?; Ok(Some(GridRevisionChangeset { operations, - md5: self.md5(), + md5: self.grid_md5(), })) } } diff --git a/shared-lib/flowy-sync/src/client_grid/view_revision_pad.rs b/shared-lib/flowy-sync/src/client_grid/view_revision_pad.rs index 46e91734b4..04dd82d8ea 100644 --- a/shared-lib/flowy-sync/src/client_grid/view_revision_pad.rs +++ b/shared-lib/flowy-sync/src/client_grid/view_revision_pad.rs @@ -1,6 +1,6 @@ -use crate::entities::revision::{md5, Revision}; +use crate::entities::revision::Revision; use crate::errors::{internal_error, CollaborateError, CollaborateResult}; -use crate::util::{cal_diff, make_operations_from_revisions}; +use crate::util::{cal_diff, make_operations_from_revisions, md5}; use flowy_grid_data_model::revision::{ FieldRevision, FieldTypeRevision, FilterConfigurationRevision, FilterConfigurationsByFieldId, GridViewRevision, GroupConfigurationRevision, GroupConfigurationsByFieldId, LayoutRevision, diff --git a/shared-lib/flowy-sync/src/entities/revision.rs b/shared-lib/flowy-sync/src/entities/revision.rs index 39fb71a2e6..950e8956db 100644 --- a/shared-lib/flowy-sync/src/entities/revision.rs +++ b/shared-lib/flowy-sync/src/entities/revision.rs @@ -1,3 +1,4 @@ +use crate::util::md5; use bytes::Bytes; use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; use std::{convert::TryFrom, fmt::Formatter, ops::RangeInclusive}; @@ -36,6 +37,34 @@ impl std::convert::From> for Revision { } impl Revision { + pub fn new>( + object_id: &str, + base_rev_id: i64, + rev_id: i64, + bytes: Bytes, + user_id: &str, + md5: T, + ) -> Revision { + let user_id = user_id.to_owned(); + let object_id = object_id.to_owned(); + let bytes = bytes.to_vec(); + let base_rev_id = base_rev_id; + let rev_id = rev_id; + + if base_rev_id != 0 { + debug_assert!(base_rev_id != rev_id); + } + + Self { + base_rev_id, + rev_id, + bytes, + md5: md5.into(), + object_id, + ty: RevType::DeprecatedLocal, + user_id, + } + } pub fn is_empty(&self) -> bool { self.base_rev_id == self.rev_id } @@ -52,28 +81,6 @@ impl Revision { let md5 = md5(&bytes); Self::new(object_id, 0, 0, bytes, user_id, md5) } - - pub fn new(object_id: &str, base_rev_id: i64, rev_id: i64, bytes: Bytes, user_id: &str, md5: String) -> Revision { - let user_id = user_id.to_owned(); - let object_id = object_id.to_owned(); - let bytes = bytes.to_vec(); - let base_rev_id = base_rev_id; - let rev_id = rev_id; - - if base_rev_id != 0 { - debug_assert!(base_rev_id != rev_id); - } - - Self { - base_rev_id, - rev_id, - bytes, - md5, - object_id, - ty: RevType::DeprecatedLocal, - user_id, - } - } } impl std::fmt::Debug for Revision { @@ -209,12 +216,6 @@ impl RevisionRange { } } -#[inline] -pub fn md5>(data: T) -> String { - let md5 = format!("{:x}", md5::compute(data)); - md5 -} - #[derive(Debug, ProtoBuf_Enum, Clone, Eq, PartialEq)] pub enum RevType { DeprecatedLocal = 0, diff --git a/shared-lib/flowy-sync/src/util.rs b/shared-lib/flowy-sync/src/util.rs index 10e977919f..31c4147e60 100644 --- a/shared-lib/flowy-sync/src/util.rs +++ b/shared-lib/flowy-sync/src/util.rs @@ -49,7 +49,8 @@ impl RevIdCounter { pub fn new(n: i64) -> Self { Self(AtomicI64::new(n)) } - pub fn next(&self) -> i64 { + + pub fn next_id(&self) -> i64 { let _ = self.0.fetch_add(1, SeqCst); self.value() } diff --git a/shared-lib/lib-ot/src/core/node_tree/tree.rs b/shared-lib/lib-ot/src/core/node_tree/tree.rs index dfd0a5f77f..16399211ea 100644 --- a/shared-lib/lib-ot/src/core/node_tree/tree.rs +++ b/shared-lib/lib-ot/src/core/node_tree/tree.rs @@ -35,9 +35,19 @@ impl NodeTree { Ok(tree) } - pub fn from_bytes(bytes: Vec, context: NodeTreeContext) -> Result { - let operations = NodeOperations::from_bytes(bytes)?; - Self::from_operations(operations, context) + pub fn from_bytes(bytes: &[u8]) -> Result { + let tree: NodeTree = serde_json::from_slice(bytes).map_err(|e| OTError::serde().context(e))?; + Ok(tree) + } + + pub fn to_bytes(&self) -> Vec { + match serde_json::to_vec(self) { + Ok(bytes) => bytes, + Err(e) => { + tracing::error!("{}", e); + vec![] + } + } } pub fn from_operations>(operations: T, context: NodeTreeContext) -> Result { diff --git a/shared-lib/lib-ot/tests/node/serde_test.rs b/shared-lib/lib-ot/tests/node/serde_test.rs index 25e086c160..6851cd4164 100644 --- a/shared-lib/lib-ot/tests/node/serde_test.rs +++ b/shared-lib/lib-ot/tests/node/serde_test.rs @@ -1,4 +1,6 @@ -use lib_ot::core::{AttributeBuilder, Changeset, NodeData, NodeDataBuilder, NodeOperation, NodeTree, Path}; +use lib_ot::core::{ + AttributeBuilder, Changeset, NodeData, NodeDataBuilder, NodeOperation, NodeTree, NodeTreeContext, Path, +}; use lib_ot::text_delta::DeltaTextOperationBuilder; #[test] @@ -26,6 +28,7 @@ fn operation_insert_node_with_children_serde_test() { r#"{"op":"insert","path":[0,1],"nodes":[{"type":"text","children":[{"type":"sub_text"}]}]}"# ); } + #[test] fn operation_update_node_attributes_serde_test() { let operation = NodeOperation::Update { @@ -102,6 +105,14 @@ fn node_tree_serialize_test() { assert_eq!(json, TREE_JSON); } +#[test] +fn node_tree_serde_test() { + let tree: NodeTree = serde_json::from_str(TREE_JSON).unwrap(); + let bytes = tree.to_bytes(); + let tree = NodeTree::from_bytes(&bytes).unwrap(); + assert_eq!(bytes, tree.to_bytes()); +} + #[allow(dead_code)] const TREE_JSON: &str = r#"{ "type": "editor",