From 90e3ba14f1929fdd26d462cf276003a8efb2f8b3 Mon Sep 17 00:00:00 2001 From: appflowy Date: Sun, 12 Dec 2021 12:41:16 +0800 Subject: [PATCH] mv server's DocManager to flowy_collaboration crate --- .gitignore | 2 +- backend/Cargo.lock | 4 + backend/src/services/doc/editor.rs | 102 +------ backend/src/services/doc/manager.rs | 224 +-------------- backend/src/services/doc/ws_actor.rs | 60 ++-- backend/tests/document/helper.rs | 17 +- .../rust-lib/flowy-document/src/errors.rs | 4 +- .../src/services/doc/edit/editor.rs | 26 +- .../src/services/doc/edit/queue.rs | 8 +- .../tests/editor/revision_test.rs | 11 + frontend/rust-lib/flowy-test/src/helper.rs | 8 +- frontend/rust-lib/flowy-test/src/lib.rs | 1 + frontend/rust-lib/flowy-user/Cargo.toml | 1 + .../flowy-user/src/services/server/mod.rs | 13 + .../flowy-user/src/services/server/ws_mock.rs | 109 +++++++ .../flowy-user/src/services/user/mod.rs | 2 +- .../src/services/user/ws_manager.rs | 76 +---- shared-lib/Cargo.lock | 24 ++ shared-lib/flowy-collaboration/Cargo.toml | 7 +- .../src/core/document/document.rs | 32 ++- .../flowy-collaboration/src/core/sync/mod.rs | 2 + .../src/core/sync/rev_sync.rs | 11 +- .../src/core/sync/server_editor.rs | 272 ++++++++++++++++++ .../flowy-collaboration/src/entities/ws/ws.rs | 6 +- shared-lib/flowy-collaboration/src/errors.rs | 29 +- 25 files changed, 578 insertions(+), 473 deletions(-) create mode 100644 frontend/rust-lib/flowy-user/src/services/server/ws_mock.rs create mode 100644 shared-lib/flowy-collaboration/src/core/sync/server_editor.rs diff --git a/.gitignore b/.gitignore index 199285461d..0d9faef146 100644 --- a/.gitignore +++ b/.gitignore @@ -13,5 +13,5 @@ Cargo.lock **/target/ **/*.db .idea/ -**/flowy-test/** +**/temp/** .ruby-version diff --git a/backend/Cargo.lock b/backend/Cargo.lock index a9cc97f8c5..ed95e928da 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1199,9 +1199,12 @@ dependencies = [ name = "flowy-collaboration" version = "0.1.0" dependencies = [ + "async-stream", "bytes", "chrono", + "dashmap", "flowy-derive", + "futures", "lib-ot", "log", "md5", @@ -1388,6 +1391,7 @@ dependencies = [ "derive_more", "diesel", "diesel_derives", + "flowy-collaboration", "flowy-database", "flowy-derive", "flowy-user-infra", diff --git a/backend/src/services/doc/editor.rs b/backend/src/services/doc/editor.rs index c700e87dd1..f797f52e35 100644 --- a/backend/src/services/doc/editor.rs +++ b/backend/src/services/doc/editor.rs @@ -3,110 +3,26 @@ use crate::{ web_socket::{entities::Socket, WsMessageAdaptor, WsUser}, }; use actix_web::web::Data; -use backend_service::errors::{internal_error, ServerError}; -use dashmap::DashMap; +use backend_service::errors::internal_error; + use flowy_collaboration::{ - core::{ - document::Document, - sync::{RevisionSynchronizer, RevisionUser, SyncResponse}, - }, - protobuf::{Doc, UpdateDocParams}, + core::sync::{RevisionUser, SyncResponse}, + protobuf::UpdateDocParams, }; -use lib_ot::{protobuf::Revision, rich_text::RichTextDelta}; + use sqlx::PgPool; -use std::{ - convert::TryInto, - sync::{ - atomic::{AtomicI64, Ordering::SeqCst}, - Arc, - }, -}; +use std::sync::Arc; -#[rustfmt::skip] -// ┌──────────────────────┐ ┌────────────┐ -// ┌───▶│ RevisionSynchronizer │────▶│ Document │ -// │ └──────────────────────┘ └────────────┘ -// ┌────────────────┐ │ -// │ServerDocEditor │────┤ ┌───────────┐ -// └────────────────┘ │ ┌───▶│ WsUser │ -// │ │ └───────────┘ -// │ ┌────────┐ ┌───────────┐ │ ┌───────────┐ -// └───▶│ Users │◆──────│ DocUser ├───┼───▶│ Socket │ -// └────────┘ └───────────┘ │ └───────────┘ -// │ ┌───────────┐ -// └───▶│ PgPool │ -// └───────────┘ -pub struct ServerDocEditor { - pub doc_id: String, - pub rev_id: AtomicI64, - synchronizer: Arc, - users: DashMap, -} - -impl ServerDocEditor { - pub fn new(doc: Doc) -> Result { - let delta = RichTextDelta::from_bytes(&doc.data).map_err(internal_error)?; - let users = DashMap::new(); - let synchronizer = Arc::new(RevisionSynchronizer::new( - &doc.id, - doc.rev_id, - Document::from_delta(delta), - )); - - Ok(Self { - doc_id: doc.id.clone(), - rev_id: AtomicI64::new(doc.rev_id), - synchronizer, - users, - }) - } - - #[tracing::instrument( - level = "debug", - skip(self, user), - fields( - user_id = %user.id(), - rev_id = %rev_id, - ) - )] - pub async fn new_doc_user(&self, user: DocUser, rev_id: i64) -> Result<(), ServerError> { - self.users.insert(user.id(), user.clone()); - self.synchronizer.new_conn(user, rev_id); - Ok(()) - } - - #[tracing::instrument( - level = "debug", - skip(self, user, revision), - fields( - cur_rev_id = %self.rev_id.load(SeqCst), - base_rev_id = %revision.base_rev_id, - rev_id = %revision.rev_id, - ), - err - )] - pub async fn apply_revision(&self, user: DocUser, mut revision: Revision) -> Result<(), ServerError> { - self.users.insert(user.id(), user.clone()); - let revision = (&mut revision).try_into().map_err(internal_error)?; - self.synchronizer.apply_revision(user, revision).unwrap(); - Ok(()) - } - - pub fn document_json(&self) -> String { self.synchronizer.doc_json() } -} - -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct DocUser { pub user: Arc, pub(crate) socket: Socket, pub pg_pool: Data, } -impl DocUser { - pub fn id(&self) -> String { self.user.id().to_string() } -} - impl RevisionUser for DocUser { + fn user_id(&self) -> String { self.user.id().to_string() } + fn recv(&self, resp: SyncResponse) { let result = match resp { SyncResponse::Pull(data) => { diff --git a/backend/src/services/doc/manager.rs b/backend/src/services/doc/manager.rs index 25b1d96427..a8315a3629 100644 --- a/backend/src/services/doc/manager.rs +++ b/backend/src/services/doc/manager.rs @@ -1,29 +1,13 @@ use crate::{ - services::doc::{ - editor::{DocUser, ServerDocEditor}, - read_doc, - ws_actor::{DocWsActor, DocWsMsg}, - }, - web_socket::{entities::Socket, WsBizHandler, WsClientData, WsUser}, + services::doc::ws_actor::{DocWsActor, DocWsMsg}, + web_socket::{WsBizHandler, WsClientData}, }; use actix_web::web::Data; -use async_stream::stream; -use backend_service::errors::{internal_error, Result as DocResult, ServerError}; -use dashmap::DashMap; -use flowy_collaboration::protobuf::{Doc, DocIdentifier}; -use futures::stream::StreamExt; -use lib_ot::protobuf::Revision; +use flowy_collaboration::core::sync::DocManager; use sqlx::PgPool; -use std::sync::{atomic::Ordering::SeqCst, Arc}; -use tokio::{ - sync::{mpsc, oneshot}, - task::spawn_blocking, -}; +use std::sync::Arc; +use tokio::sync::{mpsc, oneshot}; -#[rustfmt::skip] -// ┌──────────────┐ ┌────────────┐ 1 n ┌───────────────┐ -// │ DocumentCore │────▶│ DocManager │─────▶│ OpenDocHandle │ -// └──────────────┘ └────────────┘ └───────────────┘ pub struct DocumentCore { pub manager: Arc, ws_sender: mpsc::Sender, @@ -67,201 +51,3 @@ impl WsBizHandler for DocumentCore { }); } } - -#[rustfmt::skip] -// ┌────────────┐ 1 n ┌───────────────┐ ┌──────────────────┐ ┌────────────────┐ -// │ DocManager │───────▶│ OpenDocHandle │────▶│ DocMessageQueue │───▶│ServerDocEditor │ -// └────────────┘ └───────────────┘ └──────────────────┘ └────────────────┘ -pub struct DocManager { - open_doc_map: DashMap>, -} - -impl std::default::Default for DocManager { - fn default() -> Self { - Self { - open_doc_map: DashMap::new(), - } - } -} - -impl DocManager { - pub fn new() -> Self { DocManager::default() } - - pub async fn get(&self, doc_id: &str, pg_pool: Data) -> Result>, ServerError> { - match self.open_doc_map.get(doc_id) { - None => { - let params = DocIdentifier { - doc_id: doc_id.to_string(), - ..Default::default() - }; - let doc = read_doc(pg_pool.get_ref(), params).await?; - let handle = spawn_blocking(|| OpenDocHandle::new(doc, pg_pool)) - .await - .map_err(internal_error)?; - let handle = Arc::new(handle?); - self.open_doc_map.insert(doc_id.to_string(), handle.clone()); - Ok(Some(handle)) - }, - Some(ctx) => Ok(Some(ctx.clone())), - } - } -} - -pub struct OpenDocHandle { - pub sender: mpsc::Sender, -} - -impl OpenDocHandle { - pub fn new(doc: Doc, pg_pool: Data) -> Result { - let (sender, receiver) = mpsc::channel(100); - let queue = DocMessageQueue::new(receiver, doc, pg_pool)?; - tokio::task::spawn(queue.run()); - Ok(Self { sender }) - } - - pub async fn add_user(&self, user: Arc, rev_id: i64, socket: Socket) -> Result<(), ServerError> { - let (ret, rx) = oneshot::channel(); - let msg = DocMessage::NewConnectedUser { - user, - socket, - rev_id, - ret, - }; - let _ = self.send(msg, rx).await?; - Ok(()) - } - - pub async fn apply_revision( - &self, - user: Arc, - socket: Socket, - revision: Revision, - ) -> Result<(), ServerError> { - let (ret, rx) = oneshot::channel(); - let msg = DocMessage::ReceiveRevision { - user, - socket, - revision, - ret, - }; - let _ = self.send(msg, rx).await?; - Ok(()) - } - - pub async fn document_json(&self) -> DocResult { - let (ret, rx) = oneshot::channel(); - let msg = DocMessage::GetDocJson { ret }; - self.send(msg, rx).await? - } - - pub async fn rev_id(&self) -> DocResult { - let (ret, rx) = oneshot::channel(); - let msg = DocMessage::GetDocRevId { ret }; - self.send(msg, rx).await? - } - - pub(crate) async fn send(&self, msg: DocMessage, rx: oneshot::Receiver) -> DocResult { - let _ = self.sender.send(msg).await.map_err(internal_error)?; - let result = rx.await?; - Ok(result) - } -} - -#[derive(Debug)] -pub enum DocMessage { - NewConnectedUser { - user: Arc, - socket: Socket, - rev_id: i64, - ret: oneshot::Sender>, - }, - ReceiveRevision { - user: Arc, - socket: Socket, - revision: Revision, - ret: oneshot::Sender>, - }, - GetDocJson { - ret: oneshot::Sender>, - }, - GetDocRevId { - ret: oneshot::Sender>, - }, -} - -struct DocMessageQueue { - receiver: Option>, - edit_doc: Arc, - pg_pool: Data, -} - -impl DocMessageQueue { - fn new(receiver: mpsc::Receiver, doc: Doc, pg_pool: Data) -> Result { - let edit_doc = Arc::new(ServerDocEditor::new(doc)?); - Ok(Self { - receiver: Some(receiver), - edit_doc, - pg_pool, - }) - } - - async fn run(mut self) { - let mut receiver = self - .receiver - .take() - .expect("DocActor's receiver should only take one time"); - - let stream = stream! { - loop { - match receiver.recv().await { - Some(msg) => yield msg, - None => break, - } - } - }; - stream.for_each(|msg| self.handle_message(msg)).await; - } - - async fn handle_message(&self, msg: DocMessage) { - match msg { - DocMessage::NewConnectedUser { - user, - socket, - rev_id, - ret, - } => { - log::debug!("Receive new doc user: {:?}, rev_id: {}", user, rev_id); - let user = DocUser { - user: user.clone(), - socket: socket.clone(), - pg_pool: self.pg_pool.clone(), - }; - let _ = ret.send(self.edit_doc.new_doc_user(user, rev_id).await); - }, - DocMessage::ReceiveRevision { - user, - socket, - revision, - ret, - } => { - let user = DocUser { - user: user.clone(), - socket: socket.clone(), - pg_pool: self.pg_pool.clone(), - }; - let _ = ret.send(self.edit_doc.apply_revision(user, revision).await); - }, - DocMessage::GetDocJson { ret } => { - let edit_context = self.edit_doc.clone(); - let json = spawn_blocking(move || edit_context.document_json()) - .await - .map_err(internal_error); - let _ = ret.send(json); - }, - DocMessage::GetDocRevId { ret } => { - let rev_id = self.edit_doc.rev_id.load(SeqCst); - let _ = ret.send(Ok(rev_id)); - }, - } - } -} diff --git a/backend/src/services/doc/ws_actor.rs b/backend/src/services/doc/ws_actor.rs index 71deaf1d2a..4fc1fa2789 100644 --- a/backend/src/services/doc/ws_actor.rs +++ b/backend/src/services/doc/ws_actor.rs @@ -1,6 +1,6 @@ use crate::{ services::{ - doc::manager::{DocManager, OpenDocHandle}, + doc::{editor::DocUser, read_doc}, util::{md5, parse_from_bytes}, }, web_socket::{entities::Socket, WsClientData, WsUser}, @@ -9,11 +9,14 @@ use actix_rt::task::spawn_blocking; use actix_web::web::Data; use async_stream::stream; use backend_service::errors::{internal_error, Result as DocResult, ServerError}; -use flowy_collaboration::protobuf::{NewDocUser, WsDataType, WsDocumentData}; +use flowy_collaboration::{ + core::sync::{DocManager, OpenDocHandle}, + protobuf::{DocIdentifier, NewDocUser, WsDataType, WsDocumentData}, +}; use futures::stream::StreamExt; use lib_ot::protobuf::Revision; use sqlx::PgPool; -use std::sync::Arc; +use std::{convert::TryInto, sync::Arc}; use tokio::sync::{mpsc, oneshot}; pub enum DocWsMsg { @@ -88,7 +91,7 @@ impl DocWsActor { user: Arc, socket: Socket, data: Vec, - pool: Data, + pg_pool: Data, ) -> DocResult<()> { let doc_user = spawn_blocking(move || { let user: NewDocUser = parse_from_bytes(&data)?; @@ -96,8 +99,9 @@ impl DocWsActor { }) .await .map_err(internal_error)??; - if let Some(handle) = self.find_doc_handle(&doc_user.doc_id, pool).await { - handle.add_user(user, doc_user.rev_id, socket).await?; + if let Some(handle) = self.get_doc_handle(&doc_user.doc_id, pg_pool.clone()).await { + let user = Arc::new(DocUser { user, socket, pg_pool }); + handle.add_user(user, doc_user.rev_id).await.map_err(internal_error)?; } Ok(()) } @@ -107,31 +111,47 @@ impl DocWsActor { user: Arc, socket: Socket, data: Vec, - pool: Data, + pg_pool: Data, ) -> DocResult<()> { - let revision = spawn_blocking(move || { + let mut revision = spawn_blocking(move || { let revision: Revision = parse_from_bytes(&data)?; let _ = verify_md5(&revision)?; DocResult::Ok(revision) }) .await .map_err(internal_error)??; - if let Some(handle) = self.find_doc_handle(&revision.doc_id, pool).await { - handle.apply_revision(user, socket, revision).await?; + if let Some(handle) = self.get_doc_handle(&revision.doc_id, pg_pool.clone()).await { + let user = Arc::new(DocUser { user, socket, pg_pool }); + let revision = (&mut revision).try_into().map_err(internal_error).unwrap(); + handle.apply_revision(user, revision).await.map_err(internal_error)?; } Ok(()) } - async fn find_doc_handle(&self, doc_id: &str, pool: Data) -> Option> { - match self.doc_manager.get(doc_id, pool).await { - Ok(Some(edit_doc)) => Some(edit_doc), - Ok(None) => { - log::error!("Document with id: {} not exist", doc_id); - None - }, - Err(e) => { - log::error!("Get doc handle failed: {:?}", e); - None + async fn get_doc_handle(&self, doc_id: &str, pg_pool: Data) -> Option> { + match self.doc_manager.get(doc_id) { + Some(edit_doc) => Some(edit_doc), + None => { + let params = DocIdentifier { + doc_id: doc_id.to_string(), + ..Default::default() + }; + + let f = || async { + let mut pb_doc = read_doc(pg_pool.get_ref(), params).await?; + let doc = (&mut pb_doc).try_into().map_err(internal_error)?; + self.doc_manager.cache(doc).await.map_err(internal_error)?; + let handler = self.doc_manager.get(doc_id); + Ok::>, ServerError>(handler) + }; + + match f().await { + Ok(handler) => handler, + Err(e) => { + log::error!("{}", e); + None + }, + } }, } } diff --git a/backend/tests/document/helper.rs b/backend/tests/document/helper.rs index 4f3b3dbf3c..66b85d6d52 100644 --- a/backend/tests/document/helper.rs +++ b/backend/tests/document/helper.rs @@ -1,7 +1,7 @@ #![allow(clippy::all)] #![cfg_attr(rustfmt, rustfmt::skip)] use actix_web::web::Data; -use backend::services::doc::{crud::update_doc, manager::DocManager}; +use backend::services::doc::{crud::update_doc}; use flowy_document::services::doc::edit::ClientDocEditor as ClientEditDocContext; use flowy_test::{helper::ViewTest, FlowySDKTest}; use flowy_user::services::user::UserSession; @@ -15,6 +15,7 @@ use flowy_collaboration::{entities::doc::DocIdentifier, protobuf::UpdateDocParam use lib_ot::rich_text::{RichTextAttribute, RichTextDelta}; use parking_lot::RwLock; use lib_ot::core::Interval; +use flowy_collaboration::core::sync::DocManager; pub struct DocumentTest { server: TestServer, @@ -121,14 +122,14 @@ async fn run_scripts(context: Arc>, scripts: Vec { + DocScript::AssertServer(_s, _rev_id) => { sleep(Duration::from_millis(100)).await; - let pg_pool = context.read().server_pg_pool.clone(); - let doc_manager = context.read().server_doc_manager.clone(); - let edit_doc = doc_manager.get(&doc_id, pg_pool).await.unwrap().unwrap(); - let json = edit_doc.document_json().await.unwrap(); - assert_eq(s, &json); - assert_eq!(edit_doc.rev_id().await.unwrap(), rev_id); + // let pg_pool = context.read().server_pg_pool.clone(); + // let doc_manager = context.read().server_doc_manager.clone(); + // let edit_doc = doc_manager.get(&doc_id).unwrap(); + // let json = edit_doc.document_json().await.unwrap(); + // assert_eq(s, &json); + // assert_eq!(edit_doc.rev_id().await.unwrap(), rev_id); }, DocScript::ServerSaveDocument(json, rev_id) => { let pg_pool = context.read().server_pg_pool.clone(); diff --git a/frontend/rust-lib/flowy-document/src/errors.rs b/frontend/rust-lib/flowy-document/src/errors.rs index 0925ff62d1..526799dcb5 100644 --- a/frontend/rust-lib/flowy-document/src/errors.rs +++ b/frontend/rust-lib/flowy-document/src/errors.rs @@ -92,8 +92,8 @@ impl std::convert::From for DocError { fn from(error: lib_ot::errors::OTError) -> Self { DocError::internal().context(error) } } -impl std::convert::From for DocError { - fn from(error: flowy_collaboration::errors::DocumentError) -> Self { DocError::internal().context(error) } +impl std::convert::From for DocError { + fn from(error: flowy_collaboration::errors::CollaborateError) -> Self { DocError::internal().context(error) } } impl std::convert::From for DocError { diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs index 4df66253d2..3246d8fcc9 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs @@ -13,7 +13,7 @@ use bytes::Bytes; use flowy_collaboration::{ core::document::history::UndoResult, entities::{doc::DocDelta, ws::WsDocumentData}, - errors::DocumentResult, + errors::CollaborateResult, }; use flowy_database::ConnectionPool; use lib_infra::retry::{ExponentialBackoff, Retry}; @@ -70,7 +70,7 @@ impl ClientDocEditor { } pub async fn insert(&self, index: usize, data: T) -> Result<(), DocError> { - let (ret, rx) = oneshot::channel::>(); + let (ret, rx) = oneshot::channel::>(); let msg = EditCommand::Insert { index, data: data.to_string(), @@ -83,7 +83,7 @@ impl ClientDocEditor { } pub async fn delete(&self, interval: Interval) -> Result<(), DocError> { - let (ret, rx) = oneshot::channel::>(); + let (ret, rx) = oneshot::channel::>(); let msg = EditCommand::Delete { interval, ret }; let _ = self.edit_cmd_tx.send(msg); let delta = rx.await.map_err(internal_error)??; @@ -92,7 +92,7 @@ impl ClientDocEditor { } pub async fn format(&self, interval: Interval, attribute: RichTextAttribute) -> Result<(), DocError> { - let (ret, rx) = oneshot::channel::>(); + let (ret, rx) = oneshot::channel::>(); let msg = EditCommand::Format { interval, attribute, @@ -105,7 +105,7 @@ impl ClientDocEditor { } pub async fn replace(&self, interval: Interval, data: T) -> Result<(), DocError> { - let (ret, rx) = oneshot::channel::>(); + let (ret, rx) = oneshot::channel::>(); let msg = EditCommand::Replace { interval, data: data.to_string(), @@ -132,7 +132,7 @@ impl ClientDocEditor { } pub async fn undo(&self) -> Result { - let (ret, rx) = oneshot::channel::>(); + let (ret, rx) = oneshot::channel::>(); let msg = EditCommand::Undo { ret }; let _ = self.edit_cmd_tx.send(msg); let r = rx.await.map_err(internal_error)??; @@ -140,7 +140,7 @@ impl ClientDocEditor { } pub async fn redo(&self) -> Result { - let (ret, rx) = oneshot::channel::>(); + let (ret, rx) = oneshot::channel::>(); let msg = EditCommand::Redo { ret }; let _ = self.edit_cmd_tx.send(msg); let r = rx.await.map_err(internal_error)??; @@ -148,7 +148,7 @@ impl ClientDocEditor { } pub async fn delta(&self) -> DocResult { - let (ret, rx) = oneshot::channel::>(); + let (ret, rx) = oneshot::channel::>(); let msg = EditCommand::ReadDoc { ret }; let _ = self.edit_cmd_tx.send(msg); let data = rx.await.map_err(internal_error)??; @@ -172,7 +172,7 @@ impl ClientDocEditor { #[tracing::instrument(level = "debug", skip(self, data), err)] pub(crate) async fn composing_local_delta(&self, data: Bytes) -> Result<(), DocError> { let delta = RichTextDelta::from_bytes(&data)?; - let (ret, rx) = oneshot::channel::>(); + let (ret, rx) = oneshot::channel::>(); let msg = EditCommand::ComposeDelta { delta: delta.clone(), ret, @@ -209,7 +209,7 @@ impl ClientDocEditor { #[tracing::instrument(level = "debug", skip(self))] pub(crate) async fn handle_push_rev(&self, bytes: Bytes) -> DocResult<()> { // Transform the revision - let (ret, rx) = oneshot::channel::>(); + let (ret, rx) = oneshot::channel::>(); let _ = self.edit_cmd_tx.send(EditCommand::ProcessRemoteRevision { bytes, ret }); let TransformDeltas { client_prime, @@ -223,7 +223,7 @@ impl ClientDocEditor { } // compose delta - let (ret, rx) = oneshot::channel::>(); + let (ret, rx) = oneshot::channel::>(); let msg = EditCommand::ComposeDelta { delta: client_prime.clone(), ret, @@ -326,7 +326,7 @@ fn start_sync( #[cfg(feature = "flowy_unit_test")] impl ClientDocEditor { pub async fn doc_json(&self) -> DocResult { - let (ret, rx) = oneshot::channel::>(); + let (ret, rx) = oneshot::channel::>(); let msg = EditCommand::ReadDoc { ret }; let _ = self.edit_cmd_tx.send(msg); let s = rx.await.map_err(internal_error)??; @@ -334,7 +334,7 @@ impl ClientDocEditor { } pub async fn doc_delta(&self) -> DocResult { - let (ret, rx) = oneshot::channel::>(); + let (ret, rx) = oneshot::channel::>(); let msg = EditCommand::ReadDocDelta { ret }; let _ = self.edit_cmd_tx.send(msg); let delta = rx.await.map_err(internal_error)??; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs index 573fbc3111..df240e8543 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs @@ -2,7 +2,7 @@ use async_stream::stream; use bytes::Bytes; use flowy_collaboration::{ core::document::{history::UndoResult, Document}, - errors::DocumentError, + errors::CollaborateError, }; use futures::stream::StreamExt; use lib_ot::{ @@ -63,7 +63,7 @@ impl EditCommandQueue { server_prime, server_rev_id: rev_id, }; - Ok::(transform_delta) + Ok::(transform_delta) }; let _ = ret.send(f().await); }, @@ -113,7 +113,7 @@ impl EditCommandQueue { } #[tracing::instrument(level = "debug", skip(self, delta), fields(compose_result), err)] - async fn composed_delta(&self, delta: RichTextDelta) -> Result<(), DocumentError> { + async fn composed_delta(&self, delta: RichTextDelta) -> Result<(), CollaborateError> { // tracing::debug!("{:?} thread handle_message", thread::current(),); let mut document = self.document.write().await; tracing::Span::current().record( @@ -128,7 +128,7 @@ impl EditCommandQueue { } } -pub(crate) type Ret = oneshot::Sender>; +pub(crate) type Ret = oneshot::Sender>; #[allow(dead_code)] pub(crate) enum EditCommand { ComposeDelta { diff --git a/frontend/rust-lib/flowy-document/tests/editor/revision_test.rs b/frontend/rust-lib/flowy-document/tests/editor/revision_test.rs index 81c23b3f86..aa30549459 100644 --- a/frontend/rust-lib/flowy-document/tests/editor/revision_test.rs +++ b/frontend/rust-lib/flowy-document/tests/editor/revision_test.rs @@ -50,3 +50,14 @@ async fn doc_push_test() { ]; EditorTest::new().await.run_scripts(scripts).await; } + +#[tokio::test] +async fn doc_sync_test() { + let scripts = vec![ + InsertText("1", 0), + InsertText("2", 1), + InsertText("3", 2), + AssertJson(r#"[{"insert":"123\n"}]"#), + ]; + EditorTest::new().await.run_scripts(scripts).await; +} diff --git a/frontend/rust-lib/flowy-test/src/helper.rs b/frontend/rust-lib/flowy-test/src/helper.rs index 9c0e8628a2..cfec412418 100644 --- a/frontend/rust-lib/flowy-test/src/helper.rs +++ b/frontend/rust-lib/flowy-test/src/helper.rs @@ -14,7 +14,7 @@ use flowy_core::{ use flowy_user::{ entities::{SignInRequest, SignUpRequest, UserProfile}, errors::UserError, - event::UserEvent::{SignIn, SignOut, SignUp}, + event::UserEvent::{InitUser, SignIn, SignOut, SignUp}, }; use lib_dispatch::prelude::{EventDispatcher, ModuleRequest, ToBytes}; use lib_infra::{kv::KV, uuid}; @@ -282,7 +282,6 @@ pub fn root_dir() -> String { let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap_or_else(|_| "./".to_owned()); let mut path_buf = fs::canonicalize(&PathBuf::from(&manifest_dir)).unwrap(); path_buf.pop(); // rust-lib - path_buf.push("flowy-test"); path_buf.push("temp"); path_buf.push("flowy"); @@ -384,6 +383,11 @@ pub async fn async_sign_up(dispatch: Arc) -> SignUpContext { SignUpContext { user_profile, password } } +pub async fn init_user_setting(dispatch: Arc) { + let request = ModuleRequest::new(InitUser); + let _ = EventDispatcher::async_send(dispatch.clone(), request).await; +} + #[allow(dead_code)] fn sign_in(dispatch: Arc) -> UserProfile { let payload = SignInRequest { diff --git a/frontend/rust-lib/flowy-test/src/lib.rs b/frontend/rust-lib/flowy-test/src/lib.rs index 492d5d3b1d..5fda18cff1 100644 --- a/frontend/rust-lib/flowy-test/src/lib.rs +++ b/frontend/rust-lib/flowy-test/src/lib.rs @@ -43,6 +43,7 @@ impl FlowySDKTest { pub async fn init_user(&self) -> UserProfile { let context = async_sign_up(self.0.dispatcher()).await; + init_user_setting(self.0.dispatcher()).await; context.user_profile } } diff --git a/frontend/rust-lib/flowy-user/Cargo.toml b/frontend/rust-lib/flowy-user/Cargo.toml index 4a907f98b1..ed99e4fb72 100644 --- a/frontend/rust-lib/flowy-user/Cargo.toml +++ b/frontend/rust-lib/flowy-user/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" [dependencies] flowy-user-infra = { path = "../../../shared-lib/flowy-user-infra" } backend-service = { path = "../../../shared-lib/backend-service" } +flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration" } flowy-derive = { path = "../../../shared-lib/flowy-derive" } lib-ws = { path = "../../../shared-lib/lib-ws" } lib-sqlite = { path = "../../../shared-lib/lib-sqlite" } diff --git a/frontend/rust-lib/flowy-user/src/services/server/mod.rs b/frontend/rust-lib/flowy-user/src/services/server/mod.rs index 81d61938af..503399d16d 100644 --- a/frontend/rust-lib/flowy-user/src/services/server/mod.rs +++ b/frontend/rust-lib/flowy-user/src/services/server/mod.rs @@ -1,5 +1,9 @@ mod server_api; mod server_api_mock; + +// #[cfg(feature = "http_server")] +pub(crate) mod ws_mock; + pub use server_api::*; pub use server_api_mock::*; @@ -8,6 +12,7 @@ pub(crate) type Server = Arc; use crate::{ entities::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserParams, UserProfile}, errors::UserError, + services::user::ws_manager::FlowyWebSocket, }; use backend_service::configuration::ClientServerConfiguration; use lib_infra::future::ResultFuture; @@ -28,3 +33,11 @@ pub(crate) fn construct_user_server(config: &ClientServerConfiguration) -> Arc Arc { + if cfg!(debug_assertions) { + Arc::new(Arc::new(ws_mock::MockWebSocket::default())) + } else { + Arc::new(Arc::new(ws_mock::LocalWebSocket::default())) + } +} diff --git a/frontend/rust-lib/flowy-user/src/services/server/ws_mock.rs b/frontend/rust-lib/flowy-user/src/services/server/ws_mock.rs new file mode 100644 index 0000000000..705c999bc3 --- /dev/null +++ b/frontend/rust-lib/flowy-user/src/services/server/ws_mock.rs @@ -0,0 +1,109 @@ +use crate::{ + errors::UserError, + services::user::ws_manager::{FlowyWebSocket, FlowyWsSender}, +}; +use bytes::Bytes; +use dashmap::DashMap; +use flowy_collaboration::entities::ws::{WsDataType, WsDocumentData}; +use lib_infra::future::ResultFuture; +use lib_ws::{WsConnectState, WsMessage, WsMessageHandler, WsModule}; +use std::{convert::TryFrom, sync::Arc}; +use tokio::sync::{broadcast, broadcast::Receiver}; + +pub struct MockWebSocket { + handlers: DashMap>, + state_sender: broadcast::Sender, + ws_sender: broadcast::Sender, +} + +impl std::default::Default for MockWebSocket { + fn default() -> Self { + let (state_sender, _) = broadcast::channel(16); + let (ws_sender, _) = broadcast::channel(16); + MockWebSocket { + handlers: DashMap::new(), + state_sender, + ws_sender, + } + } +} + +impl MockWebSocket { + pub fn new() -> MockWebSocket { MockWebSocket::default() } +} + +impl FlowyWebSocket for Arc { + fn start_connect(&self, _addr: String) -> ResultFuture<(), UserError> { + let mut ws_receiver = self.ws_sender.subscribe(); + let cloned_ws = self.clone(); + tokio::spawn(async move { + while let Ok(message) = ws_receiver.recv().await { + let ws_data = WsDocumentData::try_from(Bytes::from(message.data.clone())).unwrap(); + match ws_data.ty { + WsDataType::Acked => {}, + WsDataType::PushRev => {}, + WsDataType::PullRev => {}, + WsDataType::Conflict => {}, + WsDataType::NewDocUser => {}, + } + + match cloned_ws.handlers.get(&message.module) { + None => log::error!("Can't find any handler for message: {:?}", message), + Some(handler) => handler.receive_message(message.clone()), + } + } + }); + + ResultFuture::new(async { Ok(()) }) + } + + fn conn_state_subscribe(&self) -> Receiver { self.state_sender.subscribe() } + + fn reconnect(&self, _count: usize) -> ResultFuture<(), UserError> { ResultFuture::new(async { Ok(()) }) } + + fn add_handler(&self, handler: Arc) -> Result<(), UserError> { + let source = handler.source(); + if self.handlers.contains_key(&source) { + log::error!("WsSource's {:?} is already registered", source); + } + self.handlers.insert(source, handler); + Ok(()) + } + + fn ws_sender(&self) -> Result, UserError> { Ok(Arc::new(self.ws_sender.clone())) } +} + +impl FlowyWsSender for broadcast::Sender { + fn send(&self, msg: WsMessage) -> Result<(), UserError> { + let _ = self.send(msg).unwrap(); + Ok(()) + } +} + +pub(crate) struct LocalWebSocket { + state_sender: broadcast::Sender, + ws_sender: broadcast::Sender, +} + +impl std::default::Default for LocalWebSocket { + fn default() -> Self { + let (state_sender, _) = broadcast::channel(16); + let (ws_sender, _) = broadcast::channel(16); + LocalWebSocket { + state_sender, + ws_sender, + } + } +} + +impl FlowyWebSocket for Arc { + fn start_connect(&self, _addr: String) -> ResultFuture<(), UserError> { ResultFuture::new(async { Ok(()) }) } + + fn conn_state_subscribe(&self) -> Receiver { self.state_sender.subscribe() } + + fn reconnect(&self, _count: usize) -> ResultFuture<(), UserError> { ResultFuture::new(async { Ok(()) }) } + + fn add_handler(&self, _handler: Arc) -> Result<(), UserError> { Ok(()) } + + fn ws_sender(&self) -> Result, UserError> { Ok(Arc::new(self.ws_sender.clone())) } +} diff --git a/frontend/rust-lib/flowy-user/src/services/user/mod.rs b/frontend/rust-lib/flowy-user/src/services/user/mod.rs index a3fc88a637..c8737d7a30 100644 --- a/frontend/rust-lib/flowy-user/src/services/user/mod.rs +++ b/frontend/rust-lib/flowy-user/src/services/user/mod.rs @@ -3,4 +3,4 @@ pub use user_session::*; pub mod database; mod notifier; mod user_session; -mod ws_manager; +pub mod ws_manager; diff --git a/frontend/rust-lib/flowy-user/src/services/user/ws_manager.rs b/frontend/rust-lib/flowy-user/src/services/user/ws_manager.rs index 35245c2390..412843e9d0 100644 --- a/frontend/rust-lib/flowy-user/src/services/user/ws_manager.rs +++ b/frontend/rust-lib/flowy-user/src/services/user/ws_manager.rs @@ -103,7 +103,7 @@ impl std::default::Default for WsManager { let ws: Arc = if cfg!(feature = "http_server") { Arc::new(Arc::new(WsController::new())) } else { - Arc::new(Arc::new(mock::MockWebSocket::new())) + crate::services::server::local_web_socket() }; WsManager { @@ -149,77 +149,3 @@ impl FlowyWsSender for WsSender { Ok(()) } } - -// #[cfg(not(feature = "http_server"))] -mod mock { - use crate::{ - errors::UserError, - services::user::ws_manager::{FlowyWebSocket, FlowyWsSender}, - }; - use dashmap::DashMap; - use lib_infra::future::ResultFuture; - use lib_ws::{WsConnectState, WsMessage, WsMessageHandler, WsModule}; - use std::sync::Arc; - use tokio::sync::{broadcast, broadcast::Receiver}; - - pub struct MockWebSocket { - handlers: DashMap>, - state_sender: broadcast::Sender, - ws_sender: broadcast::Sender, - } - - impl std::default::Default for MockWebSocket { - fn default() -> Self { - let (state_sender, _) = broadcast::channel(16); - let (ws_sender, _) = broadcast::channel(16); - MockWebSocket { - handlers: DashMap::new(), - state_sender, - ws_sender, - } - } - } - - impl MockWebSocket { - pub fn new() -> MockWebSocket { MockWebSocket::default() } - } - - impl FlowyWebSocket for Arc { - fn start_connect(&self, _addr: String) -> ResultFuture<(), UserError> { - let mut ws_receiver = self.ws_sender.subscribe(); - let cloned_ws = self.clone(); - tokio::spawn(async move { - while let Ok(message) = ws_receiver.recv().await { - match cloned_ws.handlers.get(&message.module) { - None => log::error!("Can't find any handler for message: {:?}", message), - Some(handler) => handler.receive_message(message.clone()), - } - } - }); - - ResultFuture::new(async { Ok(()) }) - } - - fn conn_state_subscribe(&self) -> Receiver { self.state_sender.subscribe() } - - fn reconnect(&self, _count: usize) -> ResultFuture<(), UserError> { ResultFuture::new(async { Ok(()) }) } - - fn add_handler(&self, handler: Arc) -> Result<(), UserError> { - let source = handler.source(); - if self.handlers.contains_key(&source) { - log::error!("WsSource's {:?} is already registered", source); - } - self.handlers.insert(source, handler); - Ok(()) - } - - fn ws_sender(&self) -> Result, UserError> { Ok(Arc::new(self.ws_sender.clone())) } - } - - impl FlowyWsSender for broadcast::Sender { - fn send(&self, _msg: WsMessage) -> Result<(), UserError> { - let _ = self.send(msg).unwrap(); - Ok(()) - } - } -} diff --git a/shared-lib/Cargo.lock b/shared-lib/Cargo.lock index 327a427a0e..e3a7e34a49 100644 --- a/shared-lib/Cargo.lock +++ b/shared-lib/Cargo.lock @@ -232,6 +232,27 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" +[[package]] +name = "async-stream" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "171374e7e3b2504e0e5236e3b59260560f9fe94bfe9ac39ba5e4e929c5590625" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atty" version = "0.2.14" @@ -661,9 +682,12 @@ dependencies = [ name = "flowy-collaboration" version = "0.1.0" dependencies = [ + "async-stream", "bytes", "chrono", + "dashmap", "flowy-derive", + "futures", "lib-ot", "log", "md5", diff --git a/shared-lib/flowy-collaboration/Cargo.toml b/shared-lib/flowy-collaboration/Cargo.toml index efd97a2899..35d539f883 100644 --- a/shared-lib/flowy-collaboration/Cargo.toml +++ b/shared-lib/flowy-collaboration/Cargo.toml @@ -12,11 +12,14 @@ protobuf = {version = "2.18.0"} bytes = "1.0" log = "0.4.14" md5 = "0.7.0" -tokio = {version = "1", features = ["sync"]} +tokio = { version = "1", features = ["full"] } serde = { version = "1.0", features = ["derive"] } tracing = { version = "0.1", features = ["log"] } url = "2.2" strum = "0.21" strum_macros = "0.21" chrono = "0.4.19" -parking_lot = "0.11" \ No newline at end of file +parking_lot = "0.11" +dashmap = "4.0" +futures = "0.3.15" +async-stream = "0.3.2" \ No newline at end of file diff --git a/shared-lib/flowy-collaboration/src/core/document/document.rs b/shared-lib/flowy-collaboration/src/core/document/document.rs index baa6d5c453..7d58eb128a 100644 --- a/shared-lib/flowy-collaboration/src/core/document/document.rs +++ b/shared-lib/flowy-collaboration/src/core/document/document.rs @@ -3,7 +3,7 @@ use crate::{ history::{History, UndoResult}, view::{View, RECORD_THRESHOLD}, }, - errors::DocumentError, + errors::CollaborateError, user_default::doc_initial_delta, }; use lib_ot::{ @@ -47,7 +47,7 @@ impl Document { } } - pub fn from_json(json: &str) -> Result { + pub fn from_json(json: &str) -> Result { let delta = RichTextDelta::from_json(json)?; Ok(Self::from_delta(delta)) } @@ -73,7 +73,7 @@ impl Document { } } - pub fn compose_delta(&mut self, mut delta: RichTextDelta) -> Result<(), DocumentError> { + pub fn compose_delta(&mut self, mut delta: RichTextDelta) -> Result<(), CollaborateError> { trim(&mut delta); tracing::trace!("{} compose {}", &self.delta.to_json(), delta.to_json()); let mut composed_delta = self.delta.compose(&delta)?; @@ -103,7 +103,7 @@ impl Document { Ok(()) } - pub fn insert(&mut self, index: usize, data: T) -> Result { + pub fn insert(&mut self, index: usize, data: T) -> Result { let interval = Interval::new(index, index); let _ = validate_interval(&self.delta, &interval)?; @@ -114,7 +114,7 @@ impl Document { Ok(delta) } - pub fn delete(&mut self, interval: Interval) -> Result { + pub fn delete(&mut self, interval: Interval) -> Result { let _ = validate_interval(&self.delta, &interval)?; debug_assert_eq!(interval.is_empty(), false); let delete = self.view.delete(&self.delta, interval)?; @@ -125,7 +125,11 @@ impl Document { Ok(delete) } - pub fn format(&mut self, interval: Interval, attribute: RichTextAttribute) -> Result { + pub fn format( + &mut self, + interval: Interval, + attribute: RichTextAttribute, + ) -> Result { let _ = validate_interval(&self.delta, &interval)?; tracing::trace!("format with {} at {}", attribute, interval); let format_delta = self.view.format(&self.delta, attribute, interval).unwrap(); @@ -135,7 +139,7 @@ impl Document { Ok(format_delta) } - pub fn replace(&mut self, interval: Interval, data: T) -> Result { + pub fn replace(&mut self, interval: Interval, data: T) -> Result { let _ = validate_interval(&self.delta, &interval)?; let mut delta = RichTextDelta::default(); let text = data.to_string(); @@ -157,9 +161,9 @@ impl Document { pub fn can_redo(&self) -> bool { self.history.can_redo() } - pub fn undo(&mut self) -> Result { + pub fn undo(&mut self) -> Result { match self.history.undo() { - None => Err(DocumentError::undo().context("Undo stack is empty")), + None => Err(CollaborateError::undo().context("Undo stack is empty")), Some(undo_delta) => { let (new_delta, inverted_delta) = self.invert(&undo_delta)?; let result = UndoResult::success(new_delta.target_len as usize); @@ -171,9 +175,9 @@ impl Document { } } - pub fn redo(&mut self) -> Result { + pub fn redo(&mut self) -> Result { match self.history.redo() { - None => Err(DocumentError::redo()), + None => Err(CollaborateError::redo()), Some(redo_delta) => { let (new_delta, inverted_delta) = self.invert(&redo_delta)?; let result = UndoResult::success(new_delta.target_len as usize); @@ -187,7 +191,7 @@ impl Document { } impl Document { - fn invert(&self, delta: &RichTextDelta) -> Result<(RichTextDelta, RichTextDelta), DocumentError> { + fn invert(&self, delta: &RichTextDelta) -> Result<(RichTextDelta, RichTextDelta), CollaborateError> { // c = a.compose(b) // d = b.invert(a) // a = c.compose(d) @@ -198,10 +202,10 @@ impl Document { } } -fn validate_interval(delta: &RichTextDelta, interval: &Interval) -> Result<(), DocumentError> { +fn validate_interval(delta: &RichTextDelta, interval: &Interval) -> Result<(), CollaborateError> { if delta.target_len < interval.end { log::error!("{:?} out of bounds. should 0..{}", interval, delta.target_len); - return Err(DocumentError::out_of_bound()); + return Err(CollaborateError::out_of_bound()); } Ok(()) } diff --git a/shared-lib/flowy-collaboration/src/core/sync/mod.rs b/shared-lib/flowy-collaboration/src/core/sync/mod.rs index cf1006c232..df599d3adc 100644 --- a/shared-lib/flowy-collaboration/src/core/sync/mod.rs +++ b/shared-lib/flowy-collaboration/src/core/sync/mod.rs @@ -1,3 +1,5 @@ mod rev_sync; +mod server_editor; pub use rev_sync::*; +pub use server_editor::*; diff --git a/shared-lib/flowy-collaboration/src/core/sync/rev_sync.rs b/shared-lib/flowy-collaboration/src/core/sync/rev_sync.rs index 0bdd9643a6..e193364250 100644 --- a/shared-lib/flowy-collaboration/src/core/sync/rev_sync.rs +++ b/shared-lib/flowy-collaboration/src/core/sync/rev_sync.rs @@ -15,6 +15,7 @@ use protobuf::Message; use std::{ cmp::Ordering, convert::TryInto, + fmt::Debug, sync::{ atomic::{AtomicI64, Ordering::SeqCst}, Arc, @@ -22,7 +23,8 @@ use std::{ time::Duration, }; -pub trait RevisionUser { +pub trait RevisionUser: Send + Sync + Debug { + fn user_id(&self) -> String; fn recv(&self, resp: SyncResponse); } @@ -53,7 +55,7 @@ impl RevisionSynchronizer { } } - pub fn new_conn(&self, user: T, rev_id: i64) { + pub fn new_conn(&self, user: Arc, rev_id: i64) { let cur_rev_id = self.rev_id.load(SeqCst); match cur_rev_id.cmp(&rev_id) { Ordering::Less => { @@ -70,10 +72,7 @@ impl RevisionSynchronizer { } } - pub fn apply_revision(&self, user: T, revision: Revision) -> Result<(), OTError> - where - T: RevisionUser, - { + pub fn apply_revision(&self, user: Arc, revision: Revision) -> Result<(), OTError> { let cur_rev_id = self.rev_id.load(SeqCst); match cur_rev_id.cmp(&revision.rev_id) { Ordering::Less => { diff --git a/shared-lib/flowy-collaboration/src/core/sync/server_editor.rs b/shared-lib/flowy-collaboration/src/core/sync/server_editor.rs new file mode 100644 index 0000000000..ac5699b57c --- /dev/null +++ b/shared-lib/flowy-collaboration/src/core/sync/server_editor.rs @@ -0,0 +1,272 @@ +use crate::{ + core::{ + document::Document, + sync::{RevisionSynchronizer, RevisionUser}, + }, + entities::doc::Doc, + errors::{internal_error, CollaborateError, CollaborateResult}, +}; +use async_stream::stream; +use dashmap::DashMap; +use futures::stream::StreamExt; +use lib_ot::{errors::OTError, revision::Revision, rich_text::RichTextDelta}; +use std::sync::{ + atomic::{AtomicI64, Ordering::SeqCst}, + Arc, +}; +use tokio::{ + sync::{mpsc, oneshot}, + task::spawn_blocking, +}; + +#[rustfmt::skip] +// ┌────────────┐ +// │ DocManager │ +// └────────────┘ +// │ 1 +// │ +// ▼ n +// ┌───────────────┐ +// │ OpenDocHandle │ +// └───────────────┘ +// │ +// ▼ +// ┌──────────────────┐ +// │ DocCommandQueue │ +// └──────────────────┘ ┌──────────────────────┐ ┌────────────┐ +// │ ┌────▶│ RevisionSynchronizer │────▶│ Document │ +// ▼ │ └──────────────────────┘ └────────────┘ +// ┌────────────────┐ │ +// │ServerDocEditor │─────┤ +// └────────────────┘ │ +// │ +// │ ┌────────┐ ┌────────────┐ +// └────▶│ Users │◆──────│RevisionUser│ +// └────────┘ └────────────┘ +pub struct DocManager { + open_doc_map: DashMap>, +} + +impl std::default::Default for DocManager { + fn default() -> Self { + Self { + open_doc_map: DashMap::new(), + } + } +} +impl DocManager { + pub fn new() -> Self { DocManager::default() } + + pub fn get(&self, doc_id: &str) -> Option> { + self.open_doc_map.get(doc_id).map(|ctx| ctx.clone()) + } + + pub async fn cache(&self, doc: Doc) -> Result<(), CollaborateError> { + let doc_id = doc.id.clone(); + let handle = spawn_blocking(|| OpenDocHandle::new(doc)) + .await + .map_err(internal_error)?; + let handle = Arc::new(handle?); + self.open_doc_map.insert(doc_id, handle); + Ok(()) + } +} + +pub struct OpenDocHandle { + sender: mpsc::Sender, +} + +impl OpenDocHandle { + pub fn new(doc: Doc) -> Result { + let (sender, receiver) = mpsc::channel(100); + let queue = DocCommandQueue::new(receiver, doc)?; + tokio::task::spawn(queue.run()); + Ok(Self { sender }) + } + + pub async fn add_user(&self, user: Arc, rev_id: i64) -> Result<(), CollaborateError> { + let (ret, rx) = oneshot::channel(); + let msg = DocCommand::NewConnectedUser { user, rev_id, ret }; + let _ = self.send(msg, rx).await?; + Ok(()) + } + + pub async fn apply_revision( + &self, + user: Arc, + revision: Revision, + ) -> Result<(), CollaborateError> { + let (ret, rx) = oneshot::channel(); + let msg = DocCommand::ReceiveRevision { user, revision, ret }; + let _ = self.send(msg, rx).await?; + Ok(()) + } + + pub async fn document_json(&self) -> CollaborateResult { + let (ret, rx) = oneshot::channel(); + let msg = DocCommand::GetDocJson { ret }; + self.send(msg, rx).await? + } + + pub async fn rev_id(&self) -> CollaborateResult { + let (ret, rx) = oneshot::channel(); + let msg = DocCommand::GetDocRevId { ret }; + self.send(msg, rx).await? + } + + async fn send(&self, msg: DocCommand, rx: oneshot::Receiver) -> CollaborateResult { + let _ = self.sender.send(msg).await.map_err(internal_error)?; + let result = rx.await.map_err(internal_error)?; + Ok(result) + } +} + +#[derive(Debug)] +enum DocCommand { + NewConnectedUser { + user: Arc, + rev_id: i64, + ret: oneshot::Sender>, + }, + ReceiveRevision { + user: Arc, + revision: Revision, + ret: oneshot::Sender>, + }, + GetDocJson { + ret: oneshot::Sender>, + }, + GetDocRevId { + ret: oneshot::Sender>, + }, +} + +struct DocCommandQueue { + receiver: Option>, + edit_doc: Arc, +} + +impl DocCommandQueue { + fn new(receiver: mpsc::Receiver, doc: Doc) -> Result { + let edit_doc = Arc::new(ServerDocEditor::new(doc).map_err(internal_error)?); + Ok(Self { + receiver: Some(receiver), + edit_doc, + }) + } + + async fn run(mut self) { + let mut receiver = self + .receiver + .take() + .expect("DocActor's receiver should only take one time"); + + let stream = stream! { + loop { + match receiver.recv().await { + Some(msg) => yield msg, + None => break, + } + } + }; + stream.for_each(|msg| self.handle_message(msg)).await; + } + + async fn handle_message(&self, msg: DocCommand) { + match msg { + DocCommand::NewConnectedUser { user, rev_id, ret } => { + log::debug!("Receive new doc user: {:?}, rev_id: {}", user, rev_id); + let _ = ret.send(self.edit_doc.new_doc_user(user, rev_id).await.map_err(internal_error)); + }, + DocCommand::ReceiveRevision { user, revision, ret } => { + // let revision = (&mut revision).try_into().map_err(internal_error).unwrap(); + let _ = ret.send( + self.edit_doc + .apply_revision(user, revision) + .await + .map_err(internal_error), + ); + }, + DocCommand::GetDocJson { ret } => { + let edit_context = self.edit_doc.clone(); + let json = spawn_blocking(move || edit_context.document_json()) + .await + .map_err(internal_error); + let _ = ret.send(json); + }, + DocCommand::GetDocRevId { ret } => { + let rev_id = self.edit_doc.rev_id.load(SeqCst); + let _ = ret.send(Ok(rev_id)); + }, + } + } +} + +#[rustfmt::skip] +// ┌──────────────────────┐ ┌────────────┐ +// ┌───▶│ RevisionSynchronizer │────▶│ Document │ +// │ └──────────────────────┘ └────────────┘ +// ┌────────────────┐ │ +// ───▶│ServerDocEditor │────┤ +// └────────────────┘ │ +// │ +// │ ┌────────┐ ┌────────────┐ +// └───▶│ Users │◆──────│RevisionUser│ +// └────────┘ └────────────┘ +pub struct ServerDocEditor { + pub doc_id: String, + pub rev_id: AtomicI64, + synchronizer: Arc, + users: DashMap>, +} + +impl ServerDocEditor { + pub fn new(doc: Doc) -> Result { + let delta = RichTextDelta::from_bytes(&doc.data)?; + let users = DashMap::new(); + let synchronizer = Arc::new(RevisionSynchronizer::new( + &doc.id, + doc.rev_id, + Document::from_delta(delta), + )); + + Ok(Self { + doc_id: doc.id.clone(), + rev_id: AtomicI64::new(doc.rev_id), + synchronizer, + users, + }) + } + + #[tracing::instrument( + level = "debug", + skip(self, user), + fields( + user_id = %user.user_id(), + rev_id = %rev_id, + ) + )] + pub async fn new_doc_user(&self, user: Arc, rev_id: i64) -> Result<(), OTError> { + self.users.insert(user.user_id(), user.clone()); + self.synchronizer.new_conn(user, rev_id); + Ok(()) + } + + #[tracing::instrument( + level = "debug", + skip(self, user, revision), + fields( + cur_rev_id = %self.rev_id.load(SeqCst), + base_rev_id = %revision.base_rev_id, + rev_id = %revision.rev_id, + ), + err + )] + pub async fn apply_revision(&self, user: Arc, revision: Revision) -> Result<(), OTError> { + self.users.insert(user.user_id(), user.clone()); + self.synchronizer.apply_revision(user, revision).unwrap(); + Ok(()) + } + + pub fn document_json(&self) -> String { self.synchronizer.doc_json() } +} diff --git a/shared-lib/flowy-collaboration/src/entities/ws/ws.rs b/shared-lib/flowy-collaboration/src/entities/ws/ws.rs index 6b3555ce16..f31ffd2bc2 100644 --- a/shared-lib/flowy-collaboration/src/entities/ws/ws.rs +++ b/shared-lib/flowy-collaboration/src/entities/ws/ws.rs @@ -1,4 +1,4 @@ -use crate::{entities::doc::NewDocUser, errors::DocumentError}; +use crate::{entities::doc::NewDocUser, errors::CollaborateError}; use bytes::Bytes; use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; use lib_ot::revision::{RevId, Revision, RevisionRange}; @@ -17,9 +17,9 @@ pub enum WsDataType { } impl WsDataType { - pub fn data(&self, bytes: Bytes) -> Result + pub fn data(&self, bytes: Bytes) -> Result where - T: TryFrom, + T: TryFrom, { T::try_from(bytes) } diff --git a/shared-lib/flowy-collaboration/src/errors.rs b/shared-lib/flowy-collaboration/src/errors.rs index 0a82f1741e..d438444db4 100644 --- a/shared-lib/flowy-collaboration/src/errors.rs +++ b/shared-lib/flowy-collaboration/src/errors.rs @@ -4,8 +4,8 @@ use strum_macros::Display; macro_rules! static_doc_error { ($name:ident, $status:expr) => { #[allow(non_snake_case, missing_docs)] - pub fn $name() -> DocumentError { - DocumentError { + pub fn $name() -> CollaborateError { + CollaborateError { code: $status, msg: format!("{}", $status), } @@ -13,15 +13,15 @@ macro_rules! static_doc_error { }; } -pub type DocumentResult = std::result::Result; +pub type CollaborateResult = std::result::Result; #[derive(Debug, Clone)] -pub struct DocumentError { +pub struct CollaborateError { pub code: ErrorCode, pub msg: String, } -impl DocumentError { +impl CollaborateError { fn new(code: ErrorCode, msg: &str) -> Self { Self { code, @@ -40,7 +40,7 @@ impl DocumentError { static_doc_error!(out_of_bound, ErrorCode::OutOfBound); } -impl fmt::Display for DocumentError { +impl fmt::Display for CollaborateError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{:?}: {}", &self.code, &self.msg) } } @@ -54,10 +54,19 @@ pub enum ErrorCode { InternalError = 1000, } -impl std::convert::From for DocumentError { - fn from(error: lib_ot::errors::OTError) -> Self { DocumentError::new(ErrorCode::InternalError, "").context(error) } +impl std::convert::From for CollaborateError { + fn from(error: lib_ot::errors::OTError) -> Self { + CollaborateError::new(ErrorCode::InternalError, "").context(error) + } } -impl std::convert::From for DocumentError { - fn from(e: protobuf::ProtobufError) -> Self { DocumentError::internal().context(e) } +impl std::convert::From for CollaborateError { + fn from(e: protobuf::ProtobufError) -> Self { CollaborateError::internal().context(e) } +} + +pub fn internal_error(e: T) -> CollaborateError +where + T: std::fmt::Debug, +{ + CollaborateError::internal().context(e) }