diff --git a/frontend/app_flowy/lib/plugins/doc/application/doc_service.dart b/frontend/app_flowy/lib/plugins/doc/application/doc_service.dart index 067d51a510..3a6797837f 100644 --- a/frontend/app_flowy/lib/plugins/doc/application/doc_service.dart +++ b/frontend/app_flowy/lib/plugins/doc/application/doc_service.dart @@ -3,17 +3,17 @@ import 'package:flowy_sdk/dispatch/dispatch.dart'; import 'package:flowy_sdk/protobuf/flowy-folder/view.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; -import 'package:flowy_sdk/protobuf/flowy-sync/text_block.pb.dart'; -import 'package:flowy_sdk/protobuf/flowy-text-block/entities.pb.dart'; +import 'package:flowy_sdk/protobuf/flowy-sync/document.pb.dart'; +import 'package:flowy_sdk/protobuf/flowy-document/entities.pb.dart'; class DocumentService { - Future> openDocument({ + Future> openDocument({ required String docId, }) async { await FolderEventSetLatestView(ViewIdPB(value: docId)).send(); - final payload = TextBlockIdPB(value: docId); - return TextBlockEventGetTextBlock(payload).send(); + final payload = DocumentIdPB(value: docId); + return DocumentEventGetDocument(payload).send(); } Future> applyEdit({ @@ -22,10 +22,10 @@ class DocumentService { String operations = "", }) { final payload = EditPayloadPB.create() - ..textBlockId = docId + ..docId = docId ..operations = operations - ..delta = data; - return TextBlockEventApplyEdit(payload).send(); + ..operationsStr = data; + return DocumentEventApplyEdit(payload).send(); } Future> closeDocument({required String docId}) { diff --git a/frontend/app_flowy/lib/plugins/doc/application/share_bloc.dart b/frontend/app_flowy/lib/plugins/doc/application/share_bloc.dart index 98904c035a..fa7dbb761a 100644 --- a/frontend/app_flowy/lib/plugins/doc/application/share_bloc.dart +++ b/frontend/app_flowy/lib/plugins/doc/application/share_bloc.dart @@ -3,7 +3,7 @@ import 'dart:io'; import 'package:app_flowy/startup/tasks/rust_sdk.dart'; import 'package:app_flowy/workspace/application/markdown/delta_markdown.dart'; import 'package:app_flowy/plugins/doc/application/share_service.dart'; -import 'package:flowy_sdk/protobuf/flowy-text-block/entities.pb.dart'; +import 'package:flowy_sdk/protobuf/flowy-document/entities.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-folder/view.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; import 'package:freezed_annotation/freezed_annotation.dart'; diff --git a/frontend/app_flowy/lib/plugins/doc/application/share_service.dart b/frontend/app_flowy/lib/plugins/doc/application/share_service.dart index db6ad406b7..75d045199b 100644 --- a/frontend/app_flowy/lib/plugins/doc/application/share_service.dart +++ b/frontend/app_flowy/lib/plugins/doc/application/share_service.dart @@ -2,15 +2,16 @@ import 'dart:async'; import 'package:dartz/dartz.dart'; import 'package:flowy_sdk/dispatch/dispatch.dart'; import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; -import 'package:flowy_sdk/protobuf/flowy-text-block/protobuf.dart'; +import 'package:flowy_sdk/protobuf/flowy-document/protobuf.dart'; class ShareService { - Future> export(String docId, ExportType type) { + Future> export( + String docId, ExportType type) { final request = ExportPayloadPB.create() ..viewId = docId ..exportType = type; - return TextBlockEventExportDocument(request).send(); + return DocumentEventExportDocument(request).send(); } Future> exportText(String docId) { diff --git a/frontend/app_flowy/lib/plugins/doc/document.dart b/frontend/app_flowy/lib/plugins/doc/document.dart index 5bb1377d90..55296f7831 100644 --- a/frontend/app_flowy/lib/plugins/doc/document.dart +++ b/frontend/app_flowy/lib/plugins/doc/document.dart @@ -20,7 +20,7 @@ import 'package:flowy_infra_ui/widget/rounded_button.dart'; import 'package:flowy_sdk/log.dart'; import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-folder/view.pb.dart'; -import 'package:flowy_sdk/protobuf/flowy-text-block/entities.pb.dart'; +import 'package:flowy_sdk/protobuf/flowy-document/entities.pb.dart'; import 'package:flutter/material.dart'; import 'package:flutter_bloc/flutter_bloc.dart'; import 'package:provider/provider.dart'; diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/dispatch/dispatch.dart b/frontend/app_flowy/packages/flowy_sdk/lib/dispatch/dispatch.dart index 3dae52080e..0716a2f4c6 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/dispatch/dispatch.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/dispatch/dispatch.dart @@ -16,7 +16,7 @@ import 'package:flowy_sdk/ffi.dart' as ffi; import 'package:flowy_sdk/protobuf/flowy-user/protobuf.dart'; import 'package:flowy_sdk/protobuf/dart-ffi/protobuf.dart'; import 'package:flowy_sdk/protobuf/flowy-folder/protobuf.dart'; -import 'package:flowy_sdk/protobuf/flowy-text-block/protobuf.dart'; +import 'package:flowy_sdk/protobuf/flowy-document/protobuf.dart'; import 'package:flowy_sdk/protobuf/flowy-grid/protobuf.dart'; import 'package:flowy_sdk/protobuf/flowy-sync/protobuf.dart'; @@ -30,7 +30,7 @@ part 'dart_event/flowy-folder/dart_event.dart'; part 'dart_event/flowy-net/dart_event.dart'; part 'dart_event/flowy-user/dart_event.dart'; part 'dart_event/flowy-grid/dart_event.dart'; -part 'dart_event/flowy-text-block/dart_event.dart'; +part 'dart_event/flowy-document/dart_event.dart'; enum FFIException { RequestIsEmpty, @@ -56,7 +56,8 @@ class Dispatch { } } -Future> _extractPayload(Future> responseFuture) { +Future> _extractPayload( + Future> responseFuture) { return responseFuture.then((result) { return result.fold( (response) { @@ -82,7 +83,8 @@ Future> _extractPayload(Future> _extractResponse(Completer bytesFuture) { +Future> _extractResponse( + Completer bytesFuture) { return bytesFuture.future.then((bytes) { try { final response = FFIResponse.fromBuffer(bytes); diff --git a/frontend/rust-lib/Cargo.lock b/frontend/rust-lib/Cargo.lock index 4abc6f80ef..e9f528015a 100644 --- a/frontend/rust-lib/Cargo.lock +++ b/frontend/rust-lib/Cargo.lock @@ -843,6 +843,47 @@ dependencies = [ "walkdir", ] +[[package]] +name = "flowy-document" +version = "0.1.0" +dependencies = [ + "async-stream", + "bytes", + "chrono", + "color-eyre", + "criterion", + "dart-notify", + "dashmap", + "derive_more", + "diesel", + "diesel_derives", + "flowy-database", + "flowy-derive", + "flowy-document", + "flowy-error", + "flowy-revision", + "flowy-sync", + "flowy-test", + "futures", + "futures-util", + "lib-dispatch", + "lib-infra", + "lib-ot", + "lib-ws", + "log", + "protobuf", + "rand 0.8.5", + "serde", + "serde_json", + "strum", + "strum_macros", + "tokio", + "tracing", + "tracing-subscriber", + "unicode-segmentation", + "url", +] + [[package]] name = "flowy-error" version = "0.1.0" @@ -882,13 +923,13 @@ dependencies = [ "diesel_derives", "flowy-database", "flowy-derive", + "flowy-document", "flowy-error", "flowy-folder", "flowy-folder-data-model", "flowy-revision", "flowy-sync", "flowy-test", - "flowy-text-block", "futures", "lazy_static", "lib-dispatch", @@ -995,11 +1036,11 @@ dependencies = [ "config", "dashmap", "flowy-derive", + "flowy-document", "flowy-error", "flowy-folder", "flowy-folder-data-model", "flowy-sync", - "flowy-text-block", "flowy-user", "futures-util", "http-flowy", @@ -1036,7 +1077,6 @@ dependencies = [ "flowy-sync", "futures-util", "lib-infra", - "lib-ot", "lib-ws", "serde", "serde_json", @@ -1055,13 +1095,13 @@ dependencies = [ "claim 0.5.0", "color-eyre", "flowy-database", + "flowy-document", "flowy-folder", "flowy-grid", "flowy-grid-data-model", "flowy-net", "flowy-revision", "flowy-sync", - "flowy-text-block", "flowy-user", "futures-core", "futures-util", @@ -1136,47 +1176,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "flowy-text-block" -version = "0.1.0" -dependencies = [ - "async-stream", - "bytes", - "chrono", - "color-eyre", - "criterion", - "dart-notify", - "dashmap", - "derive_more", - "diesel", - "diesel_derives", - "flowy-database", - "flowy-derive", - "flowy-error", - "flowy-revision", - "flowy-sync", - "flowy-test", - "flowy-text-block", - "futures", - "futures-util", - "lib-dispatch", - "lib-infra", - "lib-ot", - "lib-ws", - "log", - "protobuf", - "rand 0.8.5", - "serde", - "serde_json", - "strum", - "strum_macros", - "tokio", - "tracing", - "tracing-subscriber", - "unicode-segmentation", - "url", -] - [[package]] name = "flowy-user" version = "0.1.0" diff --git a/frontend/rust-lib/Cargo.toml b/frontend/rust-lib/Cargo.toml index d4cc70df5f..075258b97b 100644 --- a/frontend/rust-lib/Cargo.toml +++ b/frontend/rust-lib/Cargo.toml @@ -11,7 +11,7 @@ members = [ "flowy-database", "flowy-folder", "dart-notify", - "flowy-text-block", + "flowy-document", "flowy-error", "flowy-revision", "flowy-grid", diff --git a/frontend/rust-lib/flowy-text-block/Cargo.toml b/frontend/rust-lib/flowy-document/Cargo.toml similarity index 92% rename from frontend/rust-lib/flowy-text-block/Cargo.toml rename to frontend/rust-lib/flowy-document/Cargo.toml index 74c7f1de2a..6e3ab066a3 100644 --- a/frontend/rust-lib/flowy-text-block/Cargo.toml +++ b/frontend/rust-lib/flowy-document/Cargo.toml @@ -1,6 +1,6 @@ [package] -name = "flowy-text-block" +name = "flowy-document" version = "0.1.0" edition = "2018" @@ -41,7 +41,7 @@ futures = "0.3.15" [dev-dependencies] flowy-test = { path = "../flowy-test" } -flowy-text-block = { path = "../flowy-text-block", features = ["flowy_unit_test"]} +flowy-document = { path = "../flowy-document", features = ["flowy_unit_test"]} derive_more = {version = "0.99", features = ["display"]} tracing-subscriber = "0.2.0" diff --git a/frontend/rust-lib/flowy-text-block/Flowy.toml b/frontend/rust-lib/flowy-document/Flowy.toml similarity index 100% rename from frontend/rust-lib/flowy-text-block/Flowy.toml rename to frontend/rust-lib/flowy-document/Flowy.toml diff --git a/frontend/rust-lib/flowy-text-block/build.rs b/frontend/rust-lib/flowy-document/build.rs similarity index 100% rename from frontend/rust-lib/flowy-text-block/build.rs rename to frontend/rust-lib/flowy-document/build.rs diff --git a/frontend/rust-lib/flowy-text-block/src/editor.rs b/frontend/rust-lib/flowy-document/src/editor.rs similarity index 75% rename from frontend/rust-lib/flowy-text-block/src/editor.rs rename to frontend/rust-lib/flowy-document/src/editor.rs index 6e57b281f5..7712269bfe 100644 --- a/frontend/rust-lib/flowy-text-block/src/editor.rs +++ b/frontend/rust-lib/flowy-document/src/editor.rs @@ -1,19 +1,22 @@ use crate::web_socket::EditorCommandSender; use crate::{ errors::FlowyError, - queue::{EditBlockQueue, EditorCommand}, - TextEditorUser, + queue::{EditDocumentQueue, EditorCommand}, + DocumentUser, }; use bytes::Bytes; use flowy_error::{internal_error, FlowyResult}; -use flowy_revision::{RevisionCloudService, RevisionManager, RevisionObjectBuilder, RevisionWebSocket}; +use flowy_revision::{ + RevisionCloudService, RevisionCompress, RevisionManager, RevisionObjectDeserializer, RevisionObjectSerializer, + RevisionWebSocket, +}; use flowy_sync::entities::ws_data::ServerRevisionWSData; use flowy_sync::{ - entities::{revision::Revision, text_block::DocumentPB}, + entities::{document::DocumentPayloadPB, revision::Revision}, errors::CollaborateResult, util::make_operations_from_revisions, }; -use lib_ot::core::AttributeEntry; +use lib_ot::core::{AttributeEntry, AttributeHashMap}; use lib_ot::{ core::{DeltaOperation, Interval}, text_delta::TextOperations, @@ -22,7 +25,7 @@ use lib_ws::WSConnectState; use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; -pub struct TextBlockEditor { +pub struct DocumentEditor { pub doc_id: String, #[allow(dead_code)] rev_manager: Arc, @@ -31,24 +34,24 @@ pub struct TextBlockEditor { edit_cmd_tx: EditorCommandSender, } -impl TextBlockEditor { +impl DocumentEditor { #[allow(unused_variables)] pub(crate) async fn new( doc_id: &str, - user: Arc, + user: Arc, mut rev_manager: RevisionManager, rev_web_socket: Arc, cloud_service: Arc, ) -> FlowyResult> { - let document_info = rev_manager.load::(Some(cloud_service)).await?; - let delta = document_info.delta()?; + let document_info = rev_manager.load::(Some(cloud_service)).await?; + let operations = TextOperations::from_bytes(&document_info.content)?; let rev_manager = Arc::new(rev_manager); let doc_id = doc_id.to_string(); let user_id = user.user_id()?; - let edit_cmd_tx = spawn_edit_queue(user, rev_manager.clone(), delta); + let edit_cmd_tx = spawn_edit_queue(user, rev_manager.clone(), operations); #[cfg(feature = "sync")] - let ws_manager = crate::web_socket::make_block_ws_manager( + let ws_manager = crate::web_socket::make_document_ws_manager( doc_id.clone(), user_id.clone(), edit_cmd_tx.clone(), @@ -140,22 +143,19 @@ impl TextBlockEditor { Ok(()) } - pub async fn delta_str(&self) -> FlowyResult { + pub async fn get_operation_str(&self) -> FlowyResult { let (ret, rx) = oneshot::channel::>(); - let msg = EditorCommand::ReadDeltaStr { ret }; + let msg = EditorCommand::StringifyOperations { ret }; let _ = self.edit_cmd_tx.send(msg).await; let json = rx.await.map_err(internal_error)??; Ok(json) } #[tracing::instrument(level = "trace", skip(self, data), err)] - pub(crate) async fn compose_local_delta(&self, data: Bytes) -> Result<(), FlowyError> { - let delta = TextOperations::from_bytes(&data)?; + pub(crate) async fn compose_local_operations(&self, data: Bytes) -> Result<(), FlowyError> { + let operations = TextOperations::from_bytes(&data)?; let (ret, rx) = oneshot::channel::>(); - let msg = EditorCommand::ComposeLocalDelta { - delta: delta.clone(), - ret, - }; + let msg = EditorCommand::ComposeLocalOperations { operations, ret }; let _ = self.edit_cmd_tx.send(msg).await; let _ = rx.await.map_err(internal_error)??; Ok(()) @@ -186,20 +186,20 @@ impl TextBlockEditor { pub(crate) fn receive_ws_state(&self, _state: &WSConnectState) {} } -impl std::ops::Drop for TextBlockEditor { +impl std::ops::Drop for DocumentEditor { fn drop(&mut self) { - tracing::trace!("{} ClientBlockEditor was dropped", self.doc_id) + tracing::trace!("{} DocumentEditor was dropped", self.doc_id) } } // The edit queue will exit after the EditorCommandSender was dropped. fn spawn_edit_queue( - user: Arc, + user: Arc, rev_manager: Arc, delta: TextOperations, ) -> EditorCommandSender { let (sender, receiver) = mpsc::channel(1000); - let edit_queue = EditBlockQueue::new(user, rev_manager, delta, receiver); + let edit_queue = EditDocumentQueue::new(user, rev_manager, delta, receiver); // We can use tokio::task::spawn_local here by using tokio::spawn_blocking. // https://github.com/tokio-rs/tokio/issues/2095 // tokio::task::spawn_blocking(move || { @@ -214,10 +214,10 @@ fn spawn_edit_queue( } #[cfg(feature = "flowy_unit_test")] -impl TextBlockEditor { - pub async fn text_block_delta(&self) -> FlowyResult { +impl DocumentEditor { + pub async fn document_operations(&self) -> FlowyResult { let (ret, rx) = oneshot::channel::>(); - let msg = EditorCommand::ReadDelta { ret }; + let msg = EditorCommand::ReadOperations { ret }; let _ = self.edit_cmd_tx.send(msg).await; let delta = rx.await.map_err(internal_error)??; Ok(delta) @@ -228,24 +228,38 @@ impl TextBlockEditor { } } -struct TextBlockInfoBuilder(); -impl RevisionObjectBuilder for TextBlockInfoBuilder { - type Output = DocumentPB; +pub struct DocumentRevisionSerde(); +impl RevisionObjectDeserializer for DocumentRevisionSerde { + type Output = DocumentPayloadPB; - fn build_object(object_id: &str, revisions: Vec) -> FlowyResult { + fn deserialize_revisions(object_id: &str, revisions: Vec) -> FlowyResult { let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id(); let mut delta = make_operations_from_revisions(revisions)?; correct_delta(&mut delta); - Result::::Ok(DocumentPB { - block_id: object_id.to_owned(), - text: delta.json_str(), + Result::::Ok(DocumentPayloadPB { + doc_id: object_id.to_owned(), + content: delta.json_str(), rev_id, base_rev_id, }) } } +impl RevisionObjectSerializer for DocumentRevisionSerde { + fn serialize_revisions(revisions: Vec) -> FlowyResult { + let operations = make_operations_from_revisions::(revisions)?; + Ok(operations.json_bytes()) + } +} + +pub(crate) struct DocumentRevisionCompactor(); +impl RevisionCompress for DocumentRevisionCompactor { + fn serialize_revisions(&self, revisions: Vec) -> FlowyResult { + DocumentRevisionSerde::serialize_revisions(revisions) + } +} + // quill-editor requires the delta should end with '\n' and only contains the // insert operation. The function, correct_delta maybe be removed in the future. fn correct_delta(delta: &mut TextOperations) { diff --git a/frontend/rust-lib/flowy-text-block/src/entities.rs b/frontend/rust-lib/flowy-document/src/entities.rs similarity index 89% rename from frontend/rust-lib/flowy-text-block/src/entities.rs rename to frontend/rust-lib/flowy-document/src/entities.rs index d7cc1d8665..b84b4f8201 100644 --- a/frontend/rust-lib/flowy-text-block/src/entities.rs +++ b/frontend/rust-lib/flowy-document/src/entities.rs @@ -32,7 +32,7 @@ impl std::convert::From for ExportType { #[derive(Default, ProtoBuf)] pub struct EditPayloadPB { #[pb(index = 1)] - pub text_block_id: String, + pub doc_id: String, // Encode in JSON format #[pb(index = 2)] @@ -40,35 +40,35 @@ pub struct EditPayloadPB { // Encode in JSON format #[pb(index = 3)] - pub delta: String, + pub operations_str: String, } #[derive(Default)] pub struct EditParams { - pub text_block_id: String, + pub doc_id: String, // Encode in JSON format pub operations: String, // Encode in JSON format - pub delta: String, + pub operations_str: String, } impl TryInto for EditPayloadPB { type Error = ErrorCode; fn try_into(self) -> Result { Ok(EditParams { - text_block_id: self.text_block_id, + doc_id: self.doc_id, operations: self.operations, - delta: self.delta, + operations_str: self.operations_str, }) } } #[derive(Default, ProtoBuf)] -pub struct TextBlockPB { +pub struct DocumentSnapshotPB { #[pb(index = 1)] - pub text_block_id: String, + pub doc_id: String, /// Encode in JSON format #[pb(index = 2)] diff --git a/frontend/rust-lib/flowy-document/src/event_handler.rs b/frontend/rust-lib/flowy-document/src/event_handler.rs new file mode 100644 index 0000000000..398f3c3747 --- /dev/null +++ b/frontend/rust-lib/flowy-document/src/event_handler.rs @@ -0,0 +1,43 @@ +use crate::entities::{DocumentSnapshotPB, EditParams, EditPayloadPB, ExportDataPB, ExportParams, ExportPayloadPB}; +use crate::DocumentManager; +use flowy_error::FlowyError; +use flowy_sync::entities::document::DocumentIdPB; +use lib_dispatch::prelude::{data_result, AppData, Data, DataResult}; +use std::convert::TryInto; +use std::sync::Arc; + +pub(crate) async fn get_document_handler( + data: Data, + manager: AppData>, +) -> DataResult { + let document_id: DocumentIdPB = data.into_inner(); + let editor = manager.open_document_editor(&document_id).await?; + let operations_str = editor.get_operation_str().await?; + data_result(DocumentSnapshotPB { + doc_id: document_id.into(), + snapshot: operations_str, + }) +} + +pub(crate) async fn apply_edit_handler( + data: Data, + manager: AppData>, +) -> Result<(), FlowyError> { + let params: EditParams = data.into_inner().try_into()?; + let _ = manager.apply_edit(params).await?; + Ok(()) +} + +#[tracing::instrument(level = "debug", skip(data, manager), err)] +pub(crate) async fn export_handler( + data: Data, + manager: AppData>, +) -> DataResult { + let params: ExportParams = data.into_inner().try_into()?; + let editor = manager.open_document_editor(¶ms.view_id).await?; + let operations_str = editor.get_operation_str().await?; + data_result(ExportDataPB { + data: operations_str, + export_type: params.export_type, + }) +} diff --git a/frontend/rust-lib/flowy-text-block/src/event_map.rs b/frontend/rust-lib/flowy-document/src/event_map.rs similarity index 55% rename from frontend/rust-lib/flowy-text-block/src/event_map.rs rename to frontend/rust-lib/flowy-document/src/event_map.rs index d66ee490ee..71042cad03 100644 --- a/frontend/rust-lib/flowy-text-block/src/event_map.rs +++ b/frontend/rust-lib/flowy-document/src/event_map.rs @@ -1,26 +1,26 @@ use crate::event_handler::*; -use crate::TextEditorManager; +use crate::DocumentManager; use flowy_derive::{Flowy_Event, ProtoBuf_Enum}; use lib_dispatch::prelude::Module; use std::sync::Arc; use strum_macros::Display; -pub fn create(block_manager: Arc) -> Module { - let mut module = Module::new().name(env!("CARGO_PKG_NAME")).data(block_manager); +pub fn create(document_manager: Arc) -> Module { + let mut module = Module::new().name(env!("CARGO_PKG_NAME")).data(document_manager); module = module - .event(TextBlockEvent::GetTextBlock, get_text_block_handler) - .event(TextBlockEvent::ApplyEdit, apply_edit_handler) - .event(TextBlockEvent::ExportDocument, export_handler); + .event(DocumentEvent::GetDocument, get_document_handler) + .event(DocumentEvent::ApplyEdit, apply_edit_handler) + .event(DocumentEvent::ExportDocument, export_handler); module } #[derive(Clone, Copy, PartialEq, Eq, Debug, Display, Hash, ProtoBuf_Enum, Flowy_Event)] #[event_err = "FlowyError"] -pub enum TextBlockEvent { - #[event(input = "TextBlockIdPB", output = "TextBlockPB")] - GetTextBlock = 0, +pub enum DocumentEvent { + #[event(input = "DocumentIdPB", output = "DocumentSnapshotPB")] + GetDocument = 0, #[event(input = "EditPayloadPB")] ApplyEdit = 1, diff --git a/frontend/rust-lib/flowy-document/src/lib.rs b/frontend/rust-lib/flowy-document/src/lib.rs new file mode 100644 index 0000000000..399ac9a342 --- /dev/null +++ b/frontend/rust-lib/flowy-document/src/lib.rs @@ -0,0 +1,27 @@ +pub mod editor; +mod entities; +mod event_handler; +pub mod event_map; +pub mod manager; +mod queue; +mod web_socket; + +pub mod protobuf; +pub use manager::*; +pub mod errors { + pub use flowy_error::{internal_error, ErrorCode, FlowyError}; +} + +pub const TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS: u64 = 1000; + +use crate::errors::FlowyError; +use flowy_sync::entities::document::{CreateDocumentParams, DocumentIdPB, DocumentPayloadPB, ResetDocumentParams}; +use lib_infra::future::FutureResult; + +pub trait DocumentCloudService: Send + Sync { + fn create_document(&self, token: &str, params: CreateDocumentParams) -> FutureResult<(), FlowyError>; + + fn fetch_document(&self, token: &str, params: DocumentIdPB) -> FutureResult, FlowyError>; + + fn update_document_content(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError>; +} diff --git a/frontend/rust-lib/flowy-text-block/src/manager.rs b/frontend/rust-lib/flowy-document/src/manager.rs similarity index 54% rename from frontend/rust-lib/flowy-text-block/src/manager.rs rename to frontend/rust-lib/flowy-document/src/manager.rs index 2e10873523..a5597305f8 100644 --- a/frontend/rust-lib/flowy-text-block/src/manager.rs +++ b/frontend/rust-lib/flowy-document/src/manager.rs @@ -1,47 +1,47 @@ +use crate::editor::DocumentRevisionCompactor; use crate::entities::EditParams; -use crate::queue::TextBlockRevisionCompactor; -use crate::{editor::TextBlockEditor, errors::FlowyError, TextEditorCloudService}; +use crate::{editor::DocumentEditor, errors::FlowyError, DocumentCloudService}; use bytes::Bytes; use dashmap::DashMap; use flowy_database::ConnectionPool; use flowy_error::FlowyResult; -use flowy_revision::disk::SQLiteTextBlockRevisionPersistence; +use flowy_revision::disk::SQLiteDocumentRevisionPersistence; use flowy_revision::{ RevisionCloudService, RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence, }; use flowy_sync::entities::{ + document::{DocumentIdPB, DocumentOperationsPB}, revision::{md5, RepeatedRevision, Revision}, - text_block::{TextBlockDeltaPB, TextBlockIdPB}, ws_data::ServerRevisionWSData, }; use lib_infra::future::FutureResult; use std::{convert::TryInto, sync::Arc}; -pub trait TextEditorUser: Send + Sync { +pub trait DocumentUser: Send + Sync { fn user_dir(&self) -> Result; fn user_id(&self) -> Result; fn token(&self) -> Result; fn db_pool(&self) -> Result, FlowyError>; } -pub struct TextEditorManager { - cloud_service: Arc, +pub struct DocumentManager { + cloud_service: Arc, rev_web_socket: Arc, - editor_map: Arc, - user: Arc, + editor_map: Arc, + user: Arc, } -impl TextEditorManager { +impl DocumentManager { pub fn new( - cloud_service: Arc, - text_block_user: Arc, + cloud_service: Arc, + document_user: Arc, rev_web_socket: Arc, ) -> Self { Self { cloud_service, rev_web_socket, - editor_map: Arc::new(TextEditorMap::new()), - user: text_block_user, + editor_map: Arc::new(DocumentEditorMap::new()), + user: document_user, } } @@ -52,46 +52,49 @@ impl TextEditorManager { } #[tracing::instrument(level = "trace", skip(self, editor_id), fields(editor_id), err)] - pub async fn open_text_editor>(&self, editor_id: T) -> Result, FlowyError> { + pub async fn open_document_editor>(&self, editor_id: T) -> Result, FlowyError> { let editor_id = editor_id.as_ref(); tracing::Span::current().record("editor_id", &editor_id); - self.get_text_editor(editor_id).await + self.get_document_editor(editor_id).await } #[tracing::instrument(level = "trace", skip(self, editor_id), fields(editor_id), err)] - pub fn close_text_editor>(&self, editor_id: T) -> Result<(), FlowyError> { + pub fn close_document_editor>(&self, editor_id: T) -> Result<(), FlowyError> { let editor_id = editor_id.as_ref(); tracing::Span::current().record("editor_id", &editor_id); self.editor_map.remove(editor_id); Ok(()) } - #[tracing::instrument(level = "debug", skip(self, delta), err)] - pub async fn receive_local_delta(&self, delta: TextBlockDeltaPB) -> Result { - let editor = self.get_text_editor(&delta.text_block_id).await?; - let _ = editor.compose_local_delta(Bytes::from(delta.delta_str)).await?; - let delta_str = editor.delta_str().await?; - Ok(TextBlockDeltaPB { - text_block_id: delta.text_block_id.clone(), - delta_str, + #[tracing::instrument(level = "debug", skip(self, payload), err)] + pub async fn receive_local_operations( + &self, + payload: DocumentOperationsPB, + ) -> Result { + let editor = self.get_document_editor(&payload.doc_id).await?; + let _ = editor + .compose_local_operations(Bytes::from(payload.operations_str)) + .await?; + let operations_str = editor.get_operation_str().await?; + Ok(DocumentOperationsPB { + doc_id: payload.doc_id.clone(), + operations_str, }) } pub async fn apply_edit(&self, params: EditParams) -> FlowyResult<()> { - let editor = self.get_text_editor(¶ms.text_block_id).await?; - let _ = editor.compose_local_delta(Bytes::from(params.delta)).await?; + let editor = self.get_document_editor(¶ms.doc_id).await?; + let _ = editor + .compose_local_operations(Bytes::from(params.operations_str)) + .await?; Ok(()) } - pub async fn create_text_block>( - &self, - text_block_id: T, - revisions: RepeatedRevision, - ) -> FlowyResult<()> { - let doc_id = text_block_id.as_ref().to_owned(); + pub async fn create_document>(&self, doc_id: T, revisions: RepeatedRevision) -> FlowyResult<()> { + let doc_id = doc_id.as_ref().to_owned(); let db_pool = self.user.db_pool()?; - // Maybe we could save the block to disk without creating the RevisionManager - let rev_manager = self.make_text_block_rev_manager(&doc_id, db_pool)?; + // Maybe we could save the document to disk without creating the RevisionManager + let rev_manager = self.make_document_rev_manager(&doc_id, db_pool)?; let _ = rev_manager.reset_object(revisions).await?; Ok(()) } @@ -113,47 +116,65 @@ impl TextEditorManager { } } -impl TextEditorManager { - async fn get_text_editor(&self, block_id: &str) -> FlowyResult> { - match self.editor_map.get(block_id) { +impl DocumentManager { + /// Returns the `DocumentEditor` + /// Initializes the document editor if it's not initialized yet. Otherwise, returns the opened + /// editor. + /// + /// # Arguments + /// + /// * `doc_id`: the id of the document + /// + /// returns: Result, FlowyError> + /// + async fn get_document_editor(&self, doc_id: &str) -> FlowyResult> { + match self.editor_map.get(doc_id) { None => { let db_pool = self.user.db_pool()?; - self.make_text_editor(block_id, db_pool).await + self.init_document_editor(doc_id, db_pool).await } Some(editor) => Ok(editor), } } + /// Initializes a document editor with the doc_id + /// + /// # Arguments + /// + /// * `doc_id`: the id of the document + /// * `pool`: sqlite connection pool + /// + /// returns: Result, FlowyError> + /// #[tracing::instrument(level = "trace", skip(self, pool), err)] - async fn make_text_editor( + async fn init_document_editor( &self, - block_id: &str, + doc_id: &str, pool: Arc, - ) -> Result, FlowyError> { + ) -> Result, FlowyError> { let user = self.user.clone(); let token = self.user.token()?; - let rev_manager = self.make_text_block_rev_manager(block_id, pool.clone())?; - let cloud_service = Arc::new(TextBlockRevisionCloudService { + let rev_manager = self.make_document_rev_manager(doc_id, pool.clone())?; + let cloud_service = Arc::new(DocumentRevisionCloudService { token, server: self.cloud_service.clone(), }); - let doc_editor = - TextBlockEditor::new(block_id, user, rev_manager, self.rev_web_socket.clone(), cloud_service).await?; - self.editor_map.insert(block_id, &doc_editor); - Ok(doc_editor) + let editor = DocumentEditor::new(doc_id, user, rev_manager, self.rev_web_socket.clone(), cloud_service).await?; + self.editor_map.insert(doc_id, &editor); + Ok(editor) } - fn make_text_block_rev_manager( + fn make_document_rev_manager( &self, doc_id: &str, pool: Arc, ) -> Result { let user_id = self.user.user_id()?; - let disk_cache = SQLiteTextBlockRevisionPersistence::new(&user_id, pool.clone()); + let disk_cache = SQLiteDocumentRevisionPersistence::new(&user_id, pool.clone()); let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache); // let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone()); let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(doc_id, pool); - let rev_compactor = TextBlockRevisionCompactor(); + let rev_compactor = DocumentRevisionCompactor(); Ok(RevisionManager::new( &user_id, @@ -166,30 +187,30 @@ impl TextEditorManager { } } -struct TextBlockRevisionCloudService { +struct DocumentRevisionCloudService { token: String, - server: Arc, + server: Arc, } -impl RevisionCloudService for TextBlockRevisionCloudService { +impl RevisionCloudService for DocumentRevisionCloudService { #[tracing::instrument(level = "trace", skip(self))] fn fetch_object(&self, user_id: &str, object_id: &str) -> FutureResult, FlowyError> { - let params: TextBlockIdPB = object_id.to_string().into(); + let params: DocumentIdPB = object_id.to_string().into(); let server = self.server.clone(); let token = self.token.clone(); let user_id = user_id.to_string(); FutureResult::new(async move { - match server.read_text_block(&token, params).await? { + match server.fetch_document(&token, params).await? { None => Err(FlowyError::record_not_found().context("Remote doesn't have this document")), - Some(doc) => { - let delta_data = Bytes::from(doc.text.clone()); - let doc_md5 = md5(&delta_data); + Some(payload) => { + let bytes = Bytes::from(payload.content.clone()); + let doc_md5 = md5(&bytes); let revision = Revision::new( - &doc.block_id, - doc.base_rev_id, - doc.rev_id, - delta_data, + &payload.doc_id, + payload.base_rev_id, + payload.rev_id, + bytes, &user_id, doc_md5, ); @@ -200,23 +221,23 @@ impl RevisionCloudService for TextBlockRevisionCloudService { } } -pub struct TextEditorMap { - inner: DashMap>, +pub struct DocumentEditorMap { + inner: DashMap>, } -impl TextEditorMap { +impl DocumentEditorMap { fn new() -> Self { Self { inner: DashMap::new() } } - pub(crate) fn insert(&self, editor_id: &str, doc: &Arc) { + pub(crate) fn insert(&self, editor_id: &str, doc: &Arc) { if self.inner.contains_key(editor_id) { log::warn!("Doc:{} already exists in cache", editor_id); } self.inner.insert(editor_id.to_string(), doc.clone()); } - pub(crate) fn get(&self, editor_id: &str) -> Option> { + pub(crate) fn get(&self, editor_id: &str) -> Option> { Some(self.inner.get(editor_id)?.clone()) } @@ -229,7 +250,7 @@ impl TextEditorMap { } #[tracing::instrument(level = "trace", skip(web_socket, handlers))] -fn listen_ws_state_changed(web_socket: Arc, handlers: Arc) { +fn listen_ws_state_changed(web_socket: Arc, handlers: Arc) { tokio::spawn(async move { let mut notify = web_socket.subscribe_state_changed().await; while let Ok(state) = notify.recv().await { diff --git a/frontend/rust-lib/flowy-text-block/src/queue.rs b/frontend/rust-lib/flowy-document/src/queue.rs similarity index 61% rename from frontend/rust-lib/flowy-text-block/src/queue.rs rename to frontend/rust-lib/flowy-document/src/queue.rs index f6dd5f56b1..d74970e9e2 100644 --- a/frontend/rust-lib/flowy-text-block/src/queue.rs +++ b/frontend/rust-lib/flowy-document/src/queue.rs @@ -1,17 +1,15 @@ -use crate::web_socket::EditorCommandReceiver; -use crate::TextEditorUser; +use crate::web_socket::{DocumentResolveOperations, EditorCommandReceiver}; +use crate::DocumentUser; use async_stream::stream; -use bytes::Bytes; -use flowy_error::{FlowyError, FlowyResult}; -use flowy_revision::{OperationsMD5, RevisionCompactor, RevisionManager, RichTextTransformDeltas, TransformDeltas}; -use flowy_sync::util::make_operations_from_revisions; +use flowy_error::FlowyError; +use flowy_revision::{OperationsMD5, RevisionManager, TransformOperations}; use flowy_sync::{ client_document::{history::UndoResult, ClientDocument}, entities::revision::{RevId, Revision}, errors::CollaborateError, }; use futures::stream::StreamExt; -use lib_ot::core::{AttributeEntry, AttributeHashMap}; +use lib_ot::core::AttributeEntry; use lib_ot::{ core::{Interval, OperationTransform}, text_delta::TextOperations, @@ -21,21 +19,21 @@ use tokio::sync::{oneshot, RwLock}; // The EditorCommandQueue executes each command that will alter the document in // serial. -pub(crate) struct EditBlockQueue { +pub(crate) struct EditDocumentQueue { document: Arc>, - user: Arc, + user: Arc, rev_manager: Arc, receiver: Option, } -impl EditBlockQueue { +impl EditDocumentQueue { pub(crate) fn new( - user: Arc, + user: Arc, rev_manager: Arc, - delta: TextOperations, + operations: TextOperations, receiver: EditorCommandReceiver, ) -> Self { - let document = Arc::new(RwLock::new(ClientDocument::from_operations(delta))); + let document = Arc::new(RwLock::new(ClientDocument::from_operations(operations))); Self { document, user, @@ -67,62 +65,62 @@ impl EditBlockQueue { #[tracing::instrument(level = "trace", skip(self), err)] async fn handle_command(&self, command: EditorCommand) -> Result<(), FlowyError> { match command { - EditorCommand::ComposeLocalDelta { delta, ret } => { + EditorCommand::ComposeLocalOperations { operations, ret } => { let mut document = self.document.write().await; - let _ = document.compose_operations(delta.clone())?; + let _ = document.compose_operations(operations.clone())?; let md5 = document.md5(); drop(document); - let _ = self.save_local_delta(delta, md5).await?; + let _ = self.save_local_operations(operations, md5).await?; let _ = ret.send(Ok(())); } - EditorCommand::ComposeRemoteDelta { client_delta, ret } => { + EditorCommand::ComposeRemoteOperation { client_operations, ret } => { let mut document = self.document.write().await; - let _ = document.compose_operations(client_delta.clone())?; + let _ = document.compose_operations(client_operations.clone())?; let md5 = document.md5(); drop(document); let _ = ret.send(Ok(md5)); } - EditorCommand::ResetDelta { delta, ret } => { + EditorCommand::ResetOperations { operations, ret } => { let mut document = self.document.write().await; - let _ = document.set_operations(delta); + let _ = document.set_operations(operations); let md5 = document.md5(); drop(document); let _ = ret.send(Ok(md5)); } - EditorCommand::TransformDelta { delta, ret } => { + EditorCommand::TransformOperations { operations, ret } => { let f = || async { let read_guard = self.document.read().await; - let mut server_prime: Option = None; - let client_prime: TextOperations; + let mut server_operations: Option = None; + let client_operations: TextOperations; if read_guard.is_empty() { // Do nothing - client_prime = delta; + client_operations = operations; } else { - let (s_prime, c_prime) = read_guard.get_operations().transform(&delta)?; - client_prime = c_prime; - server_prime = Some(s_prime); + let (s_prime, c_prime) = read_guard.get_operations().transform(&operations)?; + client_operations = c_prime; + server_operations = Some(DocumentResolveOperations(s_prime)); } drop(read_guard); - Ok::(TransformDeltas { - client_prime, - server_prime, + Ok::(TransformOperations { + client_operations: DocumentResolveOperations(client_operations), + server_operations, }) }; let _ = ret.send(f().await); } EditorCommand::Insert { index, data, ret } => { let mut write_guard = self.document.write().await; - let delta = write_guard.insert(index, data)?; + let operations = write_guard.insert(index, data)?; let md5 = write_guard.md5(); - let _ = self.save_local_delta(delta, md5).await?; + let _ = self.save_local_operations(operations, md5).await?; let _ = ret.send(Ok(())); } EditorCommand::Delete { interval, ret } => { let mut write_guard = self.document.write().await; - let delta = write_guard.delete(interval)?; + let operations = write_guard.delete(interval)?; let md5 = write_guard.md5(); - let _ = self.save_local_delta(delta, md5).await?; + let _ = self.save_local_operations(operations, md5).await?; let _ = ret.send(Ok(())); } EditorCommand::Format { @@ -131,16 +129,16 @@ impl EditBlockQueue { ret, } => { let mut write_guard = self.document.write().await; - let delta = write_guard.format(interval, attribute)?; + let operations = write_guard.format(interval, attribute)?; let md5 = write_guard.md5(); - let _ = self.save_local_delta(delta, md5).await?; + let _ = self.save_local_operations(operations, md5).await?; let _ = ret.send(Ok(())); } EditorCommand::Replace { interval, data, ret } => { let mut write_guard = self.document.write().await; - let delta = write_guard.replace(interval, data)?; + let operations = write_guard.replace(interval, data)?; let md5 = write_guard.md5(); - let _ = self.save_local_delta(delta, md5).await?; + let _ = self.save_local_operations(operations, md5).await?; let _ = ret.send(Ok(())); } EditorCommand::CanUndo { ret } => { @@ -151,73 +149,60 @@ impl EditBlockQueue { } EditorCommand::Undo { ret } => { let mut write_guard = self.document.write().await; - let UndoResult { operations: delta } = write_guard.undo()?; + let UndoResult { operations } = write_guard.undo()?; let md5 = write_guard.md5(); - let _ = self.save_local_delta(delta, md5).await?; + let _ = self.save_local_operations(operations, md5).await?; let _ = ret.send(Ok(())); } EditorCommand::Redo { ret } => { let mut write_guard = self.document.write().await; - let UndoResult { operations: delta } = write_guard.redo()?; + let UndoResult { operations } = write_guard.redo()?; let md5 = write_guard.md5(); - let _ = self.save_local_delta(delta, md5).await?; + let _ = self.save_local_operations(operations, md5).await?; let _ = ret.send(Ok(())); } - EditorCommand::ReadDeltaStr { ret } => { + EditorCommand::StringifyOperations { ret } => { let data = self.document.read().await.get_operations_json(); let _ = ret.send(Ok(data)); } - EditorCommand::ReadDelta { ret } => { - let delta = self.document.read().await.get_operations().clone(); - let _ = ret.send(Ok(delta)); + EditorCommand::ReadOperations { ret } => { + let operations = self.document.read().await.get_operations().clone(); + let _ = ret.send(Ok(operations)); } } Ok(()) } - async fn save_local_delta(&self, delta: TextOperations, md5: String) -> Result { - let delta_data = delta.json_bytes(); + async fn save_local_operations(&self, operations: TextOperations, md5: String) -> Result { + let bytes = operations.json_bytes(); let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair(); let user_id = self.user.user_id()?; - let revision = Revision::new( - &self.rev_manager.object_id, - base_rev_id, - rev_id, - delta_data, - &user_id, - md5, - ); + let revision = Revision::new(&self.rev_manager.object_id, base_rev_id, rev_id, bytes, &user_id, md5); let _ = self.rev_manager.add_local_revision(&revision).await?; Ok(rev_id.into()) } } -pub(crate) struct TextBlockRevisionCompactor(); -impl RevisionCompactor for TextBlockRevisionCompactor { - fn bytes_from_revisions(&self, revisions: Vec) -> FlowyResult { - let delta = make_operations_from_revisions::(revisions)?; - Ok(delta.json_bytes()) - } -} +pub type TextTransformOperations = TransformOperations; pub(crate) type Ret = oneshot::Sender>; pub(crate) enum EditorCommand { - ComposeLocalDelta { - delta: TextOperations, + ComposeLocalOperations { + operations: TextOperations, ret: Ret<()>, }, - ComposeRemoteDelta { - client_delta: TextOperations, + ComposeRemoteOperation { + client_operations: TextOperations, ret: Ret, }, - ResetDelta { - delta: TextOperations, + ResetOperations { + operations: TextOperations, ret: Ret, }, - TransformDelta { - delta: TextOperations, - ret: Ret, + TransformOperations { + operations: TextOperations, + ret: Ret, }, Insert { index: usize, @@ -250,11 +235,11 @@ pub(crate) enum EditorCommand { Redo { ret: Ret<()>, }, - ReadDeltaStr { + StringifyOperations { ret: Ret, }, #[allow(dead_code)] - ReadDelta { + ReadOperations { ret: Ret, }, } @@ -262,10 +247,10 @@ pub(crate) enum EditorCommand { impl std::fmt::Debug for EditorCommand { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { let s = match self { - EditorCommand::ComposeLocalDelta { .. } => "ComposeLocalDelta", - EditorCommand::ComposeRemoteDelta { .. } => "ComposeRemoteDelta", - EditorCommand::ResetDelta { .. } => "ResetDelta", - EditorCommand::TransformDelta { .. } => "TransformDelta", + EditorCommand::ComposeLocalOperations { .. } => "ComposeLocalOperations", + EditorCommand::ComposeRemoteOperation { .. } => "ComposeRemoteOperation", + EditorCommand::ResetOperations { .. } => "ResetOperations", + EditorCommand::TransformOperations { .. } => "TransformOperations", EditorCommand::Insert { .. } => "Insert", EditorCommand::Delete { .. } => "Delete", EditorCommand::Format { .. } => "Format", @@ -274,8 +259,8 @@ impl std::fmt::Debug for EditorCommand { EditorCommand::CanRedo { .. } => "CanRedo", EditorCommand::Undo { .. } => "Undo", EditorCommand::Redo { .. } => "Redo", - EditorCommand::ReadDeltaStr { .. } => "ReadDeltaStr", - EditorCommand::ReadDelta { .. } => "ReadDocumentAsDelta", + EditorCommand::StringifyOperations { .. } => "StringifyOperations", + EditorCommand::ReadOperations { .. } => "ReadOperations", }; f.write_str(s) } diff --git a/frontend/rust-lib/flowy-text-block/src/web_socket.rs b/frontend/rust-lib/flowy-document/src/web_socket.rs similarity index 57% rename from frontend/rust-lib/flowy-text-block/src/web_socket.rs rename to frontend/rust-lib/flowy-document/src/web_socket.rs index 742962dd4d..8c4e1170af 100644 --- a/frontend/rust-lib/flowy-text-block/src/web_socket.rs +++ b/frontend/rust-lib/flowy-document/src/web_socket.rs @@ -1,7 +1,9 @@ +use crate::queue::TextTransformOperations; use crate::{queue::EditorCommand, TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS}; use bytes::Bytes; -use flowy_error::{internal_error, FlowyError}; +use flowy_error::{internal_error, FlowyError, FlowyResult}; use flowy_revision::*; +use flowy_sync::entities::revision::Revision; use flowy_sync::{ entities::{ revision::RevisionRange, @@ -10,8 +12,8 @@ use flowy_sync::{ errors::CollaborateResult, }; use lib_infra::future::{BoxResultFuture, FutureResult}; -use lib_ot::core::AttributeHashMap; +use flowy_sync::util::make_operations_from_revisions; use lib_ot::text_delta::TextOperations; use lib_ws::WSConnectState; use std::{sync::Arc, time::Duration}; @@ -24,8 +26,31 @@ use tokio::sync::{ pub(crate) type EditorCommandSender = Sender; pub(crate) type EditorCommandReceiver = Receiver; +#[derive(Clone)] +pub struct DocumentResolveOperations(pub TextOperations); + +impl OperationsDeserializer for DocumentResolveOperations { + fn deserialize_revisions(revisions: Vec) -> FlowyResult { + Ok(DocumentResolveOperations(make_operations_from_revisions(revisions)?)) + } +} + +impl OperationsSerializer for DocumentResolveOperations { + fn serialize_operations(&self) -> Bytes { + self.0.json_bytes() + } +} + +impl DocumentResolveOperations { + pub fn into_inner(self) -> TextOperations { + self.0 + } +} + +pub type DocumentConflictController = ConflictController; + #[allow(dead_code)] -pub(crate) async fn make_block_ws_manager( +pub(crate) async fn make_document_ws_manager( doc_id: String, user_id: String, edit_cmd_tx: EditorCommandSender, @@ -33,11 +58,11 @@ pub(crate) async fn make_block_ws_manager( rev_web_socket: Arc, ) -> Arc { let ws_data_provider = Arc::new(WSDataProvider::new(&doc_id, Arc::new(rev_manager.clone()))); - let resolver = Arc::new(TextBlockConflictResolver { edit_cmd_tx }); + let resolver = Arc::new(DocumentConflictResolver { edit_cmd_tx }); let conflict_controller = - RichTextConflictController::new(&user_id, resolver, Arc::new(ws_data_provider.clone()), rev_manager); - let ws_data_stream = Arc::new(TextBlockRevisionWSDataStream::new(conflict_controller)); - let ws_data_sink = Arc::new(TextBlockWSDataSink(ws_data_provider)); + DocumentConflictController::new(&user_id, resolver, Arc::new(ws_data_provider.clone()), rev_manager); + let ws_data_stream = Arc::new(DocumentRevisionWSDataStream::new(conflict_controller)); + let ws_data_sink = Arc::new(DocumentWSDataSink(ws_data_provider)); let ping_duration = Duration::from_millis(TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS); let ws_manager = Arc::new(RevisionWebSocketManager::new( "Block", @@ -65,20 +90,20 @@ fn listen_document_ws_state(_user_id: &str, _doc_id: &str, mut subscriber: broad }); } -pub(crate) struct TextBlockRevisionWSDataStream { - conflict_controller: Arc, +pub(crate) struct DocumentRevisionWSDataStream { + conflict_controller: Arc, } -impl TextBlockRevisionWSDataStream { +impl DocumentRevisionWSDataStream { #[allow(dead_code)] - pub fn new(conflict_controller: RichTextConflictController) -> Self { + pub fn new(conflict_controller: DocumentConflictController) -> Self { Self { conflict_controller: Arc::new(conflict_controller), } } } -impl RevisionWSDataStream for TextBlockRevisionWSDataStream { +impl RevisionWSDataStream for DocumentRevisionWSDataStream { fn receive_push_revision(&self, bytes: Bytes) -> BoxResultFuture<(), FlowyError> { let resolver = self.conflict_controller.clone(); Box::pin(async move { resolver.receive_bytes(bytes).await }) @@ -100,64 +125,67 @@ impl RevisionWSDataStream for TextBlockRevisionWSDataStream { } } -pub(crate) struct TextBlockWSDataSink(pub(crate) Arc); -impl RevisionWebSocketSink for TextBlockWSDataSink { +pub(crate) struct DocumentWSDataSink(pub(crate) Arc); +impl RevisionWebSocketSink for DocumentWSDataSink { fn next(&self) -> FutureResult, FlowyError> { let sink_provider = self.0.clone(); FutureResult::new(async move { sink_provider.next().await }) } } -struct TextBlockConflictResolver { +struct DocumentConflictResolver { edit_cmd_tx: EditorCommandSender, } -impl ConflictResolver for TextBlockConflictResolver { - fn compose_delta(&self, delta: TextOperations) -> BoxResultFuture { +impl ConflictResolver for DocumentConflictResolver { + fn compose_operations(&self, operations: DocumentResolveOperations) -> BoxResultFuture { let tx = self.edit_cmd_tx.clone(); + let operations = operations.into_inner(); Box::pin(async move { let (ret, rx) = oneshot::channel(); - tx.send(EditorCommand::ComposeRemoteDelta { - client_delta: delta, + tx.send(EditorCommand::ComposeRemoteOperation { + client_operations: operations, ret, }) .await .map_err(internal_error)?; - let md5 = rx.await.map_err(|e| { - FlowyError::internal().context(format!("handle EditorCommand::ComposeRemoteDelta failed: {}", e)) - })??; + let md5 = rx + .await + .map_err(|e| FlowyError::internal().context(format!("Compose operations failed: {}", e)))??; Ok(md5) }) } - fn transform_delta( + fn transform_operations( &self, - delta: TextOperations, - ) -> BoxResultFuture { + operations: DocumentResolveOperations, + ) -> BoxResultFuture, FlowyError> { let tx = self.edit_cmd_tx.clone(); + let operations = operations.into_inner(); Box::pin(async move { - let (ret, rx) = oneshot::channel::>(); - tx.send(EditorCommand::TransformDelta { delta, ret }) + let (ret, rx) = oneshot::channel::>(); + tx.send(EditorCommand::TransformOperations { operations, ret }) .await .map_err(internal_error)?; - let transform_delta = rx + let transformed_operations = rx .await - .map_err(|e| FlowyError::internal().context(format!("TransformDelta failed: {}", e)))??; - Ok(transform_delta) + .map_err(|e| FlowyError::internal().context(format!("Transform operations failed: {}", e)))??; + Ok(transformed_operations) }) } - fn reset_delta(&self, delta: TextOperations) -> BoxResultFuture { + fn reset_operations(&self, operations: DocumentResolveOperations) -> BoxResultFuture { let tx = self.edit_cmd_tx.clone(); + let operations = operations.into_inner(); Box::pin(async move { let (ret, rx) = oneshot::channel(); let _ = tx - .send(EditorCommand::ResetDelta { delta, ret }) + .send(EditorCommand::ResetOperations { operations, ret }) .await .map_err(internal_error)?; - let md5 = rx.await.map_err(|e| { - FlowyError::internal().context(format!("handle EditorCommand::OverrideDelta failed: {}", e)) - })??; + let md5 = rx + .await + .map_err(|e| FlowyError::internal().context(format!("Reset operations failed: {}", e)))??; Ok(md5) }) } diff --git a/frontend/rust-lib/flowy-text-block/tests/document/mod.rs b/frontend/rust-lib/flowy-document/tests/document/mod.rs similarity index 100% rename from frontend/rust-lib/flowy-text-block/tests/document/mod.rs rename to frontend/rust-lib/flowy-document/tests/document/mod.rs diff --git a/frontend/rust-lib/flowy-text-block/tests/document/script.rs b/frontend/rust-lib/flowy-document/tests/document/script.rs similarity index 87% rename from frontend/rust-lib/flowy-text-block/tests/document/script.rs rename to frontend/rust-lib/flowy-document/tests/document/script.rs index 8157debd16..3055e7224d 100644 --- a/frontend/rust-lib/flowy-text-block/tests/document/script.rs +++ b/frontend/rust-lib/flowy-document/tests/document/script.rs @@ -1,7 +1,7 @@ +use flowy_document::editor::DocumentEditor; +use flowy_document::TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS; use flowy_revision::disk::RevisionState; use flowy_test::{helper::ViewTest, FlowySDKTest}; -use flowy_text_block::editor::TextBlockEditor; -use flowy_text_block::TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS; use lib_ot::{core::Interval, text_delta::TextOperations}; use std::sync::Arc; use tokio::time::{sleep, Duration}; @@ -17,17 +17,21 @@ pub enum EditorScript { AssertJson(&'static str), } -pub struct TextBlockEditorTest { +pub struct DocumentEditorTest { pub sdk: FlowySDKTest, - pub editor: Arc, + pub editor: Arc, } -impl TextBlockEditorTest { +impl DocumentEditorTest { pub async fn new() -> Self { let sdk = FlowySDKTest::default(); let _ = sdk.init_user().await; let test = ViewTest::new_text_block_view(&sdk).await; - let editor = sdk.text_block_manager.open_text_editor(&test.view.id).await.unwrap(); + let editor = sdk + .text_block_manager + .open_document_editor(&test.view.id) + .await + .unwrap(); Self { sdk, editor } } @@ -72,7 +76,7 @@ impl TextBlockEditorTest { } EditorScript::AssertJson(expected) => { let expected_delta: TextOperations = serde_json::from_str(expected).unwrap(); - let delta = self.editor.text_block_delta().await.unwrap(); + let delta = self.editor.document_operations().await.unwrap(); if expected_delta != delta { eprintln!("✅ expect: {}", expected,); eprintln!("❌ receive: {}", delta.json_str()); diff --git a/frontend/rust-lib/flowy-text-block/tests/document/text_block_test.rs b/frontend/rust-lib/flowy-document/tests/document/text_block_test.rs similarity index 83% rename from frontend/rust-lib/flowy-text-block/tests/document/text_block_test.rs rename to frontend/rust-lib/flowy-document/tests/document/text_block_test.rs index 30a926c3f9..436411f80b 100644 --- a/frontend/rust-lib/flowy-text-block/tests/document/text_block_test.rs +++ b/frontend/rust-lib/flowy-document/tests/document/text_block_test.rs @@ -14,7 +14,7 @@ async fn text_block_sync_current_rev_id_check() { AssertNextSyncRevId(None), AssertJson(r#"[{"insert":"123\n"}]"#), ]; - TextBlockEditorTest::new().await.run_scripts(scripts).await; + DocumentEditorTest::new().await.run_scripts(scripts).await; } #[tokio::test] @@ -28,7 +28,7 @@ async fn text_block_sync_state_check() { AssertRevisionState(3, RevisionState::Ack), AssertJson(r#"[{"insert":"123\n"}]"#), ]; - TextBlockEditorTest::new().await.run_scripts(scripts).await; + DocumentEditorTest::new().await.run_scripts(scripts).await; } #[tokio::test] @@ -40,7 +40,7 @@ async fn text_block_sync_insert_test() { AssertJson(r#"[{"insert":"123\n"}]"#), AssertNextSyncRevId(None), ]; - TextBlockEditorTest::new().await.run_scripts(scripts).await; + DocumentEditorTest::new().await.run_scripts(scripts).await; } #[tokio::test] @@ -52,7 +52,7 @@ async fn text_block_sync_insert_in_chinese() { InsertText("好", offset), AssertJson(r#"[{"insert":"你好\n"}]"#), ]; - TextBlockEditorTest::new().await.run_scripts(scripts).await; + DocumentEditorTest::new().await.run_scripts(scripts).await; } #[tokio::test] @@ -64,7 +64,7 @@ async fn text_block_sync_insert_with_emoji() { InsertText("☺️", offset), AssertJson(r#"[{"insert":"😁☺️\n"}]"#), ]; - TextBlockEditorTest::new().await.run_scripts(scripts).await; + DocumentEditorTest::new().await.run_scripts(scripts).await; } #[tokio::test] @@ -76,7 +76,7 @@ async fn text_block_sync_delete_in_english() { Delete(Interval::new(0, 2)), AssertJson(r#"[{"insert":"3\n"}]"#), ]; - TextBlockEditorTest::new().await.run_scripts(scripts).await; + DocumentEditorTest::new().await.run_scripts(scripts).await; } #[tokio::test] @@ -89,7 +89,7 @@ async fn text_block_sync_delete_in_chinese() { Delete(Interval::new(0, offset)), AssertJson(r#"[{"insert":"好\n"}]"#), ]; - TextBlockEditorTest::new().await.run_scripts(scripts).await; + DocumentEditorTest::new().await.run_scripts(scripts).await; } #[tokio::test] @@ -101,5 +101,5 @@ async fn text_block_sync_replace_test() { Replace(Interval::new(0, 3), "abc"), AssertJson(r#"[{"insert":"abc\n"}]"#), ]; - TextBlockEditorTest::new().await.run_scripts(scripts).await; + DocumentEditorTest::new().await.run_scripts(scripts).await; } diff --git a/frontend/rust-lib/flowy-text-block/tests/editor/attribute_test.rs b/frontend/rust-lib/flowy-document/tests/editor/attribute_test.rs similarity index 100% rename from frontend/rust-lib/flowy-text-block/tests/editor/attribute_test.rs rename to frontend/rust-lib/flowy-document/tests/editor/attribute_test.rs diff --git a/frontend/rust-lib/flowy-text-block/tests/editor/mod.rs b/frontend/rust-lib/flowy-document/tests/editor/mod.rs similarity index 99% rename from frontend/rust-lib/flowy-text-block/tests/editor/mod.rs rename to frontend/rust-lib/flowy-document/tests/editor/mod.rs index 1cce5aaeca..d750b459ca 100644 --- a/frontend/rust-lib/flowy-text-block/tests/editor/mod.rs +++ b/frontend/rust-lib/flowy-document/tests/editor/mod.rs @@ -5,7 +5,7 @@ mod serde_test; mod undo_redo_test; use derive_more::Display; -use flowy_sync::client_document::{ClientDocument, InitialDocumentContent}; +use flowy_sync::client_document::{ClientDocument, InitialDocument}; use lib_ot::{ core::*, text_delta::{BuildInTextAttribute, TextOperations}, @@ -264,7 +264,7 @@ impl TestBuilder { } } - pub fn run_scripts(mut self, scripts: Vec) { + pub fn run_scripts(mut self, scripts: Vec) { self.documents = vec![ClientDocument::new::(), ClientDocument::new::()]; self.primes = vec![None, None]; self.deltas = vec![None, None]; diff --git a/frontend/rust-lib/flowy-text-block/tests/editor/op_test.rs b/frontend/rust-lib/flowy-document/tests/editor/op_test.rs similarity index 100% rename from frontend/rust-lib/flowy-text-block/tests/editor/op_test.rs rename to frontend/rust-lib/flowy-document/tests/editor/op_test.rs diff --git a/frontend/rust-lib/flowy-text-block/tests/editor/serde_test.rs b/frontend/rust-lib/flowy-document/tests/editor/serde_test.rs similarity index 100% rename from frontend/rust-lib/flowy-text-block/tests/editor/serde_test.rs rename to frontend/rust-lib/flowy-document/tests/editor/serde_test.rs diff --git a/frontend/rust-lib/flowy-text-block/tests/editor/undo_redo_test.rs b/frontend/rust-lib/flowy-document/tests/editor/undo_redo_test.rs similarity index 100% rename from frontend/rust-lib/flowy-text-block/tests/editor/undo_redo_test.rs rename to frontend/rust-lib/flowy-document/tests/editor/undo_redo_test.rs diff --git a/frontend/rust-lib/flowy-text-block/tests/main.rs b/frontend/rust-lib/flowy-document/tests/main.rs similarity index 100% rename from frontend/rust-lib/flowy-text-block/tests/main.rs rename to frontend/rust-lib/flowy-document/tests/main.rs diff --git a/frontend/rust-lib/flowy-folder/Cargo.toml b/frontend/rust-lib/flowy-folder/Cargo.toml index e099df9629..70e37a3459 100644 --- a/frontend/rust-lib/flowy-folder/Cargo.toml +++ b/frontend/rust-lib/flowy-folder/Cargo.toml @@ -12,7 +12,7 @@ flowy-derive = { path = "../../../shared-lib/flowy-derive" } lib-ot = { path = "../../../shared-lib/lib-ot" } lib-infra = { path = "../../../shared-lib/lib-infra" } -flowy-text-block = { path = "../flowy-text-block" } +flowy-document = { path = "../flowy-document" } flowy-database = { path = "../flowy-database" } flowy-error = { path = "../flowy-error", features = ["db", "http_server"]} dart-notify = { path = "../dart-notify" } diff --git a/frontend/rust-lib/flowy-folder/src/manager.rs b/frontend/rust-lib/flowy-folder/src/manager.rs index 3df48e428e..61bd3c77b6 100644 --- a/frontend/rust-lib/flowy-folder/src/manager.rs +++ b/frontend/rust-lib/flowy-folder/src/manager.rs @@ -14,7 +14,7 @@ use crate::{ use bytes::Bytes; use flowy_error::FlowyError; use flowy_folder_data_model::user_default; -use flowy_revision::disk::SQLiteTextBlockRevisionPersistence; +use flowy_revision::disk::SQLiteDocumentRevisionPersistence; use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence}; use flowy_sync::client_document::default::{initial_document_str, initial_read_me}; use flowy_sync::{client_folder::FolderPad, entities::ws_data::ServerRevisionWSData}; @@ -164,7 +164,7 @@ impl FolderManager { let pool = self.persistence.db_pool()?; let object_id = folder_id.as_ref(); - let disk_cache = SQLiteTextBlockRevisionPersistence::new(user_id, pool.clone()); + let disk_cache = SQLiteDocumentRevisionPersistence::new(user_id, pool.clone()); let rev_persistence = RevisionPersistence::new(user_id, object_id, disk_cache); let rev_compactor = FolderRevisionCompactor(); // let history_persistence = SQLiteRevisionHistoryPersistence::new(object_id, pool.clone()); diff --git a/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs b/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs index a8c54fd0ad..b66b649cc3 100644 --- a/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs +++ b/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs @@ -2,7 +2,8 @@ use crate::manager::FolderId; use bytes::Bytes; use flowy_error::{FlowyError, FlowyResult}; use flowy_revision::{ - RevisionCloudService, RevisionCompactor, RevisionManager, RevisionObjectBuilder, RevisionWebSocket, + RevisionCloudService, RevisionCompress, RevisionManager, RevisionObjectDeserializer, RevisionObjectSerializer, + RevisionWebSocket, }; use flowy_sync::util::make_operations_from_revisions; use flowy_sync::{ @@ -37,7 +38,7 @@ impl FolderEditor { let cloud = Arc::new(FolderRevisionCloudService { token: token.to_string(), }); - let folder = Arc::new(RwLock::new(rev_manager.load::(Some(cloud)).await?)); + let folder = Arc::new(RwLock::new(rev_manager.load::(Some(cloud)).await?)); let rev_manager = Arc::new(rev_manager); #[cfg(feature = "sync")] @@ -100,16 +101,30 @@ impl FolderEditor { } } -struct FolderPadBuilder(); -impl RevisionObjectBuilder for FolderPadBuilder { +struct FolderRevisionSerde(); +impl RevisionObjectDeserializer for FolderRevisionSerde { type Output = FolderPad; - fn build_object(_object_id: &str, revisions: Vec) -> FlowyResult { + fn deserialize_revisions(_object_id: &str, revisions: Vec) -> FlowyResult { let pad = FolderPad::from_revisions(revisions)?; Ok(pad) } } +impl RevisionObjectSerializer for FolderRevisionSerde { + fn serialize_revisions(revisions: Vec) -> FlowyResult { + let operations = make_operations_from_revisions::(revisions)?; + Ok(operations.json_bytes()) + } +} + +pub struct FolderRevisionCompactor(); +impl RevisionCompress for FolderRevisionCompactor { + fn serialize_revisions(&self, revisions: Vec) -> FlowyResult { + FolderRevisionSerde::serialize_revisions(revisions) + } +} + struct FolderRevisionCloudService { #[allow(dead_code)] token: String, @@ -128,11 +143,3 @@ impl FolderEditor { self.rev_manager.clone() } } - -pub struct FolderRevisionCompactor(); -impl RevisionCompactor for FolderRevisionCompactor { - fn bytes_from_revisions(&self, revisions: Vec) -> FlowyResult { - let operations = make_operations_from_revisions::(revisions)?; - Ok(operations.json_bytes()) - } -} diff --git a/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs b/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs index 82ebcd9b65..cfa034240e 100644 --- a/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs +++ b/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs @@ -3,14 +3,16 @@ use crate::{ event_map::WorkspaceDatabase, services::persistence::{AppTableSql, TrashTableSql, ViewTableSql, WorkspaceTableSql}, }; +use bytes::Bytes; use flowy_database::kv::KV; use flowy_error::{FlowyError, FlowyResult}; use flowy_folder_data_model::revision::{AppRevision, FolderRevision, ViewRevision, WorkspaceRevision}; -use flowy_revision::disk::SQLiteTextBlockRevisionPersistence; +use flowy_revision::disk::SQLiteDocumentRevisionPersistence; use flowy_revision::reset::{RevisionResettable, RevisionStructReset}; use flowy_sync::client_folder::make_folder_rev_json_str; use flowy_sync::entities::revision::Revision; use flowy_sync::{client_folder::FolderPad, entities::revision::md5}; +use lib_ot::core::DeltaBuilder; use std::sync::Arc; const V1_MIGRATION: &str = "FOLDER_V1_MIGRATION"; @@ -110,7 +112,7 @@ impl FolderMigration { }; let pool = self.database.db_pool()?; - let disk_cache = SQLiteTextBlockRevisionPersistence::new(&self.user_id, pool); + let disk_cache = SQLiteDocumentRevisionPersistence::new(&self.user_id, pool); let reset = RevisionStructReset::new(&self.user_id, object, Arc::new(disk_cache)); reset.run().await } @@ -129,10 +131,11 @@ impl RevisionResettable for FolderRevisionResettable { &self.folder_id } - fn target_reset_rev_str(&self, revisions: Vec) -> FlowyResult { + fn reset_data(&self, revisions: Vec) -> FlowyResult { let pad = FolderPad::from_revisions(revisions)?; let json = pad.to_json()?; - Ok(json) + let bytes = DeltaBuilder::new().insert(&json).build().json_bytes(); + Ok(bytes) } fn default_target_rev_str(&self) -> FlowyResult { diff --git a/frontend/rust-lib/flowy-folder/src/services/view/controller.rs b/frontend/rust-lib/flowy-folder/src/services/view/controller.rs index 3f2bf9aee0..b77a44623d 100644 --- a/frontend/rust-lib/flowy-folder/src/services/view/controller.rs +++ b/frontend/rust-lib/flowy-folder/src/services/view/controller.rs @@ -17,7 +17,7 @@ use crate::{ use bytes::Bytes; use flowy_database::kv::KV; use flowy_folder_data_model::revision::{gen_view_id, ViewRevision}; -use flowy_sync::entities::text_block::TextBlockIdPB; +use flowy_sync::entities::document::DocumentIdPB; use futures::{FutureExt, StreamExt}; use std::{collections::HashSet, sync::Arc}; @@ -193,7 +193,7 @@ impl ViewController { } #[tracing::instrument(level = "debug", skip(self,params), fields(doc_id = %params.value), err)] - pub(crate) async fn move_view_to_trash(&self, params: TextBlockIdPB) -> Result<(), FlowyError> { + pub(crate) async fn move_view_to_trash(&self, params: DocumentIdPB) -> Result<(), FlowyError> { let view_id = params.value; if let Some(latest_view_id) = KV::get_str(LATEST_VIEW_ID) { if latest_view_id == view_id { diff --git a/frontend/rust-lib/flowy-folder/src/services/web_socket.rs b/frontend/rust-lib/flowy-folder/src/services/web_socket.rs index ce122073eb..e77c82bdba 100644 --- a/frontend/rust-lib/flowy-folder/src/services/web_socket.rs +++ b/frontend/rust-lib/flowy-folder/src/services/web_socket.rs @@ -1,7 +1,10 @@ use crate::services::FOLDER_SYNC_INTERVAL_IN_MILLIS; use bytes::Bytes; -use flowy_error::FlowyError; +use flowy_error::{FlowyError, FlowyResult}; use flowy_revision::*; +use flowy_sync::entities::revision::Revision; +use flowy_sync::server_folder::FolderOperations; +use flowy_sync::util::make_operations_from_revisions; use flowy_sync::{ client_folder::FolderPad, entities::{ @@ -10,10 +13,32 @@ use flowy_sync::{ }, }; use lib_infra::future::{BoxResultFuture, FutureResult}; -use lib_ot::core::{Delta, EmptyAttributes, OperationTransform}; +use lib_ot::core::OperationTransform; use parking_lot::RwLock; use std::{sync::Arc, time::Duration}; +#[derive(Clone)] +pub struct FolderResolveOperations(pub FolderOperations); +impl OperationsDeserializer for FolderResolveOperations { + fn deserialize_revisions(revisions: Vec) -> FlowyResult { + Ok(FolderResolveOperations(make_operations_from_revisions(revisions)?)) + } +} + +impl OperationsSerializer for FolderResolveOperations { + fn serialize_operations(&self) -> Bytes { + self.0.json_bytes() + } +} + +impl FolderResolveOperations { + pub fn into_inner(self) -> FolderOperations { + self.0 + } +} + +pub type FolderConflictController = ConflictController; + #[allow(dead_code)] pub(crate) async fn make_folder_ws_manager( user_id: &str, @@ -25,7 +50,7 @@ pub(crate) async fn make_folder_ws_manager( let ws_data_provider = Arc::new(WSDataProvider::new(folder_id, Arc::new(rev_manager.clone()))); let resolver = Arc::new(FolderConflictResolver { folder_pad }); let conflict_controller = - ConflictController::::new(user_id, resolver, Arc::new(ws_data_provider.clone()), rev_manager); + FolderConflictController::new(user_id, resolver, Arc::new(ws_data_provider.clone()), rev_manager); let ws_data_stream = Arc::new(FolderRevisionWSDataStream::new(conflict_controller)); let ws_data_sink = Arc::new(FolderWSDataSink(ws_data_provider)); let ping_duration = Duration::from_millis(FOLDER_SYNC_INTERVAL_IN_MILLIS); @@ -51,52 +76,57 @@ struct FolderConflictResolver { folder_pad: Arc>, } -impl ConflictResolver for FolderConflictResolver { - fn compose_delta(&self, delta: Delta) -> BoxResultFuture { +impl ConflictResolver for FolderConflictResolver { + fn compose_operations(&self, operations: FolderResolveOperations) -> BoxResultFuture { + let operations = operations.into_inner(); let folder_pad = self.folder_pad.clone(); Box::pin(async move { - let md5 = folder_pad.write().compose_remote_operations(delta)?; + let md5 = folder_pad.write().compose_remote_operations(operations)?; Ok(md5) }) } - fn transform_delta(&self, delta: Delta) -> BoxResultFuture, FlowyError> { + fn transform_operations( + &self, + operations: FolderResolveOperations, + ) -> BoxResultFuture, FlowyError> { let folder_pad = self.folder_pad.clone(); + let operations = operations.into_inner(); Box::pin(async move { let read_guard = folder_pad.read(); - let mut server_prime: Option = None; - let client_prime: Delta; + let mut server_operations: Option = None; + let client_operations: FolderResolveOperations; if read_guard.is_empty() { // Do nothing - client_prime = delta; + client_operations = FolderResolveOperations(operations); } else { - let (s_prime, c_prime) = read_guard.get_operations().transform(&delta)?; - client_prime = c_prime; - server_prime = Some(s_prime); + let (s_prime, c_prime) = read_guard.get_operations().transform(&operations)?; + client_operations = FolderResolveOperations(c_prime); + server_operations = Some(FolderResolveOperations(s_prime)); } drop(read_guard); - Ok(TransformDeltas { - client_prime, - server_prime, + Ok(TransformOperations { + client_operations, + server_operations, }) }) } - fn reset_delta(&self, delta: Delta) -> BoxResultFuture { + fn reset_operations(&self, operations: FolderResolveOperations) -> BoxResultFuture { let folder_pad = self.folder_pad.clone(); Box::pin(async move { - let md5 = folder_pad.write().reset_folder(delta)?; + let md5 = folder_pad.write().reset_folder(operations.into_inner())?; Ok(md5) }) } } struct FolderRevisionWSDataStream { - conflict_controller: Arc, + conflict_controller: Arc, } impl FolderRevisionWSDataStream { - pub fn new(conflict_controller: PlainTextConflictController) -> Self { + pub fn new(conflict_controller: FolderConflictController) -> Self { Self { conflict_controller: Arc::new(conflict_controller), } diff --git a/frontend/rust-lib/flowy-folder/tests/workspace/script.rs b/frontend/rust-lib/flowy-folder/tests/workspace/script.rs index 74b90ce654..005b685a94 100644 --- a/frontend/rust-lib/flowy-folder/tests/workspace/script.rs +++ b/frontend/rust-lib/flowy-folder/tests/workspace/script.rs @@ -18,7 +18,7 @@ use flowy_folder::{errors::ErrorCode, services::folder_editor::FolderEditor}; use flowy_revision::disk::RevisionState; use flowy_revision::REVISION_WRITE_INTERVAL_IN_MILLIS; -use flowy_sync::entities::text_block::DocumentPB; +use flowy_sync::entities::document::DocumentPayloadPB; use flowy_test::{event_builder::*, FlowySDKTest}; use std::{sync::Arc, time::Duration}; use tokio::time::sleep; @@ -412,14 +412,14 @@ pub async fn delete_view(sdk: &FlowySDKTest, view_ids: Vec) { } #[allow(dead_code)] -pub async fn set_latest_view(sdk: &FlowySDKTest, view_id: &str) -> DocumentPB { +pub async fn set_latest_view(sdk: &FlowySDKTest, view_id: &str) -> DocumentPayloadPB { let view_id: ViewIdPB = view_id.into(); FolderEventBuilder::new(sdk.clone()) .event(SetLatestView) .payload(view_id) .async_send() .await - .parse::() + .parse::() } pub async fn read_trash(sdk: &FlowySDKTest) -> RepeatedTrashPB { diff --git a/frontend/rust-lib/flowy-grid/src/services/block_editor.rs b/frontend/rust-lib/flowy-grid/src/services/block_editor.rs index 0c313a869f..1797a08150 100644 --- a/frontend/rust-lib/flowy-grid/src/services/block_editor.rs +++ b/frontend/rust-lib/flowy-grid/src/services/block_editor.rs @@ -2,7 +2,9 @@ use crate::entities::RowPB; use bytes::Bytes; use flowy_error::{FlowyError, FlowyResult}; use flowy_grid_data_model::revision::{CellRevision, GridBlockRevision, RowChangeset, RowRevision}; -use flowy_revision::{RevisionCloudService, RevisionCompactor, RevisionManager, RevisionObjectBuilder}; +use flowy_revision::{ + RevisionCloudService, RevisionCompress, RevisionManager, RevisionObjectDeserializer, RevisionObjectSerializer, +}; use flowy_sync::client_grid::{GridBlockRevisionChangeset, GridBlockRevisionPad}; use flowy_sync::entities::revision::Revision; use flowy_sync::util::make_operations_from_revisions; @@ -30,7 +32,7 @@ impl GridBlockRevisionEditor { let cloud = Arc::new(GridBlockRevisionCloudService { token: token.to_owned(), }); - let block_revision_pad = rev_manager.load::(Some(cloud)).await?; + let block_revision_pad = rev_manager.load::(Some(cloud)).await?; let pad = Arc::new(RwLock::new(block_revision_pad)); let rev_manager = Arc::new(rev_manager); let user_id = user_id.to_owned(); @@ -192,20 +194,25 @@ impl RevisionCloudService for GridBlockRevisionCloudService { } } -struct GridBlockRevisionPadBuilder(); -impl RevisionObjectBuilder for GridBlockRevisionPadBuilder { +struct GridBlockRevisionSerde(); +impl RevisionObjectDeserializer for GridBlockRevisionSerde { type Output = GridBlockRevisionPad; - - fn build_object(object_id: &str, revisions: Vec) -> FlowyResult { + fn deserialize_revisions(object_id: &str, revisions: Vec) -> FlowyResult { let pad = GridBlockRevisionPad::from_revisions(object_id, revisions)?; Ok(pad) } } -pub struct GridBlockRevisionCompactor(); -impl RevisionCompactor for GridBlockRevisionCompactor { - fn bytes_from_revisions(&self, revisions: Vec) -> FlowyResult { +impl RevisionObjectSerializer for GridBlockRevisionSerde { + fn serialize_revisions(revisions: Vec) -> FlowyResult { let operations = make_operations_from_revisions::(revisions)?; Ok(operations.json_bytes()) } } + +pub struct GridBlockRevisionCompactor(); +impl RevisionCompress for GridBlockRevisionCompactor { + fn serialize_revisions(&self, revisions: Vec) -> FlowyResult { + GridBlockRevisionSerde::serialize_revisions(revisions) + } +} diff --git a/frontend/rust-lib/flowy-grid/src/services/field/type_options/selection_type_option/select_type_option.rs b/frontend/rust-lib/flowy-grid/src/services/field/type_options/selection_type_option/select_type_option.rs index b52f60fd2d..a99b1e48b0 100644 --- a/frontend/rust-lib/flowy-grid/src/services/field/type_options/selection_type_option/select_type_option.rs +++ b/frontend/rust-lib/flowy-grid/src/services/field/type_options/selection_type_option/select_type_option.rs @@ -122,7 +122,7 @@ pub trait SelectTypeOptionSharedAction: TypeOptionDataSerializer + Send + Sync { fn transform_type_option(&mut self, field_type: &FieldType, _type_option_data: String) { match field_type { FieldType::Checkbox => { - //add Yes and No options if it's not exist. + //add Yes and No options if it does not exist. if !self.options().iter().any(|option| option.name == CHECK) { let check_option = SelectOptionPB::with_color(CHECK, SelectOptionColorPB::Green); self.mut_options().push(check_option); diff --git a/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs b/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs index 1cd31bb697..a931dddf66 100644 --- a/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs +++ b/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs @@ -16,7 +16,9 @@ use crate::services::row::{make_grid_blocks, make_rows_from_row_revs, GridBlockS use bytes::Bytes; use flowy_error::{ErrorCode, FlowyError, FlowyResult}; use flowy_grid_data_model::revision::*; -use flowy_revision::{RevisionCloudService, RevisionCompactor, RevisionManager, RevisionObjectBuilder}; +use flowy_revision::{ + RevisionCloudService, RevisionCompress, RevisionManager, RevisionObjectDeserializer, RevisionObjectSerializer, +}; use flowy_sync::client_grid::{GridRevisionChangeset, GridRevisionPad, JsonDeserializer}; use flowy_sync::entities::revision::Revision; use flowy_sync::errors::{CollaborateError, CollaborateResult}; @@ -56,7 +58,7 @@ impl GridRevisionEditor { ) -> FlowyResult> { let token = user.token()?; let cloud = Arc::new(GridRevisionCloudService { token }); - let grid_pad = rev_manager.load::(Some(cloud)).await?; + let grid_pad = rev_manager.load::(Some(cloud)).await?; let rev_manager = Arc::new(rev_manager); let grid_pad = Arc::new(RwLock::new(grid_pad)); @@ -830,16 +832,21 @@ impl GridRevisionEditor { } } -pub struct GridPadBuilder(); -impl RevisionObjectBuilder for GridPadBuilder { +pub struct GridRevisionSerde(); +impl RevisionObjectDeserializer for GridRevisionSerde { type Output = GridRevisionPad; - fn build_object(_object_id: &str, revisions: Vec) -> FlowyResult { + fn deserialize_revisions(_object_id: &str, revisions: Vec) -> FlowyResult { let pad = GridRevisionPad::from_revisions(revisions)?; Ok(pad) } } - +impl RevisionObjectSerializer for GridRevisionSerde { + fn serialize_revisions(revisions: Vec) -> FlowyResult { + let operations = make_operations_from_revisions::(revisions)?; + Ok(operations.json_bytes()) + } +} struct GridRevisionCloudService { #[allow(dead_code)] token: String, @@ -853,10 +860,10 @@ impl RevisionCloudService for GridRevisionCloudService { } pub struct GridRevisionCompactor(); -impl RevisionCompactor for GridRevisionCompactor { - fn bytes_from_revisions(&self, revisions: Vec) -> FlowyResult { - let operations = make_operations_from_revisions::(revisions)?; - Ok(operations.json_bytes()) + +impl RevisionCompress for GridRevisionCompactor { + fn serialize_revisions(&self, revisions: Vec) -> FlowyResult { + GridRevisionSerde::serialize_revisions(revisions) } } diff --git a/frontend/rust-lib/flowy-grid/src/services/grid_view_editor.rs b/frontend/rust-lib/flowy-grid/src/services/grid_view_editor.rs index 6aa96a4e81..718c9ee871 100644 --- a/frontend/rust-lib/flowy-grid/src/services/grid_view_editor.rs +++ b/frontend/rust-lib/flowy-grid/src/services/grid_view_editor.rs @@ -11,15 +11,20 @@ use crate::services::group::{ default_group_configuration, find_group_field, make_group_controller, GroupConfigurationReader, GroupConfigurationWriter, GroupController, MoveGroupRowContext, }; +use bytes::Bytes; use flowy_error::{FlowyError, FlowyResult}; use flowy_grid_data_model::revision::{ gen_grid_filter_id, FieldRevision, FieldTypeRevision, FilterConfigurationRevision, GroupConfigurationRevision, RowChangeset, RowRevision, }; -use flowy_revision::{RevisionCloudService, RevisionManager, RevisionObjectBuilder}; +use flowy_revision::{ + RevisionCloudService, RevisionCompress, RevisionManager, RevisionObjectDeserializer, RevisionObjectSerializer, +}; use flowy_sync::client_grid::{GridViewRevisionChangeset, GridViewRevisionPad}; use flowy_sync::entities::revision::Revision; +use flowy_sync::util::make_operations_from_revisions; use lib_infra::future::{wrap_future, AFFuture, FutureResult}; +use lib_ot::core::EmptyAttributes; use std::future::Future; use std::sync::Arc; use tokio::sync::RwLock; @@ -49,7 +54,7 @@ impl GridViewRevisionEditor { let cloud = Arc::new(GridViewRevisionCloudService { token: token.to_owned(), }); - let view_revision_pad = rev_manager.load::(Some(cloud)).await?; + let view_revision_pad = rev_manager.load::(Some(cloud)).await?; let pad = Arc::new(RwLock::new(view_revision_pad)); let rev_manager = Arc::new(rev_manager); let group_controller = new_group_controller( @@ -472,16 +477,30 @@ impl RevisionCloudService for GridViewRevisionCloudService { } } -struct GridViewRevisionPadBuilder(); -impl RevisionObjectBuilder for GridViewRevisionPadBuilder { +pub struct GridViewRevisionSerde(); +impl RevisionObjectDeserializer for GridViewRevisionSerde { type Output = GridViewRevisionPad; - fn build_object(object_id: &str, revisions: Vec) -> FlowyResult { + fn deserialize_revisions(object_id: &str, revisions: Vec) -> FlowyResult { let pad = GridViewRevisionPad::from_revisions(object_id, revisions)?; Ok(pad) } } +impl RevisionObjectSerializer for GridViewRevisionSerde { + fn serialize_revisions(revisions: Vec) -> FlowyResult { + let operations = make_operations_from_revisions::(revisions)?; + Ok(operations.json_bytes()) + } +} + +pub struct GridViewRevisionCompactor(); +impl RevisionCompress for GridViewRevisionCompactor { + fn serialize_revisions(&self, revisions: Vec) -> FlowyResult { + GridViewRevisionSerde::serialize_revisions(revisions) + } +} + struct GroupConfigurationReaderImpl(Arc>); impl GroupConfigurationReader for GroupConfigurationReaderImpl { diff --git a/frontend/rust-lib/flowy-grid/src/services/grid_view_manager.rs b/frontend/rust-lib/flowy-grid/src/services/grid_view_manager.rs index b74419dab0..027f7cfc80 100644 --- a/frontend/rust-lib/flowy-grid/src/services/grid_view_manager.rs +++ b/frontend/rust-lib/flowy-grid/src/services/grid_view_manager.rs @@ -4,17 +4,14 @@ use crate::entities::{ }; use crate::manager::GridUser; use crate::services::grid_editor_task::GridServiceTaskScheduler; -use crate::services::grid_view_editor::GridViewRevisionEditor; -use bytes::Bytes; +use crate::services::grid_view_editor::{GridViewRevisionCompactor, GridViewRevisionEditor}; + use dashmap::DashMap; use flowy_error::FlowyResult; use flowy_grid_data_model::revision::{FieldRevision, RowChangeset, RowRevision}; use flowy_revision::disk::SQLiteGridViewRevisionPersistence; -use flowy_revision::{RevisionCompactor, RevisionManager, RevisionPersistence, SQLiteRevisionSnapshotPersistence}; -use flowy_sync::entities::revision::Revision; -use flowy_sync::util::make_operations_from_revisions; +use flowy_revision::{RevisionManager, RevisionPersistence, SQLiteRevisionSnapshotPersistence}; use lib_infra::future::AFFuture; -use lib_ot::core::EmptyAttributes; use std::sync::Arc; type ViewId = String; @@ -264,11 +261,3 @@ pub async fn make_grid_view_rev_manager(user: &Arc, view_id: &str) snapshot_persistence, )) } - -pub struct GridViewRevisionCompactor(); -impl RevisionCompactor for GridViewRevisionCompactor { - fn bytes_from_revisions(&self, revisions: Vec) -> FlowyResult { - let operations = make_operations_from_revisions::(revisions)?; - Ok(operations.json_bytes()) - } -} diff --git a/frontend/rust-lib/flowy-grid/src/services/persistence/migration.rs b/frontend/rust-lib/flowy-grid/src/services/persistence/migration.rs index cb99d8eb6d..3084432b95 100644 --- a/frontend/rust-lib/flowy-grid/src/services/persistence/migration.rs +++ b/frontend/rust-lib/flowy-grid/src/services/persistence/migration.rs @@ -1,5 +1,6 @@ use crate::manager::GridUser; use crate::services::persistence::GridDatabase; +use bytes::Bytes; use flowy_database::kv::KV; use flowy_error::FlowyResult; use flowy_grid_data_model::revision::GridRevision; @@ -8,6 +9,7 @@ use flowy_revision::reset::{RevisionResettable, RevisionStructReset}; use flowy_sync::client_grid::{make_grid_rev_json_str, GridRevisionPad}; use flowy_sync::entities::revision::Revision; use flowy_sync::util::md5; +use lib_ot::core::DeltaBuilder; use std::sync::Arc; const V1_MIGRATION: &str = "GRID_V1_MIGRATION"; @@ -59,10 +61,11 @@ impl RevisionResettable for GridRevisionResettable { &self.grid_id } - fn target_reset_rev_str(&self, revisions: Vec) -> FlowyResult { + fn reset_data(&self, revisions: Vec) -> FlowyResult { let pad = GridRevisionPad::from_revisions(revisions)?; let json = pad.json_str()?; - Ok(json) + let bytes = DeltaBuilder::new().insert(&json).build().json_bytes(); + Ok(bytes) } fn default_target_rev_str(&self) -> FlowyResult { diff --git a/frontend/rust-lib/flowy-grid/tests/grid/grid_editor.rs b/frontend/rust-lib/flowy-grid/tests/grid/grid_editor.rs index eca0a17e16..ccccea454d 100644 --- a/frontend/rust-lib/flowy-grid/tests/grid/grid_editor.rs +++ b/frontend/rust-lib/flowy-grid/tests/grid/grid_editor.rs @@ -6,7 +6,7 @@ use bytes::Bytes; use flowy_grid::entities::*; use flowy_grid::services::field::SelectOptionPB; use flowy_grid::services::field::*; -use flowy_grid::services::grid_editor::{GridPadBuilder, GridRevisionEditor}; +use flowy_grid::services::grid_editor::{GridRevisionEditor, GridRevisionSerde}; use flowy_grid::services::row::{CreateRowRevisionPayload, RowRevisionBuilder}; use flowy_grid::services::setting::GridSettingChangesetBuilder; use flowy_grid_data_model::revision::*; diff --git a/frontend/rust-lib/flowy-net/Cargo.toml b/frontend/rust-lib/flowy-net/Cargo.toml index 6096f18ea3..fee2b433b6 100644 --- a/frontend/rust-lib/flowy-net/Cargo.toml +++ b/frontend/rust-lib/flowy-net/Cargo.toml @@ -13,7 +13,7 @@ flowy-sync = { path = "../../../shared-lib/flowy-sync"} flowy-folder-data-model = { path = "../../../shared-lib/flowy-folder-data-model"} flowy-folder = { path = "../flowy-folder" } flowy-user = { path = "../flowy-user" } -flowy-text-block = { path = "../flowy-text-block" } +flowy-document = { path = "../flowy-document" } lazy_static = "1.4.0" lib-infra = { path = "../../../shared-lib/lib-infra" } protobuf = {version = "2.18.0"} diff --git a/frontend/rust-lib/flowy-net/src/http_server/document.rs b/frontend/rust-lib/flowy-net/src/http_server/document.rs index 1a6b66ca59..5139d6e146 100644 --- a/frontend/rust-lib/flowy-net/src/http_server/document.rs +++ b/frontend/rust-lib/flowy-net/src/http_server/document.rs @@ -2,45 +2,45 @@ use crate::{ configuration::*, request::{HttpRequestBuilder, ResponseMiddleware}, }; +use flowy_document::DocumentCloudService; use flowy_error::FlowyError; -use flowy_sync::entities::text_block::{CreateTextBlockParams, DocumentPB, ResetTextBlockParams, TextBlockIdPB}; -use flowy_text_block::TextEditorCloudService; +use flowy_sync::entities::document::{CreateDocumentParams, DocumentIdPB, DocumentPayloadPB, ResetDocumentParams}; use http_flowy::response::FlowyResponse; use lazy_static::lazy_static; use lib_infra::future::FutureResult; use std::sync::Arc; -pub struct BlockHttpCloudService { +pub struct DocumentCloudServiceImpl { config: ClientServerConfiguration, } -impl BlockHttpCloudService { +impl DocumentCloudServiceImpl { pub fn new(config: ClientServerConfiguration) -> Self { Self { config } } } -impl TextEditorCloudService for BlockHttpCloudService { - fn create_text_block(&self, token: &str, params: CreateTextBlockParams) -> FutureResult<(), FlowyError> { +impl DocumentCloudService for DocumentCloudServiceImpl { + fn create_document(&self, token: &str, params: CreateDocumentParams) -> FutureResult<(), FlowyError> { let token = token.to_owned(); let url = self.config.doc_url(); FutureResult::new(async move { create_document_request(&token, params, &url).await }) } - fn read_text_block(&self, token: &str, params: TextBlockIdPB) -> FutureResult, FlowyError> { + fn fetch_document(&self, token: &str, params: DocumentIdPB) -> FutureResult, FlowyError> { let token = token.to_owned(); let url = self.config.doc_url(); FutureResult::new(async move { read_document_request(&token, params, &url).await }) } - fn update_text_block(&self, token: &str, params: ResetTextBlockParams) -> FutureResult<(), FlowyError> { + fn update_document_content(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> { let token = token.to_owned(); let url = self.config.doc_url(); FutureResult::new(async move { reset_doc_request(&token, params, &url).await }) } } -pub async fn create_document_request(token: &str, params: CreateTextBlockParams, url: &str) -> Result<(), FlowyError> { +pub async fn create_document_request(token: &str, params: CreateDocumentParams, url: &str) -> Result<(), FlowyError> { let _ = request_builder() .post(url) .header(HEADER_TOKEN, token) @@ -52,9 +52,9 @@ pub async fn create_document_request(token: &str, params: CreateTextBlockParams, pub async fn read_document_request( token: &str, - params: TextBlockIdPB, + params: DocumentIdPB, url: &str, -) -> Result, FlowyError> { +) -> Result, FlowyError> { let doc = request_builder() .get(url) .header(HEADER_TOKEN, token) @@ -65,7 +65,7 @@ pub async fn read_document_request( Ok(doc) } -pub async fn reset_doc_request(token: &str, params: ResetTextBlockParams, url: &str) -> Result<(), FlowyError> { +pub async fn reset_doc_request(token: &str, params: ResetDocumentParams, url: &str) -> Result<(), FlowyError> { let _ = request_builder() .patch(url) .header(HEADER_TOKEN, token) diff --git a/frontend/rust-lib/flowy-net/src/local_server/persistence.rs b/frontend/rust-lib/flowy-net/src/local_server/persistence.rs index 17008481b5..9e15c0d246 100644 --- a/frontend/rust-lib/flowy-net/src/local_server/persistence.rs +++ b/frontend/rust-lib/flowy-net/src/local_server/persistence.rs @@ -1,6 +1,6 @@ use flowy_sync::entities::revision::{RepeatedRevision, Revision}; use flowy_sync::{ - entities::{folder::FolderInfo, text_block::DocumentPB}, + entities::{document::DocumentPayloadPB, folder::FolderInfo}, errors::CollaborateError, server_document::*, server_folder::FolderCloudPersistence, @@ -29,25 +29,25 @@ pub trait RevisionCloudStorage: Send + Sync { ) -> BoxResultFuture<(), CollaborateError>; } -pub(crate) struct LocalTextBlockCloudPersistence { +pub(crate) struct LocalDocumentCloudPersistence { storage: Arc, } -impl Debug for LocalTextBlockCloudPersistence { +impl Debug for LocalDocumentCloudPersistence { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("LocalRevisionCloudPersistence") } } -impl std::default::Default for LocalTextBlockCloudPersistence { +impl std::default::Default for LocalDocumentCloudPersistence { fn default() -> Self { - LocalTextBlockCloudPersistence { + LocalDocumentCloudPersistence { storage: Arc::new(MemoryDocumentCloudStorage::default()), } } } -impl FolderCloudPersistence for LocalTextBlockCloudPersistence { +impl FolderCloudPersistence for LocalDocumentCloudPersistence { fn read_folder(&self, _user_id: &str, folder_id: &str) -> BoxResultFuture { let storage = self.storage.clone(); let folder_id = folder_id.to_owned(); @@ -109,8 +109,8 @@ impl FolderCloudPersistence for LocalTextBlockCloudPersistence { } } -impl TextBlockCloudPersistence for LocalTextBlockCloudPersistence { - fn read_text_block(&self, doc_id: &str) -> BoxResultFuture { +impl DocumentCloudPersistence for LocalDocumentCloudPersistence { + fn read_document(&self, doc_id: &str) -> BoxResultFuture { let storage = self.storage.clone(); let doc_id = doc_id.to_owned(); Box::pin(async move { @@ -122,11 +122,11 @@ impl TextBlockCloudPersistence for LocalTextBlockCloudPersistence { }) } - fn create_text_block( + fn create_document( &self, doc_id: &str, repeated_revision: RepeatedRevision, - ) -> BoxResultFuture, CollaborateError> { + ) -> BoxResultFuture, CollaborateError> { let doc_id = doc_id.to_owned(); let storage = self.storage.clone(); Box::pin(async move { @@ -135,7 +135,7 @@ impl TextBlockCloudPersistence for LocalTextBlockCloudPersistence { }) } - fn read_text_block_revisions( + fn read_document_revisions( &self, doc_id: &str, rev_ids: Option>, @@ -148,7 +148,7 @@ impl TextBlockCloudPersistence for LocalTextBlockCloudPersistence { }) } - fn save_text_block_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> { + fn save_document_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> { let storage = self.storage.clone(); Box::pin(async move { let _ = storage.set_revisions(repeated_revision).await?; @@ -156,7 +156,7 @@ impl TextBlockCloudPersistence for LocalTextBlockCloudPersistence { }) } - fn reset_text_block(&self, doc_id: &str, revisions: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> { + fn reset_document(&self, doc_id: &str, revisions: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> { let storage = self.storage.clone(); let doc_id = doc_id.to_owned(); Box::pin(async move { diff --git a/frontend/rust-lib/flowy-net/src/local_server/server.rs b/frontend/rust-lib/flowy-net/src/local_server/server.rs index fbdba41098..b0acecb839 100644 --- a/frontend/rust-lib/flowy-net/src/local_server/server.rs +++ b/frontend/rust-lib/flowy-net/src/local_server/server.rs @@ -1,4 +1,4 @@ -use crate::local_server::persistence::LocalTextBlockCloudPersistence; +use crate::local_server::persistence::LocalDocumentCloudPersistence; use async_stream::stream; use bytes::Bytes; use flowy_error::{internal_error, FlowyError}; @@ -6,7 +6,7 @@ use flowy_folder::event_map::FolderCouldServiceV1; use flowy_sync::{ client_document::default::initial_document_str, entities::{ - text_block::{CreateTextBlockParams, DocumentPB, ResetTextBlockParams, TextBlockIdPB}, + document::{CreateDocumentParams, DocumentIdPB, DocumentPayloadPB, ResetDocumentParams}, ws_data::{ClientRevisionWSData, ClientRevisionWSDataType}, }, errors::CollaborateError, @@ -39,7 +39,7 @@ impl LocalServer { client_ws_sender: mpsc::UnboundedSender, client_ws_receiver: broadcast::Sender, ) -> Self { - let persistence = Arc::new(LocalTextBlockCloudPersistence::default()); + let persistence = Arc::new(LocalDocumentCloudPersistence::default()); let doc_manager = Arc::new(ServerDocumentManager::new(persistence.clone())); let folder_manager = Arc::new(ServerFolderManager::new(persistence)); let stop_tx = RwLock::new(None); @@ -252,6 +252,7 @@ impl RevisionUser for LocalRevisionUser { } } +use flowy_document::DocumentCloudService; use flowy_folder::entities::{ app::{AppIdPB, CreateAppParams, UpdateAppParams}, trash::RepeatedTrashIdPB, @@ -261,7 +262,6 @@ use flowy_folder::entities::{ use flowy_folder_data_model::revision::{ gen_app_id, gen_workspace_id, AppRevision, TrashRevision, ViewRevision, WorkspaceRevision, }; -use flowy_text_block::TextEditorCloudService; use flowy_user::entities::{ SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserProfileParams, UserProfilePB, }; @@ -414,22 +414,26 @@ impl UserCloudService for LocalServer { } } -impl TextEditorCloudService for LocalServer { - fn create_text_block(&self, _token: &str, _params: CreateTextBlockParams) -> FutureResult<(), FlowyError> { +impl DocumentCloudService for LocalServer { + fn create_document(&self, _token: &str, _params: CreateDocumentParams) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } - fn read_text_block(&self, _token: &str, params: TextBlockIdPB) -> FutureResult, FlowyError> { - let doc = DocumentPB { - block_id: params.value, - text: initial_document_str(), + fn fetch_document( + &self, + _token: &str, + params: DocumentIdPB, + ) -> FutureResult, FlowyError> { + let doc = DocumentPayloadPB { + doc_id: params.value, + content: initial_document_str(), rev_id: 0, base_rev_id: 0, }; FutureResult::new(async { Ok(Some(doc)) }) } - fn update_text_block(&self, _token: &str, _params: ResetTextBlockParams) -> FutureResult<(), FlowyError> { + fn update_document_content(&self, _token: &str, _params: ResetDocumentParams) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } } diff --git a/frontend/rust-lib/flowy-revision/Cargo.toml b/frontend/rust-lib/flowy-revision/Cargo.toml index e0fdde3271..110c606879 100644 --- a/frontend/rust-lib/flowy-revision/Cargo.toml +++ b/frontend/rust-lib/flowy-revision/Cargo.toml @@ -7,7 +7,6 @@ edition = "2018" [dependencies] flowy-sync = { path = "../../../shared-lib/flowy-sync" } -lib-ot = { path = "../../../shared-lib/lib-ot" } lib-ws = { path = "../../../shared-lib/lib-ws" } lib-infra = { path = "../../../shared-lib/lib-infra" } flowy-database = { path = "../flowy-database" } @@ -26,4 +25,4 @@ async-stream = "0.3.2" serde_json = {version = "1.0"} [features] -flowy_unit_test = ["lib-ot/flowy_unit_test"] \ No newline at end of file +flowy_unit_test = [] \ No newline at end of file diff --git a/frontend/rust-lib/flowy-revision/src/cache/disk/document_impl.rs b/frontend/rust-lib/flowy-revision/src/cache/disk/document_impl.rs index a220f0e625..5866df3523 100644 --- a/frontend/rust-lib/flowy-revision/src/cache/disk/document_impl.rs +++ b/frontend/rust-lib/flowy-revision/src/cache/disk/document_impl.rs @@ -15,12 +15,12 @@ use flowy_sync::{ }; use std::sync::Arc; -pub struct SQLiteTextBlockRevisionPersistence { +pub struct SQLiteDocumentRevisionPersistence { user_id: String, pub(crate) pool: Arc, } -impl RevisionDiskCache for SQLiteTextBlockRevisionPersistence { +impl RevisionDiskCache for SQLiteDocumentRevisionPersistence { type Error = FlowyError; fn create_revision_records(&self, revision_records: Vec) -> Result<(), Self::Error> { @@ -81,7 +81,7 @@ impl RevisionDiskCache for SQLiteTextBlockRevisionPersistence { } } -impl SQLiteTextBlockRevisionPersistence { +impl SQLiteDocumentRevisionPersistence { pub fn new(user_id: &str, pool: Arc) -> Self { Self { user_id: user_id.to_owned(), diff --git a/frontend/rust-lib/flowy-revision/src/cache/reset.rs b/frontend/rust-lib/flowy-revision/src/cache/reset.rs index 9e1e728f7d..6f5a760a95 100644 --- a/frontend/rust-lib/flowy-revision/src/cache/reset.rs +++ b/frontend/rust-lib/flowy-revision/src/cache/reset.rs @@ -1,17 +1,18 @@ use crate::disk::{RevisionDiskCache, RevisionRecord}; use crate::{RevisionLoader, RevisionPersistence}; +use bytes::Bytes; use flowy_database::kv::KV; use flowy_error::{FlowyError, FlowyResult}; use flowy_sync::entities::revision::Revision; -use lib_ot::core::DeltaBuilder; use serde::{Deserialize, Serialize}; use std::str::FromStr; use std::sync::Arc; pub trait RevisionResettable { fn target_id(&self) -> &str; + // String in json format - fn target_reset_rev_str(&self, revisions: Vec) -> FlowyResult; + fn reset_data(&self, revisions: Vec) -> FlowyResult; // String in json format fn default_target_rev_str(&self) -> FlowyResult; @@ -69,9 +70,8 @@ where .load() .await?; - let s = self.target.target_reset_rev_str(revisions)?; - let delta_data = DeltaBuilder::new().insert(&s).build().json_bytes(); - let revision = Revision::initial_revision(&self.user_id, self.target.target_id(), delta_data); + let bytes = self.target.reset_data(revisions)?; + let revision = Revision::initial_revision(&self.user_id, self.target.target_id(), bytes); let record = RevisionRecord::new(revision); tracing::trace!("Reset {} revision record object", self.target.target_id()); diff --git a/frontend/rust-lib/flowy-revision/src/conflict_resolve.rs b/frontend/rust-lib/flowy-revision/src/conflict_resolve.rs index d51e503a59..fa5b79e3a1 100644 --- a/frontend/rust-lib/flowy-revision/src/conflict_resolve.rs +++ b/frontend/rust-lib/flowy-revision/src/conflict_resolve.rs @@ -1,28 +1,39 @@ use crate::RevisionManager; use bytes::Bytes; use flowy_error::{FlowyError, FlowyResult}; -use flowy_sync::{ - entities::{ - revision::{RepeatedRevision, Revision, RevisionRange}, - ws_data::ServerRevisionWSDataType, - }, - util::make_operations_from_revisions, +use flowy_sync::entities::{ + revision::{RepeatedRevision, Revision, RevisionRange}, + ws_data::ServerRevisionWSDataType, }; use lib_infra::future::BoxResultFuture; -use lib_ot::core::{AttributeHashMap, DeltaOperations, EmptyAttributes, OperationAttributes}; -use serde::de::DeserializeOwned; use std::{convert::TryFrom, sync::Arc}; - pub type OperationsMD5 = String; -pub trait ConflictResolver +pub struct TransformOperations { + pub client_operations: Operations, + pub server_operations: Option, +} + +pub trait OperationsDeserializer: Send + Sync { + fn deserialize_revisions(revisions: Vec) -> FlowyResult; +} + +pub trait OperationsSerializer: Send + Sync { + fn serialize_operations(&self) -> Bytes; +} + +pub struct ConflictOperations(T); +pub trait ConflictResolver where - T: OperationAttributes + Send + Sync, + Operations: Send + Sync, { - fn compose_delta(&self, delta: DeltaOperations) -> BoxResultFuture; - fn transform_delta(&self, delta: DeltaOperations) -> BoxResultFuture, FlowyError>; - fn reset_delta(&self, delta: DeltaOperations) -> BoxResultFuture; + fn compose_operations(&self, operations: Operations) -> BoxResultFuture; + fn transform_operations( + &self, + operations: Operations, + ) -> BoxResultFuture, FlowyError>; + fn reset_operations(&self, operations: Operations) -> BoxResultFuture; } pub trait ConflictRevisionSink: Send + Sync + 'static { @@ -30,26 +41,23 @@ pub trait ConflictRevisionSink: Send + Sync + 'static { fn ack(&self, rev_id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError>; } -pub type RichTextConflictController = ConflictController; -pub type PlainTextConflictController = ConflictController; - -pub struct ConflictController +pub struct ConflictController where - T: OperationAttributes + Send + Sync, + Operations: Send + Sync, { user_id: String, - resolver: Arc + Send + Sync>, + resolver: Arc + Send + Sync>, rev_sink: Arc, rev_manager: Arc, } -impl ConflictController +impl ConflictController where - T: OperationAttributes + Send + Sync + DeserializeOwned + serde::Serialize, + Operations: Clone + Send + Sync, { pub fn new( user_id: &str, - resolver: Arc + Send + Sync>, + resolver: Arc + Send + Sync>, rev_sink: Arc, rev_manager: Arc, ) -> Self { @@ -61,7 +69,12 @@ where rev_manager, } } +} +impl ConflictController +where + Operations: OperationsSerializer + OperationsDeserializer + Clone + Send + Sync, +{ pub async fn receive_bytes(&self, bytes: Bytes) -> FlowyResult<()> { let repeated_revision = RepeatedRevision::try_from(bytes)?; if repeated_revision.is_empty() { @@ -103,33 +116,32 @@ where } } - let new_delta = make_operations_from_revisions(revisions.clone())?; + let new_operations = Operations::deserialize_revisions(revisions.clone())?; + let TransformOperations { + client_operations, + server_operations, + } = self.resolver.transform_operations(new_operations).await?; - let TransformDeltas { - client_prime, - server_prime, - } = self.resolver.transform_delta(new_delta).await?; - - match server_prime { + match server_operations { None => { // The server_prime is None means the client local revisions conflict with the // // server, and it needs to override the client delta. - let md5 = self.resolver.reset_delta(client_prime).await?; + let md5 = self.resolver.reset_operations(client_operations).await?; let repeated_revision = RepeatedRevision::new(revisions); assert_eq!(repeated_revision.last().unwrap().md5, md5); let _ = self.rev_manager.reset_object(repeated_revision).await?; Ok(None) } - Some(server_prime) => { - let md5 = self.resolver.compose_delta(client_prime.clone()).await?; + Some(server_operations) => { + let md5 = self.resolver.compose_operations(client_operations.clone()).await?; for revision in &revisions { let _ = self.rev_manager.add_remote_revision(revision).await?; } let (client_revision, server_revision) = make_client_and_server_revision( &self.user_id, &self.rev_manager, - client_prime, - Some(server_prime), + client_operations, + Some(server_operations), md5, ); let _ = self.rev_manager.add_remote_revision(&client_revision).await?; @@ -139,48 +151,26 @@ where } } -fn make_client_and_server_revision( +fn make_client_and_server_revision( user_id: &str, rev_manager: &Arc, - client_delta: DeltaOperations, - server_delta: Option>, + client_operations: Operations, + server_operations: Option, md5: String, ) -> (Revision, Option) where - T: OperationAttributes + serde::Serialize, + Operations: OperationsSerializer, { let (base_rev_id, rev_id) = rev_manager.next_rev_id_pair(); - let client_revision = Revision::new( - &rev_manager.object_id, - base_rev_id, - rev_id, - client_delta.json_bytes(), - user_id, - md5.clone(), - ); + let bytes = client_operations.serialize_operations(); + let client_revision = Revision::new(&rev_manager.object_id, base_rev_id, rev_id, bytes, user_id, md5.clone()); - match server_delta { + match server_operations { None => (client_revision, None), - Some(server_delta) => { - let server_revision = Revision::new( - &rev_manager.object_id, - base_rev_id, - rev_id, - server_delta.json_bytes(), - user_id, - md5, - ); + Some(operations) => { + let bytes = operations.serialize_operations(); + let server_revision = Revision::new(&rev_manager.object_id, base_rev_id, rev_id, bytes, user_id, md5); (client_revision, Some(server_revision)) } } } - -pub type RichTextTransformDeltas = TransformDeltas; - -pub struct TransformDeltas -where - T: OperationAttributes, -{ - pub client_prime: DeltaOperations, - pub server_prime: Option>, -} diff --git a/frontend/rust-lib/flowy-revision/src/rev_manager.rs b/frontend/rust-lib/flowy-revision/src/rev_manager.rs index 1776956d1c..e7384b1190 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_manager.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_manager.rs @@ -9,21 +9,48 @@ use flowy_sync::{ use lib_infra::future::FutureResult; use std::sync::Arc; -pub type SyncObject = lib_ot::text_delta::TextOperations; - pub trait RevisionCloudService: Send + Sync { + /// Read the object's revision from remote + /// Returns a list of revisions that used to build the object + /// # Arguments + /// + /// * `user_id`: the id of the user + /// * `object_id`: the id of the object + /// fn fetch_object(&self, user_id: &str, object_id: &str) -> FutureResult, FlowyError>; } -pub trait RevisionObjectBuilder: Send + Sync { +pub trait RevisionObjectDeserializer: Send + Sync { type Output; - fn build_object(object_id: &str, revisions: Vec) -> FlowyResult; + /// Deserialize the list of revisions into an concrete object type. + /// + /// # Arguments + /// + /// * `object_id`: the id of the object + /// * `revisions`: a list of revisions that represent the object + /// + fn deserialize_revisions(object_id: &str, revisions: Vec) -> FlowyResult; } -pub trait RevisionCompactor: Send + Sync { - fn compact(&self, user_id: &str, object_id: &str, mut revisions: Vec) -> FlowyResult { +pub trait RevisionObjectSerializer: Send + Sync { + /// Serialize the list of revisions to `Bytes` + /// + /// * `revisions`: a list of revisions will be serialized to `Bytes` + /// + fn serialize_revisions(revisions: Vec) -> FlowyResult; +} + +/// `RevisionCompress` is used to compress multiple revisions into one revision +/// +pub trait RevisionCompress: Send + Sync { + fn compress_revisions( + &self, + user_id: &str, + object_id: &str, + mut revisions: Vec, + ) -> FlowyResult { if revisions.is_empty() { - return Err(FlowyError::internal().context("Can't compact the empty folder's revisions")); + return Err(FlowyError::internal().context("Can't compact the empty revisions")); } if revisions.len() == 1 { @@ -35,11 +62,11 @@ pub trait RevisionCompactor: Send + Sync { let (base_rev_id, rev_id) = first_revision.pair_rev_id(); let md5 = last_revision.md5.clone(); - let bytes = self.bytes_from_revisions(revisions)?; + let bytes = self.serialize_revisions(revisions)?; Ok(Revision::new(object_id, base_rev_id, rev_id, bytes, user_id, md5)) } - fn bytes_from_revisions(&self, revisions: Vec) -> FlowyResult; + fn serialize_revisions(&self, revisions: Vec) -> FlowyResult; } pub struct RevisionManager { @@ -49,7 +76,7 @@ pub struct RevisionManager { rev_persistence: Arc, #[allow(dead_code)] rev_snapshot: Arc, - rev_compactor: Arc, + rev_compress: Arc, #[cfg(feature = "flowy_unit_test")] rev_ack_notifier: tokio::sync::broadcast::Sender, } @@ -64,13 +91,11 @@ impl RevisionManager { ) -> Self where SP: 'static + RevisionSnapshotDiskCache, - C: 'static + RevisionCompactor, + C: 'static + RevisionCompress, { let rev_id_counter = RevIdCounter::new(0); let rev_compactor = Arc::new(rev_compactor); - let rev_persistence = Arc::new(rev_persistence); - let rev_snapshot = Arc::new(RevisionSnapshotManager::new(user_id, object_id, snapshot_persistence)); #[cfg(feature = "flowy_unit_test")] let (revision_ack_notifier, _) = tokio::sync::broadcast::channel(1); @@ -81,7 +106,7 @@ impl RevisionManager { rev_id_counter, rev_persistence, rev_snapshot, - rev_compactor, + rev_compress: rev_compactor, #[cfg(feature = "flowy_unit_test")] rev_ack_notifier: revision_ack_notifier, } @@ -90,7 +115,7 @@ impl RevisionManager { #[tracing::instrument(level = "debug", skip_all, fields(object_id) err)] pub async fn load(&mut self, cloud: Option>) -> FlowyResult where - B: RevisionObjectBuilder, + B: RevisionObjectDeserializer, { let (revisions, rev_id) = RevisionLoader { object_id: self.object_id.clone(), @@ -102,7 +127,7 @@ impl RevisionManager { .await?; self.rev_id_counter.set(rev_id); tracing::Span::current().record("object_id", &self.object_id.as_str()); - B::build_object(&self.object_id, revisions) + B::deserialize_revisions(&self.object_id, revisions) } #[tracing::instrument(level = "debug", skip(self, revisions), err)] @@ -116,7 +141,7 @@ impl RevisionManager { #[tracing::instrument(level = "debug", skip(self, revision), err)] pub async fn add_remote_revision(&self, revision: &Revision) -> Result<(), FlowyError> { if revision.bytes.is_empty() { - return Err(FlowyError::internal().context("Delta data should be empty")); + return Err(FlowyError::internal().context("Remote revisions is empty")); } let _ = self.rev_persistence.add_ack_revision(revision).await?; @@ -128,11 +153,11 @@ impl RevisionManager { #[tracing::instrument(level = "debug", skip_all, err)] pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), FlowyError> { if revision.bytes.is_empty() { - return Err(FlowyError::internal().context("Delta data should be empty")); + return Err(FlowyError::internal().context("Local revisions is empty")); } let rev_id = self .rev_persistence - .add_sync_revision(revision, &self.rev_compactor) + .add_sync_revision(revision, &self.rev_compress) .await?; // self.rev_history.add_revision(revision).await; self.rev_id_counter.set(rev_id); diff --git a/frontend/rust-lib/flowy-revision/src/rev_persistence.rs b/frontend/rust-lib/flowy-revision/src/rev_persistence.rs index 9360182eba..0c0875d6a5 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_persistence.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_persistence.rs @@ -1,10 +1,10 @@ use crate::cache::{ - disk::{RevisionChangeset, RevisionDiskCache, SQLiteTextBlockRevisionPersistence}, + disk::{RevisionChangeset, RevisionDiskCache, SQLiteDocumentRevisionPersistence}, memory::RevisionMemoryCacheDelegate, }; use crate::disk::{RevisionRecord, RevisionState, SQLiteGridBlockRevisionPersistence}; use crate::memory::RevisionMemoryCache; -use crate::RevisionCompactor; +use crate::RevisionCompress; use flowy_database::ConnectionPool; use flowy_error::{internal_error, FlowyError, FlowyResult}; use flowy_sync::entities::revision::{Revision, RevisionRange}; @@ -71,7 +71,7 @@ impl RevisionPersistence { pub(crate) async fn add_sync_revision<'a>( &'a self, revision: &'a Revision, - compactor: &Arc, + rev_compress: &Arc, ) -> FlowyResult { let mut sync_seq_write_guard = self.sync_seq.write().await; let result = sync_seq_write_guard.compact(); @@ -93,7 +93,7 @@ impl RevisionPersistence { revisions.push(revision.clone()); // compact multiple revisions into one - let compact_revision = compactor.compact(&self.user_id, &self.object_id, revisions)?; + let compact_revision = rev_compress.compress_revisions(&self.user_id, &self.object_id, revisions)?; let rev_id = compact_revision.rev_id; tracing::Span::current().record("rev_id", &rev_id); @@ -228,7 +228,7 @@ pub fn mk_text_block_revision_disk_cache( user_id: &str, pool: Arc, ) -> Arc> { - Arc::new(SQLiteTextBlockRevisionPersistence::new(user_id, pool)) + Arc::new(SQLiteDocumentRevisionPersistence::new(user_id, pool)) } pub fn mk_grid_block_revision_disk_cache( diff --git a/frontend/rust-lib/flowy-sdk/Cargo.toml b/frontend/rust-lib/flowy-sdk/Cargo.toml index 6c24c388ad..7a1be98efa 100644 --- a/frontend/rust-lib/flowy-sdk/Cargo.toml +++ b/frontend/rust-lib/flowy-sdk/Cargo.toml @@ -14,7 +14,7 @@ flowy-folder = { path = "../flowy-folder", default-features = false } flowy-grid = { path = "../flowy-grid", default-features = false } flowy-grid-data-model = { path = "../../../shared-lib/flowy-grid-data-model" } flowy-database = { path = "../flowy-database" } -flowy-text-block = { path = "../flowy-text-block", default-features = false } +flowy-document = { path = "../flowy-document", default-features = false } flowy-revision = { path = "../flowy-revision" } tracing = { version = "0.1" } @@ -38,8 +38,8 @@ tokio = { version = "1", features = ["full"] } futures-util = "0.3.15" [features] -http_sync = ["flowy-folder/cloud_sync", "flowy-text-block/cloud_sync"] -native_sync = ["flowy-folder/cloud_sync", "flowy-text-block/cloud_sync"] +http_sync = ["flowy-folder/cloud_sync", "flowy-document/cloud_sync"] +native_sync = ["flowy-folder/cloud_sync", "flowy-document/cloud_sync"] use_bunyan = ["lib-log/use_bunyan"] dart = [ "flowy-user/dart", @@ -47,6 +47,6 @@ dart = [ "flowy-folder/dart", "flowy-sync/dart", "flowy-grid/dart", - "flowy-text-block/dart", + "flowy-document/dart", ] openssl_vendored = ["flowy-database/openssl_vendored"] diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/text_block_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs similarity index 76% rename from frontend/rust-lib/flowy-sdk/src/deps_resolve/text_block_deps.rs rename to frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs index 9fa5667bb1..41a25c0c1f 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/text_block_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs @@ -1,37 +1,37 @@ use bytes::Bytes; use flowy_database::ConnectionPool; +use flowy_document::{ + errors::{internal_error, FlowyError}, + DocumentCloudService, DocumentManager, DocumentUser, +}; use flowy_net::ClientServerConfiguration; use flowy_net::{ - http_server::document::BlockHttpCloudService, local_server::LocalServer, ws::connection::FlowyWebSocketConnect, + http_server::document::DocumentCloudServiceImpl, local_server::LocalServer, ws::connection::FlowyWebSocketConnect, }; use flowy_revision::{RevisionWebSocket, WSStateReceiver}; use flowy_sync::entities::ws_data::ClientRevisionWSData; -use flowy_text_block::{ - errors::{internal_error, FlowyError}, - TextEditorCloudService, TextEditorManager, TextEditorUser, -}; use flowy_user::services::UserSession; use futures_core::future::BoxFuture; use lib_infra::future::BoxResultFuture; use lib_ws::{WSChannel, WSMessageReceiver, WebSocketRawMessage}; use std::{convert::TryInto, path::Path, sync::Arc}; -pub struct TextBlockDepsResolver(); -impl TextBlockDepsResolver { +pub struct DocumentDepsResolver(); +impl DocumentDepsResolver { pub fn resolve( local_server: Option>, ws_conn: Arc, user_session: Arc, server_config: &ClientServerConfiguration, - ) -> Arc { + ) -> Arc { let user = Arc::new(BlockUserImpl(user_session)); - let rev_web_socket = Arc::new(TextBlockWebSocket(ws_conn.clone())); - let cloud_service: Arc = match local_server { - None => Arc::new(BlockHttpCloudService::new(server_config.clone())), + let rev_web_socket = Arc::new(DocumentRevisionWebSocket(ws_conn.clone())); + let cloud_service: Arc = match local_server { + None => Arc::new(DocumentCloudServiceImpl::new(server_config.clone())), Some(local_server) => local_server, }; - let manager = Arc::new(TextEditorManager::new(cloud_service, user, rev_web_socket)); + let manager = Arc::new(DocumentManager::new(cloud_service, user, rev_web_socket)); let receiver = Arc::new(DocumentWSMessageReceiverImpl(manager.clone())); ws_conn.add_ws_message_receiver(receiver).unwrap(); @@ -40,7 +40,7 @@ impl TextBlockDepsResolver { } struct BlockUserImpl(Arc); -impl TextEditorUser for BlockUserImpl { +impl DocumentUser for BlockUserImpl { fn user_dir(&self) -> Result { let dir = self.0.user_dir().map_err(|e| FlowyError::unauthorized().context(e))?; @@ -64,8 +64,8 @@ impl TextEditorUser for BlockUserImpl { } } -struct TextBlockWebSocket(Arc); -impl RevisionWebSocket for TextBlockWebSocket { +struct DocumentRevisionWebSocket(Arc); +impl RevisionWebSocket for DocumentRevisionWebSocket { fn send(&self, data: ClientRevisionWSData) -> BoxResultFuture<(), FlowyError> { let bytes: Bytes = data.try_into().unwrap(); let msg = WebSocketRawMessage { @@ -90,7 +90,7 @@ impl RevisionWebSocket for TextBlockWebSocket { } } -struct DocumentWSMessageReceiverImpl(Arc); +struct DocumentWSMessageReceiverImpl(Arc); impl WSMessageReceiver for DocumentWSMessageReceiverImpl { fn source(&self) -> WSChannel { WSChannel::Document diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/folder_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/folder_deps.rs index c2730afe4e..a3dbdb57ac 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/folder_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/folder_deps.rs @@ -1,5 +1,6 @@ use bytes::Bytes; use flowy_database::ConnectionPool; +use flowy_document::DocumentManager; use flowy_folder::entities::{ViewDataTypePB, ViewLayoutTypePB}; use flowy_folder::manager::{ViewDataProcessor, ViewDataProcessorMap}; use flowy_folder::{ @@ -19,7 +20,6 @@ use flowy_revision::{RevisionWebSocket, WSStateReceiver}; use flowy_sync::client_document::default::initial_document_str; use flowy_sync::entities::revision::{RepeatedRevision, Revision}; use flowy_sync::entities::ws_data::ClientRevisionWSData; -use flowy_text_block::TextEditorManager; use flowy_user::services::UserSession; use futures_core::future::BoxFuture; use lib_infra::future::{BoxResultFuture, FutureResult}; @@ -35,12 +35,12 @@ impl FolderDepsResolver { user_session: Arc, server_config: &ClientServerConfiguration, ws_conn: &Arc, - text_block_manager: &Arc, + text_block_manager: &Arc, grid_manager: &Arc, ) -> Arc { let user: Arc = Arc::new(WorkspaceUserImpl(user_session.clone())); let database: Arc = Arc::new(WorkspaceDatabaseImpl(user_session)); - let web_socket = Arc::new(FolderWebSocket(ws_conn.clone())); + let web_socket = Arc::new(FolderRevisionWebSocket(ws_conn.clone())); let cloud_service: Arc = match local_server { None => Arc::new(FolderHttpCloudService::new(server_config.clone())), Some(local_server) => local_server, @@ -64,12 +64,12 @@ impl FolderDepsResolver { } fn make_view_data_processor( - text_block_manager: Arc, + text_block_manager: Arc, grid_manager: Arc, ) -> ViewDataProcessorMap { let mut map: HashMap> = HashMap::new(); - let block_data_impl = TextBlockViewDataProcessor(text_block_manager); + let block_data_impl = DocumentViewDataProcessor(text_block_manager); map.insert(block_data_impl.data_type(), Arc::new(block_data_impl)); let grid_data_impl = GridViewDataProcessor(grid_manager); @@ -96,8 +96,8 @@ impl WorkspaceUser for WorkspaceUserImpl { } } -struct FolderWebSocket(Arc); -impl RevisionWebSocket for FolderWebSocket { +struct FolderRevisionWebSocket(Arc); +impl RevisionWebSocket for FolderRevisionWebSocket { fn send(&self, data: ClientRevisionWSData) -> BoxResultFuture<(), FlowyError> { let bytes: Bytes = data.try_into().unwrap(); let msg = WebSocketRawMessage { @@ -136,8 +136,8 @@ impl WSMessageReceiver for FolderWSMessageReceiverImpl { } } -struct TextBlockViewDataProcessor(Arc); -impl ViewDataProcessor for TextBlockViewDataProcessor { +struct DocumentViewDataProcessor(Arc); +impl ViewDataProcessor for DocumentViewDataProcessor { fn initialize(&self) -> FutureResult<(), FlowyError> { let manager = self.0.clone(); FutureResult::new(async move { manager.init() }) @@ -156,7 +156,7 @@ impl ViewDataProcessor for TextBlockViewDataProcessor { let view_id = view_id.to_string(); let manager = self.0.clone(); FutureResult::new(async move { - let _ = manager.create_text_block(view_id, repeated_revision).await?; + let _ = manager.create_document(view_id, repeated_revision).await?; Ok(()) }) } @@ -165,7 +165,7 @@ impl ViewDataProcessor for TextBlockViewDataProcessor { let manager = self.0.clone(); let view_id = view_id.to_string(); FutureResult::new(async move { - let _ = manager.close_text_editor(view_id)?; + let _ = manager.close_document_editor(view_id)?; Ok(()) }) } @@ -174,8 +174,8 @@ impl ViewDataProcessor for TextBlockViewDataProcessor { let view_id = view_id.to_string(); let manager = self.0.clone(); FutureResult::new(async move { - let editor = manager.open_text_editor(view_id).await?; - let delta_bytes = Bytes::from(editor.delta_str().await?); + let editor = manager.open_document_editor(view_id).await?; + let delta_bytes = Bytes::from(editor.get_operation_str().await?); Ok(delta_bytes) }) } @@ -195,7 +195,7 @@ impl ViewDataProcessor for TextBlockViewDataProcessor { let delta_data = Bytes::from(view_data); let repeated_revision: RepeatedRevision = Revision::initial_revision(&user_id, &view_id, delta_data.clone()).into(); - let _ = manager.create_text_block(view_id, repeated_revision).await?; + let _ = manager.create_document(view_id, repeated_revision).await?; Ok(delta_data) }) } diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/grid_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/grid_deps.rs index f2b862c53a..880bc7031b 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/grid_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/grid_deps.rs @@ -18,7 +18,7 @@ pub struct GridDepsResolver(); impl GridDepsResolver { pub async fn resolve(ws_conn: Arc, user_session: Arc) -> Arc { let user = Arc::new(GridUserImpl(user_session.clone())); - let rev_web_socket = Arc::new(GridWebSocket(ws_conn)); + let rev_web_socket = Arc::new(GridRevisionWebSocket(ws_conn)); let grid_manager = Arc::new(GridManager::new( user.clone(), rev_web_socket, @@ -58,8 +58,8 @@ impl GridUser for GridUserImpl { } } -struct GridWebSocket(Arc); -impl RevisionWebSocket for GridWebSocket { +struct GridRevisionWebSocket(Arc); +impl RevisionWebSocket for GridRevisionWebSocket { fn send(&self, data: ClientRevisionWSData) -> BoxResultFuture<(), FlowyError> { let bytes: Bytes = data.try_into().unwrap(); let msg = WebSocketRawMessage { diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/mod.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/mod.rs index cac82f9c56..6a270e4376 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/mod.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/mod.rs @@ -1,10 +1,10 @@ +mod document_deps; mod folder_deps; mod grid_deps; -mod text_block_deps; mod user_deps; mod util; +pub use document_deps::*; pub use folder_deps::*; pub use grid_deps::*; -pub use text_block_deps::*; pub use user_deps::*; diff --git a/frontend/rust-lib/flowy-sdk/src/lib.rs b/frontend/rust-lib/flowy-sdk/src/lib.rs index e780f9f937..24a9e4f56c 100644 --- a/frontend/rust-lib/flowy-sdk/src/lib.rs +++ b/frontend/rust-lib/flowy-sdk/src/lib.rs @@ -3,6 +3,7 @@ pub mod module; pub use flowy_net::get_client_server_configuration; use crate::deps_resolve::*; +use flowy_document::DocumentManager; use flowy_folder::{errors::FlowyError, manager::FolderManager}; use flowy_grid::manager::GridManager; use flowy_net::ClientServerConfiguration; @@ -11,7 +12,6 @@ use flowy_net::{ local_server::LocalServer, ws::connection::{listen_on_websocket, FlowyWebSocketConnect}, }; -use flowy_text_block::TextEditorManager; use flowy_user::services::{notifier::UserStatus, UserSession, UserSessionConfig}; use lib_dispatch::prelude::*; use lib_dispatch::runtime::tokio_default_runtime; @@ -67,7 +67,7 @@ fn crate_log_filter(level: String) -> String { filters.push(format!("flowy_sdk={}", level)); filters.push(format!("flowy_folder={}", level)); filters.push(format!("flowy_user={}", level)); - filters.push(format!("flowy_text_block={}", level)); + filters.push(format!("flowy_document={}", level)); filters.push(format!("flowy_grid={}", level)); filters.push(format!("flowy_collaboration={}", "info")); filters.push(format!("dart_notify={}", level)); @@ -89,7 +89,7 @@ pub struct FlowySDK { #[allow(dead_code)] config: FlowySDKConfig, pub user_session: Arc, - pub text_block_manager: Arc, + pub text_block_manager: Arc, pub folder_manager: Arc, pub grid_manager: Arc, pub dispatcher: Arc, @@ -106,7 +106,7 @@ impl FlowySDK { let (local_server, ws_conn) = mk_local_server(&config.server_config); let (user_session, text_block_manager, folder_manager, local_server, grid_manager) = runtime.block_on(async { let user_session = mk_user_session(&config, &local_server, &config.server_config); - let text_block_manager = TextBlockDepsResolver::resolve( + let text_block_manager = DocumentDepsResolver::resolve( local_server.clone(), ws_conn.clone(), user_session.clone(), diff --git a/frontend/rust-lib/flowy-sdk/src/module.rs b/frontend/rust-lib/flowy-sdk/src/module.rs index 2f6a20d6c6..58fe1f4f54 100644 --- a/frontend/rust-lib/flowy-sdk/src/module.rs +++ b/frontend/rust-lib/flowy-sdk/src/module.rs @@ -1,7 +1,7 @@ +use flowy_document::DocumentManager; use flowy_folder::manager::FolderManager; use flowy_grid::manager::GridManager; use flowy_net::ws::connection::FlowyWebSocketConnect; -use flowy_text_block::TextEditorManager; use flowy_user::services::UserSession; use lib_dispatch::prelude::Module; use std::sync::Arc; @@ -11,7 +11,7 @@ pub fn mk_modules( folder_manager: &Arc, grid_manager: &Arc, user_session: &Arc, - text_block_manager: &Arc, + text_block_manager: &Arc, ) -> Vec { let user_module = mk_user_module(user_session.clone()); let folder_module = mk_folder_module(folder_manager.clone()); @@ -43,6 +43,6 @@ fn mk_grid_module(grid_manager: Arc) -> Module { flowy_grid::event_map::create(grid_manager) } -fn mk_text_block_module(text_block_manager: Arc) -> Module { - flowy_text_block::event_map::create(text_block_manager) +fn mk_text_block_module(text_block_manager: Arc) -> Module { + flowy_document::event_map::create(text_block_manager) } diff --git a/frontend/rust-lib/flowy-text-block/src/event_handler.rs b/frontend/rust-lib/flowy-text-block/src/event_handler.rs deleted file mode 100644 index 98ca7e19fe..0000000000 --- a/frontend/rust-lib/flowy-text-block/src/event_handler.rs +++ /dev/null @@ -1,43 +0,0 @@ -use crate::entities::{EditParams, EditPayloadPB, ExportDataPB, ExportParams, ExportPayloadPB, TextBlockPB}; -use crate::TextEditorManager; -use flowy_error::FlowyError; -use flowy_sync::entities::text_block::TextBlockIdPB; -use lib_dispatch::prelude::{data_result, AppData, Data, DataResult}; -use std::convert::TryInto; -use std::sync::Arc; - -pub(crate) async fn get_text_block_handler( - data: Data, - manager: AppData>, -) -> DataResult { - let text_block_id: TextBlockIdPB = data.into_inner(); - let editor = manager.open_text_editor(&text_block_id).await?; - let delta_str = editor.delta_str().await?; - data_result(TextBlockPB { - text_block_id: text_block_id.into(), - snapshot: delta_str, - }) -} - -pub(crate) async fn apply_edit_handler( - data: Data, - manager: AppData>, -) -> Result<(), FlowyError> { - let params: EditParams = data.into_inner().try_into()?; - let _ = manager.apply_edit(params).await?; - Ok(()) -} - -#[tracing::instrument(level = "debug", skip(data, manager), err)] -pub(crate) async fn export_handler( - data: Data, - manager: AppData>, -) -> DataResult { - let params: ExportParams = data.into_inner().try_into()?; - let editor = manager.open_text_editor(¶ms.view_id).await?; - let delta_json = editor.delta_str().await?; - data_result(ExportDataPB { - data: delta_json, - export_type: params.export_type, - }) -} diff --git a/frontend/rust-lib/flowy-text-block/src/lib.rs b/frontend/rust-lib/flowy-text-block/src/lib.rs deleted file mode 100644 index b3f9839fde..0000000000 --- a/frontend/rust-lib/flowy-text-block/src/lib.rs +++ /dev/null @@ -1,27 +0,0 @@ -pub mod editor; -mod entities; -mod event_handler; -pub mod event_map; -pub mod manager; -mod queue; -mod web_socket; - -pub mod protobuf; -pub use manager::*; -pub mod errors { - pub use flowy_error::{internal_error, ErrorCode, FlowyError}; -} - -pub const TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS: u64 = 1000; - -use crate::errors::FlowyError; -use flowy_sync::entities::text_block::{CreateTextBlockParams, DocumentPB, ResetTextBlockParams, TextBlockIdPB}; -use lib_infra::future::FutureResult; - -pub trait TextEditorCloudService: Send + Sync { - fn create_text_block(&self, token: &str, params: CreateTextBlockParams) -> FutureResult<(), FlowyError>; - - fn read_text_block(&self, token: &str, params: TextBlockIdPB) -> FutureResult, FlowyError>; - - fn update_text_block(&self, token: &str, params: ResetTextBlockParams) -> FutureResult<(), FlowyError>; -} diff --git a/shared-lib/flowy-sync/src/client_document/document_pad.rs b/shared-lib/flowy-sync/src/client_document/document_pad.rs index cdf5fdfad9..176cf0d02a 100644 --- a/shared-lib/flowy-sync/src/client_document/document_pad.rs +++ b/shared-lib/flowy-sync/src/client_document/document_pad.rs @@ -10,19 +10,19 @@ use bytes::Bytes; use lib_ot::{core::*, text_delta::TextOperations}; use tokio::sync::mpsc; -pub trait InitialDocumentContent { +pub trait InitialDocument { fn json_str() -> String; } pub struct EmptyDoc(); -impl InitialDocumentContent for EmptyDoc { +impl InitialDocument for EmptyDoc { fn json_str() -> String { TextOperations::default().json_str() } } pub struct NewlineDoc(); -impl InitialDocumentContent for NewlineDoc { +impl InitialDocument for NewlineDoc { fn json_str() -> String { initial_document_str() } @@ -37,7 +37,7 @@ pub struct ClientDocument { } impl ClientDocument { - pub fn new() -> Self { + pub fn new() -> Self { let content = C::json_str(); Self::from_json(&content).unwrap() } diff --git a/shared-lib/flowy-sync/src/entities/text_block.rs b/shared-lib/flowy-sync/src/entities/document.rs similarity index 62% rename from shared-lib/flowy-sync/src/entities/text_block.rs rename to shared-lib/flowy-sync/src/entities/document.rs index edea62a0e0..f351d95677 100644 --- a/shared-lib/flowy-sync/src/entities/text_block.rs +++ b/shared-lib/flowy-sync/src/entities/document.rs @@ -3,24 +3,24 @@ use crate::{ errors::CollaborateError, }; use flowy_derive::ProtoBuf; -use lib_ot::{errors::OTError, text_delta::TextOperations}; +use lib_ot::text_delta::TextOperations; #[derive(ProtoBuf, Default, Debug, Clone)] -pub struct CreateTextBlockParams { +pub struct CreateDocumentParams { #[pb(index = 1)] - pub id: String, + pub doc_id: String, #[pb(index = 2)] pub revisions: RepeatedRevision, } #[derive(ProtoBuf, Default, Debug, Clone, Eq, PartialEq)] -pub struct DocumentPB { +pub struct DocumentPayloadPB { #[pb(index = 1)] - pub block_id: String, + pub doc_id: String, #[pb(index = 2)] - pub text: String, + pub content: String, #[pb(index = 3)] pub rev_id: i64, @@ -29,14 +29,7 @@ pub struct DocumentPB { pub base_rev_id: i64, } -impl DocumentPB { - pub fn delta(&self) -> Result { - let delta = TextOperations::from_bytes(&self.text)?; - Ok(delta) - } -} - -impl std::convert::TryFrom for DocumentPB { +impl std::convert::TryFrom for DocumentPayloadPB { type Error = CollaborateError; fn try_from(revision: Revision) -> Result { @@ -48,9 +41,9 @@ impl std::convert::TryFrom for DocumentPB { let delta = TextOperations::from_bytes(&revision.bytes)?; let doc_json = delta.json_str(); - Ok(DocumentPB { - block_id: revision.object_id, - text: doc_json, + Ok(DocumentPayloadPB { + doc_id: revision.object_id, + content: doc_json, rev_id: revision.rev_id, base_rev_id: revision.base_rev_id, }) @@ -58,21 +51,21 @@ impl std::convert::TryFrom for DocumentPB { } #[derive(ProtoBuf, Default, Debug, Clone)] -pub struct ResetTextBlockParams { +pub struct ResetDocumentParams { #[pb(index = 1)] - pub block_id: String, + pub doc_id: String, #[pb(index = 2)] pub revisions: RepeatedRevision, } #[derive(ProtoBuf, Default, Debug, Clone)] -pub struct TextBlockDeltaPB { +pub struct DocumentOperationsPB { #[pb(index = 1)] - pub text_block_id: String, + pub doc_id: String, #[pb(index = 2)] - pub delta_str: String, + pub operations_str: String, } #[derive(ProtoBuf, Default, Debug, Clone)] @@ -88,30 +81,30 @@ pub struct NewDocUserPB { } #[derive(ProtoBuf, Default, Debug, Clone)] -pub struct TextBlockIdPB { +pub struct DocumentIdPB { #[pb(index = 1)] pub value: String, } -impl AsRef for TextBlockIdPB { +impl AsRef for DocumentIdPB { fn as_ref(&self) -> &str { &self.value } } -impl std::convert::From for TextBlockIdPB { +impl std::convert::From for DocumentIdPB { fn from(value: String) -> Self { - TextBlockIdPB { value } + DocumentIdPB { value } } } -impl std::convert::From for String { - fn from(block_id: TextBlockIdPB) -> Self { +impl std::convert::From for String { + fn from(block_id: DocumentIdPB) -> Self { block_id.value } } -impl std::convert::From<&String> for TextBlockIdPB { +impl std::convert::From<&String> for DocumentIdPB { fn from(s: &String) -> Self { - TextBlockIdPB { value: s.to_owned() } + DocumentIdPB { value: s.to_owned() } } } diff --git a/shared-lib/flowy-sync/src/entities/mod.rs b/shared-lib/flowy-sync/src/entities/mod.rs index 1c357df94c..178c8a0f35 100644 --- a/shared-lib/flowy-sync/src/entities/mod.rs +++ b/shared-lib/flowy-sync/src/entities/mod.rs @@ -1,5 +1,5 @@ +pub mod document; pub mod folder; pub mod parser; pub mod revision; -pub mod text_block; pub mod ws_data; diff --git a/shared-lib/flowy-sync/src/server_document/document_manager.rs b/shared-lib/flowy-sync/src/server_document/document_manager.rs index 786021b0b4..82dd4cb2e7 100644 --- a/shared-lib/flowy-sync/src/server_document/document_manager.rs +++ b/shared-lib/flowy-sync/src/server_document/document_manager.rs @@ -1,6 +1,6 @@ use crate::entities::revision::{RepeatedRevision, Revision}; use crate::{ - entities::{text_block::DocumentPB, ws_data::ServerRevisionWSDataBuilder}, + entities::{document::DocumentPayloadPB, ws_data::ServerRevisionWSDataBuilder}, errors::{internal_error, CollaborateError, CollaborateResult}, protobuf::ClientRevisionWSData, server_document::document_pad::ServerDocument, @@ -19,41 +19,41 @@ use tokio::{ task::spawn_blocking, }; -pub trait TextBlockCloudPersistence: Send + Sync + Debug { - fn read_text_block(&self, doc_id: &str) -> BoxResultFuture; +pub trait DocumentCloudPersistence: Send + Sync + Debug { + fn read_document(&self, doc_id: &str) -> BoxResultFuture; - fn create_text_block( + fn create_document( &self, doc_id: &str, repeated_revision: RepeatedRevision, - ) -> BoxResultFuture, CollaborateError>; + ) -> BoxResultFuture, CollaborateError>; - fn read_text_block_revisions( + fn read_document_revisions( &self, doc_id: &str, rev_ids: Option>, ) -> BoxResultFuture, CollaborateError>; - fn save_text_block_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError>; + fn save_document_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError>; - fn reset_text_block( + fn reset_document( &self, doc_id: &str, repeated_revision: RepeatedRevision, ) -> BoxResultFuture<(), CollaborateError>; } -impl RevisionSyncPersistence for Arc { +impl RevisionSyncPersistence for Arc { fn read_revisions( &self, object_id: &str, rev_ids: Option>, ) -> BoxResultFuture, CollaborateError> { - (**self).read_text_block_revisions(object_id, rev_ids) + (**self).read_document_revisions(object_id, rev_ids) } fn save_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> { - (**self).save_text_block_revisions(repeated_revision) + (**self).save_document_revisions(repeated_revision) } fn reset_object( @@ -61,17 +61,17 @@ impl RevisionSyncPersistence for Arc { object_id: &str, repeated_revision: RepeatedRevision, ) -> BoxResultFuture<(), CollaborateError> { - (**self).reset_text_block(object_id, repeated_revision) + (**self).reset_document(object_id, repeated_revision) } } pub struct ServerDocumentManager { document_handlers: Arc>>>, - persistence: Arc, + persistence: Arc, } impl ServerDocumentManager { - pub fn new(persistence: Arc) -> Self { + pub fn new(persistence: Arc) -> Self { Self { document_handlers: Arc::new(RwLock::new(HashMap::new())), persistence, @@ -154,7 +154,7 @@ impl ServerDocumentManager { } let mut write_guard = self.document_handlers.write().await; - match self.persistence.read_text_block(doc_id).await { + match self.persistence.read_document(doc_id).await { Ok(doc) => { let handler = self.create_document_handler(doc).await.map_err(internal_error).unwrap(); write_guard.insert(doc_id.to_owned(), handler.clone()); @@ -170,7 +170,7 @@ impl ServerDocumentManager { doc_id: &str, repeated_revision: RepeatedRevision, ) -> Result, CollaborateError> { - match self.persistence.create_text_block(doc_id, repeated_revision).await? { + match self.persistence.create_document(doc_id, repeated_revision).await? { None => Err(CollaborateError::internal().context("Create document info from revisions failed")), Some(doc) => { let handler = self.create_document_handler(doc).await?; @@ -184,7 +184,10 @@ impl ServerDocumentManager { } #[tracing::instrument(level = "debug", skip(self, doc), err)] - async fn create_document_handler(&self, doc: DocumentPB) -> Result, CollaborateError> { + async fn create_document_handler( + &self, + doc: DocumentPayloadPB, + ) -> Result, CollaborateError> { let persistence = self.persistence.clone(); let handle = spawn_blocking(|| OpenDocumentHandler::new(doc, persistence)) .await @@ -208,16 +211,16 @@ struct OpenDocumentHandler { } impl OpenDocumentHandler { - fn new(doc: DocumentPB, persistence: Arc) -> Result { - let doc_id = doc.block_id.clone(); + fn new(doc: DocumentPayloadPB, persistence: Arc) -> Result { + let doc_id = doc.doc_id.clone(); let (sender, receiver) = mpsc::channel(1000); let users = DashMap::new(); - let operations = TextOperations::from_bytes(&doc.text)?; + let operations = TextOperations::from_bytes(&doc.content)?; let sync_object = ServerDocument::from_operations(&doc_id, operations); let synchronizer = Arc::new(DocumentRevisionSynchronizer::new(doc.rev_id, sync_object, persistence)); - let queue = DocumentCommandRunner::new(&doc.block_id, receiver, synchronizer); + let queue = DocumentCommandRunner::new(&doc.doc_id, receiver, synchronizer); tokio::task::spawn(queue.run()); Ok(Self { doc_id, sender, users }) } diff --git a/shared-lib/flowy-sync/src/server_document/document_pad.rs b/shared-lib/flowy-sync/src/server_document/document_pad.rs index 3a6c54628f..ff2b567955 100644 --- a/shared-lib/flowy-sync/src/server_document/document_pad.rs +++ b/shared-lib/flowy-sync/src/server_document/document_pad.rs @@ -1,5 +1,5 @@ use crate::synchronizer::RevisionOperations; -use crate::{client_document::InitialDocumentContent, errors::CollaborateError, synchronizer::RevisionSyncObject}; +use crate::{client_document::InitialDocument, errors::CollaborateError, synchronizer::RevisionSyncObject}; use lib_ot::{core::*, text_delta::TextOperations}; pub struct ServerDocument { @@ -9,7 +9,7 @@ pub struct ServerDocument { impl ServerDocument { #[allow(dead_code)] - pub fn new(doc_id: &str) -> Self { + pub fn new(doc_id: &str) -> Self { let operations = TextOperations::from_json(&C::json_str()).unwrap(); Self::from_operations(doc_id, operations) } diff --git a/shared-lib/flowy-sync/src/util.rs b/shared-lib/flowy-sync/src/util.rs index 43c06823fa..225598c752 100644 --- a/shared-lib/flowy-sync/src/util.rs +++ b/shared-lib/flowy-sync/src/util.rs @@ -1,9 +1,9 @@ use crate::server_folder::FolderOperations; use crate::{ entities::{ + document::DocumentPayloadPB, folder::FolderInfo, revision::{RepeatedRevision, Revision}, - text_block::DocumentPB, }, errors::{CollaborateError, CollaborateResult}, }; @@ -149,7 +149,7 @@ pub fn make_folder_from_revisions_pb( pub fn make_document_from_revision_pbs( doc_id: &str, revisions: RepeatedRevision, -) -> Result, CollaborateError> { +) -> Result, CollaborateError> { let revisions = revisions.into_inner(); if revisions.is_empty() { return Ok(None); @@ -172,9 +172,9 @@ pub fn make_document_from_revision_pbs( let text = delta.json_str(); - Ok(Some(DocumentPB { - block_id: doc_id.to_owned(), - text, + Ok(Some(DocumentPayloadPB { + doc_id: doc_id.to_owned(), + content: text, rev_id, base_rev_id, })) diff --git a/shared-lib/lib-ot/Cargo.toml b/shared-lib/lib-ot/Cargo.toml index 81b75507a0..64cb062ff1 100644 --- a/shared-lib/lib-ot/Cargo.toml +++ b/shared-lib/lib-ot/Cargo.toml @@ -8,8 +8,8 @@ edition = "2018" [dependencies] bytecount = "0.6.0" serde = { version = "1.0", features = ["derive", "rc"] } +#flowy-revision = { path = "../../frontend/rust-lib/flowy-revision" } #protobuf = {version = "2.18.0"} -#flowy-derive = { path = "../flowy-derive" } tokio = { version = "1", features = ["sync"] } dashmap = "5" md5 = "0.7.0" diff --git a/shared-lib/lib-ot/src/core/delta/ops_serde.rs b/shared-lib/lib-ot/src/core/delta/ops_serde.rs index a29b32926f..0a3ebc7230 100644 --- a/shared-lib/lib-ot/src/core/delta/ops_serde.rs +++ b/shared-lib/lib-ot/src/core/delta/ops_serde.rs @@ -1,5 +1,6 @@ use crate::core::delta::operation::OperationAttributes; use crate::core::delta::DeltaOperations; +use serde::de::DeserializeOwned; use serde::{ de::{SeqAccess, Visitor}, ser::SerializeSeq,