From a488944d2c5eed9f9b81ae85af9157506b6426aa Mon Sep 17 00:00:00 2001 From: appflowy Date: Wed, 5 Jan 2022 23:15:55 +0800 Subject: [PATCH] Saving the revision in serial --- backend/src/services/document/persistence.rs | 2 + .../src/services/view/controller.rs | 2 +- .../rust-lib/flowy-document/src/controller.rs | 5 +- .../flowy-document/src/core/edit/editor.rs | 75 +++---- .../flowy-document/src/core/edit/queue.rs | 203 +++++++++++++----- .../rust-lib/flowy-document/src/core/mod.rs | 1 + .../flowy-document/src/core/revision/mod.rs | 1 + .../src/core/revision/snapshot.rs | 1 + .../src/core/web_socket/web_socket.rs | 138 +++--------- .../src/document/history.rs | 30 +-- 10 files changed, 223 insertions(+), 235 deletions(-) create mode 100644 frontend/rust-lib/flowy-document/src/core/revision/snapshot.rs diff --git a/backend/src/services/document/persistence.rs b/backend/src/services/document/persistence.rs index db1ea20055..0552542a6f 100644 --- a/backend/src/services/document/persistence.rs +++ b/backend/src/services/document/persistence.rs @@ -17,6 +17,7 @@ use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta}; use protobuf::Message; use flowy_collaboration::sync::ServerDocumentManager; +use lib_ot::core::trim; use std::sync::Arc; use uuid::Uuid; @@ -204,6 +205,7 @@ fn make_doc_from_revisions(doc_id: &str, mut revisions: RepeatedRevisionPB) -> R let delta = RichTextDelta::from_bytes(revision.delta_data).map_err(internal_error)?; document_delta = document_delta.compose(&delta).map_err(internal_error)?; } + let text = document_delta.to_json(); let mut document_info = DocumentInfo::new(); document_info.set_doc_id(doc_id.to_owned()); diff --git a/frontend/rust-lib/flowy-core/src/services/view/controller.rs b/frontend/rust-lib/flowy-core/src/services/view/controller.rs index 14c3205c48..47a65ba055 100644 --- a/frontend/rust-lib/flowy-core/src/services/view/controller.rs +++ b/frontend/rust-lib/flowy-core/src/services/view/controller.rs @@ -216,7 +216,7 @@ impl ViewController { } pub(crate) async fn receive_document_delta(&self, params: DocumentDelta) -> Result { - let doc = self.document_ctx.controller.apply_document_delta(params).await?; + let doc = self.document_ctx.controller.receive_local_delta(params).await?; Ok(doc) } diff --git a/frontend/rust-lib/flowy-document/src/controller.rs b/frontend/rust-lib/flowy-document/src/controller.rs index dca7408098..55e90cbda2 100644 --- a/frontend/rust-lib/flowy-document/src/controller.rs +++ b/frontend/rust-lib/flowy-document/src/controller.rs @@ -79,7 +79,7 @@ impl DocumentController { } #[tracing::instrument(level = "debug", skip(self, delta), fields(doc_id = %delta.doc_id), err)] - pub async fn apply_document_delta(&self, delta: DocumentDelta) -> Result { + pub async fn receive_local_delta(&self, delta: DocumentDelta) -> Result { let editor = self.get_editor(&delta.doc_id).await?; let _ = editor.compose_local_delta(Bytes::from(delta.delta_json)).await?; let document_json = editor.document_json().await?; @@ -121,8 +121,7 @@ impl DocumentController { token, server: self.server.clone(), }); - let doc_editor = - ClientDocumentEditor::new(doc_id, user, pool, rev_manager, self.ws_sender.clone(), server).await?; + let doc_editor = ClientDocumentEditor::new(doc_id, user, rev_manager, self.ws_sender.clone(), server).await?; self.ws_receivers.add(doc_id, doc_editor.ws_handler()); self.open_cache.insert(&doc_id, &doc_editor); Ok(doc_editor) diff --git a/frontend/rust-lib/flowy-document/src/core/edit/editor.rs b/frontend/rust-lib/flowy-document/src/core/edit/editor.rs index ef16e1c47c..635f4394f4 100644 --- a/frontend/rust-lib/flowy-document/src/core/edit/editor.rs +++ b/frontend/rust-lib/flowy-document/src/core/edit/editor.rs @@ -7,12 +7,7 @@ use crate::{ errors::FlowyError, }; use bytes::Bytes; -use flowy_collaboration::{ - document::history::UndoResult, - entities::revision::{RevId, Revision}, - errors::CollaborateResult, -}; -use flowy_database::ConnectionPool; +use flowy_collaboration::errors::CollaborateResult; use flowy_error::{internal_error, FlowyResult}; use lib_ot::{ core::Interval, @@ -26,24 +21,22 @@ pub struct ClientDocumentEditor { rev_manager: Arc, ws_manager: Arc, edit_queue: UnboundedSender, - user: Arc, } impl ClientDocumentEditor { pub(crate) async fn new( doc_id: &str, user: Arc, - pool: Arc, mut rev_manager: RevisionManager, ws: Arc, server: Arc, ) -> FlowyResult> { let delta = rev_manager.load_document(server).await?; - let edit_queue = spawn_edit_queue(doc_id, delta, pool.clone()); + let rev_manager = Arc::new(rev_manager); let doc_id = doc_id.to_string(); let user_id = user.user_id()?; - let rev_manager = Arc::new(rev_manager); + let edit_queue = spawn_edit_queue(user, rev_manager.clone(), delta); let ws_manager = make_document_ws_manager( doc_id.clone(), user_id.clone(), @@ -57,56 +50,51 @@ impl ClientDocumentEditor { rev_manager, ws_manager, edit_queue, - user, }); Ok(editor) } pub async fn insert(&self, index: usize, data: T) -> Result<(), FlowyError> { - let (ret, rx) = oneshot::channel::>(); + let (ret, rx) = oneshot::channel::>(); let msg = EditorCommand::Insert { index, data: data.to_string(), ret, }; let _ = self.edit_queue.send(msg); - let (delta, md5) = rx.await.map_err(internal_error)??; - let _ = self.save_local_delta(delta, md5).await?; + let _ = rx.await.map_err(internal_error)??; Ok(()) } pub async fn delete(&self, interval: Interval) -> Result<(), FlowyError> { - let (ret, rx) = oneshot::channel::>(); + let (ret, rx) = oneshot::channel::>(); let msg = EditorCommand::Delete { interval, ret }; let _ = self.edit_queue.send(msg); - let (delta, md5) = rx.await.map_err(internal_error)??; - let _ = self.save_local_delta(delta, md5).await?; + let _ = rx.await.map_err(internal_error)??; Ok(()) } pub async fn format(&self, interval: Interval, attribute: RichTextAttribute) -> Result<(), FlowyError> { - let (ret, rx) = oneshot::channel::>(); + let (ret, rx) = oneshot::channel::>(); let msg = EditorCommand::Format { interval, attribute, ret, }; let _ = self.edit_queue.send(msg); - let (delta, md5) = rx.await.map_err(internal_error)??; - let _ = self.save_local_delta(delta, md5).await?; + let _ = rx.await.map_err(internal_error)??; Ok(()) } pub async fn replace(&self, interval: Interval, data: T) -> Result<(), FlowyError> { - let (ret, rx) = oneshot::channel::>(); + let (ret, rx) = oneshot::channel::>(); let msg = EditorCommand::Replace { interval, data: data.to_string(), ret, }; let _ = self.edit_queue.send(msg); - let (delta, md5) = rx.await.map_err(internal_error)??; - let _ = self.save_local_delta(delta, md5).await?; + let _ = rx.await.map_err(internal_error)??; Ok(()) } @@ -124,20 +112,20 @@ impl ClientDocumentEditor { rx.await.unwrap_or(false) } - pub async fn undo(&self) -> Result { - let (ret, rx) = oneshot::channel::>(); + pub async fn undo(&self) -> Result<(), FlowyError> { + let (ret, rx) = oneshot::channel(); let msg = EditorCommand::Undo { ret }; let _ = self.edit_queue.send(msg); - let r = rx.await.map_err(internal_error)??; - Ok(r) + let _ = rx.await.map_err(internal_error)??; + Ok(()) } - pub async fn redo(&self) -> Result { - let (ret, rx) = oneshot::channel::>(); + pub async fn redo(&self) -> Result<(), FlowyError> { + let (ret, rx) = oneshot::channel(); let msg = EditorCommand::Redo { ret }; let _ = self.edit_queue.send(msg); - let r = rx.await.map_err(internal_error)??; - Ok(r) + let _ = rx.await.map_err(internal_error)??; + Ok(()) } pub async fn document_json(&self) -> FlowyResult { @@ -148,27 +136,16 @@ impl ClientDocumentEditor { Ok(json) } - async fn save_local_delta(&self, delta: RichTextDelta, md5: String) -> Result { - let delta_data = delta.to_bytes(); - let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair(); - let user_id = self.user.user_id()?; - let revision = Revision::new(&self.doc_id, base_rev_id, rev_id, delta_data, &user_id, md5); - let _ = self.rev_manager.add_local_revision(&revision).await?; - Ok(rev_id.into()) - } - #[tracing::instrument(level = "debug", skip(self, data), err)] pub(crate) async fn compose_local_delta(&self, data: Bytes) -> Result<(), FlowyError> { let delta = RichTextDelta::from_bytes(&data)?; - let (ret, rx) = oneshot::channel::>(); - let msg = EditorCommand::ComposeDelta { + let (ret, rx) = oneshot::channel::>(); + let msg = EditorCommand::ComposeLocalDelta { delta: delta.clone(), ret, }; let _ = self.edit_queue.send(msg); - let md5 = rx.await.map_err(internal_error)??; - - let _ = self.save_local_delta(delta, md5).await?; + let _ = rx.await.map_err(internal_error)??; Ok(()) } @@ -178,9 +155,13 @@ impl ClientDocumentEditor { pub(crate) fn ws_handler(&self) -> Arc { self.ws_manager.receiver() } } -fn spawn_edit_queue(doc_id: &str, delta: RichTextDelta, _pool: Arc) -> UnboundedSender { +fn spawn_edit_queue( + user: Arc, + rev_manager: Arc, + delta: RichTextDelta, +) -> UnboundedSender { let (sender, receiver) = mpsc::unbounded_channel::(); - let actor = EditorCommandQueue::new(doc_id, delta, receiver); + let actor = EditorCommandQueue::new(user, rev_manager, delta, receiver); tokio::spawn(actor.run()); sender } diff --git a/frontend/rust-lib/flowy-document/src/core/edit/queue.rs b/frontend/rust-lib/flowy-document/src/core/edit/queue.rs index a91a12b175..be61333083 100644 --- a/frontend/rust-lib/flowy-document/src/core/edit/queue.rs +++ b/frontend/rust-lib/flowy-document/src/core/edit/queue.rs @@ -1,8 +1,8 @@ +use crate::{context::DocumentUser, core::RevisionManager}; use async_stream::stream; - use flowy_collaboration::{ document::{history::UndoResult, Document, NewlineDoc}, - entities::revision::Revision, + entities::revision::{RepeatedRevision, RevId, Revision}, errors::CollaborateError, util::make_delta_from_revisions, }; @@ -16,18 +16,24 @@ use std::sync::Arc; use tokio::sync::{mpsc, oneshot, RwLock}; pub(crate) struct EditorCommandQueue { - #[allow(dead_code)] - doc_id: String, document: Arc>, + user: Arc, + rev_manager: Arc, receiver: Option>, } impl EditorCommandQueue { - pub(crate) fn new(doc_id: &str, delta: RichTextDelta, receiver: mpsc::UnboundedReceiver) -> Self { + pub(crate) fn new( + user: Arc, + rev_manager: Arc, + delta: RichTextDelta, + receiver: mpsc::UnboundedReceiver, + ) -> Self { let document = Arc::new(RwLock::new(Document::from_delta(delta))); Self { - doc_id: doc_id.to_owned(), document, + user, + rev_manager, receiver: Some(receiver), } } @@ -43,8 +49,8 @@ impl EditorCommandQueue { } }; stream - .for_each(|msg| async { - match self.handle_message(msg).await { + .for_each(|command| async { + match self.handle_command(command).await { Ok(_) => {}, Err(e) => tracing::debug!("[EditCommandQueue]: {}", e), } @@ -52,30 +58,55 @@ impl EditorCommandQueue { .await; } - async fn handle_message(&self, msg: EditorCommand) -> Result<(), FlowyError> { - match msg { - EditorCommand::ComposeDelta { delta, ret } => { - let fut = || async { - let mut document = self.document.write().await; - let _ = document.compose_delta(delta)?; - let md5 = document.md5(); - drop(document); - - Ok::(md5) - }; - - let _ = ret.send(fut().await); + #[tracing::instrument(level = "debug", skip(self), err)] + async fn handle_command(&self, command: EditorCommand) -> Result<(), FlowyError> { + match command { + EditorCommand::ComposeLocalDelta { delta, ret } => { + let mut document = self.document.write().await; + let _ = document.compose_delta(delta.clone())?; + let md5 = document.md5(); + drop(document); + let _ = self.save_local_delta(delta, md5).await?; + let _ = ret.send(Ok(())); }, - EditorCommand::OverrideDelta { delta, ret } => { - let fut = || async { - let mut document = self.document.write().await; - let _ = document.set_delta(delta); - let md5 = document.md5(); - drop(document); - Ok::(md5) - }; + EditorCommand::ComposeRemoteDelta { + revisions, + client_delta, + server_delta, + ret, + } => { + let mut document = self.document.write().await; + let _ = document.compose_delta(client_delta.clone())?; + let md5 = document.md5(); + for revision in &revisions { + let _ = self.rev_manager.add_remote_revision(revision).await?; + } - let _ = ret.send(fut().await); + let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair(); + let doc_id = self.rev_manager.doc_id.clone(); + let user_id = self.user.user_id()?; + let (client_revision, server_revision) = make_client_and_server_revision( + &doc_id, + &user_id, + base_rev_id, + rev_id, + client_delta, + Some(server_delta), + md5, + ); + let _ = self.rev_manager.add_remote_revision(&client_revision).await?; + let _ = ret.send(Ok(server_revision)); + }, + EditorCommand::OverrideDelta { revisions, delta, ret } => { + let mut document = self.document.write().await; + let _ = document.set_delta(delta); + let md5 = document.md5(); + drop(document); + + let repeated_revision = RepeatedRevision::new(revisions); + assert_eq!(repeated_revision.last().unwrap().md5, md5); + let _ = self.rev_manager.reset_document(repeated_revision).await?; + let _ = ret.send(Ok(())); }, EditorCommand::TransformRevision { revisions, ret } => { let f = || async { @@ -104,13 +135,15 @@ impl EditorCommandQueue { let mut write_guard = self.document.write().await; let delta = write_guard.insert(index, data)?; let md5 = write_guard.md5(); - let _ = ret.send(Ok((delta, md5))); + let _ = self.save_local_delta(delta, md5).await?; + let _ = ret.send(Ok(())); }, EditorCommand::Delete { interval, ret } => { let mut write_guard = self.document.write().await; let delta = write_guard.delete(interval)?; let md5 = write_guard.md5(); - let _ = ret.send(Ok((delta, md5))); + let _ = self.save_local_delta(delta, md5).await?; + let _ = ret.send(Ok(())); }, EditorCommand::Format { interval, @@ -120,13 +153,15 @@ impl EditorCommandQueue { let mut write_guard = self.document.write().await; let delta = write_guard.format(interval, attribute)?; let md5 = write_guard.md5(); - let _ = ret.send(Ok((delta, md5))); + let _ = self.save_local_delta(delta, md5).await?; + let _ = ret.send(Ok(())); }, EditorCommand::Replace { interval, data, ret } => { let mut write_guard = self.document.write().await; let delta = write_guard.replace(interval, data)?; let md5 = write_guard.md5(); - let _ = ret.send(Ok((delta, md5))); + let _ = self.save_local_delta(delta, md5).await?; + let _ = ret.send(Ok(())); }, EditorCommand::CanUndo { ret } => { let _ = ret.send(self.document.read().await.can_undo()); @@ -135,12 +170,18 @@ impl EditorCommandQueue { let _ = ret.send(self.document.read().await.can_redo()); }, EditorCommand::Undo { ret } => { - let result = self.document.write().await.undo(); - let _ = ret.send(result); + let mut write_guard = self.document.write().await; + let UndoResult { delta } = write_guard.undo()?; + let md5 = write_guard.md5(); + let _ = self.save_local_delta(delta, md5).await?; + let _ = ret.send(Ok(())); }, EditorCommand::Redo { ret } => { - let result = self.document.write().await.redo(); - let _ = ret.send(result); + let mut write_guard = self.document.write().await; + let UndoResult { delta } = write_guard.redo()?; + let md5 = write_guard.md5(); + let _ = self.save_local_delta(delta, md5).await?; + let _ = ret.send(Ok(())); }, EditorCommand::ReadDoc { ret } => { let data = self.document.read().await.to_json(); @@ -153,21 +194,62 @@ impl EditorCommandQueue { } Ok(()) } + + async fn save_local_delta(&self, delta: RichTextDelta, md5: String) -> Result { + let delta_data = delta.to_bytes(); + let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair(); + let user_id = self.user.user_id()?; + let revision = Revision::new(&self.rev_manager.doc_id, base_rev_id, rev_id, delta_data, &user_id, md5); + let _ = self.rev_manager.add_local_revision(&revision).await?; + Ok(rev_id.into()) + } +} + +fn make_client_and_server_revision( + doc_id: &str, + user_id: &str, + base_rev_id: i64, + rev_id: i64, + client_delta: RichTextDelta, + server_delta: Option, + md5: DocumentMD5, +) -> (Revision, Option) { + let client_revision = Revision::new( + &doc_id, + base_rev_id, + rev_id, + client_delta.to_bytes(), + &user_id, + md5.clone(), + ); + + match server_delta { + None => (client_revision, None), + Some(server_delta) => { + let server_revision = Revision::new(&doc_id, base_rev_id, rev_id, server_delta.to_bytes(), &user_id, md5); + (client_revision, Some(server_revision)) + }, + } } pub(crate) type Ret = oneshot::Sender>; -pub(crate) type NewDelta = (RichTextDelta, String); pub(crate) type DocumentMD5 = String; -#[allow(dead_code)] pub(crate) enum EditorCommand { - ComposeDelta { + ComposeLocalDelta { delta: RichTextDelta, - ret: Ret, + ret: Ret<()>, + }, + ComposeRemoteDelta { + revisions: Vec, + client_delta: RichTextDelta, + server_delta: RichTextDelta, + ret: Ret>, }, OverrideDelta { + revisions: Vec, delta: RichTextDelta, - ret: Ret, + ret: Ret<()>, }, TransformRevision { revisions: Vec, @@ -176,22 +258,21 @@ pub(crate) enum EditorCommand { Insert { index: usize, data: String, - ret: Ret, + ret: Ret<()>, }, Delete { interval: Interval, - ret: Ret, + ret: Ret<()>, }, Format { interval: Interval, attribute: RichTextAttribute, - ret: Ret, + ret: Ret<()>, }, - Replace { interval: Interval, data: String, - ret: Ret, + ret: Ret<()>, }, CanUndo { ret: oneshot::Sender, @@ -200,10 +281,10 @@ pub(crate) enum EditorCommand { ret: oneshot::Sender, }, Undo { - ret: Ret, + ret: Ret<()>, }, Redo { - ret: Ret, + ret: Ret<()>, }, ReadDoc { ret: Ret, @@ -213,6 +294,28 @@ pub(crate) enum EditorCommand { }, } +impl std::fmt::Debug for EditorCommand { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let s = match self { + EditorCommand::ComposeLocalDelta { .. } => "ComposeLocalDelta", + EditorCommand::ComposeRemoteDelta { .. } => "ComposeRemoteDelta", + EditorCommand::OverrideDelta { .. } => "OverrideDelta", + EditorCommand::TransformRevision { .. } => "TransformRevision", + EditorCommand::Insert { .. } => "Insert", + EditorCommand::Delete { .. } => "Delete", + EditorCommand::Format { .. } => "Format", + EditorCommand::Replace { .. } => "Replace", + EditorCommand::CanUndo { .. } => "CanUndo", + EditorCommand::CanRedo { .. } => "CanRedo", + EditorCommand::Undo { .. } => "Undo", + EditorCommand::Redo { .. } => "Redo", + EditorCommand::ReadDoc { .. } => "ReadDoc", + EditorCommand::ReadDocDelta { .. } => "ReadDocDelta", + }; + f.write_str(s) + } +} + pub(crate) struct TransformDeltas { pub client_prime: RichTextDelta, pub server_prime: Option, diff --git a/frontend/rust-lib/flowy-document/src/core/mod.rs b/frontend/rust-lib/flowy-document/src/core/mod.rs index d2e604915b..1c02fcc4f2 100644 --- a/frontend/rust-lib/flowy-document/src/core/mod.rs +++ b/frontend/rust-lib/flowy-document/src/core/mod.rs @@ -1,6 +1,7 @@ pub mod edit; pub mod revision; mod web_socket; + pub use crate::ws_receivers::*; pub use edit::*; pub use revision::*; diff --git a/frontend/rust-lib/flowy-document/src/core/revision/mod.rs b/frontend/rust-lib/flowy-document/src/core/revision/mod.rs index 19529199f8..0dc61e57bc 100644 --- a/frontend/rust-lib/flowy-document/src/core/revision/mod.rs +++ b/frontend/rust-lib/flowy-document/src/core/revision/mod.rs @@ -2,6 +2,7 @@ mod cache; mod disk; mod manager; mod memory; +mod snapshot; pub use cache::*; pub use manager::*; diff --git a/frontend/rust-lib/flowy-document/src/core/revision/snapshot.rs b/frontend/rust-lib/flowy-document/src/core/revision/snapshot.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/frontend/rust-lib/flowy-document/src/core/revision/snapshot.rs @@ -0,0 +1 @@ + diff --git a/frontend/rust-lib/flowy-document/src/core/web_socket/web_socket.rs b/frontend/rust-lib/flowy-document/src/core/web_socket/web_socket.rs index 67d1590926..3bb6a9b02c 100644 --- a/frontend/rust-lib/flowy-document/src/core/web_socket/web_socket.rs +++ b/frontend/rust-lib/flowy-document/src/core/web_socket/web_socket.rs @@ -1,6 +1,5 @@ use crate::core::{ web_socket::{DocumentWSSinkDataProvider, DocumentWSSteamConsumer, HttpWebSocketManager}, - DocumentMD5, DocumentWSReceiver, DocumentWebSocket, EditorCommand, @@ -20,7 +19,7 @@ use lib_infra::future::FutureResult; use crate::core::web_socket::local_ws_impl::LocalWebSocketManager; use flowy_collaboration::entities::ws::DocumentServerWSDataType; -use lib_ot::rich_text::RichTextDelta; + use lib_ws::WSConnectState; use std::{collections::VecDeque, convert::TryFrom, sync::Arc}; use tokio::sync::{broadcast, mpsc::UnboundedSender, oneshot, RwLock}; @@ -33,7 +32,7 @@ pub(crate) trait DocumentWebSocketManager: Send + Sync { pub(crate) async fn make_document_ws_manager( doc_id: String, user_id: String, - editor_edit_queue: UnboundedSender, + edit_cmd_tx: UnboundedSender, rev_manager: Arc, ws: Arc, ) -> Arc { @@ -42,52 +41,24 @@ pub(crate) async fn make_document_ws_manager( let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { doc_id: doc_id.clone(), user_id: user_id.clone(), - editor_edit_queue: editor_edit_queue.clone(), + edit_cmd_tx, rev_manager: rev_manager.clone(), shared_sink: shared_sink.clone(), }); - let ws_stream_provider = DocumentWSSinkDataProviderAdapter(shared_sink.clone()); + let ws_stream_provider = DocumentWSSinkDataProviderAdapter(shared_sink); let ws_manager = Arc::new(HttpWebSocketManager::new( &doc_id, ws.clone(), Arc::new(ws_stream_provider), ws_stream_consumer, )); - notify_user_has_connected(&user_id, &doc_id, rev_manager.clone(), shared_sink).await; - listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(), rev_manager.clone()); - + listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(), rev_manager); Arc::new(ws_manager) } else { Arc::new(Arc::new(LocalWebSocketManager {})) } } -async fn notify_user_has_connected( - _user_id: &str, - _doc_id: &str, - _rev_manager: Arc, - _shared_sink: Arc, -) { - // let need_notify = match shared_sink.front().await { - // None => true, - // Some(data) => data.ty != DocumentClientWSDataType::UserConnect, - // }; - // - // if need_notify { - // let revision_data: Bytes = - // rev_manager.latest_revision().await.try_into().unwrap(); - // let new_connect = NewDocumentUser { - // user_id: user_id.to_owned(), - // doc_id: doc_id.to_owned(), - // revision_data: revision_data.to_vec(), - // }; - // - // let data = - // DocumentWSDataBuilder::build_new_document_user_message(doc_id, - // new_connect); shared_sink.push_front(data).await; - // } -} - fn listen_document_ws_state( _user_id: &str, _doc_id: &str, @@ -109,22 +80,19 @@ fn listen_document_ws_state( pub(crate) struct DocumentWebSocketSteamConsumerAdapter { pub(crate) doc_id: String, pub(crate) user_id: String, - pub(crate) editor_edit_queue: UnboundedSender, + pub(crate) edit_cmd_tx: UnboundedSender, pub(crate) rev_manager: Arc, pub(crate) shared_sink: Arc, } impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter { fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError> { - let user_id = self.user_id.clone(); let rev_manager = self.rev_manager.clone(); - let edit_cmd_tx = self.editor_edit_queue.clone(); + let edit_cmd_tx = self.edit_cmd_tx.clone(); let shared_sink = self.shared_sink.clone(); let doc_id = self.doc_id.clone(); FutureResult::new(async move { - if let Some(server_composed_revision) = - handle_push_rev(&doc_id, &user_id, edit_cmd_tx, rev_manager, bytes).await? - { + if let Some(server_composed_revision) = handle_remote_revision(edit_cmd_tx, rev_manager, bytes).await? { let data = DocumentClientWSData::from_revisions(&doc_id, vec![server_composed_revision]); shared_sink.push_back(data).await; } @@ -177,58 +145,8 @@ async fn transform_pushed_revisions( Ok(transformed_delta) } -async fn compose_pushed_delta( - delta: RichTextDelta, - edit_cmd: &UnboundedSender, -) -> FlowyResult { - // compose delta - let (ret, rx) = oneshot::channel::>(); - let _ = edit_cmd.send(EditorCommand::ComposeDelta { delta, ret }); - let md5 = rx.await.map_err(internal_error)??; - Ok(md5) -} - -async fn override_client_delta( - delta: RichTextDelta, - edit_cmd: &UnboundedSender, -) -> FlowyResult { - let (ret, rx) = oneshot::channel::>(); - let _ = edit_cmd.send(EditorCommand::OverrideDelta { delta, ret }); - let md5 = rx.await.map_err(internal_error)??; - Ok(md5) -} - -async fn make_client_and_server_revision( - doc_id: &str, - user_id: &str, - base_rev_id: i64, - rev_id: i64, - client_delta: RichTextDelta, - server_delta: Option, - md5: DocumentMD5, -) -> (Revision, Option) { - let client_revision = Revision::new( - &doc_id, - base_rev_id, - rev_id, - client_delta.to_bytes(), - &user_id, - md5.clone(), - ); - - match server_delta { - None => (client_revision, None), - Some(server_delta) => { - let server_revision = Revision::new(&doc_id, base_rev_id, rev_id, server_delta.to_bytes(), &user_id, md5); - (client_revision, Some(server_revision)) - }, - } -} - #[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))] -pub(crate) async fn handle_push_rev( - doc_id: &str, - user_id: &str, +pub(crate) async fn handle_remote_revision( edit_cmd_tx: UnboundedSender, rev_manager: Arc, bytes: Bytes, @@ -259,32 +177,24 @@ pub(crate) async fn handle_push_rev( None => { // The server_prime is None means the client local revisions conflict with the // server, and it needs to override the client delta. - let md5 = override_client_delta(client_prime.clone(), &edit_cmd_tx).await?; - let repeated_revision = RepeatedRevision::new(revisions); - assert_eq!(repeated_revision.last().unwrap().md5, md5); - let _ = rev_manager.reset_document(repeated_revision).await?; + let (ret, rx) = oneshot::channel(); + let _ = edit_cmd_tx.send(EditorCommand::OverrideDelta { + revisions, + delta: client_prime, + ret, + }); + let _ = rx.await.map_err(internal_error)??; Ok(None) }, Some(server_prime) => { - let md5 = compose_pushed_delta(client_prime.clone(), &edit_cmd_tx).await?; - for revision in &revisions { - let _ = rev_manager.add_remote_revision(revision).await?; - } - let (base_rev_id, rev_id) = rev_manager.next_rev_id_pair(); - let (client_revision, server_revision) = make_client_and_server_revision( - doc_id, - user_id, - base_rev_id, - rev_id, - client_prime, - Some(server_prime), - md5, - ) - .await; - - // save the client revision - let _ = rev_manager.add_remote_revision(&client_revision).await?; - Ok(server_revision) + let (ret, rx) = oneshot::channel(); + let _ = edit_cmd_tx.send(EditorCommand::ComposeRemoteDelta { + revisions, + client_delta: client_prime, + server_delta: server_prime, + ret, + }); + Ok(rx.await.map_err(internal_error)??) }, } } diff --git a/shared-lib/flowy-collaboration/src/document/history.rs b/shared-lib/flowy-collaboration/src/document/history.rs index a233263824..06aec82252 100644 --- a/shared-lib/flowy-collaboration/src/document/history.rs +++ b/shared-lib/flowy-collaboration/src/document/history.rs @@ -1,27 +1,17 @@ use lib_ot::rich_text::RichTextDelta; -const MAX_UNDOS: usize = 20; +const MAX_UNDOES: usize = 20; #[derive(Debug, Clone)] pub struct UndoResult { - #[allow(dead_code)] - success: bool, - - #[allow(dead_code)] - len: usize, -} - -impl UndoResult { - pub fn fail() -> Self { UndoResult { success: false, len: 0 } } - - pub fn success(len: usize) -> Self { UndoResult { success: true, len } } + pub delta: RichTextDelta, } #[derive(Debug, Clone)] pub struct History { #[allow(dead_code)] cur_undo: usize, - undos: Vec, + undoes: Vec, redoes: Vec, capacity: usize, } @@ -30,9 +20,9 @@ impl std::default::Default for History { fn default() -> Self { History { cur_undo: 1, - undos: Vec::new(), + undoes: Vec::new(), redoes: Vec::new(), - capacity: MAX_UNDOS, + capacity: MAX_UNDOES, } } } @@ -40,11 +30,11 @@ impl std::default::Default for History { impl History { pub fn new() -> Self { History::default() } - pub fn can_undo(&self) -> bool { !self.undos.is_empty() } + pub fn can_undo(&self) -> bool { !self.undoes.is_empty() } pub fn can_redo(&self) -> bool { !self.redoes.is_empty() } - pub fn add_undo(&mut self, delta: RichTextDelta) { self.undos.push(delta); } + pub fn add_undo(&mut self, delta: RichTextDelta) { self.undoes.push(delta); } pub fn add_redo(&mut self, delta: RichTextDelta) { self.redoes.push(delta); } @@ -56,8 +46,8 @@ impl History { self.redoes.clear(); self.add_undo(delta); - if self.undos.len() > self.capacity { - self.undos.remove(0); + if self.undoes.len() > self.capacity { + self.undoes.remove(0); } } @@ -65,7 +55,7 @@ impl History { if !self.can_undo() { return None; } - let delta = self.undos.pop().unwrap(); + let delta = self.undoes.pop().unwrap(); Some(delta) }