diff --git a/backend/src/application.rs b/backend/src/application.rs index f05b44767d..b4958a76d4 100644 --- a/backend/src/application.rs +++ b/backend/src/application.rs @@ -59,7 +59,7 @@ pub fn run(listener: TcpListener, app_ctx: AppContext) -> Result>, pub pg_pool: Data, pub ws_bizs: Data, - pub doc_biz: Data>, + pub document_core: Data>, } impl AppContext { @@ -22,14 +22,14 @@ impl AppContext { let pg_pool = Data::new(db_pool); let mut ws_bizs = WsBizHandlers::new(); - let doc_biz = Arc::new(DocBiz::new(pg_pool.clone())); - ws_bizs.register(WsModule::Doc, doc_biz.clone()); + let document_core = Arc::new(DocumentCore::new(pg_pool.clone())); + ws_bizs.register(WsModule::Doc, document_core.clone()); AppContext { ws_server, pg_pool, ws_bizs: Data::new(ws_bizs), - doc_biz: Data::new(doc_biz), + document_core: Data::new(document_core), } } } diff --git a/backend/src/services/doc/manager.rs b/backend/src/services/doc/manager.rs index f0de848da1..4b3159f9f9 100644 --- a/backend/src/services/doc/manager.rs +++ b/backend/src/services/doc/manager.rs @@ -18,30 +18,34 @@ use tokio::{ task::spawn_blocking, }; -pub struct DocBiz { +#[rustfmt::skip] +// ┌──────────────┐ ┌────────────┐ 1 n ┌───────────────┐ +// │ DocumentCore │────▶│ DocManager │─────▶│ OpenDocHandle │ +// └──────────────┘ └────────────┘ └───────────────┘ +pub struct DocumentCore { pub manager: Arc, - sender: mpsc::Sender, + ws_sender: mpsc::Sender, pg_pool: Data, } -impl DocBiz { +impl DocumentCore { pub fn new(pg_pool: Data) -> Self { let manager = Arc::new(DocManager::new()); - let (tx, rx) = mpsc::channel(100); + let (ws_sender, rx) = mpsc::channel(100); let actor = DocWsActor::new(rx, manager.clone()); tokio::task::spawn(actor.run()); Self { manager, - sender: tx, + ws_sender, pg_pool, } } } -impl WsBizHandler for DocBiz { +impl WsBizHandler for DocumentCore { fn receive_data(&self, client_data: WsClientData) { let (ret, rx) = oneshot::channel(); - let sender = self.sender.clone(); + let sender = self.ws_sender.clone(); let pool = self.pg_pool.clone(); actix_rt::spawn(async move { @@ -58,14 +62,25 @@ impl WsBizHandler for DocBiz { } } +#[rustfmt::skip] +// EditDocActor +// ┌────────────────────────────────────┐ +// │ ServerDocEditor │ +// │ ┌──────────────────────────────┐ │ +// ┌────────────┐ 1 n ┌───────────────┐ │ │ ┌──────────┐ ┌──────────┐ │ │ +// │ DocManager │─────▶│ OpenDocHandle │──────▶│ │ │ Document │ │ Users │ │ │ +// └────────────┘ └───────────────┘ │ │ └──────────┘ └──────────┘ │ │ +// │ └──────────────────────────────┘ │ +// │ │ +// └────────────────────────────────────┘ pub struct DocManager { - docs_map: DashMap>, + open_doc_map: DashMap>, } impl std::default::Default for DocManager { fn default() -> Self { Self { - docs_map: DashMap::new(), + open_doc_map: DashMap::new(), } } } @@ -73,19 +88,19 @@ impl std::default::Default for DocManager { impl DocManager { pub fn new() -> Self { DocManager::default() } - pub async fn get(&self, doc_id: &str, pg_pool: Data) -> Result>, ServerError> { - match self.docs_map.get(doc_id) { + pub async fn get(&self, doc_id: &str, pg_pool: Data) -> Result>, ServerError> { + match self.open_doc_map.get(doc_id) { None => { let params = DocIdentifier { doc_id: doc_id.to_string(), ..Default::default() }; let doc = read_doc(pg_pool.get_ref(), params).await?; - let handle = spawn_blocking(|| DocOpenHandle::new(doc, pg_pool)) + let handle = spawn_blocking(|| OpenDocHandle::new(doc, pg_pool)) .await .map_err(internal_error)?; let handle = Arc::new(handle?); - self.docs_map.insert(doc_id.to_string(), handle.clone()); + self.open_doc_map.insert(doc_id.to_string(), handle.clone()); Ok(Some(handle)) }, Some(ctx) => Ok(Some(ctx.clone())), @@ -93,11 +108,11 @@ impl DocManager { } } -pub struct DocOpenHandle { +pub struct OpenDocHandle { pub sender: mpsc::Sender, } -impl DocOpenHandle { +impl OpenDocHandle { pub fn new(doc: Doc, pg_pool: Data) -> Result { let (sender, receiver) = mpsc::channel(100); let actor = EditDocActor::new(receiver, doc, pg_pool)?; diff --git a/backend/src/services/doc/ws_actor.rs b/backend/src/services/doc/ws_actor.rs index 1a8fa60de0..b6542588d5 100644 --- a/backend/src/services/doc/ws_actor.rs +++ b/backend/src/services/doc/ws_actor.rs @@ -1,6 +1,6 @@ use crate::{ services::{ - doc::manager::{DocManager, DocOpenHandle}, + doc::manager::{DocManager, OpenDocHandle}, util::{md5, parse_from_bytes}, }, web_socket::{entities::Socket, WsClientData, WsUser}, @@ -122,7 +122,7 @@ impl DocWsActor { Ok(()) } - async fn find_doc_handle(&self, doc_id: &str, pool: Data) -> Option> { + async fn find_doc_handle(&self, doc_id: &str, pool: Data) -> Option> { match self.doc_manager.get(doc_id, pool).await { Ok(Some(edit_doc)) => Some(edit_doc), Ok(None) => { diff --git a/backend/src/services/view/router.rs b/backend/src/services/view/router.rs index ee67637973..c4a4ddd69d 100644 --- a/backend/src/services/view/router.rs +++ b/backend/src/services/view/router.rs @@ -1,5 +1,5 @@ use crate::services::{ - doc::manager::DocBiz, + doc::manager::DocumentCore, user::LoggedUser, util::parse_from_payload, view::{create_view, delete_view, read_view, sql_builder::check_view_ids, update_view}, @@ -23,7 +23,7 @@ use std::sync::Arc; pub async fn create_handler( payload: Payload, pool: Data, - _doc_biz: Data>, + _doc_biz: Data>, ) -> Result { let params: CreateViewParams = parse_from_payload(payload).await?; let mut transaction = pool diff --git a/frontend/rust-lib/flowy-document/src/services/cache.rs b/frontend/rust-lib/flowy-document/src/services/cache.rs deleted file mode 100644 index dae1fa7338..0000000000 --- a/frontend/rust-lib/flowy-document/src/services/cache.rs +++ /dev/null @@ -1,53 +0,0 @@ -use std::sync::Arc; - -use dashmap::DashMap; - -use crate::{ - errors::DocError, - services::doc::edit::{ClientDocEditor, DocId}, -}; - -pub(crate) struct DocCache { - inner: DashMap>, -} - -impl DocCache { - pub(crate) fn new() -> Self { Self { inner: DashMap::new() } } - - #[allow(dead_code)] - pub(crate) fn all_docs(&self) -> Vec> { - self.inner - .iter() - .map(|kv| kv.value().clone()) - .collect::>>() - } - - pub(crate) fn set(&self, doc: Arc) { - let doc_id = doc.doc_id.clone(); - if self.inner.contains_key(&doc_id) { - log::warn!("Doc:{} already exists in cache", &doc_id); - } - self.inner.insert(doc_id, doc); - } - - pub(crate) fn contains(&self, doc_id: &str) -> bool { self.inner.get(doc_id).is_some() } - - pub(crate) fn get(&self, doc_id: &str) -> Result, DocError> { - if !self.contains(&doc_id) { - return Err(doc_not_found()); - } - let opened_doc = self.inner.get(doc_id).unwrap(); - Ok(opened_doc.clone()) - } - - pub(crate) fn remove(&self, id: &str) { - let doc_id: DocId = id.into(); - match self.get(id) { - Ok(editor) => editor.stop_sync(), - Err(e) => log::error!("{}", e), - } - self.inner.remove(&doc_id); - } -} - -fn doc_not_found() -> DocError { DocError::doc_not_found().context("Doc is close or you should call open first") } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/controller.rs b/frontend/rust-lib/flowy-document/src/services/doc/controller.rs index fabc15668b..2a1fd5db46 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/controller.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/controller.rs @@ -2,7 +2,6 @@ use crate::{ errors::{DocError, DocResult}, module::DocumentUser, services::{ - cache::DocCache, doc::{ edit::{ClientDocEditor, EditDocWsHandler}, revision::{RevisionCache, RevisionManager, RevisionServer}, @@ -12,6 +11,7 @@ use crate::{ }, }; use bytes::Bytes; +use dashmap::DashMap; use flowy_database::ConnectionPool; use flowy_document_infra::entities::doc::{Doc, DocDelta, DocIdentifier}; use lib_infra::future::{wrap_future, FnFuture, ResultFuture}; @@ -21,18 +21,18 @@ use tokio::time::{interval, Duration}; pub(crate) struct DocController { server: Server, ws_manager: Arc, - cache: Arc, + open_cache: Arc, user: Arc, } impl DocController { pub(crate) fn new(server: Server, user: Arc, ws: Arc) -> Self { - let cache = Arc::new(DocCache::new()); + let open_cache = Arc::new(OpenDocCache::new()); Self { server, user, ws_manager: ws, - cache, + open_cache, } } @@ -46,18 +46,18 @@ impl DocController { params: DocIdentifier, pool: Arc, ) -> Result, DocError> { - if !self.cache.contains(¶ms.doc_id) { + if !self.open_cache.contains(¶ms.doc_id) { let edit_ctx = self.make_edit_context(¶ms.doc_id, pool.clone()).await?; return Ok(edit_ctx); } - let edit_doc_ctx = self.cache.get(¶ms.doc_id)?; + let edit_doc_ctx = self.open_cache.get(¶ms.doc_id)?; Ok(edit_doc_ctx) } pub(crate) fn close(&self, doc_id: &str) -> Result<(), DocError> { log::debug!("Close doc {}", doc_id); - self.cache.remove(doc_id); + self.open_cache.remove(doc_id); self.ws_manager.remove_handler(doc_id); Ok(()) } @@ -65,7 +65,7 @@ impl DocController { #[tracing::instrument(level = "debug", skip(self), err)] pub(crate) fn delete(&self, params: DocIdentifier) -> Result<(), DocError> { let doc_id = ¶ms.doc_id; - self.cache.remove(doc_id); + self.open_cache.remove(doc_id); self.ws_manager.remove_handler(doc_id); Ok(()) } @@ -80,12 +80,12 @@ impl DocController { delta: DocDelta, db_pool: Arc, ) -> Result { - if !self.cache.contains(&delta.doc_id) { + if !self.open_cache.contains(&delta.doc_id) { let doc_identifier: DocIdentifier = delta.doc_id.clone().into(); let _ = self.open(doc_identifier, db_pool).await?; } - let edit_doc_ctx = self.cache.get(&delta.doc_id)?; + let edit_doc_ctx = self.open_cache.get(&delta.doc_id)?; let _ = edit_doc_ctx.composing_local_delta(Bytes::from(delta.data)).await?; Ok(edit_doc_ctx.delta().await?) } @@ -102,7 +102,7 @@ impl DocController { let edit_ctx = ClientDocEditor::new(doc_id, user, pool, rev_manager, self.ws_manager.ws()).await?; let ws_handler = Arc::new(EditDocWsHandler(edit_ctx.clone())); self.ws_manager.register_handler(doc_id, ws_handler); - self.cache.set(edit_ctx.clone()); + self.open_cache.set(edit_ctx.clone()); Ok(edit_ctx) } @@ -145,13 +145,39 @@ impl RevisionServer for RevisionServerImpl { } } -#[allow(dead_code)] -fn event_loop(_cache: Arc) -> FnFuture<()> { - let mut i = interval(Duration::from_secs(3)); - wrap_future(async move { - loop { - // cache.all_docs().iter().for_each(|doc| doc.tick()); - i.tick().await; - } - }) +pub struct OpenDocCache { + inner: DashMap>, } + +impl OpenDocCache { + fn new() -> Self { Self { inner: DashMap::new() } } + + pub(crate) fn set(&self, doc: Arc) { + let doc_id = doc.doc_id.clone(); + if self.inner.contains_key(&doc_id) { + log::warn!("Doc:{} already exists in cache", &doc_id); + } + self.inner.insert(doc_id, doc); + } + + pub(crate) fn contains(&self, doc_id: &str) -> bool { self.inner.get(doc_id).is_some() } + + pub(crate) fn get(&self, doc_id: &str) -> Result, DocError> { + if !self.contains(&doc_id) { + return Err(doc_not_found()); + } + let opened_doc = self.inner.get(doc_id).unwrap(); + Ok(opened_doc.clone()) + } + + pub(crate) fn remove(&self, id: &str) { + let doc_id = id.to_string(); + match self.get(id) { + Ok(editor) => editor.stop_sync(), + Err(e) => log::error!("{}", e), + } + self.inner.remove(&doc_id); + } +} + +fn doc_not_found() -> DocError { DocError::doc_not_found().context("Doc is close or you should call open first") } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs index 01a2927d26..ed65d9ccf5 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs @@ -129,6 +129,7 @@ impl EditCommandQueue { } pub(crate) type Ret = oneshot::Sender>; +#[allow(dead_code)] pub(crate) enum EditCommand { ComposeDelta { delta: RichTextDelta, diff --git a/frontend/rust-lib/flowy-document/src/services/mod.rs b/frontend/rust-lib/flowy-document/src/services/mod.rs index c26732203a..a9a801a0b8 100644 --- a/frontend/rust-lib/flowy-document/src/services/mod.rs +++ b/frontend/rust-lib/flowy-document/src/services/mod.rs @@ -1,4 +1,3 @@ -mod cache; pub mod doc; pub mod server; pub mod ws; diff --git a/frontend/rust-lib/flowy-document/tests/editor/revision_test.rs b/frontend/rust-lib/flowy-document/tests/editor/revision_test.rs index 81c23b3f86..d565e2a109 100644 --- a/frontend/rust-lib/flowy-document/tests/editor/revision_test.rs +++ b/frontend/rust-lib/flowy-document/tests/editor/revision_test.rs @@ -1,3 +1,4 @@ +use flowy_document_infra::core::{Document, FlowyDoc}; use flowy_test::editor::{EditorScript::*, *}; use lib_ot::{revision::RevState, rich_text::RichTextDeltaBuilder}; @@ -50,3 +51,16 @@ async fn doc_push_test() { ]; EditorTest::new().await.run_scripts(scripts).await; } + +#[tokio::test] +async fn doc_push_test2() { + let mut document = Document::new::(); + let delta_1 = document.insert(0, "123").unwrap(); + let json = document.to_json(); + + let scripts = vec![ + SimulatePushRevisionMessageWithDelta(delta_1), + AssertJson(r#"[{"insert":"\n123"}]"#), + ]; + EditorTest::new().await.run_scripts(scripts).await; +} diff --git a/frontend/rust-lib/flowy-test/src/editor.rs b/frontend/rust-lib/flowy-test/src/editor.rs index 3dd4b6de5d..7953577b89 100644 --- a/frontend/rust-lib/flowy-test/src/editor.rs +++ b/frontend/rust-lib/flowy-test/src/editor.rs @@ -47,7 +47,7 @@ impl EditorTest { self.run_script(script).await; } - sleep(Duration::from_secs(5)).await; + sleep(Duration::from_secs(10)).await; } async fn run_script(&mut self, script: EditorScript) {