diff --git a/backend/src/services/document/ws_actor.rs b/backend/src/services/document/ws_actor.rs index cb97162ed3..9de71869e8 100644 --- a/backend/src/services/document/ws_actor.rs +++ b/backend/src/services/document/ws_actor.rs @@ -13,7 +13,8 @@ use flowy_collaboration::{ ClientRevisionWSDataType as ClientRevisionWSDataTypePB, Revision as RevisionPB, }, - server_document::{RevisionSyncResponse, RevisionUser, ServerDocumentManager}, + server_document::ServerDocumentManager, + synchronizer::{RevisionSyncResponse, RevisionUser}, }; use futures::stream::StreamExt; use std::sync::Arc; diff --git a/backend/src/services/folder/ws_actor.rs b/backend/src/services/folder/ws_actor.rs index 1fa1beaf8b..a76bb11bb4 100644 --- a/backend/src/services/folder/ws_actor.rs +++ b/backend/src/services/folder/ws_actor.rs @@ -11,7 +11,8 @@ use flowy_collaboration::{ ClientRevisionWSData as ClientRevisionWSDataPB, ClientRevisionWSDataType as ClientRevisionWSDataTypePB, }, - server_document::{RevisionSyncResponse, RevisionUser, ServerDocumentManager}, + server_document::ServerDocumentManager, + synchronizer::{RevisionSyncResponse, RevisionUser}, }; use futures::stream::StreamExt; use std::sync::Arc; diff --git a/backend/src/services/web_socket/entities/message.rs b/backend/src/services/web_socket/entities/message.rs index ac0e64e2af..47d6e3da9c 100644 --- a/backend/src/services/web_socket/entities/message.rs +++ b/backend/src/services/web_socket/entities/message.rs @@ -1,6 +1,6 @@ use actix::Message; use bytes::Bytes; -use flowy_collaboration::entities::ws::{ClientRevisionWSData, ServerRevisionWSData}; +use flowy_collaboration::entities::ws_data::{ClientRevisionWSData, ServerRevisionWSData}; use lib_ws::{WSModule, WebSocketRawMessage}; use std::convert::TryInto; diff --git a/frontend/rust-lib/flowy-document/src/controller.rs b/frontend/rust-lib/flowy-document/src/controller.rs index 72301a58e0..d0c0905a41 100644 --- a/frontend/rust-lib/flowy-document/src/controller.rs +++ b/frontend/rust-lib/flowy-document/src/controller.rs @@ -5,7 +5,7 @@ use dashmap::DashMap; use flowy_collaboration::entities::{ doc::{DocumentDelta, DocumentId}, revision::{md5, RepeatedRevision, Revision}, - ws::ServerRevisionWSData, + ws_data::ServerRevisionWSData, }; use flowy_database::ConnectionPool; use flowy_error::FlowyResult; diff --git a/frontend/rust-lib/flowy-document/src/core/web_socket.rs b/frontend/rust-lib/flowy-document/src/core/web_socket.rs index b68b0e4146..781f2ea2b5 100644 --- a/frontend/rust-lib/flowy-document/src/core/web_socket.rs +++ b/frontend/rust-lib/flowy-document/src/core/web_socket.rs @@ -7,7 +7,7 @@ use bytes::Bytes; use flowy_collaboration::{ entities::{ revision::{RepeatedRevision, Revision, RevisionRange}, - ws::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSData, ServerRevisionWSDataType}, + ws_data::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSData, ServerRevisionWSDataType}, }, errors::CollaborateResult, }; diff --git a/frontend/rust-lib/flowy-net/src/local_server/persistence.rs b/frontend/rust-lib/flowy-net/src/local_server/persistence.rs index 7b89537a0c..8784cdf31c 100644 --- a/frontend/rust-lib/flowy-net/src/local_server/persistence.rs +++ b/frontend/rust-lib/flowy-net/src/local_server/persistence.rs @@ -27,25 +27,25 @@ pub trait RevisionCloudStorage: Send + Sync { ) -> BoxResultFuture<(), CollaborateError>; } -pub(crate) struct LocalRevisionCloudPersistence { +pub(crate) struct LocalDocumentCloudPersistence { // For the moment, we use memory to cache the data, it will be implemented with other storage. // Like the Firestore,Dropbox.etc. storage: Arc, } -impl Debug for LocalRevisionCloudPersistence { +impl Debug for LocalDocumentCloudPersistence { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("LocalRevisionCloudPersistence") } } -impl std::default::Default for LocalRevisionCloudPersistence { +impl std::default::Default for LocalDocumentCloudPersistence { fn default() -> Self { - LocalRevisionCloudPersistence { + LocalDocumentCloudPersistence { storage: Arc::new(MemoryDocumentCloudStorage::default()), } } } -impl DocumentCloudPersistence for LocalRevisionCloudPersistence { +impl DocumentCloudPersistence for LocalDocumentCloudPersistence { fn read_document(&self, doc_id: &str) -> BoxResultFuture { let storage = self.storage.clone(); let doc_id = doc_id.to_owned(); diff --git a/frontend/rust-lib/flowy-net/src/local_server/server.rs b/frontend/rust-lib/flowy-net/src/local_server/server.rs index 5d3ed60eac..48b33d7fc5 100644 --- a/frontend/rust-lib/flowy-net/src/local_server/server.rs +++ b/frontend/rust-lib/flowy-net/src/local_server/server.rs @@ -1,15 +1,16 @@ -use crate::local_server::persistence::LocalRevisionCloudPersistence; +use crate::local_server::persistence::LocalDocumentCloudPersistence; use async_stream::stream; use bytes::Bytes; use flowy_collaboration::{ client_document::default::initial_delta_string, entities::{ doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams}, - ws::{ClientRevisionWSData, ClientRevisionWSDataType}, + ws_data::{ClientRevisionWSData, ClientRevisionWSDataType}, }, errors::CollaborateError, protobuf::ClientRevisionWSData as ClientRevisionWSDataPB, - server_document::*, + server_document::ServerDocumentManager, + synchronizer::{RevisionSyncResponse, RevisionUser}, }; use flowy_core::module::WorkspaceCloudService; use flowy_error::{internal_error, FlowyError}; @@ -35,7 +36,7 @@ impl LocalServer { client_ws_sender: mpsc::UnboundedSender, client_ws_receiver: broadcast::Sender, ) -> Self { - let persistence = Arc::new(LocalRevisionCloudPersistence::default()); + let persistence = Arc::new(LocalDocumentCloudPersistence::default()); let doc_manager = Arc::new(ServerDocumentManager::new(persistence)); let stop_tx = RwLock::new(None); diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/core_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/core_deps.rs index cabe4de893..c8cbce22d9 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/core_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/core_deps.rs @@ -1,6 +1,6 @@ use backend_service::configuration::ClientServerConfiguration; use bytes::Bytes; -use flowy_collaboration::entities::ws::ClientRevisionWSData; +use flowy_collaboration::entities::ws_data::ClientRevisionWSData; use flowy_core::{ controller::FolderManager, errors::{internal_error, FlowyError}, diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs index 12b094635f..1456a939e5 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs @@ -1,6 +1,6 @@ use backend_service::configuration::ClientServerConfiguration; use bytes::Bytes; -use flowy_collaboration::entities::ws::ClientRevisionWSData; +use flowy_collaboration::entities::ws_data::ClientRevisionWSData; use flowy_database::ConnectionPool; use flowy_document::{ context::{DocumentContext, DocumentUser}, diff --git a/frontend/rust-lib/flowy-sync/src/ws_manager.rs b/frontend/rust-lib/flowy-sync/src/ws_manager.rs index 577a079664..943bb1db11 100644 --- a/frontend/rust-lib/flowy-sync/src/ws_manager.rs +++ b/frontend/rust-lib/flowy-sync/src/ws_manager.rs @@ -2,7 +2,7 @@ use async_stream::stream; use bytes::Bytes; use flowy_collaboration::entities::{ revision::{RevId, RevisionRange}, - ws::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSData, ServerRevisionWSDataType}, + ws_data::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSData, ServerRevisionWSDataType}, }; use flowy_error::{internal_error, FlowyError, FlowyResult}; use futures_util::stream::StreamExt; diff --git a/shared-lib/flowy-collaboration/src/entities/mod.rs b/shared-lib/flowy-collaboration/src/entities/mod.rs index b79cf36fe5..dfdd6d5a5a 100644 --- a/shared-lib/flowy-collaboration/src/entities/mod.rs +++ b/shared-lib/flowy-collaboration/src/entities/mod.rs @@ -1,4 +1,4 @@ pub mod doc; pub mod parser; pub mod revision; -pub mod ws; +pub mod ws_data; diff --git a/shared-lib/flowy-collaboration/src/entities/ws.rs b/shared-lib/flowy-collaboration/src/entities/ws_data.rs similarity index 100% rename from shared-lib/flowy-collaboration/src/entities/ws.rs rename to shared-lib/flowy-collaboration/src/entities/ws_data.rs diff --git a/shared-lib/flowy-collaboration/src/lib.rs b/shared-lib/flowy-collaboration/src/lib.rs index b9f8b20c2e..a043dfcd26 100644 --- a/shared-lib/flowy-collaboration/src/lib.rs +++ b/shared-lib/flowy-collaboration/src/lib.rs @@ -3,5 +3,7 @@ pub mod entities; pub mod errors; pub mod protobuf; pub mod server_document; +pub mod synchronizer; pub mod util; + pub use lib_ot::rich_text::RichTextDelta; diff --git a/shared-lib/flowy-collaboration/src/server_document/document_manager.rs b/shared-lib/flowy-collaboration/src/server_document/document_manager.rs index b9168342b5..3ed154b48e 100644 --- a/shared-lib/flowy-collaboration/src/server_document/document_manager.rs +++ b/shared-lib/flowy-collaboration/src/server_document/document_manager.rs @@ -1,20 +1,23 @@ use crate::{ - entities::{doc::DocumentInfo, ws::ServerRevisionWSDataBuilder}, + entities::{doc::DocumentInfo, ws_data::ServerRevisionWSDataBuilder}, errors::{internal_error, CollaborateError, CollaborateResult}, protobuf::{ClientRevisionWSData, RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, - server_document::{document_pad::ServerDocument, RevisionSyncResponse, RevisionSynchronizer, RevisionUser}, + server_document::document_pad::ServerDocument, + synchronizer::{RevisionSyncPersistence, RevisionSyncResponse, RevisionSynchronizer, RevisionUser}, }; use async_stream::stream; use dashmap::DashMap; use futures::stream::StreamExt; use lib_infra::future::BoxResultFuture; -use lib_ot::rich_text::RichTextDelta; +use lib_ot::rich_text::{RichTextAttributes, RichTextDelta}; use std::{collections::HashMap, fmt::Debug, sync::Arc}; use tokio::{ sync::{mpsc, oneshot, RwLock}, task::spawn_blocking, }; +type RichTextRevisionSynchronizer = RevisionSynchronizer; + pub trait DocumentCloudPersistence: Send + Sync + Debug { fn read_document(&self, doc_id: &str) -> BoxResultFuture; @@ -173,6 +176,28 @@ struct OpenDocHandle { users: DashMap>, } +impl RevisionSyncPersistence for Arc { + fn read_revisions( + &self, + object_id: &str, + rev_ids: Option>, + ) -> BoxResultFuture, CollaborateError> { + (**self).read_revisions(object_id, rev_ids) + } + + fn save_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> { + (**self).save_revisions(repeated_revision) + } + + fn reset_object( + &self, + object_id: &str, + repeated_revision: RepeatedRevisionPB, + ) -> BoxResultFuture<(), CollaborateError> { + (**self).reset_document(object_id, repeated_revision) + } +} + impl OpenDocHandle { fn new(doc: DocumentInfo, persistence: Arc) -> Result { let doc_id = doc.doc_id.clone(); @@ -180,12 +205,8 @@ impl OpenDocHandle { let users = DashMap::new(); let delta = RichTextDelta::from_bytes(&doc.text)?; - let synchronizer = Arc::new(RevisionSynchronizer::new( - &doc.doc_id, - doc.rev_id, - ServerDocument::from_delta(delta), - persistence, - )); + let sync_object = ServerDocument::from_delta(&doc_id, delta); + let synchronizer = Arc::new(RichTextRevisionSynchronizer::new(doc.rev_id, sync_object, persistence)); let queue = DocumentCommandQueue::new(&doc.doc_id, receiver, synchronizer)?; tokio::task::spawn(queue.run()); @@ -263,14 +284,14 @@ enum DocumentCommand { struct DocumentCommandQueue { pub doc_id: String, receiver: Option>, - synchronizer: Arc, + synchronizer: Arc, } impl DocumentCommandQueue { fn new( doc_id: &str, receiver: mpsc::Receiver, - synchronizer: Arc, + synchronizer: Arc, ) -> Result { Ok(Self { doc_id: doc_id.to_owned(), diff --git a/shared-lib/flowy-collaboration/src/server_document/document_pad.rs b/shared-lib/flowy-collaboration/src/server_document/document_pad.rs index eaa4491fb0..984d7f1850 100644 --- a/shared-lib/flowy-collaboration/src/server_document/document_pad.rs +++ b/shared-lib/flowy-collaboration/src/server_document/document_pad.rs @@ -1,39 +1,42 @@ -use crate::{client_document::InitialDocumentText, errors::CollaborateError}; -use lib_ot::{core::*, rich_text::RichTextDelta}; +use crate::{client_document::InitialDocumentText, errors::CollaborateError, synchronizer::RevisionSyncObject}; +use lib_ot::{ + core::*, + rich_text::{RichTextAttributes, RichTextDelta}, +}; pub struct ServerDocument { + doc_id: String, delta: RichTextDelta, } impl ServerDocument { - pub fn new() -> Self { Self::from_delta(C::initial_delta()) } + #[allow(dead_code)] + pub fn new(doc_id: &str) -> Self { Self::from_delta(doc_id, C::initial_delta()) } - pub fn from_delta(delta: RichTextDelta) -> Self { ServerDocument { delta } } - - pub fn from_json(json: &str) -> Result { - let delta = RichTextDelta::from_json(json)?; - Ok(Self::from_delta(delta)) - } - - pub fn to_json(&self) -> String { self.delta.to_json() } - - pub fn to_bytes(&self) -> Vec { self.delta.clone().to_bytes().to_vec() } - - pub fn to_plain_string(&self) -> String { self.delta.apply("").unwrap() } - - pub fn delta(&self) -> &RichTextDelta { &self.delta } - - pub fn md5(&self) -> String { - let bytes = self.to_bytes(); - format!("{:x}", md5::compute(bytes)) - } - - pub fn compose_delta(&mut self, delta: RichTextDelta) -> Result<(), CollaborateError> { - // tracing::trace!("{} compose {}", &self.delta.to_json(), delta.to_json()); - let composed_delta = self.delta.compose(&delta)?; - self.delta = composed_delta; - Ok(()) + pub fn from_delta(doc_id: &str, delta: RichTextDelta) -> Self { + let doc_id = doc_id.to_owned(); + ServerDocument { doc_id, delta } } pub fn is_empty(&self) -> bool { self.delta == C::initial_delta() } } + +impl RevisionSyncObject for ServerDocument { + fn id(&self) -> &str { &self.doc_id } + + fn compose(&mut self, other: &RichTextDelta) -> Result<(), CollaborateError> { + tracing::trace!("{} compose {}", &self.delta.to_json(), other.to_json()); + let new_delta = self.delta.compose(other)?; + self.delta = new_delta; + Ok(()) + } + + fn transform(&self, other: &RichTextDelta) -> Result<(RichTextDelta, RichTextDelta), CollaborateError> { + let value = self.delta.transform(other)?; + Ok(value) + } + + fn to_json(&self) -> String { self.delta.to_json() } + + fn set_delta(&mut self, new_delta: Delta) { self.delta = new_delta; } +} diff --git a/shared-lib/flowy-collaboration/src/server_document/mod.rs b/shared-lib/flowy-collaboration/src/server_document/mod.rs index 8a9b254844..413933015a 100644 --- a/shared-lib/flowy-collaboration/src/server_document/mod.rs +++ b/shared-lib/flowy-collaboration/src/server_document/mod.rs @@ -1,6 +1,4 @@ mod document_manager; mod document_pad; -mod revision_sync; pub use document_manager::*; -pub use revision_sync::*; diff --git a/shared-lib/flowy-collaboration/src/server_document/revision_sync.rs b/shared-lib/flowy-collaboration/src/server_document/revision_sync.rs index d85c3584e1..4ed469d479 100644 --- a/shared-lib/flowy-collaboration/src/server_document/revision_sync.rs +++ b/shared-lib/flowy-collaboration/src/server_document/revision_sync.rs @@ -1,13 +1,14 @@ use crate::{ entities::{ revision::RevisionRange, - ws::{ServerRevisionWSData, ServerRevisionWSDataBuilder}, + ws_data::{ServerRevisionWSData, ServerRevisionWSDataBuilder}, }, errors::CollaborateError, protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, server_document::{document_pad::ServerDocument, DocumentCloudPersistence}, util::*, }; +use lib_infra::future::BoxResultFuture; use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta}; use parking_lot::RwLock; use std::{ @@ -25,6 +26,16 @@ pub trait RevisionUser: Send + Sync + Debug { fn receive(&self, resp: RevisionSyncResponse); } +pub trait RevisionSyncObject { + type SyncObject; + + fn read_revisions(&self, rev_ids: Option>) -> BoxResultFuture, CollaborateError>; + + fn save_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>; + + fn reset_object(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>; +} + pub enum RevisionSyncResponse { Pull(ServerRevisionWSData), Push(ServerRevisionWSData), diff --git a/shared-lib/flowy-collaboration/src/synchronizer.rs b/shared-lib/flowy-collaboration/src/synchronizer.rs new file mode 100644 index 0000000000..855833c53d --- /dev/null +++ b/shared-lib/flowy-collaboration/src/synchronizer.rs @@ -0,0 +1,260 @@ +use crate::{ + entities::{ + revision::RevisionRange, + ws_data::{ServerRevisionWSData, ServerRevisionWSDataBuilder}, + }, + errors::CollaborateError, + protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, + util::*, +}; +use lib_infra::future::BoxResultFuture; +use lib_ot::core::{Attributes, Delta, OperationTransformable}; +use parking_lot::RwLock; +use serde::de::DeserializeOwned; +use std::{ + cmp::Ordering, + fmt::Debug, + sync::{ + atomic::{AtomicI64, Ordering::SeqCst}, + Arc, + }, + time::Duration, +}; + +pub trait RevisionUser: Send + Sync + Debug { + fn user_id(&self) -> String; + fn receive(&self, resp: RevisionSyncResponse); +} + +pub trait RevisionSyncPersistence: Send + Sync + 'static { + fn read_revisions( + &self, + object_id: &str, + rev_ids: Option>, + ) -> BoxResultFuture, CollaborateError>; + + fn save_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>; + + fn reset_object( + &self, + object_id: &str, + repeated_revision: RepeatedRevisionPB, + ) -> BoxResultFuture<(), CollaborateError>; +} + +pub trait RevisionSyncObject: Send + Sync + 'static { + fn id(&self) -> &str; + fn compose(&mut self, other: &Delta) -> Result<(), CollaborateError>; + fn transform(&self, other: &Delta) -> Result<(Delta, Delta), CollaborateError>; + fn to_json(&self) -> String; + fn set_delta(&mut self, new_delta: Delta); +} + +pub enum RevisionSyncResponse { + Pull(ServerRevisionWSData), + Push(ServerRevisionWSData), + Ack(ServerRevisionWSData), +} + +pub struct RevisionSynchronizer { + object_id: String, + rev_id: AtomicI64, + object: Arc>>, + persistence: Arc, +} + +impl RevisionSynchronizer +where + T: Attributes + DeserializeOwned + serde::Serialize + 'static, +{ + pub fn new(rev_id: i64, sync_object: S, persistence: P) -> RevisionSynchronizer + where + S: RevisionSyncObject, + P: RevisionSyncPersistence, + { + let object = Arc::new(RwLock::new(sync_object)); + let persistence = Arc::new(persistence); + let object_id = object.read().id().to_owned(); + RevisionSynchronizer { + object_id, + rev_id: AtomicI64::new(rev_id), + object, + persistence, + } + } + + #[tracing::instrument(level = "debug", skip(self, user, repeated_revision), err)] + pub async fn sync_revisions( + &self, + user: Arc, + repeated_revision: RepeatedRevisionPB, + ) -> Result<(), CollaborateError> { + let doc_id = self.object_id.clone(); + if repeated_revision.get_items().is_empty() { + // Return all the revisions to client + let revisions = self.persistence.read_revisions(&doc_id, None).await?; + let repeated_revision = repeated_revision_from_revision_pbs(revisions)?; + let data = ServerRevisionWSDataBuilder::build_push_message(&doc_id, repeated_revision); + user.receive(RevisionSyncResponse::Push(data)); + return Ok(()); + } + + let server_base_rev_id = self.rev_id.load(SeqCst); + let first_revision = repeated_revision.get_items().first().unwrap().clone(); + if self.is_applied_before(&first_revision, &self.persistence).await { + // Server has received this revision before, so ignore the following revisions + return Ok(()); + } + + match server_base_rev_id.cmp(&first_revision.rev_id) { + Ordering::Less => { + let server_rev_id = next(server_base_rev_id); + if server_base_rev_id == first_revision.base_rev_id || server_rev_id == first_revision.rev_id { + // The rev is in the right order, just compose it. + for revision in repeated_revision.get_items() { + let _ = self.compose_revision(revision)?; + } + let _ = self.persistence.save_revisions(repeated_revision).await?; + } else { + // The server document is outdated, pull the missing revision from the client. + let range = RevisionRange { + object_id: self.object_id.clone(), + start: server_rev_id, + end: first_revision.rev_id, + }; + let msg = ServerRevisionWSDataBuilder::build_pull_message(&self.object_id, range); + user.receive(RevisionSyncResponse::Pull(msg)); + } + }, + Ordering::Equal => { + // Do nothing + tracing::warn!("Applied revision rev_id is the same as cur_rev_id"); + }, + Ordering::Greater => { + // The client document is outdated. Transform the client revision delta and then + // send the prime delta to the client. Client should compose the this prime + // delta. + let from_rev_id = first_revision.rev_id; + let to_rev_id = server_base_rev_id; + let _ = self.push_revisions_to_user(user, from_rev_id, to_rev_id).await; + }, + } + Ok(()) + } + + #[tracing::instrument(level = "trace", skip(self, user), fields(server_rev_id), err)] + pub async fn pong(&self, user: Arc, client_rev_id: i64) -> Result<(), CollaborateError> { + let doc_id = self.object_id.clone(); + let server_rev_id = self.rev_id(); + tracing::Span::current().record("server_rev_id", &server_rev_id); + + match server_rev_id.cmp(&client_rev_id) { + Ordering::Less => { + tracing::error!("Client should not send ping and the server should pull the revisions from the client") + }, + Ordering::Equal => tracing::trace!("{} is up to date.", doc_id), + Ordering::Greater => { + // The client document is outdated. Transform the client revision delta and then + // send the prime delta to the client. Client should compose the this prime + // delta. + let from_rev_id = client_rev_id; + let to_rev_id = server_rev_id; + tracing::trace!("Push revisions to user"); + let _ = self.push_revisions_to_user(user, from_rev_id, to_rev_id).await; + }, + } + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self, repeated_revision), fields(doc_id), err)] + pub async fn reset(&self, repeated_revision: RepeatedRevisionPB) -> Result<(), CollaborateError> { + let doc_id = self.object_id.clone(); + tracing::Span::current().record("doc_id", &doc_id.as_str()); + let revisions: Vec = repeated_revision.get_items().to_vec(); + let (_, rev_id) = pair_rev_id_from_revision_pbs(&revisions); + let delta = make_delta_from_revision_pb(revisions)?; + let _ = self.persistence.reset_object(&doc_id, repeated_revision).await?; + self.object.write().set_delta(delta); + let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id)); + Ok(()) + } + + pub fn object_json(&self) -> String { self.object.read().to_json() } + + fn compose_revision(&self, revision: &RevisionPB) -> Result<(), CollaborateError> { + let delta = Delta::::from_bytes(&revision.delta_data)?; + let _ = self.compose_delta(delta)?; + let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id)); + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self, revision))] + fn transform_revision(&self, revision: &RevisionPB) -> Result<(Delta, Delta), CollaborateError> { + let cli_delta = Delta::::from_bytes(&revision.delta_data)?; + let result = self.object.read().transform(&cli_delta)?; + Ok(result) + } + + fn compose_delta(&self, delta: Delta) -> Result<(), CollaborateError> { + if delta.is_empty() { + log::warn!("Composed delta is empty"); + } + + match self.object.try_write_for(Duration::from_millis(300)) { + None => log::error!("Failed to acquire write lock of document"), + Some(mut write_guard) => { + let _ = write_guard.compose(&delta)?; + }, + } + Ok(()) + } + + pub(crate) fn rev_id(&self) -> i64 { self.rev_id.load(SeqCst) } + + async fn is_applied_before( + &self, + new_revision: &RevisionPB, + persistence: &Arc, + ) -> bool { + let rev_ids = Some(vec![new_revision.rev_id]); + if let Ok(revisions) = persistence.read_revisions(&self.object_id, rev_ids).await { + if let Some(revision) = revisions.first() { + if revision.md5 == new_revision.md5 { + return true; + } + } + }; + + false + } + + async fn push_revisions_to_user(&self, user: Arc, from: i64, to: i64) { + let rev_ids: Vec = (from..=to).collect(); + let revisions = match self.persistence.read_revisions(&self.object_id, Some(rev_ids)).await { + Ok(revisions) => { + assert_eq!( + revisions.is_empty(), + false, + "revisions should not be empty if the doc exists" + ); + revisions + }, + Err(e) => { + tracing::error!("{}", e); + vec![] + }, + }; + + tracing::debug!("Push revision: {} -> {} to client", from, to); + match repeated_revision_from_revision_pbs(revisions) { + Ok(repeated_revision) => { + let data = ServerRevisionWSDataBuilder::build_push_message(&self.object_id, repeated_revision); + user.receive(RevisionSyncResponse::Push(data)); + }, + Err(e) => tracing::error!("{}", e), + } + } +} + +#[inline] +fn next(rev_id: i64) -> i64 { rev_id + 1 } diff --git a/shared-lib/flowy-collaboration/src/util.rs b/shared-lib/flowy-collaboration/src/util.rs index a5d3890c4e..c18d045cf0 100644 --- a/shared-lib/flowy-collaboration/src/util.rs +++ b/shared-lib/flowy-collaboration/src/util.rs @@ -12,6 +12,8 @@ use std::{ convert::TryInto, sync::atomic::{AtomicI64, Ordering::SeqCst}, }; +use serde::de::DeserializeOwned; +use lib_ot::core::{Attributes, Delta}; #[inline] pub fn find_newline(s: &str) -> Option { s.find(NEW_LINE) } @@ -57,10 +59,10 @@ pub fn make_delta_from_revisions(revisions: Vec) -> CollaborateResult< Ok(delta) } -pub fn make_delta_from_revision_pb(revisions: Vec) -> CollaborateResult { - let mut new_delta = RichTextDelta::new(); +pub fn make_delta_from_revision_pb(revisions: Vec) -> CollaborateResult> where T: Attributes + DeserializeOwned { + let mut new_delta = Delta::::new(); for revision in revisions { - let delta = RichTextDelta::from_bytes(revision.delta_data).map_err(|e| { + let delta = Delta::::from_bytes(revision.delta_data).map_err(|e| { let err_msg = format!("Deserialize remote revision failed: {:?}", e); CollaborateError::internal().context(err_msg) })?;