From 0231ad3adf6fea1f8c5a8dddc136d630ca866336 Mon Sep 17 00:00:00 2001 From: appflowy Date: Sun, 12 Dec 2021 16:02:58 +0800 Subject: [PATCH] add doc persistence trait --- backend/src/services/doc/editor.rs | 4 +-- backend/src/services/doc/manager.rs | 20 +++++++++-- backend/src/services/doc/ws_actor.rs | 12 +++---- backend/tests/document/helper.rs | 4 +-- frontend/rust-lib/lib-infra/Cargo.toml | 3 +- .../flowy-collaboration/src/core/sync/mod.rs | 4 +-- .../src/core/sync/server_editor.rs | 36 ++++++++++--------- .../sync/{rev_sync.rs => synchronizer.rs} | 0 8 files changed, 50 insertions(+), 33 deletions(-) rename shared-lib/flowy-collaboration/src/core/sync/{rev_sync.rs => synchronizer.rs} (100%) diff --git a/backend/src/services/doc/editor.rs b/backend/src/services/doc/editor.rs index f797f52e35..33aac0be76 100644 --- a/backend/src/services/doc/editor.rs +++ b/backend/src/services/doc/editor.rs @@ -14,13 +14,13 @@ use sqlx::PgPool; use std::sync::Arc; #[derive(Clone, Debug)] -pub struct DocUser { +pub struct ServerDocUser { pub user: Arc, pub(crate) socket: Socket, pub pg_pool: Data, } -impl RevisionUser for DocUser { +impl RevisionUser for ServerDocUser { fn user_id(&self) -> String { self.user.id().to_string() } fn recv(&self, resp: SyncResponse) { diff --git a/backend/src/services/doc/manager.rs b/backend/src/services/doc/manager.rs index a8315a3629..749099b6a2 100644 --- a/backend/src/services/doc/manager.rs +++ b/backend/src/services/doc/manager.rs @@ -3,20 +3,25 @@ use crate::{ web_socket::{WsBizHandler, WsClientData}, }; use actix_web::web::Data; -use flowy_collaboration::core::sync::DocManager; +use flowy_collaboration::{ + core::sync::{ServerDocManager, ServerDocPersistence}, + entities::doc::Doc, + errors::CollaborateResult, +}; +use lib_ot::rich_text::RichTextDelta; use sqlx::PgPool; use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; pub struct DocumentCore { - pub manager: Arc, + pub manager: Arc, ws_sender: mpsc::Sender, pg_pool: Data, } impl DocumentCore { pub fn new(pg_pool: Data) -> Self { - let manager = Arc::new(DocManager::new()); + let manager = Arc::new(ServerDocManager::new(Arc::new(DocPersistenceImpl()))); let (ws_sender, rx) = mpsc::channel(100); let actor = DocWsActor::new(rx, manager.clone()); tokio::task::spawn(actor.run()); @@ -51,3 +56,12 @@ impl WsBizHandler for DocumentCore { }); } } + +struct DocPersistenceImpl(); +impl ServerDocPersistence for DocPersistenceImpl { + fn create_doc(&self, doc_id: &str, delta: RichTextDelta) -> CollaborateResult<()> { unimplemented!() } + + fn update_doc(&self, doc_id: &str, delta: RichTextDelta) -> CollaborateResult<()> { unimplemented!() } + + fn read_doc(&self, doc_id: &str) -> CollaborateResult { unimplemented!() } +} diff --git a/backend/src/services/doc/ws_actor.rs b/backend/src/services/doc/ws_actor.rs index 4fc1fa2789..0e851b5f8e 100644 --- a/backend/src/services/doc/ws_actor.rs +++ b/backend/src/services/doc/ws_actor.rs @@ -1,6 +1,6 @@ use crate::{ services::{ - doc::{editor::DocUser, read_doc}, + doc::{editor::ServerDocUser, read_doc}, util::{md5, parse_from_bytes}, }, web_socket::{entities::Socket, WsClientData, WsUser}, @@ -10,7 +10,7 @@ use actix_web::web::Data; use async_stream::stream; use backend_service::errors::{internal_error, Result as DocResult, ServerError}; use flowy_collaboration::{ - core::sync::{DocManager, OpenDocHandle}, + core::sync::{OpenDocHandle, ServerDocManager}, protobuf::{DocIdentifier, NewDocUser, WsDataType, WsDocumentData}, }; use futures::stream::StreamExt; @@ -29,11 +29,11 @@ pub enum DocWsMsg { pub struct DocWsActor { receiver: Option>, - doc_manager: Arc, + doc_manager: Arc, } impl DocWsActor { - pub fn new(receiver: mpsc::Receiver, manager: Arc) -> Self { + pub fn new(receiver: mpsc::Receiver, manager: Arc) -> Self { Self { receiver: Some(receiver), doc_manager: manager, @@ -100,7 +100,7 @@ impl DocWsActor { .await .map_err(internal_error)??; if let Some(handle) = self.get_doc_handle(&doc_user.doc_id, pg_pool.clone()).await { - let user = Arc::new(DocUser { user, socket, pg_pool }); + let user = Arc::new(ServerDocUser { user, socket, pg_pool }); handle.add_user(user, doc_user.rev_id).await.map_err(internal_error)?; } Ok(()) @@ -121,7 +121,7 @@ impl DocWsActor { .await .map_err(internal_error)??; if let Some(handle) = self.get_doc_handle(&revision.doc_id, pg_pool.clone()).await { - let user = Arc::new(DocUser { user, socket, pg_pool }); + let user = Arc::new(ServerDocUser { user, socket, pg_pool }); let revision = (&mut revision).try_into().map_err(internal_error).unwrap(); handle.apply_revision(user, revision).await.map_err(internal_error)?; } diff --git a/backend/tests/document/helper.rs b/backend/tests/document/helper.rs index 66b85d6d52..182bd32448 100644 --- a/backend/tests/document/helper.rs +++ b/backend/tests/document/helper.rs @@ -15,7 +15,7 @@ use flowy_collaboration::{entities::doc::DocIdentifier, protobuf::UpdateDocParam use lib_ot::rich_text::{RichTextAttribute, RichTextDelta}; use parking_lot::RwLock; use lib_ot::core::Interval; -use flowy_collaboration::core::sync::DocManager; +use flowy_collaboration::core::sync::ServerDocManager; pub struct DocumentTest { server: TestServer, @@ -53,7 +53,7 @@ struct ScriptContext { client_edit_context: Option>, client_sdk: FlowySDKTest, client_user_session: Arc, - server_doc_manager: Arc, + server_doc_manager: Arc, server_pg_pool: Data, doc_id: String, } diff --git a/frontend/rust-lib/lib-infra/Cargo.toml b/frontend/rust-lib/lib-infra/Cargo.toml index c9a9c8808a..ce43059378 100644 --- a/frontend/rust-lib/lib-infra/Cargo.toml +++ b/frontend/rust-lib/lib-infra/Cargo.toml @@ -20,4 +20,5 @@ bytes = { version = "1.0" } pin-project = "1.0" futures-core = { version = "0.3", default-features = false } tokio = { version = "1.0", features = ["time", "rt"] } -rand = "0.8.3" \ No newline at end of file +rand = "0.8.3" + diff --git a/shared-lib/flowy-collaboration/src/core/sync/mod.rs b/shared-lib/flowy-collaboration/src/core/sync/mod.rs index df599d3adc..69dd8a5114 100644 --- a/shared-lib/flowy-collaboration/src/core/sync/mod.rs +++ b/shared-lib/flowy-collaboration/src/core/sync/mod.rs @@ -1,5 +1,5 @@ -mod rev_sync; mod server_editor; +mod synchronizer; -pub use rev_sync::*; pub use server_editor::*; +pub use synchronizer::*; diff --git a/shared-lib/flowy-collaboration/src/core/sync/server_editor.rs b/shared-lib/flowy-collaboration/src/core/sync/server_editor.rs index ac5699b57c..16b18c58ac 100644 --- a/shared-lib/flowy-collaboration/src/core/sync/server_editor.rs +++ b/shared-lib/flowy-collaboration/src/core/sync/server_editor.rs @@ -19,12 +19,17 @@ use tokio::{ task::spawn_blocking, }; +pub trait ServerDocPersistence: Send + Sync { + fn create_doc(&self, doc_id: &str, delta: RichTextDelta) -> CollaborateResult<()>; + fn update_doc(&self, doc_id: &str, delta: RichTextDelta) -> CollaborateResult<()>; + fn read_doc(&self, doc_id: &str) -> CollaborateResult; +} + #[rustfmt::skip] -// ┌────────────┐ -// │ DocManager │ -// └────────────┘ +// ┌─────────────────┐ +// │ServerDocManager │ +// └─────────────────┘ // │ 1 -// │ // ▼ n // ┌───────────────┐ // │ OpenDocHandle │ @@ -33,29 +38,26 @@ use tokio::{ // ▼ // ┌──────────────────┐ // │ DocCommandQueue │ -// └──────────────────┘ ┌──────────────────────┐ ┌────────────┐ -// │ ┌────▶│ RevisionSynchronizer │────▶│ Document │ -// ▼ │ └──────────────────────┘ └────────────┘ -// ┌────────────────┐ │ +// └──────────────────┘ +// │ ┌──────────────────────┐ ┌────────────┐ +// ▼ ┌────▶│ RevisionSynchronizer │────▶│ Document │ +// ┌────────────────┐ │ └──────────────────────┘ └────────────┘ // │ServerDocEditor │─────┤ -// └────────────────┘ │ -// │ -// │ ┌────────┐ ┌────────────┐ +// └────────────────┘ │ ┌────────┐ ┌────────────┐ // └────▶│ Users │◆──────│RevisionUser│ // └────────┘ └────────────┘ -pub struct DocManager { +pub struct ServerDocManager { open_doc_map: DashMap>, + persistence: Arc, } -impl std::default::Default for DocManager { - fn default() -> Self { +impl ServerDocManager { + pub fn new(persistence: Arc) -> Self { Self { open_doc_map: DashMap::new(), + persistence, } } -} -impl DocManager { - pub fn new() -> Self { DocManager::default() } pub fn get(&self, doc_id: &str) -> Option> { self.open_doc_map.get(doc_id).map(|ctx| ctx.clone()) diff --git a/shared-lib/flowy-collaboration/src/core/sync/rev_sync.rs b/shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs similarity index 100% rename from shared-lib/flowy-collaboration/src/core/sync/rev_sync.rs rename to shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs