From 5b7e6690f8f7116a21f5c4194616cb5ea97c8e13 Mon Sep 17 00:00:00 2001 From: appflowy Date: Mon, 13 Dec 2021 22:46:35 +0800 Subject: [PATCH] test sync --- backend/src/services/doc/crud.rs | 18 ++- backend/src/services/doc/manager.rs | 39 ++++- backend/src/services/doc/router.rs | 16 +- backend/src/services/doc/ws_actor.rs | 56 ++----- backend/src/services/view/view.rs | 4 +- .../flowy-collaboration/ws.pbenum.dart | 2 - .../flowy-collaboration/ws.pbjson.dart | 3 +- .../src/services/doc/controller.rs | 2 +- .../src/services/doc/edit/editor.rs | 14 +- .../src/services/doc/edit/model.rs | 26 ++-- .../doc/revision/{ => cache}/cache.rs | 95 ++++++++---- .../src/services/doc/revision/cache/disk.rs | 12 ++ .../src/services/doc/revision/cache/memory.rs | 126 +++++++++++++++ .../src/services/doc/revision/cache/mod.rs | 8 + .../src/services/doc/revision/cache/model.rs | 15 ++ .../src/services/doc/revision/manager.rs | 9 +- .../src/services/doc/revision/sync.rs | 14 +- .../src/sql_tables/doc/rev_sql.rs | 21 +-- .../src/sql_tables/doc/rev_table.rs | 12 +- .../tests/editor/revision_test.rs | 5 +- .../src/protobuf/model/network_state.rs | 30 ++-- .../src/protobuf/proto/network_state.proto | 1 + frontend/rust-lib/flowy-test/src/editor.rs | 2 +- .../flowy-user/src/services/server/ws_mock.rs | 144 +++++++++++++++--- .../src/core/sync/server_editor.rs | 62 ++++---- .../src/core/sync/synchronizer.rs | 28 +--- .../src/entities/doc/doc.rs | 25 ++- .../flowy-collaboration/src/entities/ws/ws.rs | 23 +-- shared-lib/flowy-collaboration/src/errors.rs | 16 +- .../src/protobuf/model/ws.rs | 48 +++--- .../src/protobuf/proto/ws.proto | 1 - shared-lib/lib-ot/src/revision/cache.rs | 140 ----------------- shared-lib/lib-ot/src/revision/mod.rs | 2 - shared-lib/lib-ot/src/revision/model.rs | 8 + 34 files changed, 595 insertions(+), 432 deletions(-) rename frontend/rust-lib/flowy-document/src/services/doc/revision/{ => cache}/cache.rs (72%) create mode 100644 frontend/rust-lib/flowy-document/src/services/doc/revision/cache/disk.rs create mode 100644 frontend/rust-lib/flowy-document/src/services/doc/revision/cache/memory.rs create mode 100644 frontend/rust-lib/flowy-document/src/services/doc/revision/cache/mod.rs create mode 100644 frontend/rust-lib/flowy-document/src/services/doc/revision/cache/model.rs delete mode 100644 shared-lib/lib-ot/src/revision/cache.rs diff --git a/backend/src/services/doc/crud.rs b/backend/src/services/doc/crud.rs index 0d83d5922a..264cf6103a 100644 --- a/backend/src/services/doc/crud.rs +++ b/backend/src/services/doc/crud.rs @@ -9,7 +9,7 @@ use sqlx::{postgres::PgArguments, PgPool, Postgres}; use uuid::Uuid; #[tracing::instrument(level = "debug", skip(transaction), err)] -pub(crate) async fn create_doc( +pub(crate) async fn create_doc_with_transaction( transaction: &mut DBTransaction<'_>, params: CreateDocParams, ) -> Result<(), ServerError> { @@ -23,6 +23,22 @@ pub(crate) async fn create_doc( Ok(()) } +pub(crate) async fn create_doc(pool: &PgPool, params: CreateDocParams) -> Result<(), ServerError> { + let mut transaction = pool + .begin() + .await + .context("Failed to acquire a Postgres connection to create doc")?; + + let _ = create_doc_with_transaction(&mut transaction, params).await?; + + transaction + .commit() + .await + .context("Failed to commit SQL transaction to create doc.")?; + + Ok(()) +} + #[tracing::instrument(level = "debug", skip(pool), err)] pub(crate) async fn read_doc(pool: &PgPool, params: DocIdentifier) -> Result { let doc_id = Uuid::parse_str(¶ms.doc_id)?; diff --git a/backend/src/services/doc/manager.rs b/backend/src/services/doc/manager.rs index 06c0c59a34..6ac68a5e0f 100644 --- a/backend/src/services/doc/manager.rs +++ b/backend/src/services/doc/manager.rs @@ -8,14 +8,16 @@ use crate::{ }; use actix_web::web::Data; +use crate::services::doc::create_doc; +use backend_service::errors::ServerError; use flowy_collaboration::{ core::sync::{ServerDocManager, ServerDocPersistence}, entities::doc::Doc, errors::CollaborateError, - protobuf::{DocIdentifier, UpdateDocParams}, + protobuf::{CreateDocParams, DocIdentifier, UpdateDocParams}, }; use lib_infra::future::FutureResultSend; -use lib_ot::rich_text::RichTextDelta; +use lib_ot::{revision::Revision, rich_text::RichTextDelta}; use sqlx::PgPool; use std::{convert::TryInto, sync::Arc}; use tokio::sync::{mpsc, oneshot}; @@ -77,7 +79,7 @@ impl ServerDocPersistence for DocPersistenceImpl { FutureResultSend::new(async move { let _ = update_doc(pg_pool.get_ref(), params) .await - .map_err(|e| CollaborateError::internal().context(e))?; + .map_err(server_error_to_collaborate_error)?; Ok(()) }) } @@ -91,11 +93,40 @@ impl ServerDocPersistence for DocPersistenceImpl { FutureResultSend::new(async move { let mut pb_doc = read_doc(pg_pool.get_ref(), params) .await - .map_err(|e| CollaborateError::internal().context(e))?; + .map_err(server_error_to_collaborate_error)?; let doc = (&mut pb_doc) .try_into() .map_err(|e| CollaborateError::internal().context(e))?; Ok(doc) }) } + + fn create_doc(&self, revision: Revision) -> FutureResultSend { + let pg_pool = self.0.clone(); + FutureResultSend::new(async move { + let delta = RichTextDelta::from_bytes(&revision.delta_data)?; + let doc_json = delta.to_json(); + + let params = CreateDocParams { + id: revision.doc_id.clone(), + data: doc_json.clone(), + unknown_fields: Default::default(), + cached_size: Default::default(), + }; + + let _ = create_doc(pg_pool.get_ref(), params) + .await + .map_err(server_error_to_collaborate_error)?; + let doc: Doc = revision.try_into()?; + Ok(doc) + }) + } +} + +fn server_error_to_collaborate_error(error: ServerError) -> CollaborateError { + if error.is_record_not_found() { + CollaborateError::record_not_found() + } else { + CollaborateError::internal().context(error) + } } diff --git a/backend/src/services/doc/router.rs b/backend/src/services/doc/router.rs index 74883ae4f6..c541db6f69 100644 --- a/backend/src/services/doc/router.rs +++ b/backend/src/services/doc/router.rs @@ -6,26 +6,14 @@ use actix_web::{ web::{Data, Payload}, HttpResponse, }; -use anyhow::Context; + use backend_service::{errors::ServerError, response::FlowyResponse}; use flowy_collaboration::protobuf::{CreateDocParams, DocIdentifier, UpdateDocParams}; use sqlx::PgPool; pub async fn create_handler(payload: Payload, pool: Data) -> Result { let params: CreateDocParams = parse_from_payload(payload).await?; - - let mut transaction = pool - .begin() - .await - .context("Failed to acquire a Postgres connection to create doc")?; - - let _ = create_doc(&mut transaction, params).await?; - - transaction - .commit() - .await - .context("Failed to commit SQL transaction to create doc.")?; - + let _ = create_doc(&pool, params).await?; Ok(FlowyResponse::success().into()) } diff --git a/backend/src/services/doc/ws_actor.rs b/backend/src/services/doc/ws_actor.rs index e93d59a1fc..85b196dfb7 100644 --- a/backend/src/services/doc/ws_actor.rs +++ b/backend/src/services/doc/ws_actor.rs @@ -10,8 +10,8 @@ use actix_web::web::Data; use async_stream::stream; use backend_service::errors::{internal_error, Result as DocResult, ServerError}; use flowy_collaboration::{ - core::sync::{OpenDocHandle, ServerDocManager}, - protobuf::{NewDocUser, WsDataType, WsDocumentData}, + core::sync::ServerDocManager, + protobuf::{WsDataType, WsDocumentData}, }; use futures::stream::StreamExt; use lib_ot::protobuf::Revision; @@ -80,32 +80,11 @@ impl DocWsActor { match document_data.ty { WsDataType::Acked => Ok(()), WsDataType::PushRev => self.apply_pushed_rev(user, socket, data, pool).await, - WsDataType::NewDocUser => self.add_doc_user(user, socket, data, pool).await, WsDataType::PullRev => Ok(()), WsDataType::Conflict => Ok(()), } } - async fn add_doc_user( - &self, - user: Arc, - socket: Socket, - data: Vec, - pg_pool: Data, - ) -> DocResult<()> { - let doc_user = spawn_blocking(move || { - let user: NewDocUser = parse_from_bytes(&data)?; - DocResult::Ok(user) - }) - .await - .map_err(internal_error)??; - if let Some(handle) = self.get_doc_handle(&doc_user.doc_id, pg_pool.clone()).await { - let user = Arc::new(ServerDocUser { user, socket, pg_pool }); - handle.add_user(user, doc_user.rev_id).await.map_err(internal_error)?; - } - Ok(()) - } - async fn apply_pushed_rev( &self, user: Arc, @@ -113,30 +92,27 @@ impl DocWsActor { data: Vec, pg_pool: Data, ) -> DocResult<()> { - let mut revision = spawn_blocking(move || { + let mut revision_pb = spawn_blocking(move || { let revision: Revision = parse_from_bytes(&data)?; let _ = verify_md5(&revision)?; DocResult::Ok(revision) }) .await .map_err(internal_error)??; - if let Some(handle) = self.get_doc_handle(&revision.doc_id, pg_pool.clone()).await { - let user = Arc::new(ServerDocUser { user, socket, pg_pool }); - let revision = (&mut revision).try_into().map_err(internal_error).unwrap(); - handle.apply_revision(user, revision).await.map_err(internal_error)?; - } - Ok(()) - } + let revision: lib_ot::revision::Revision = (&mut revision_pb).try_into().map_err(internal_error)?; + // Create the doc if it doesn't exist + let handler = match self.doc_manager.get(&revision.doc_id).await { + None => self + .doc_manager + .create_doc(revision.clone()) + .await + .map_err(internal_error)?, + Some(handler) => handler, + }; - async fn get_doc_handle(&self, doc_id: &str, _pg_pool: Data) -> Option> { - match self.doc_manager.get(doc_id).await { - Ok(Some(edit_doc)) => Some(edit_doc), - Ok(None) => None, - Err(e) => { - log::error!("{}", e); - None - }, - } + let user = Arc::new(ServerDocUser { user, socket, pg_pool }); + handler.apply_revision(user, revision).await.map_err(internal_error)?; + Ok(()) } } diff --git a/backend/src/services/view/view.rs b/backend/src/services/view/view.rs index 05715303d5..5dd1055d0e 100644 --- a/backend/src/services/view/view.rs +++ b/backend/src/services/view/view.rs @@ -1,7 +1,7 @@ use crate::{ entities::workspace::{ViewTable, VIEW_TABLE}, services::{ - doc::{create_doc, delete_doc}, + doc::{create_doc_with_transaction, delete_doc}, trash::read_trash_ids, user::LoggedUser, view::sql_builder::*, @@ -94,7 +94,7 @@ pub(crate) async fn create_view_with_args( let mut create_doc_params = CreateDocParams::new(); create_doc_params.set_data(view_data); create_doc_params.set_id(view.id.clone()); - let _ = create_doc(transaction, create_doc_params).await?; + let _ = create_doc_with_transaction(transaction, create_doc_params).await?; Ok(view) } diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbenum.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbenum.dart index 8cedbb2f2e..31f54d22ea 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbenum.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbenum.dart @@ -14,14 +14,12 @@ class WsDataType extends $pb.ProtobufEnum { static const WsDataType PushRev = WsDataType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PushRev'); static const WsDataType PullRev = WsDataType._(2, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PullRev'); static const WsDataType Conflict = WsDataType._(3, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Conflict'); - static const WsDataType NewDocUser = WsDataType._(4, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'NewDocUser'); static const $core.List values = [ Acked, PushRev, PullRev, Conflict, - NewDocUser, ]; static final $core.Map<$core.int, WsDataType> _byValue = $pb.ProtobufEnum.initByValue(values); diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbjson.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbjson.dart index 8f57171e33..50bc4f4d5b 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbjson.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbjson.dart @@ -16,12 +16,11 @@ const WsDataType$json = const { const {'1': 'PushRev', '2': 1}, const {'1': 'PullRev', '2': 2}, const {'1': 'Conflict', '2': 3}, - const {'1': 'NewDocUser', '2': 4}, ], }; /// Descriptor for `WsDataType`. Decode as a `google.protobuf.EnumDescriptorProto`. -final $typed_data.Uint8List wsDataTypeDescriptor = $convert.base64Decode('CgpXc0RhdGFUeXBlEgkKBUFja2VkEAASCwoHUHVzaFJldhABEgsKB1B1bGxSZXYQAhIMCghDb25mbGljdBADEg4KCk5ld0RvY1VzZXIQBA=='); +final $typed_data.Uint8List wsDataTypeDescriptor = $convert.base64Decode('CgpXc0RhdGFUeXBlEgkKBUFja2VkEAASCwoHUHVzaFJldhABEgsKB1B1bGxSZXYQAhIMCghDb25mbGljdBAD'); @$core.Deprecated('Use wsDocumentDataDescriptor instead') const WsDocumentData$json = const { '1': 'WsDocumentData', diff --git a/frontend/rust-lib/flowy-document/src/services/doc/controller.rs b/frontend/rust-lib/flowy-document/src/services/doc/controller.rs index 82b999a4ff..cab76400b4 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/controller.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/controller.rs @@ -55,7 +55,7 @@ impl DocController { } pub(crate) fn close(&self, doc_id: &str) -> Result<(), DocError> { - log::debug!("Close doc {}", doc_id); + tracing::debug!("Close doc {}", doc_id); self.open_cache.remove(doc_id); self.ws_manager.remove_handler(doc_id); Ok(()) diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs index 3246d8fcc9..f60a7d5e86 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs @@ -63,7 +63,7 @@ impl ClientDocEditor { stop_sync_tx, }); - edit_doc.notify_open_doc(); + // edit_doc.notify_open_doc(); start_sync(edit_doc.clone(), ws_msg_rx, cloned_stop_sync_tx); Ok(edit_doc) @@ -165,7 +165,7 @@ impl ClientDocEditor { let delta_data = delta_data.to_vec(); let user_id = self.user.user_id()?; let revision = Revision::new(base_rev_id, rev_id, delta_data, &self.doc_id, RevType::Local, user_id); - let _ = self.rev_manager.add_revision(&revision).await?; + let _ = self.rev_manager.add_local_revision(&revision).await?; Ok(rev_id.into()) } @@ -246,7 +246,7 @@ impl ClientDocEditor { RevType::Remote, user_id, ); - let _ = self.rev_manager.add_revision(&revision).await?; + let _ = self.rev_manager.add_remote_revision(&revision).await?; // send the server_prime delta let user_id = self.user.user_id()?; @@ -264,10 +264,8 @@ impl ClientDocEditor { pub async fn handle_ws_message(&self, doc_data: WsDocumentData) -> DocResult<()> { match self.ws_msg_tx.send(doc_data) { - Ok(_) => { - tracing::debug!("Propagate ws message data success") - }, - Err(e) => tracing::error!("Propagate ws message data failed. {}", e), + Ok(_) => {}, + Err(e) => tracing::error!("❌Propagate ws message failed. {}", e), } Ok(()) } @@ -286,7 +284,7 @@ impl WsDocumentHandler for EditDocWsHandler { let edit_doc = self.0.clone(); tokio::spawn(async move { if let Err(e) = edit_doc.handle_ws_message(doc_data).await { - log::error!("{:?}", e); + tracing::error!("❌{:?}", e); } }); } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/model.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/model.rs index f75cc672e2..c2454c267e 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/model.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/model.rs @@ -1,10 +1,13 @@ +#![allow(clippy::all)] +#![cfg_attr(rustfmt, rustfmt::skip)] use crate::{errors::DocError, services::ws::DocumentWebSocket}; -use flowy_collaboration::entities::doc::NewDocUser; + use futures::future::BoxFuture; use lib_infra::retry::Action; use lib_ot::revision::RevId; use std::{future, sync::Arc}; +#[allow(dead_code)] pub(crate) struct OpenDocAction { user_id: String, rev_id: RevId, @@ -29,15 +32,16 @@ impl Action for OpenDocAction { type Error = DocError; fn run(&mut self) -> Self::Future { - let new_doc_user = NewDocUser { - user_id: self.user_id.clone(), - rev_id: self.rev_id.clone().into(), - doc_id: self.doc_id.clone(), - }; - - match self.ws.send(new_doc_user.into()) { - Ok(_) => Box::pin(future::ready(Ok::<(), DocError>(()))), - Err(e) => Box::pin(future::ready(Err::<(), DocError>(e))), - } + // let new_doc_user = NewDocUser { + // user_id: self.user_id.clone(), + // rev_id: self.rev_id.clone().into(), + // doc_id: self.doc_id.clone(), + // }; + // + // match self.ws.send(new_doc_user.into()) { + // Ok(_) => Box::pin(future::ready(Ok::<(), DocError>(()))), + // Err(e) => Box::pin(future::ready(Err::<(), DocError>(e))), + // } + Box::pin(future::ready(Ok::<(), DocError>(()))) } } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs similarity index 72% rename from frontend/rust-lib/flowy-document/src/services/doc/revision/cache.rs rename to frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs index 8ded5792c9..8d85696203 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs @@ -1,6 +1,10 @@ use crate::{ errors::{internal_error, DocError, DocResult}, - services::doc::revision::RevisionServer, + services::doc::revision::{ + cache::{disk::RevisionDiskCache, memory::RevisionMemoryCache}, + RevisionRecord, + RevisionServer, + }, sql_tables::RevTableSql, }; use flowy_collaboration::entities::doc::Doc; @@ -8,7 +12,7 @@ use flowy_database::ConnectionPool; use lib_infra::future::FutureResult; use lib_ot::{ core::{Operation, OperationTransformable}, - revision::{RevState, RevType, Revision, RevisionDiskCache, RevisionMemoryCache, RevisionRange, RevisionRecord}, + revision::{RevState, RevType, Revision, RevisionRange}, rich_text::RichTextDelta, }; use std::{sync::Arc, time::Duration}; @@ -53,11 +57,29 @@ impl RevisionCache { } #[tracing::instrument(level = "debug", skip(self, revision))] - pub async fn add_revision(&self, revision: Revision) -> DocResult<()> { + pub async fn add_local_revision(&self, revision: Revision) -> DocResult<()> { if self.memory_cache.contains(&revision.rev_id) { return Err(DocError::duplicate_rev().context(format!("Duplicate revision id: {}", revision.rev_id))); } - self.memory_cache.add_revision(revision.clone()).await?; + let record = RevisionRecord { + revision, + state: RevState::Local, + }; + self.memory_cache.add_revision(record).await?; + self.save_revisions().await; + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self, revision))] + pub async fn add_remote_revision(&self, revision: Revision) -> DocResult<()> { + if self.memory_cache.contains(&revision.rev_id) { + return Err(DocError::duplicate_rev().context(format!("Duplicate revision id: {}", revision.rev_id))); + } + let record = RevisionRecord { + revision, + state: RevState::Local, + }; + self.memory_cache.add_revision(record).await?; self.save_revisions().await; Ok(()) } @@ -68,8 +90,17 @@ impl RevisionCache { self.save_revisions().await; } - pub async fn query_revision(&self, rev_id: i64) -> Option { - self.memory_cache.query_revision(&rev_id).await + pub async fn query_revision(&self, doc_id: &str, rev_id: i64) -> Option { + match self.memory_cache.query_revision(&rev_id).await { + None => match self.dish_cache.read_revision(doc_id, rev_id) { + Ok(revision) => revision, + Err(e) => { + log::error!("query_revision error: {:?}", e); + None + }, + }, + Some(record) => Some(record), + } } async fn save_revisions(&self) { @@ -102,9 +133,15 @@ impl RevisionCache { } else { let doc_id = self.doc_id.clone(); let disk_cache = self.dish_cache.clone(); - spawn_blocking(move || disk_cache.revisions_in_range(&doc_id, &range)) + let records = spawn_blocking(move || disk_cache.revisions_in_range(&doc_id, &range)) .await - .map_err(internal_error)? + .map_err(internal_error)??; + + let revisions = records + .into_iter() + .map(|record| record.revision) + .collect::>(); + Ok(revisions) } } @@ -126,11 +163,8 @@ impl RevisionCache { RevType::Remote, self.user_id.clone(), ); - let record = RevisionRecord { - revision, - state: RevState::Acked, - }; - let _ = self.dish_cache.create_revisions(vec![record])?; + + self.add_remote_revision(revision).await?; Ok(doc) } } @@ -141,14 +175,14 @@ impl RevisionIterator for RevisionCache { let disk_cache = self.dish_cache.clone(); let doc_id = self.doc_id.clone(); FutureResult::new(async move { - match memory_cache.front_revision().await { + match memory_cache.front_local_revision().await { None => { // - match memory_cache.front_rev_id().await { + match memory_cache.front_local_rev_id().await { None => Ok(None), Some(rev_id) => match disk_cache.read_revision(&doc_id, rev_id)? { None => Ok(None), - Some(revision) => Ok(Some(RevisionRecord::new(revision))), + Some(record) => Ok(Some(record)), }, } }, @@ -166,25 +200,25 @@ async fn load_from_disk( let doc_id = doc_id.to_owned(); let (tx, mut rx) = mpsc::channel(2); let doc = spawn_blocking(move || { - let revisions = disk_cache.read_revisions(&doc_id)?; - if revisions.is_empty() { + let records = disk_cache.read_revisions(&doc_id)?; + if records.is_empty() { return Err(DocError::doc_not_found().context("Local doesn't have this document")); } - let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id(); + let (base_rev_id, rev_id) = records.last().unwrap().revision.pair_rev_id(); let mut delta = RichTextDelta::new(); - for (_, revision) in revisions.into_iter().enumerate() { + for (_, record) in records.into_iter().enumerate() { // Opti: revision's clone may cause memory issues - match RichTextDelta::from_bytes(revision.clone().delta_data) { + match RichTextDelta::from_bytes(record.revision.clone().delta_data) { Ok(local_delta) => { delta = delta.compose(&local_delta)?; - match tx.blocking_send(revision) { + match tx.blocking_send(record) { Ok(_) => {}, - Err(e) => log::error!("Load document from disk error: {}", e), + Err(e) => tracing::error!("❌Load document from disk error: {}", e), } }, Err(e) => { - log::error!("Deserialize delta from revision failed: {}", e); + tracing::error!("Deserialize delta from revision failed: {}", e); }, } } @@ -200,13 +234,12 @@ async fn load_from_disk( .await .map_err(internal_error)?; - while let Some(revision) = rx.recv().await { - match memory_cache.add_revision(revision).await { + while let Some(record) = rx.recv().await { + match memory_cache.add_revision(record).await { Ok(_) => {}, Err(e) => log::error!("{:?}", e), } } - doc } @@ -217,7 +250,7 @@ fn correct_delta_if_need(delta: &mut RichTextDelta) { let data = delta.ops.last().as_ref().unwrap().get_data(); if !data.ends_with('\n') { - log::error!("The op must end with newline. Correcting it by inserting newline op"); + log::error!("❌The op must end with newline. Correcting it by inserting newline op"); delta.ops.push(Operation::Insert("\n".into())); } } @@ -238,19 +271,19 @@ impl RevisionDiskCache for Persistence { }) } - fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result, Self::Error> { + fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result, Self::Error> { let conn = &*self.pool.get().map_err(internal_error).unwrap(); let revisions = RevTableSql::read_rev_tables_with_range(&self.user_id, doc_id, range.clone(), conn)?; Ok(revisions) } - fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result, Self::Error> { + fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result, Self::Error> { let conn = self.pool.get().map_err(internal_error)?; let some = RevTableSql::read_rev_table(&self.user_id, doc_id, &rev_id, &*conn)?; Ok(some) } - fn read_revisions(&self, doc_id: &str) -> Result, Self::Error> { + fn read_revisions(&self, doc_id: &str) -> Result, Self::Error> { let conn = self.pool.get().map_err(internal_error)?; let some = RevTableSql::read_rev_tables(&self.user_id, doc_id, &*conn)?; Ok(some) diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/disk.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/disk.rs new file mode 100644 index 0000000000..a72a6ede8b --- /dev/null +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/disk.rs @@ -0,0 +1,12 @@ +use crate::services::doc::revision::RevisionRecord; + +use lib_ot::revision::RevisionRange; +use std::fmt::Debug; + +pub trait RevisionDiskCache: Sync + Send { + type Error: Debug; + fn create_revisions(&self, revisions: Vec) -> Result<(), Self::Error>; + fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result, Self::Error>; + fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result, Self::Error>; + fn read_revisions(&self, doc_id: &str) -> Result, Self::Error>; +} diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/memory.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/memory.rs new file mode 100644 index 0000000000..ca590e57cd --- /dev/null +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/memory.rs @@ -0,0 +1,126 @@ +use crate::services::doc::revision::RevisionRecord; +use dashmap::DashMap; +use lib_ot::{ + errors::OTError, + revision::{RevState, Revision, RevisionRange}, +}; +use std::{collections::VecDeque, sync::Arc}; +use tokio::sync::RwLock; + +pub struct RevisionMemoryCache { + revs_map: Arc>, + local_revs: Arc>>, +} + +impl std::default::Default for RevisionMemoryCache { + fn default() -> Self { + let local_revs = Arc::new(RwLock::new(VecDeque::new())); + RevisionMemoryCache { + revs_map: Arc::new(DashMap::new()), + local_revs, + } + } +} + +impl RevisionMemoryCache { + pub fn new() -> Self { RevisionMemoryCache::default() } + + pub async fn add_revision(&self, record: RevisionRecord) -> Result<(), OTError> { + // The last revision's rev_id must be greater than the new one. + if let Some(rev_id) = self.local_revs.read().await.back() { + if *rev_id >= record.revision.rev_id { + return Err(OTError::revision_id_conflict() + .context(format!("The new revision's id must be greater than {}", rev_id))); + } + } + + match record.state { + RevState::Local => { + tracing::debug!("{}:add revision {}", record.revision.doc_id, record.revision.rev_id); + self.local_revs.write().await.push_back(record.revision.rev_id); + }, + RevState::Acked => {}, + } + + self.revs_map.insert(record.revision.rev_id, record); + Ok(()) + } + + pub fn remove_revisions(&self, ids: Vec) { self.revs_map.retain(|k, _| !ids.contains(k)); } + + pub async fn ack_revision(&self, rev_id: &i64) { + if let Some(pop_rev_id) = self.front_local_rev_id().await { + if &pop_rev_id != rev_id { + return; + } + } + + match self.local_revs.write().await.pop_front() { + None => tracing::error!("❌The local_revs should not be empty"), + Some(pop_rev_id) => { + if &pop_rev_id != rev_id { + tracing::error!("The front rev_id:{} not equal to ack rev_id: {}", pop_rev_id, rev_id); + assert_eq!(&pop_rev_id, rev_id); + } else { + tracing::debug!("pop revision {}", pop_rev_id); + } + }, + } + } + + pub async fn revisions_in_range(&self, range: &RevisionRange) -> Result, OTError> { + let revs = range + .iter() + .flat_map(|rev_id| match self.revs_map.get(&rev_id) { + None => None, + Some(record) => Some(record.revision.clone()), + }) + .collect::>(); + + if revs.len() == range.len() as usize { + Ok(revs) + } else { + Ok(vec![]) + } + } + + pub fn contains(&self, rev_id: &i64) -> bool { self.revs_map.contains_key(rev_id) } + + pub fn is_empty(&self) -> bool { self.revs_map.is_empty() } + + pub fn revisions(&self) -> (Vec, Vec) { + let mut records: Vec = vec![]; + let mut ids: Vec = vec![]; + + self.revs_map.iter().for_each(|kv| { + records.push(kv.value().clone()); + ids.push(*kv.key()); + }); + (ids, records) + } + + pub async fn query_revision(&self, rev_id: &i64) -> Option { + self.revs_map.get(&rev_id).map(|r| r.value().clone()) + } + + pub async fn front_local_revision(&self) -> Option<(i64, RevisionRecord)> { + match self.local_revs.read().await.front() { + None => None, + Some(rev_id) => match self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())) { + None => None, + Some(val) => { + tracing::debug!("{}:try send revision {}", val.1.revision.doc_id, val.1.revision.rev_id); + Some(val) + }, + }, + } + } + + pub async fn front_local_rev_id(&self) -> Option { self.local_revs.read().await.front().copied() } +} + +#[cfg(feature = "flowy_unit_test")] +impl RevisionMemoryCache { + pub fn revs_map(&self) -> Arc> { self.revs_map.clone() } + pub fn pending_revs(&self) -> Arc>> { self.local_revs.clone() } +} diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/mod.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/mod.rs new file mode 100644 index 0000000000..2886d90fe0 --- /dev/null +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/mod.rs @@ -0,0 +1,8 @@ +#![allow(clippy::module_inception)] +mod cache; +mod disk; +mod memory; +mod model; + +pub use cache::*; +pub use model::*; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/model.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/model.rs new file mode 100644 index 0000000000..f19b351fd9 --- /dev/null +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/model.rs @@ -0,0 +1,15 @@ +use lib_ot::revision::{RevState, Revision}; +use tokio::sync::broadcast; + +pub type RevIdReceiver = broadcast::Receiver; +pub type RevIdSender = broadcast::Sender; + +#[derive(Clone)] +pub struct RevisionRecord { + pub revision: Revision, + pub state: RevState, +} + +impl RevisionRecord { + pub fn ack(&mut self) { self.state = RevState::Acked; } +} diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs index af9a2c599c..434badcfd8 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -44,8 +44,13 @@ impl RevisionManager { Ok(doc.delta()?) } - pub async fn add_revision(&self, revision: &Revision) -> Result<(), DocError> { - let _ = self.cache.add_revision(revision.clone()).await?; + pub async fn add_remote_revision(&self, revision: &Revision) -> Result<(), DocError> { + let _ = self.cache.add_remote_revision(revision.clone()).await?; + Ok(()) + } + + pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), DocError> { + let _ = self.cache.add_local_revision(revision.clone()).await?; Ok(()) } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs index 98b2ccace5..9fccdd2512 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs @@ -54,8 +54,13 @@ impl RevisionDownStream { tokio::select! { result = receiver.recv() => { match result { - Some(msg) => yield msg, - None => {}, + Some(msg) => { + yield msg + }, + None => { + tracing::debug!("[RevisionDownStream:{}] loop exit", doc_id); + break; + }, } }, _ = stop_rx.recv() => { @@ -82,7 +87,7 @@ impl RevisionDownStream { .await .map_err(internal_error)?; - log::debug!("[RevisionDownStream]: receives new message: {:?}", ty); + tracing::debug!("[RevisionDownStream]: receives new message: {:?}", ty); match ty { WsDataType::PushRev => { let _ = self.editor.handle_push_rev(bytes).await?; @@ -97,7 +102,6 @@ impl RevisionDownStream { let _ = self.rev_manager.ack_revision(rev_id).await?; }, WsDataType::Conflict => {}, - WsDataType::NewDocUser => {}, } Ok(()) @@ -145,7 +149,7 @@ impl RevisionUpStream { result = rx.recv() => { match result { Some(msg) => yield msg, - None => {}, + None => break, } }, _ = stop_rx.recv() => { diff --git a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs index 28cd06895c..61077c26fd 100644 --- a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs +++ b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs @@ -1,10 +1,11 @@ use crate::{ errors::DocError, - sql_tables::{doc::RevTable, mk_revision_from_table, RevChangeset, RevTableState, RevTableType}, + services::doc::revision::RevisionRecord, + sql_tables::{doc::RevTable, mk_revision_record_from_table, RevChangeset, RevTableState, RevTableType}, }; use diesel::update; use flowy_database::{insert_or_ignore_into, prelude::*, schema::rev_table::dsl, SqliteConnection}; -use lib_ot::revision::{Revision, RevisionRange, RevisionRecord}; +use lib_ot::revision::RevisionRange; pub struct RevTableSql {} @@ -45,7 +46,7 @@ impl RevTableSql { user_id: &str, doc_id: &str, conn: &SqliteConnection, - ) -> Result, DocError> { + ) -> Result, DocError> { let filter = dsl::rev_table .filter(dsl::doc_id.eq(doc_id)) .order(dsl::rev_id.asc()) @@ -53,8 +54,8 @@ impl RevTableSql { let rev_tables = filter.load::(conn)?; let revisions = rev_tables .into_iter() - .map(|table| mk_revision_from_table(user_id, table)) - .collect::>(); + .map(|table| mk_revision_record_from_table(user_id, table)) + .collect::>(); Ok(revisions) } @@ -63,7 +64,7 @@ impl RevTableSql { doc_id: &str, revision_id: &i64, conn: &SqliteConnection, - ) -> Result, DocError> { + ) -> Result, DocError> { let filter = dsl::rev_table .filter(dsl::doc_id.eq(doc_id)) .filter(dsl::rev_id.eq(revision_id)); @@ -72,7 +73,7 @@ impl RevTableSql { if Err(diesel::NotFound) == result { Ok(None) } else { - Ok(Some(mk_revision_from_table(user_id, result?))) + Ok(Some(mk_revision_record_from_table(user_id, result?))) } } @@ -81,7 +82,7 @@ impl RevTableSql { doc_id: &str, range: RevisionRange, conn: &SqliteConnection, - ) -> Result, DocError> { + ) -> Result, DocError> { let rev_tables = dsl::rev_table .filter(dsl::rev_id.ge(range.start)) .filter(dsl::rev_id.le(range.end)) @@ -91,8 +92,8 @@ impl RevTableSql { let revisions = rev_tables .into_iter() - .map(|table| mk_revision_from_table(user_id, table)) - .collect::>(); + .map(|table| mk_revision_record_from_table(user_id, table)) + .collect::>(); Ok(revisions) } diff --git a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs index 85a592dc79..2db34d293a 100644 --- a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs +++ b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs @@ -1,7 +1,7 @@ +use crate::services::doc::revision::RevisionRecord; use diesel::sql_types::Integer; -use flowy_database::schema::rev_table; - use flowy_collaboration::util::md5; +use flowy_database::schema::rev_table; use lib_ot::revision::{RevId, RevState, RevType, Revision}; #[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)] @@ -64,9 +64,9 @@ impl std::convert::From for RevTableState { } } -pub(crate) fn mk_revision_from_table(user_id: &str, table: RevTable) -> Revision { +pub(crate) fn mk_revision_record_from_table(user_id: &str, table: RevTable) -> RevisionRecord { let md5 = md5(&table.data); - Revision { + let revision = Revision { base_rev_id: table.base_rev_id, rev_id: table.rev_id, delta_data: table.data, @@ -74,6 +74,10 @@ pub(crate) fn mk_revision_from_table(user_id: &str, table: RevTable) -> Revision doc_id: table.doc_id, ty: table.ty.into(), user_id: user_id.to_owned(), + }; + RevisionRecord { + revision, + state: table.state.into(), } } diff --git a/frontend/rust-lib/flowy-document/tests/editor/revision_test.rs b/frontend/rust-lib/flowy-document/tests/editor/revision_test.rs index aa30549459..aa2f425b4b 100644 --- a/frontend/rust-lib/flowy-document/tests/editor/revision_test.rs +++ b/frontend/rust-lib/flowy-document/tests/editor/revision_test.rs @@ -1,5 +1,5 @@ use flowy_test::editor::{EditorScript::*, *}; -use lib_ot::{revision::RevState, rich_text::RichTextDeltaBuilder}; +use lib_ot::revision::RevState; #[tokio::test] async fn doc_rev_state_test1() { @@ -40,12 +40,11 @@ async fn doc_rev_state_test2() { #[tokio::test] async fn doc_push_test() { - let delta = RichTextDeltaBuilder::new().insert("abc\n").build(); + // let delta = RichTextDeltaBuilder::new().insert("abc\n").build(); let scripts = vec![ InsertText("1", 0), InsertText("2", 1), InsertText("3", 2), - SimulatePushRevisionMessageWithDelta(delta), AssertJson(r#"[{"insert":"123\nabc\n"}]"#), ]; EditorTest::new().await.run_scripts(scripts).await; diff --git a/frontend/rust-lib/flowy-net/src/protobuf/model/network_state.rs b/frontend/rust-lib/flowy-net/src/protobuf/model/network_state.rs index 82286beead..b74f295a86 100644 --- a/frontend/rust-lib/flowy-net/src/protobuf/model/network_state.rs +++ b/frontend/rust-lib/flowy-net/src/protobuf/model/network_state.rs @@ -231,21 +231,21 @@ static file_descriptor_proto_data: &'static [u8] = b"\ \n\x13network_state.proto\",\n\x0cNetworkState\x12\x1c\n\x02ty\x18\x01\ \x20\x01(\x0e2\x0c.NetworkTypeR\x02ty*G\n\x0bNetworkType\x12\x16\n\x12Un\ knownNetworkType\x10\0\x12\x08\n\x04Wifi\x10\x01\x12\x08\n\x04Cell\x10\ - \x02\x12\x0c\n\x08Ethernet\x10\x03J\x9d\x02\n\x06\x12\x04\0\0\t\x01\n\ - \x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x01\0\x03\x01\n\n\ - \n\x03\x04\0\x01\x12\x03\x01\x08\x14\n\x0b\n\x04\x04\0\x02\0\x12\x03\x02\ - \x04\x17\n\x0c\n\x05\x04\0\x02\0\x06\x12\x03\x02\x04\x0f\n\x0c\n\x05\x04\ - \0\x02\0\x01\x12\x03\x02\x10\x12\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x02\ - \x15\x16\n\n\n\x02\x05\0\x12\x04\x04\0\t\x01\n\n\n\x03\x05\0\x01\x12\x03\ - \x04\x05\x10\n\x0b\n\x04\x05\0\x02\0\x12\x03\x05\x04\x1b\n\x0c\n\x05\x05\ - \0\x02\0\x01\x12\x03\x05\x04\x16\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x05\ - \x19\x1a\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x06\x04\r\n\x0c\n\x05\x05\0\ - \x02\x01\x01\x12\x03\x06\x04\x08\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\ - \x06\x0b\x0c\n\x0b\n\x04\x05\0\x02\x02\x12\x03\x07\x04\r\n\x0c\n\x05\x05\ - \0\x02\x02\x01\x12\x03\x07\x04\x08\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\ - \x07\x0b\x0c\n\x0b\n\x04\x05\0\x02\x03\x12\x03\x08\x04\x11\n\x0c\n\x05\ - \x05\0\x02\x03\x01\x12\x03\x08\x04\x0c\n\x0c\n\x05\x05\0\x02\x03\x02\x12\ - \x03\x08\x0f\x10b\x06proto3\ + \x02\x12\x0c\n\x08Ethernet\x10\x03J\x9d\x02\n\x06\x12\x04\0\0\n\x01\n\ + \x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x04\x01\n\n\ + \n\x03\x04\0\x01\x12\x03\x02\x08\x14\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\ + \x04\x17\n\x0c\n\x05\x04\0\x02\0\x06\x12\x03\x03\x04\x0f\n\x0c\n\x05\x04\ + \0\x02\0\x01\x12\x03\x03\x10\x12\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\ + \x15\x16\n\n\n\x02\x05\0\x12\x04\x05\0\n\x01\n\n\n\x03\x05\0\x01\x12\x03\ + \x05\x05\x10\n\x0b\n\x04\x05\0\x02\0\x12\x03\x06\x04\x1b\n\x0c\n\x05\x05\ + \0\x02\0\x01\x12\x03\x06\x04\x16\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x06\ + \x19\x1a\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x07\x04\r\n\x0c\n\x05\x05\0\ + \x02\x01\x01\x12\x03\x07\x04\x08\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\ + \x07\x0b\x0c\n\x0b\n\x04\x05\0\x02\x02\x12\x03\x08\x04\r\n\x0c\n\x05\x05\ + \0\x02\x02\x01\x12\x03\x08\x04\x08\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\ + \x08\x0b\x0c\n\x0b\n\x04\x05\0\x02\x03\x12\x03\t\x04\x11\n\x0c\n\x05\x05\ + \0\x02\x03\x01\x12\x03\t\x04\x0c\n\x0c\n\x05\x05\0\x02\x03\x02\x12\x03\t\ + \x0f\x10b\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/frontend/rust-lib/flowy-net/src/protobuf/proto/network_state.proto b/frontend/rust-lib/flowy-net/src/protobuf/proto/network_state.proto index 390def7011..06a4aa5f3c 100644 --- a/frontend/rust-lib/flowy-net/src/protobuf/proto/network_state.proto +++ b/frontend/rust-lib/flowy-net/src/protobuf/proto/network_state.proto @@ -1,4 +1,5 @@ syntax = "proto3"; + message NetworkState { NetworkType ty = 1; } diff --git a/frontend/rust-lib/flowy-test/src/editor.rs b/frontend/rust-lib/flowy-test/src/editor.rs index 615272d178..c326cb5ce9 100644 --- a/frontend/rust-lib/flowy-test/src/editor.rs +++ b/frontend/rust-lib/flowy-test/src/editor.rs @@ -76,7 +76,7 @@ impl EditorTest { self.editor.redo().await.unwrap(); }, EditorScript::AssertRevisionState(rev_id, state) => { - let record = cache.query_revision(rev_id).await.unwrap(); + let record = cache.query_revision(&doc_id, rev_id).await.unwrap(); assert_eq!(record.state, state); }, EditorScript::AssertCurrentRevId(rev_id) => { diff --git a/frontend/rust-lib/flowy-user/src/services/server/ws_mock.rs b/frontend/rust-lib/flowy-user/src/services/server/ws_mock.rs index 4828781d29..dffb8bb710 100644 --- a/frontend/rust-lib/flowy-user/src/services/server/ws_mock.rs +++ b/frontend/rust-lib/flowy-user/src/services/server/ws_mock.rs @@ -7,7 +7,7 @@ use dashmap::DashMap; use flowy_collaboration::{ core::sync::{ServerDocManager, ServerDocPersistence}, entities::{ - doc::{Doc, NewDocUser}, + doc::Doc, ws::{WsDataType, WsDocumentData}, }, errors::CollaborateError, @@ -16,9 +16,13 @@ use lazy_static::lazy_static; use lib_infra::future::{FutureResult, FutureResultSend}; use lib_ot::{revision::Revision, rich_text::RichTextDelta}; use lib_ws::{WsConnectState, WsMessage, WsMessageHandler, WsModule}; -use parking_lot::RwLock; -use std::{convert::TryFrom, sync::Arc}; -use tokio::sync::{broadcast, broadcast::Receiver}; + +use flowy_collaboration::core::sync::{RevisionUser, SyncResponse}; +use std::{ + convert::{TryFrom, TryInto}, + sync::Arc, +}; +use tokio::sync::{broadcast, broadcast::Receiver, mpsc}; pub struct MockWebSocket { handlers: DashMap>, @@ -49,12 +53,11 @@ impl FlowyWebSocket for Arc { tokio::spawn(async move { while let Ok(message) = ws_receiver.recv().await { let ws_data = WsDocumentData::try_from(Bytes::from(message.data.clone())).unwrap(); - match DOC_SERVER.handle_ws_data(ws_data).await { - None => {}, - Some(new_ws_message) => match cloned_ws.handlers.get(&new_ws_message.module) { - None => log::error!("Can't find any handler for message: {:?}", new_ws_message), - Some(handler) => handler.receive_message(new_ws_message.clone()), - }, + let mut rx = DOC_SERVER.handle_ws_data(ws_data).await; + let new_ws_message = rx.recv().await.unwrap(); + match cloned_ws.handlers.get(&new_ws_message.module) { + None => log::error!("Can't find any handler for message: {:?}", new_ws_message), + Some(handler) => handler.receive_message(new_ws_message.clone()), } } }); @@ -88,38 +91,131 @@ struct MockDocServer { impl std::default::Default for MockDocServer { fn default() -> Self { - let manager = Arc::new(ServerDocManager::new(Arc::new(MockDocServerPersistence {}))); + let persistence = Arc::new(MockDocServerPersistence::default()); + let manager = Arc::new(ServerDocManager::new(persistence)); MockDocServer { manager } } } impl MockDocServer { - async fn handle_ws_data(&self, ws_data: WsDocumentData) -> Option { + async fn handle_ws_data(&self, ws_data: WsDocumentData) -> mpsc::Receiver { let bytes = Bytes::from(ws_data.data); match ws_data.ty { - WsDataType::Acked => {}, + WsDataType::Acked => { + unimplemented!() + }, WsDataType::PushRev => { let revision = Revision::try_from(bytes).unwrap(); - log::info!("{:?}", revision); + let handler = match self.manager.get(&revision.doc_id).await { + None => self.manager.create_doc(revision.clone()).await.unwrap(), + Some(handler) => handler, + }; + + let (tx, rx) = mpsc::channel(1); + let user = MockDocUser { + user_id: revision.user_id.clone(), + tx, + }; + handler.apply_revision(Arc::new(user), revision).await.unwrap(); + rx }, - WsDataType::PullRev => {}, - WsDataType::Conflict => {}, - WsDataType::NewDocUser => { - let new_doc_user = NewDocUser::try_from(bytes).unwrap(); - log::info!("{:?}", new_doc_user); - // NewDocUser + WsDataType::PullRev => { + unimplemented!() + }, + WsDataType::Conflict => { + unimplemented!() }, } - None } } -struct MockDocServerPersistence {} +struct MockDocServerPersistence { + inner: Arc>, +} + +impl std::default::Default for MockDocServerPersistence { + fn default() -> Self { + MockDocServerPersistence { + inner: Arc::new(DashMap::new()), + } + } +} impl ServerDocPersistence for MockDocServerPersistence { - fn update_doc(&self, doc_id: &str, rev_id: i64, delta: RichTextDelta) -> FutureResultSend<(), CollaborateError> { + fn update_doc(&self, _doc_id: &str, _rev_id: i64, _delta: RichTextDelta) -> FutureResultSend<(), CollaborateError> { unimplemented!() } - fn read_doc(&self, doc_id: &str) -> FutureResultSend { unimplemented!() } + fn read_doc(&self, doc_id: &str) -> FutureResultSend { + let inner = self.inner.clone(); + let doc_id = doc_id.to_owned(); + FutureResultSend::new(async move { + match inner.get(&doc_id) { + None => { + // + Err(CollaborateError::record_not_found()) + }, + Some(val) => { + // + Ok(val.value().clone()) + }, + } + }) + } + + fn create_doc(&self, revision: Revision) -> FutureResultSend { + FutureResultSend::new(async move { + let doc: Doc = revision.try_into().unwrap(); + Ok(doc) + }) + } +} + +#[derive(Debug)] +struct MockDocUser { + user_id: String, + tx: mpsc::Sender, +} + +impl RevisionUser for MockDocUser { + fn user_id(&self) -> String { self.user_id.clone() } + + fn recv(&self, resp: SyncResponse) { + let sender = self.tx.clone(); + tokio::spawn(async move { + match resp { + SyncResponse::Pull(data) => { + let bytes: Bytes = data.try_into().unwrap(); + let msg = WsMessage { + module: WsModule::Doc, + data: bytes.to_vec(), + }; + sender.send(msg).await.unwrap(); + }, + SyncResponse::Push(data) => { + let bytes: Bytes = data.try_into().unwrap(); + let msg = WsMessage { + module: WsModule::Doc, + data: bytes.to_vec(), + }; + sender.send(msg).await.unwrap(); + }, + SyncResponse::Ack(data) => { + let bytes: Bytes = data.try_into().unwrap(); + let msg = WsMessage { + module: WsModule::Doc, + data: bytes.to_vec(), + }; + sender.send(msg).await.unwrap(); + }, + SyncResponse::NewRevision { + rev_id: _, + doc_id: _, + doc_json: _, + } => { + // unimplemented!() + }, + } + }); + } } diff --git a/shared-lib/flowy-collaboration/src/core/sync/server_editor.rs b/shared-lib/flowy-collaboration/src/core/sync/server_editor.rs index cd346913c1..a15f3201cf 100644 --- a/shared-lib/flowy-collaboration/src/core/sync/server_editor.rs +++ b/shared-lib/flowy-collaboration/src/core/sync/server_editor.rs @@ -23,6 +23,7 @@ use tokio::{ pub trait ServerDocPersistence: Send + Sync { fn update_doc(&self, doc_id: &str, rev_id: i64, delta: RichTextDelta) -> FutureResultSend<(), CollaborateError>; fn read_doc(&self, doc_id: &str) -> FutureResultSend; + fn create_doc(&self, revision: Revision) -> FutureResultSend; } #[rustfmt::skip] @@ -59,17 +60,38 @@ impl ServerDocManager { } } - pub async fn get(&self, doc_id: &str) -> Result>, CollaborateError> { + pub async fn get(&self, doc_id: &str) -> Option> { match self.open_doc_map.get(doc_id).map(|ctx| ctx.clone()) { - Some(edit_doc) => Ok(Some(edit_doc)), + Some(edit_doc) => Some(edit_doc), None => { - let doc = self.persistence.read_doc(doc_id).await?; - let handler = self.cache(doc).await.map_err(internal_error)?; - Ok(Some(handler)) + let f = || async { + let doc = self.persistence.read_doc(doc_id).await?; + let handler = self.cache(doc).await.map_err(internal_error)?; + Ok::, CollaborateError>(handler) + }; + match f().await { + Ok(handler) => Some(handler), + Err(e) => { + log::error!("{}", e); + None + }, + } }, } } + pub async fn create_doc(&self, revision: Revision) -> Result, CollaborateError> { + if !revision.is_initial() { + return Err( + CollaborateError::revision_conflict().context("Revision's rev_id should be 0 when creating the doc") + ); + } + + let doc = self.persistence.create_doc(revision).await?; + let handler = self.cache(doc).await?; + Ok(handler) + } + async fn cache(&self, doc: Doc) -> Result, CollaborateError> { let doc_id = doc.id.clone(); let handle = spawn_blocking(|| OpenDocHandle::new(doc)) @@ -93,13 +115,6 @@ impl OpenDocHandle { Ok(Self { sender }) } - pub async fn add_user(&self, user: Arc, rev_id: i64) -> Result<(), CollaborateError> { - let (ret, rx) = oneshot::channel(); - let msg = DocCommand::NewConnectedUser { user, rev_id, ret }; - let _ = self.send(msg, rx).await?; - Ok(()) - } - pub async fn apply_revision( &self, user: Arc, @@ -132,11 +147,6 @@ impl OpenDocHandle { #[derive(Debug)] enum DocCommand { - NewConnectedUser { - user: Arc, - rev_id: i64, - ret: oneshot::Sender>, - }, ReceiveRevision { user: Arc, revision: Revision, @@ -183,10 +193,6 @@ impl DocCommandQueue { async fn handle_message(&self, msg: DocCommand) { match msg { - DocCommand::NewConnectedUser { user, rev_id, ret } => { - log::debug!("Receive new doc user: {:?}, rev_id: {}", user, rev_id); - let _ = ret.send(self.edit_doc.new_doc_user(user, rev_id).await.map_err(internal_error)); - }, DocCommand::ReceiveRevision { user, revision, ret } => { // let revision = (&mut revision).try_into().map_err(internal_error).unwrap(); let _ = ret.send( @@ -247,20 +253,6 @@ impl ServerDocEditor { }) } - #[tracing::instrument( - level = "debug", - skip(self, user), - fields( - user_id = %user.user_id(), - rev_id = %rev_id, - ) - )] - pub async fn new_doc_user(&self, user: Arc, rev_id: i64) -> Result<(), OTError> { - self.users.insert(user.user_id(), user.clone()); - self.synchronizer.new_conn(user, rev_id); - Ok(()) - } - #[tracing::instrument( level = "debug", skip(self, user, revision), diff --git a/shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs b/shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs index e193364250..2d62071407 100644 --- a/shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs +++ b/shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs @@ -55,29 +55,12 @@ impl RevisionSynchronizer { } } - pub fn new_conn(&self, user: Arc, rev_id: i64) { - let cur_rev_id = self.rev_id.load(SeqCst); - match cur_rev_id.cmp(&rev_id) { - Ordering::Less => { - let msg = mk_pull_message(&self.doc_id, next(cur_rev_id), rev_id); - user.recv(SyncResponse::Pull(msg)); - }, - Ordering::Equal => {}, - Ordering::Greater => { - let doc_delta = self.document.read().delta().clone(); - let revision = self.mk_revision(rev_id, doc_delta); - let data = mk_push_message(&self.doc_id, revision); - user.recv(SyncResponse::Push(data)); - }, - } - } - pub fn apply_revision(&self, user: Arc, revision: Revision) -> Result<(), OTError> { - let cur_rev_id = self.rev_id.load(SeqCst); - match cur_rev_id.cmp(&revision.rev_id) { + let server_base_rev_id = self.rev_id.load(SeqCst); + match server_base_rev_id.cmp(&revision.rev_id) { Ordering::Less => { - let next_rev_id = next(cur_rev_id); - if cur_rev_id == revision.base_rev_id || next_rev_id == revision.base_rev_id { + let server_rev_id = next(server_base_rev_id); + if server_base_rev_id == revision.base_rev_id || server_rev_id == revision.rev_id { // The rev is in the right order, just compose it. let _ = self.compose_revision(&revision)?; user.recv(SyncResponse::Ack(mk_acked_message(&revision))); @@ -91,13 +74,14 @@ impl RevisionSynchronizer { }); } else { // The server document is outdated, pull the missing revision from the client. - let msg = mk_pull_message(&self.doc_id, next_rev_id, revision.rev_id); + let msg = mk_pull_message(&self.doc_id, server_rev_id, revision.rev_id); user.recv(SyncResponse::Pull(msg)); } }, Ordering::Equal => { // Do nothing log::warn!("Applied revision rev_id is the same as cur_rev_id"); + user.recv(SyncResponse::Ack(mk_acked_message(&revision))); }, Ordering::Greater => { // The client document is outdated. Transform the client revision delta and then diff --git a/shared-lib/flowy-collaboration/src/entities/doc/doc.rs b/shared-lib/flowy-collaboration/src/entities/doc/doc.rs index 151bef5b57..7e740f101b 100644 --- a/shared-lib/flowy-collaboration/src/entities/doc/doc.rs +++ b/shared-lib/flowy-collaboration/src/entities/doc/doc.rs @@ -1,5 +1,6 @@ +use crate::errors::CollaborateError; use flowy_derive::ProtoBuf; -use lib_ot::{errors::OTError, rich_text::RichTextDelta}; +use lib_ot::{errors::OTError, revision::Revision, rich_text::RichTextDelta}; #[derive(ProtoBuf, Default, Debug, Clone)] pub struct CreateDocParams { @@ -41,6 +42,28 @@ impl Doc { } } +impl std::convert::TryFrom for Doc { + type Error = CollaborateError; + + fn try_from(revision: Revision) -> Result { + if !revision.is_initial() { + return Err( + CollaborateError::revision_conflict().context("Revision's rev_id should be 0 when creating the doc") + ); + } + + let delta = RichTextDelta::from_bytes(&revision.delta_data)?; + let doc_json = delta.to_json(); + + Ok(Doc { + id: revision.doc_id, + data: doc_json, + rev_id: revision.rev_id, + base_rev_id: revision.base_rev_id, + }) + } +} + #[derive(ProtoBuf, Default, Debug, Clone)] pub struct UpdateDocParams { #[pb(index = 1)] diff --git a/shared-lib/flowy-collaboration/src/entities/ws/ws.rs b/shared-lib/flowy-collaboration/src/entities/ws/ws.rs index f31ffd2bc2..3e41268aef 100644 --- a/shared-lib/flowy-collaboration/src/entities/ws/ws.rs +++ b/shared-lib/flowy-collaboration/src/entities/ws/ws.rs @@ -1,4 +1,4 @@ -use crate::{entities::doc::NewDocUser, errors::CollaborateError}; +use crate::errors::CollaborateError; use bytes::Bytes; use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; use lib_ot::revision::{RevId, Revision, RevisionRange}; @@ -7,13 +7,12 @@ use std::convert::{TryFrom, TryInto}; #[derive(Debug, Clone, ProtoBuf_Enum, Eq, PartialEq, Hash)] pub enum WsDataType { // The frontend receives the Acked means the backend has accepted the revision - Acked = 0, + Acked = 0, // The frontend receives the PushRev event means the backend is pushing the new revision to frontend - PushRev = 1, + PushRev = 1, // The fronted receives the PullRev event means the backend try to pull the revision from frontend - PullRev = 2, - Conflict = 3, - NewDocUser = 4, + PullRev = 2, + Conflict = 3, } impl WsDataType { @@ -53,18 +52,6 @@ impl std::convert::From for WsDocumentData { } } -impl std::convert::From for WsDocumentData { - fn from(user: NewDocUser) -> Self { - let doc_id = user.doc_id.clone(); - let bytes: Bytes = user.try_into().unwrap(); - Self { - doc_id, - ty: WsDataType::NewDocUser, - data: bytes.to_vec(), - } - } -} - pub struct WsDocumentDataBuilder(); impl WsDocumentDataBuilder { // WsDataType::PushRev -> Revision diff --git a/shared-lib/flowy-collaboration/src/errors.rs b/shared-lib/flowy-collaboration/src/errors.rs index d438444db4..767367ae23 100644 --- a/shared-lib/flowy-collaboration/src/errors.rs +++ b/shared-lib/flowy-collaboration/src/errors.rs @@ -38,6 +38,8 @@ impl CollaborateError { static_doc_error!(undo, ErrorCode::UndoFail); static_doc_error!(redo, ErrorCode::RedoFail); static_doc_error!(out_of_bound, ErrorCode::OutOfBound); + static_doc_error!(record_not_found, ErrorCode::RecordNotFound); + static_doc_error!(revision_conflict, ErrorCode::RevisionConflict); } impl fmt::Display for CollaborateError { @@ -46,12 +48,14 @@ impl fmt::Display for CollaborateError { #[derive(Debug, Clone, Display, PartialEq, Eq)] pub enum ErrorCode { - DocIdInvalid = 0, - DocNotfound = 1, - UndoFail = 200, - RedoFail = 201, - OutOfBound = 202, - InternalError = 1000, + DocIdInvalid = 0, + DocNotfound = 1, + UndoFail = 200, + RedoFail = 201, + OutOfBound = 202, + RevisionConflict = 203, + RecordNotFound = 300, + InternalError = 1000, } impl std::convert::From for CollaborateError { diff --git a/shared-lib/flowy-collaboration/src/protobuf/model/ws.rs b/shared-lib/flowy-collaboration/src/protobuf/model/ws.rs index 8e9387f84e..dbdcbe6569 100644 --- a/shared-lib/flowy-collaboration/src/protobuf/model/ws.rs +++ b/shared-lib/flowy-collaboration/src/protobuf/model/ws.rs @@ -261,7 +261,6 @@ pub enum WsDataType { PushRev = 1, PullRev = 2, Conflict = 3, - NewDocUser = 4, } impl ::protobuf::ProtobufEnum for WsDataType { @@ -275,7 +274,6 @@ impl ::protobuf::ProtobufEnum for WsDataType { 1 => ::std::option::Option::Some(WsDataType::PushRev), 2 => ::std::option::Option::Some(WsDataType::PullRev), 3 => ::std::option::Option::Some(WsDataType::Conflict), - 4 => ::std::option::Option::Some(WsDataType::NewDocUser), _ => ::std::option::Option::None } } @@ -286,7 +284,6 @@ impl ::protobuf::ProtobufEnum for WsDataType { WsDataType::PushRev, WsDataType::PullRev, WsDataType::Conflict, - WsDataType::NewDocUser, ]; values } @@ -317,31 +314,28 @@ impl ::protobuf::reflect::ProtobufValue for WsDataType { static file_descriptor_proto_data: &'static [u8] = b"\ \n\x08ws.proto\"X\n\x0eWsDocumentData\x12\x15\n\x06doc_id\x18\x01\x20\ \x01(\tR\x05docId\x12\x1b\n\x02ty\x18\x02\x20\x01(\x0e2\x0b.WsDataTypeR\ - \x02ty\x12\x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data*O\n\nWsDataType\ + \x02ty\x12\x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data*?\n\nWsDataType\ \x12\t\n\x05Acked\x10\0\x12\x0b\n\x07PushRev\x10\x01\x12\x0b\n\x07PullRe\ - v\x10\x02\x12\x0c\n\x08Conflict\x10\x03\x12\x0e\n\nNewDocUser\x10\x04J\ - \xb4\x03\n\x06\x12\x04\0\0\r\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\ - \x02\x04\0\x12\x04\x02\0\x06\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x16\ - \n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x16\n\x0c\n\x05\x04\0\x02\0\x05\ - \x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\x0b\x11\n\x0c\ - \n\x05\x04\0\x02\0\x03\x12\x03\x03\x14\x15\n\x0b\n\x04\x04\0\x02\x01\x12\ - \x03\x04\x04\x16\n\x0c\n\x05\x04\0\x02\x01\x06\x12\x03\x04\x04\x0e\n\x0c\ - \n\x05\x04\0\x02\x01\x01\x12\x03\x04\x0f\x11\n\x0c\n\x05\x04\0\x02\x01\ - \x03\x12\x03\x04\x14\x15\n\x0b\n\x04\x04\0\x02\x02\x12\x03\x05\x04\x13\n\ - \x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\ - \x01\x12\x03\x05\n\x0e\n\x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05\x11\x12\ - \n\n\n\x02\x05\0\x12\x04\x07\0\r\x01\n\n\n\x03\x05\0\x01\x12\x03\x07\x05\ - \x0f\n\x0b\n\x04\x05\0\x02\0\x12\x03\x08\x04\x0e\n\x0c\n\x05\x05\0\x02\0\ - \x01\x12\x03\x08\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x08\x0c\r\n\ - \x0b\n\x04\x05\0\x02\x01\x12\x03\t\x04\x10\n\x0c\n\x05\x05\0\x02\x01\x01\ - \x12\x03\t\x04\x0b\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\t\x0e\x0f\n\x0b\ - \n\x04\x05\0\x02\x02\x12\x03\n\x04\x10\n\x0c\n\x05\x05\0\x02\x02\x01\x12\ - \x03\n\x04\x0b\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\n\x0e\x0f\n\x0b\n\ - \x04\x05\0\x02\x03\x12\x03\x0b\x04\x11\n\x0c\n\x05\x05\0\x02\x03\x01\x12\ - \x03\x0b\x04\x0c\n\x0c\n\x05\x05\0\x02\x03\x02\x12\x03\x0b\x0f\x10\n\x0b\ - \n\x04\x05\0\x02\x04\x12\x03\x0c\x04\x13\n\x0c\n\x05\x05\0\x02\x04\x01\ - \x12\x03\x0c\x04\x0e\n\x0c\n\x05\x05\0\x02\x04\x02\x12\x03\x0c\x11\x12b\ - \x06proto3\ + v\x10\x02\x12\x0c\n\x08Conflict\x10\x03J\x8b\x03\n\x06\x12\x04\0\0\x0c\ + \x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x06\ + \x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x16\n\x0b\n\x04\x04\0\x02\0\x12\ + \x03\x03\x04\x16\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\ + \x05\x04\0\x02\0\x01\x12\x03\x03\x0b\x11\n\x0c\n\x05\x04\0\x02\0\x03\x12\ + \x03\x03\x14\x15\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x16\n\x0c\n\ + \x05\x04\0\x02\x01\x06\x12\x03\x04\x04\x0e\n\x0c\n\x05\x04\0\x02\x01\x01\ + \x12\x03\x04\x0f\x11\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x14\x15\n\ + \x0b\n\x04\x04\0\x02\x02\x12\x03\x05\x04\x13\n\x0c\n\x05\x04\0\x02\x02\ + \x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x0e\n\ + \x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05\x11\x12\n\n\n\x02\x05\0\x12\x04\ + \x07\0\x0c\x01\n\n\n\x03\x05\0\x01\x12\x03\x07\x05\x0f\n\x0b\n\x04\x05\0\ + \x02\0\x12\x03\x08\x04\x0e\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x08\x04\t\ + \n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x08\x0c\r\n\x0b\n\x04\x05\0\x02\x01\ + \x12\x03\t\x04\x10\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\t\x04\x0b\n\x0c\ + \n\x05\x05\0\x02\x01\x02\x12\x03\t\x0e\x0f\n\x0b\n\x04\x05\0\x02\x02\x12\ + \x03\n\x04\x10\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\n\x04\x0b\n\x0c\n\ + \x05\x05\0\x02\x02\x02\x12\x03\n\x0e\x0f\n\x0b\n\x04\x05\0\x02\x03\x12\ + \x03\x0b\x04\x11\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\x0b\x04\x0c\n\x0c\ + \n\x05\x05\0\x02\x03\x02\x12\x03\x0b\x0f\x10b\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/shared-lib/flowy-collaboration/src/protobuf/proto/ws.proto b/shared-lib/flowy-collaboration/src/protobuf/proto/ws.proto index 4c7865fd1b..423a3e3869 100644 --- a/shared-lib/flowy-collaboration/src/protobuf/proto/ws.proto +++ b/shared-lib/flowy-collaboration/src/protobuf/proto/ws.proto @@ -10,5 +10,4 @@ enum WsDataType { PushRev = 1; PullRev = 2; Conflict = 3; - NewDocUser = 4; } diff --git a/shared-lib/lib-ot/src/revision/cache.rs b/shared-lib/lib-ot/src/revision/cache.rs deleted file mode 100644 index 4e2dd40673..0000000000 --- a/shared-lib/lib-ot/src/revision/cache.rs +++ /dev/null @@ -1,140 +0,0 @@ -use crate::{ - errors::OTError, - revision::{Revision, RevisionRange}, -}; -use dashmap::DashMap; -use std::{collections::VecDeque, fmt::Debug, sync::Arc}; -use tokio::sync::{broadcast, RwLock}; - -pub trait RevisionDiskCache: Sync + Send { - type Error: Debug; - fn create_revisions(&self, revisions: Vec) -> Result<(), Self::Error>; - fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result, Self::Error>; - fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result, Self::Error>; - fn read_revisions(&self, doc_id: &str) -> Result, Self::Error>; -} - -pub struct RevisionMemoryCache { - revs_map: Arc>, - pending_revs: Arc>>, -} - -impl std::default::Default for RevisionMemoryCache { - fn default() -> Self { - let pending_revs = Arc::new(RwLock::new(VecDeque::new())); - RevisionMemoryCache { - revs_map: Arc::new(DashMap::new()), - pending_revs, - } - } -} - -impl RevisionMemoryCache { - pub fn new() -> Self { RevisionMemoryCache::default() } - - pub async fn add_revision(&self, revision: Revision) -> Result<(), OTError> { - // The last revision's rev_id must be greater than the new one. - if let Some(rev_id) = self.pending_revs.read().await.back() { - if *rev_id >= revision.rev_id { - return Err(OTError::revision_id_conflict() - .context(format!("The new revision's id must be greater than {}", rev_id))); - } - } - - self.pending_revs.write().await.push_back(revision.rev_id); - self.revs_map.insert(revision.rev_id, RevisionRecord::new(revision)); - Ok(()) - } - - pub fn remove_revisions(&self, ids: Vec) { self.revs_map.retain(|k, _| !ids.contains(k)); } - - pub async fn ack_revision(&self, rev_id: &i64) { - if let Some(mut m_revision) = self.revs_map.get_mut(rev_id) { - m_revision.value_mut().ack(); - match self.pending_revs.write().await.pop_front() { - None => log::error!("The pending_revs should not be empty"), - Some(cache_rev_id) => { - assert_eq!(&cache_rev_id, rev_id); - }, - } - } else { - log::error!("Can't find revision with id {}", rev_id); - } - } - - pub async fn revisions_in_range(&self, range: &RevisionRange) -> Result, OTError> { - let revs = range - .iter() - .flat_map(|rev_id| match self.revs_map.get(&rev_id) { - None => None, - Some(rev) => Some(rev.revision.clone()), - }) - .collect::>(); - - if revs.len() == range.len() as usize { - Ok(revs) - } else { - Ok(vec![]) - } - } - - pub fn contains(&self, rev_id: &i64) -> bool { self.revs_map.contains_key(rev_id) } - - pub fn is_empty(&self) -> bool { self.revs_map.is_empty() } - - pub fn revisions(&self) -> (Vec, Vec) { - let mut records: Vec = vec![]; - let mut ids: Vec = vec![]; - - self.revs_map.iter().for_each(|kv| { - records.push(kv.value().clone()); - ids.push(*kv.key()); - }); - (ids, records) - } - - pub async fn query_revision(&self, rev_id: &i64) -> Option { - self.revs_map.get(&rev_id).map(|r| r.value().clone()) - } - - pub async fn front_revision(&self) -> Option<(i64, RevisionRecord)> { - match self.pending_revs.read().await.front() { - None => None, - Some(rev_id) => self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())), - } - } - - pub async fn front_rev_id(&self) -> Option { self.pending_revs.read().await.front().copied() } -} - -pub type RevIdReceiver = broadcast::Receiver; -pub type RevIdSender = broadcast::Sender; - -#[derive(Debug, Clone, Eq, PartialEq)] -pub enum RevState { - Local = 0, - Acked = 1, -} - -#[derive(Clone)] -pub struct RevisionRecord { - pub revision: Revision, - pub state: RevState, -} - -impl RevisionRecord { - pub fn new(revision: Revision) -> Self { - Self { - revision, - state: RevState::Local, - } - } - - pub fn ack(&mut self) { self.state = RevState::Acked; } -} - -#[cfg(feature = "flowy_unit_test")] -impl RevisionMemoryCache { - pub fn revs_map(&self) -> Arc> { self.revs_map.clone() } - pub fn pending_revs(&self) -> Arc>> { self.pending_revs.clone() } -} diff --git a/shared-lib/lib-ot/src/revision/mod.rs b/shared-lib/lib-ot/src/revision/mod.rs index 9848b59af1..4a7ebf60c1 100644 --- a/shared-lib/lib-ot/src/revision/mod.rs +++ b/shared-lib/lib-ot/src/revision/mod.rs @@ -1,5 +1,3 @@ -mod cache; mod model; -pub use cache::*; pub use model::*; diff --git a/shared-lib/lib-ot/src/revision/model.rs b/shared-lib/lib-ot/src/revision/model.rs index 63a56fc314..71b0a4e7d5 100644 --- a/shared-lib/lib-ot/src/revision/model.rs +++ b/shared-lib/lib-ot/src/revision/model.rs @@ -31,6 +31,8 @@ impl Revision { pub fn pair_rev_id(&self) -> (i64, i64) { (self.base_rev_id, self.rev_id) } + pub fn is_initial(&self) -> bool { self.rev_id == 0 } + // pub fn from_pb(pb: &mut crate::protobuf::Revision) -> Self { // pb.try_into().unwrap() } @@ -155,3 +157,9 @@ pub fn md5>(data: T) -> String { let md5 = format!("{:x}", md5::compute(data)); md5 } + +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum RevState { + Local = 0, + Acked = 1, +}