diff --git a/backend/src/application.rs b/backend/src/application.rs index 00c5b52deb..81518fd9dc 100644 --- a/backend/src/application.rs +++ b/backend/src/application.rs @@ -57,6 +57,7 @@ pub fn run(listener: TcpListener, app_ctx: AppContext) -> Result>, pub persistence: Data>, pub ws_receivers: Data, + pub document_manager: Data>, } impl AppContext { @@ -25,12 +30,16 @@ impl AppContext { let kv_store = make_document_kv_store(pg_pool.clone()); let persistence = Arc::new(FlowyPersistence { pg_pool, kv_store }); - let document_ws_receiver = make_document_ws_receiver(persistence.clone()); + let document_persistence = Arc::new(DocumentPersistenceImpl(persistence.clone())); + let document_manager = Arc::new(ServerDocumentManager::new(document_persistence)); + + let document_ws_receiver = make_document_ws_receiver(persistence.clone(), document_manager.clone()); ws_receivers.set(WSModule::Doc, document_ws_receiver); AppContext { ws_server, persistence: Data::new(persistence), ws_receivers: Data::new(ws_receivers), + document_manager: Data::new(document_manager), } } } diff --git a/backend/src/services/document/persistence.rs b/backend/src/services/document/persistence.rs index a48779f63b..006df85b60 100644 --- a/backend/src/services/document/persistence.rs +++ b/backend/src/services/document/persistence.rs @@ -15,7 +15,9 @@ use flowy_collaboration::protobuf::{ }; use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta}; use protobuf::Message; +use std::convert::TryInto; +use flowy_collaboration::sync::ServerDocumentManager; use std::sync::Arc; use uuid::Uuid; @@ -39,23 +41,23 @@ pub async fn read_document( make_doc_from_revisions(¶ms.doc_id, revisions) } -#[tracing::instrument(level = "debug", skip(kv_store, params), fields(delta), err)] +#[tracing::instrument(level = "debug", skip(document_manager, params), fields(delta), err)] pub async fn reset_document( - kv_store: &Arc, + document_manager: &Arc, mut params: ResetDocumentParams, ) -> Result<(), ServerError> { - let revisions = params.take_revisions().take_items(); - let doc_id = params.take_doc_id(); - kv_store - .transaction(|mut transaction| { - Box::pin(async move { - let _ = transaction.batch_delete_key_start_with(&doc_id).await?; - let items = revisions_to_key_value_items(revisions.into())?; - let _ = transaction.batch_set(items).await?; - Ok(()) - }) - }) + let params: flowy_collaboration::entities::doc::ResetDocumentParams = (&mut params).try_into().unwrap(); + let mut revisions = params.revisions.into_inner(); + if revisions.is_empty() { + return Err(ServerError::payload_none().context("Revisions should not be empty when reset the document")); + } + let doc_id = params.doc_id.clone(); + revisions.sort_by(|a, b| a.rev_id.cmp(&b.rev_id)); + let _ = document_manager + .handle_document_reset(&doc_id, revisions) .await + .map_err(internal_error)?; + Ok(()) } #[tracing::instrument(level = "debug", skip(kv_store), err)] @@ -152,7 +154,7 @@ impl DocumentKVPersistence { } #[inline] -fn revisions_to_key_value_items(revisions: Vec) -> Result, ServerError> { +pub fn revisions_to_key_value_items(revisions: Vec) -> Result, ServerError> { let mut items = vec![]; for revision in revisions { let key = make_revision_key(&revision.doc_id, revision.rev_id); @@ -193,7 +195,7 @@ fn make_doc_from_revisions(doc_id: &str, mut revisions: RepeatedRevision) -> Res let mut document_delta = RichTextDelta::new(); let mut base_rev_id = 0; let mut rev_id = 0; - // TODO: generate delta from revision should be wrapped into function. + // TODO: replace with make_delta_from_revisions for revision in revisions { base_rev_id = revision.base_rev_id; rev_id = revision.rev_id; diff --git a/backend/src/services/document/router.rs b/backend/src/services/document/router.rs index 69e74df086..dfd2cda1ca 100644 --- a/backend/src/services/document/router.rs +++ b/backend/src/services/document/router.rs @@ -10,6 +10,7 @@ use actix_web::{ use backend_service::{errors::ServerError, response::FlowyResponse}; use flowy_collaboration::protobuf::{CreateDocParams, DocumentId, ResetDocumentParams}; +use flowy_collaboration::sync::ServerDocumentManager; use std::sync::Arc; pub async fn create_document_handler( @@ -36,10 +37,9 @@ pub async fn read_document_handler( pub async fn reset_document_handler( payload: Payload, - persistence: Data>, + document_manager: Data>, ) -> Result { let params: ResetDocumentParams = parse_from_payload(payload).await?; - let kv_store = persistence.kv_store(); - let _ = reset_document(&kv_store, params).await?; + let _ = reset_document(document_manager.get_ref(), params).await?; Ok(FlowyResponse::success().into()) } diff --git a/backend/src/services/document/ws_receiver.rs b/backend/src/services/document/ws_receiver.rs index ca91596a82..1d6ecc0f77 100644 --- a/backend/src/services/document/ws_receiver.rs +++ b/backend/src/services/document/ws_receiver.rs @@ -6,7 +6,7 @@ use crate::services::{ web_socket::{WSClientData, WebSocketReceiver}, }; -use crate::context::FlowyPersistence; +use crate::{context::FlowyPersistence, services::document::persistence::revisions_to_key_value_items}; use backend_service::errors::ServerError; use flowy_collaboration::{ entities::{ @@ -25,10 +25,10 @@ use std::{ }; use tokio::sync::{mpsc, oneshot}; -pub fn make_document_ws_receiver(persistence: Arc) -> Arc { - let document_persistence = Arc::new(DocumentPersistenceImpl(persistence.clone())); - let document_manager = Arc::new(ServerDocumentManager::new(document_persistence)); - +pub fn make_document_ws_receiver( + persistence: Arc, + document_manager: Arc, +) -> Arc { let (ws_sender, rx) = tokio::sync::mpsc::channel(100); let actor = DocumentWebSocketActor::new(rx, document_manager); tokio::task::spawn(actor.run()); @@ -72,7 +72,7 @@ impl WebSocketReceiver for DocumentWebSocketReceiver { } } -struct DocumentPersistenceImpl(Arc); +pub struct DocumentPersistenceImpl(pub Arc); impl Debug for DocumentPersistenceImpl { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("DocumentPersistenceImpl") } } @@ -83,9 +83,9 @@ impl DocumentPersistence for DocumentPersistenceImpl { doc_id: doc_id.to_string(), ..Default::default() }; - let persistence = self.0.kv_store(); + let kv_store = self.0.kv_store(); Box::pin(async move { - let mut pb_doc = read_document(&persistence, params) + let mut pb_doc = read_document(&kv_store, params) .await .map_err(server_error_to_collaborate_error)?; let doc = (&mut pb_doc) @@ -136,6 +136,24 @@ impl DocumentPersistence for DocumentPersistenceImpl { Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) }) } + + fn reset_document(&self, doc_id: &str, revisions: Vec) -> BoxResultFuture<(), CollaborateError> { + let kv_store = self.0.kv_store(); + let doc_id = doc_id.to_owned(); + let f = || async move { + kv_store + .transaction(|mut transaction| { + Box::pin(async move { + let _ = transaction.batch_delete_key_start_with(&doc_id).await?; + // let items = revisions_to_key_value_items(vec![])?; + let _ = transaction.batch_set(vec![]).await?; + Ok(()) + }) + }) + .await + }; + Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) }) + } } fn server_error_to_collaborate_error(error: ServerError) -> CollaborateError { diff --git a/backend/tests/document_test/edit_script.rs b/backend/tests/document_test/edit_script.rs index a2501e025a..b966b940a9 100644 --- a/backend/tests/document_test/edit_script.rs +++ b/backend/tests/document_test/edit_script.rs @@ -14,8 +14,9 @@ use crate::util::helper::{spawn_server, TestServer}; use flowy_collaboration::{entities::doc::DocumentId, protobuf::ResetDocumentParams}; use lib_ot::rich_text::{RichTextAttribute, RichTextDelta}; use parking_lot::RwLock; -use backend::services::document::persistence::{DocumentKVPersistence, read_document, reset_document}; +use backend::services::document::persistence::{read_document, reset_document}; use flowy_collaboration::entities::revision::{RepeatedRevision, Revision}; +use flowy_collaboration::sync::ServerDocumentManager; use lib_ot::core::Interval; use flowy_net::services::ws::FlowyWSConnect; @@ -146,8 +147,8 @@ async fn run_scripts(context: Arc>, scripts: Vec { @@ -182,10 +183,10 @@ async fn create_doc(flowy_test: &FlowySDKTest) -> String { view_test.view.id } -async fn reset_doc(doc_id: &str, repeated_revision: RepeatedRevision, kv_store: &Arc) { +async fn reset_doc(doc_id: &str, repeated_revision: RepeatedRevision, document_manager: &Arc) { let pb: flowy_collaboration::protobuf::RepeatedRevision = repeated_revision.try_into().unwrap(); let mut params = ResetDocumentParams::new(); params.set_doc_id(doc_id.to_owned()); params.set_revisions(pb); - let _ = reset_document(kv_store, params).await.unwrap(); + let _ = reset_document(document_manager, params).await.unwrap(); } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs index 50e482b78d..8730318c30 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs @@ -4,6 +4,7 @@ use flowy_collaboration::{ document::{history::UndoResult, Document, NewlineDoc}, entities::revision::Revision, errors::CollaborateError, + util::make_delta_from_revisions, }; use flowy_error::FlowyError; use futures::stream::StreamExt; @@ -77,20 +78,7 @@ impl EditorCommandQueue { }, EditorCommand::TransformRevision { revisions, ret } => { let f = || async { - let mut new_delta = RichTextDelta::new(); - for revision in revisions { - match RichTextDelta::from_bytes(revision.delta_data) { - Ok(delta) => { - new_delta = new_delta.compose(&delta)?; - }, - Err(e) => { - let err_msg = format!("Deserialize remote revision failed: {:?}", e); - log::error!("{}", err_msg); - return Err(CollaborateError::internal().context(err_msg)); - }, - } - } - + let new_delta = make_delta_from_revisions(revisions)?; let read_guard = self.document.read().await; let mut server_prime: Option = None; let client_prime: RichTextDelta; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/web_socket/web_socket.rs b/frontend/rust-lib/flowy-document/src/services/doc/web_socket/web_socket.rs index 868f5e2d97..54dde7b901 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/web_socket/web_socket.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/web_socket/web_socket.rs @@ -19,7 +19,7 @@ use flowy_error::{internal_error, FlowyError, FlowyResult}; use lib_infra::future::FutureResult; use crate::services::doc::web_socket::local_ws_impl::LocalWebSocketManager; -use flowy_collaboration::entities::{revision::pair_rev_id_from_revisions, ws::DocumentServerWSDataType}; +use flowy_collaboration::entities::ws::DocumentServerWSDataType; use lib_ot::rich_text::RichTextDelta; use lib_ws::WSConnectState; use std::{collections::VecDeque, convert::TryFrom, sync::Arc}; diff --git a/frontend/rust-lib/flowy-virtual-net/src/mock/server.rs b/frontend/rust-lib/flowy-virtual-net/src/mock/server.rs index 2550de42b3..e6fb33c6ad 100644 --- a/frontend/rust-lib/flowy-virtual-net/src/mock/server.rs +++ b/frontend/rust-lib/flowy-virtual-net/src/mock/server.rs @@ -94,6 +94,10 @@ impl DocumentPersistence for MockDocServerPersistence { } fn get_doc_revisions(&self, _doc_id: &str) -> BoxResultFuture, CollaborateError> { unimplemented!() } + + fn reset_document(&self, _doc_id: &str, _revisions: Vec) -> BoxResultFuture<(), CollaborateError> { + unimplemented!() + } } #[derive(Debug)] diff --git a/shared-lib/flowy-collaboration/src/entities/revision.rs b/shared-lib/flowy-collaboration/src/entities/revision.rs index 9843bdc850..c973493e69 100644 --- a/shared-lib/flowy-collaboration/src/entities/revision.rs +++ b/shared-lib/flowy-collaboration/src/entities/revision.rs @@ -107,10 +107,9 @@ impl std::ops::DerefMut for RepeatedRevision { } impl RepeatedRevision { - pub fn new(items: Vec) -> Self { - let mut sorted_items = items.clone(); - sorted_items.sort_by(|a, b| a.rev_id.cmp(&b.rev_id)); - Self { items: sorted_items } + pub fn new(mut items: Vec) -> Self { + items.sort_by(|a, b| a.rev_id.cmp(&b.rev_id)); + Self { items } } pub fn empty() -> Self { RepeatedRevision { items: vec![] } } diff --git a/shared-lib/flowy-collaboration/src/sync/server.rs b/shared-lib/flowy-collaboration/src/sync/server.rs index dbfbf9185f..03ce39054f 100644 --- a/shared-lib/flowy-collaboration/src/sync/server.rs +++ b/shared-lib/flowy-collaboration/src/sync/server.rs @@ -25,6 +25,7 @@ pub trait DocumentPersistence: Send + Sync + Debug { fn create_doc(&self, doc_id: &str, revisions: Vec) -> BoxResultFuture; fn get_revisions(&self, doc_id: &str, rev_ids: Vec) -> BoxResultFuture, CollaborateError>; fn get_doc_revisions(&self, doc_id: &str) -> BoxResultFuture, CollaborateError>; + fn reset_document(&self, doc_id: &str, revisions: Vec) -> BoxResultFuture<(), CollaborateError>; } pub struct ServerDocumentManager { @@ -66,7 +67,7 @@ impl ServerDocumentManager { Ok(()) }, Some(handler) => { - let _ = handler.apply_revisions(doc_id.clone(), user, revisions).await?; + let _ = handler.apply_revisions(user, revisions).await?; Ok(()) }, }; @@ -86,14 +87,26 @@ impl ServerDocumentManager { ) -> Result<(), CollaborateError> { let rev_id = rev_id_from_str(&client_data.id)?; let doc_id = client_data.doc_id.clone(); - match self.get_document_handler(&doc_id).await { None => { tracing::warn!("Document:{} doesn't exist, ignore pinging", doc_id); Ok(()) }, Some(handler) => { - let _ = handler.apply_ping(doc_id.clone(), rev_id, user).await?; + let _ = handler.apply_ping(rev_id, user).await?; + Ok(()) + }, + } + } + + pub async fn handle_document_reset(&self, doc_id: &str, revisions: Vec) -> Result<(), CollaborateError> { + match self.get_document_handler(doc_id).await { + None => { + tracing::warn!("Document:{} doesn't exist, ignore document reset", doc_id); + Ok(()) + }, + Some(handler) => { + let _ = handler.apply_document_reset(revisions).await?; Ok(()) }, } @@ -167,7 +180,6 @@ impl OpenDocHandle { #[tracing::instrument(level = "debug", skip(self, user, revisions), err)] async fn apply_revisions( &self, - doc_id: String, user: Arc, revisions: Vec, ) -> Result<(), CollaborateError> { @@ -175,7 +187,6 @@ impl OpenDocHandle { let persistence = self.persistence.clone(); self.users.insert(user.user_id(), user.clone()); let msg = DocumentCommand::ApplyRevisions { - doc_id, user, revisions, persistence, @@ -187,17 +198,11 @@ impl OpenDocHandle { } #[tracing::instrument(level = "debug", skip(self, user), err)] - async fn apply_ping( - &self, - doc_id: String, - rev_id: i64, - user: Arc, - ) -> Result<(), CollaborateError> { + async fn apply_ping(&self, rev_id: i64, user: Arc) -> Result<(), CollaborateError> { let (ret, rx) = oneshot::channel(); self.users.insert(user.user_id(), user.clone()); let persistence = self.persistence.clone(); let msg = DocumentCommand::Ping { - doc_id, user, persistence, rev_id, @@ -207,6 +212,19 @@ impl OpenDocHandle { Ok(()) } + #[tracing::instrument(level = "debug", skip(self, revisions), err)] + async fn apply_document_reset(&self, revisions: Vec) -> Result<(), CollaborateError> { + let (ret, rx) = oneshot::channel(); + let persistence = self.persistence.clone(); + let msg = DocumentCommand::Reset { + persistence, + revisions, + ret, + }; + let _ = self.send(msg, rx).await?; + Ok(()) + } + async fn send(&self, msg: DocumentCommand, rx: oneshot::Receiver) -> CollaborateResult { let _ = self .sender @@ -226,19 +244,22 @@ impl std::ops::Drop for OpenDocHandle { // #[derive(Debug)] enum DocumentCommand { ApplyRevisions { - doc_id: String, user: Arc, revisions: Vec, persistence: Arc, ret: oneshot::Sender>, }, Ping { - doc_id: String, user: Arc, persistence: Arc, rev_id: i64, ret: oneshot::Sender>, }, + Reset { + persistence: Arc, + revisions: Vec, + ret: oneshot::Sender>, + }, } struct DocumentCommandQueue { @@ -283,7 +304,6 @@ impl DocumentCommandQueue { async fn handle_message(&self, msg: DocumentCommand) { match msg { DocumentCommand::ApplyRevisions { - doc_id, user, revisions, persistence, @@ -291,13 +311,12 @@ impl DocumentCommandQueue { } => { let result = self .synchronizer - .sync_revisions(doc_id, user, revisions, persistence) + .sync_revisions(user, revisions, persistence) .await .map_err(internal_error); let _ = ret.send(result); }, DocumentCommand::Ping { - doc_id, user, persistence, rev_id, @@ -305,7 +324,19 @@ impl DocumentCommandQueue { } => { let result = self .synchronizer - .pong(doc_id, user, persistence, rev_id) + .pong(user, persistence, rev_id) + .await + .map_err(internal_error); + let _ = ret.send(result); + }, + DocumentCommand::Reset { + persistence, + revisions, + ret, + } => { + let result = self + .synchronizer + .reset(persistence, revisions) .await .map_err(internal_error); let _ = ret.send(result); diff --git a/shared-lib/flowy-collaboration/src/sync/synchronizer.rs b/shared-lib/flowy-collaboration/src/sync/synchronizer.rs index 7f17c5bfcd..550c50ef15 100644 --- a/shared-lib/flowy-collaboration/src/sync/synchronizer.rs +++ b/shared-lib/flowy-collaboration/src/sync/synchronizer.rs @@ -8,6 +8,7 @@ use crate::{ sync::DocumentPersistence, }; +use crate::util::make_delta_from_revisions; use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta}; use parking_lot::RwLock; use std::{ @@ -51,11 +52,11 @@ impl RevisionSynchronizer { #[tracing::instrument(level = "debug", skip(self, user, revisions, persistence), err)] pub async fn sync_revisions( &self, - doc_id: String, user: Arc, revisions: Vec, persistence: Arc, ) -> Result<(), CollaborateError> { + let doc_id = self.doc_id.clone(); if revisions.is_empty() { // Return all the revisions to client let revisions = persistence.get_doc_revisions(&doc_id).await?; @@ -112,11 +113,11 @@ impl RevisionSynchronizer { #[tracing::instrument(level = "debug", skip(self, user, persistence), err)] pub async fn pong( &self, - doc_id: String, user: Arc, persistence: Arc, rev_id: i64, ) -> Result<(), CollaborateError> { + let doc_id = self.doc_id.clone(); let server_base_rev_id = self.rev_id.load(SeqCst); match server_base_rev_id.cmp(&rev_id) { Ordering::Less => tracing::error!( @@ -138,6 +139,21 @@ impl RevisionSynchronizer { Ok(()) } + #[tracing::instrument(level = "debug", skip(self, revisions), err)] + pub async fn reset( + &self, + persistence: Arc, + revisions: Vec, + ) -> Result<(), CollaborateError> { + let doc_id = self.doc_id.clone(); + let _ = persistence.reset_document(&doc_id, revisions.clone()).await?; + let delta = make_delta_from_revisions(revisions)?; + let new_document = Document::from_delta(delta); + *self.document.write() = new_document; + + Ok(()) + } + pub fn doc_json(&self) -> String { self.document.read().to_json() } fn compose_revision(&self, revision: &Revision) -> Result<(), CollaborateError> { diff --git a/shared-lib/flowy-collaboration/src/util.rs b/shared-lib/flowy-collaboration/src/util.rs index 0aa22b2bc1..ae232252ac 100644 --- a/shared-lib/flowy-collaboration/src/util.rs +++ b/shared-lib/flowy-collaboration/src/util.rs @@ -1,4 +1,11 @@ -use lib_ot::core::{NEW_LINE, WHITESPACE}; +use crate::{ + entities::revision::Revision, + errors::{CollaborateError, CollaborateResult}, +}; +use lib_ot::{ + core::{OperationTransformable, NEW_LINE, WHITESPACE}, + rich_text::RichTextDelta, +}; use std::sync::atomic::{AtomicI64, Ordering::SeqCst}; #[inline] @@ -32,3 +39,15 @@ impl RevIdCounter { pub fn set(&self, n: i64) { let _ = self.0.fetch_update(SeqCst, SeqCst, |_| Some(n)); } } + +pub fn make_delta_from_revisions(revisions: Vec) -> CollaborateResult { + let mut new_delta = RichTextDelta::new(); + for revision in revisions { + let delta = RichTextDelta::from_bytes(revision.delta_data).map_err(|e| { + let err_msg = format!("Deserialize remote revision failed: {:?}", e); + CollaborateError::internal().context(err_msg) + })?; + new_delta = new_delta.compose(&delta)?; + } + Ok(new_delta) +}