From af6afafd0f7b767f0cba198a14e0f7b0c8e0468a Mon Sep 17 00:00:00 2001 From: appflowy Date: Sat, 2 Oct 2021 17:19:54 +0800 Subject: [PATCH] build doc from local revision or fetch from remote --- backend/src/application.rs | 2 +- backend/src/service/doc/doc.rs | 20 +- backend/src/service/doc/edit/context.rs | 194 ----------- .../doc/edit/{actor.rs => edit_actor.rs} | 25 +- backend/src/service/doc/edit/edit_doc.rs | 228 ++++++++++--- backend/src/service/doc/edit/mod.rs | 6 +- backend/src/service/doc/edit/open_handle.rs | 57 ++++ backend/src/service/doc/mod.rs | 2 +- backend/src/service/doc/router.rs | 1 + .../src/service/doc/{actor.rs => ws_actor.rs} | 4 +- backend/tests/document/helper.rs | 2 +- .../src/entities/doc/revision.rs | 19 +- rust-lib/flowy-document/src/module.rs | 4 +- rust-lib/flowy-document/src/services/cache.rs | 12 +- .../src/services/doc/doc_controller.rs | 62 +++- .../src/services/doc/document/document.rs | 12 +- .../doc/edit/{context.rs => edit_doc.rs} | 22 +- .../src/services/doc/edit/mod.rs | 4 +- .../flowy-document/src/services/doc/mod.rs | 2 +- .../src/services/doc/rev_manager/mod.rs | 5 - .../src/services/doc/rev_manager/store.rs | 191 ----------- .../rev_manager.rs => revision/manager.rs} | 73 +++-- .../src/services/doc/revision/mod.rs | 5 + .../src/services/doc/revision/store.rs | 301 ++++++++++++++++++ .../doc/{rev_manager => revision}/util.rs | 0 .../flowy-document/src/sql_tables/doc/mod.rs | 8 +- .../doc/{doc_op_sql.rs => rev_sql.rs} | 31 +- .../doc/{doc_op_table.rs => rev_table.rs} | 0 rust-lib/flowy-net/src/request/request.rs | 12 +- 29 files changed, 764 insertions(+), 540 deletions(-) delete mode 100644 backend/src/service/doc/edit/context.rs rename backend/src/service/doc/edit/{actor.rs => edit_actor.rs} (74%) create mode 100644 backend/src/service/doc/edit/open_handle.rs rename backend/src/service/doc/{actor.rs => ws_actor.rs} (98%) rename rust-lib/flowy-document/src/services/doc/edit/{context.rs => edit_doc.rs} (93%) delete mode 100644 rust-lib/flowy-document/src/services/doc/rev_manager/mod.rs delete mode 100644 rust-lib/flowy-document/src/services/doc/rev_manager/store.rs rename rust-lib/flowy-document/src/services/doc/{rev_manager/rev_manager.rs => revision/manager.rs} (50%) create mode 100644 rust-lib/flowy-document/src/services/doc/revision/mod.rs create mode 100644 rust-lib/flowy-document/src/services/doc/revision/store.rs rename rust-lib/flowy-document/src/services/doc/{rev_manager => revision}/util.rs (100%) rename rust-lib/flowy-document/src/sql_tables/doc/{doc_op_sql.rs => rev_sql.rs} (82%) rename rust-lib/flowy-document/src/sql_tables/doc/{doc_op_table.rs => rev_table.rs} (100%) diff --git a/backend/src/application.rs b/backend/src/application.rs index 561e1d77b7..517bb51640 100644 --- a/backend/src/application.rs +++ b/backend/src/application.rs @@ -114,7 +114,7 @@ fn user_scope() -> Scope { .route(web::patch().to(view::update_handler)) ) .service(web::resource("/doc") - .route(web::get().to(doc::create_handler)) + .route(web::post().to(doc::create_handler)) .route(web::get().to(doc::read_handler)) .route(web::patch().to(doc::update_handler)) ) diff --git a/backend/src/service/doc/doc.rs b/backend/src/service/doc/doc.rs index ec08799534..ab1f8e01b5 100644 --- a/backend/src/service/doc/doc.rs +++ b/backend/src/service/doc/doc.rs @@ -1,8 +1,8 @@ use crate::service::{ doc::{ - actor::{DocWsMsg, DocWsMsgActor}, - edit::EditDoc, + edit::DocHandle, read_doc, + ws_actor::{DocWsActor, DocWsMsg}, }, ws::{WsBizHandler, WsClientData}, }; @@ -27,7 +27,7 @@ impl DocBiz { pub fn new(pg_pool: Data) -> Self { let manager = Arc::new(DocManager::new()); let (tx, rx) = mpsc::channel(100); - let actor = DocWsMsgActor::new(rx, manager.clone()); + let actor = DocWsActor::new(rx, manager.clone()); tokio::task::spawn(actor.run()); Self { manager, @@ -58,7 +58,7 @@ impl WsBizHandler for DocBiz { } pub struct DocManager { - docs_map: DashMap>, + docs_map: DashMap>, } impl DocManager { @@ -68,7 +68,7 @@ impl DocManager { } } - pub async fn get(&self, doc_id: &str, pg_pool: Data) -> Result>, ServerError> { + pub async fn get(&self, doc_id: &str, pg_pool: Data) -> Result>, ServerError> { match self.docs_map.get(doc_id) { None => { let params = QueryDocParams { @@ -76,10 +76,12 @@ impl DocManager { ..Default::default() }; let doc = read_doc(pg_pool.get_ref(), params).await?; - let edit_doc = spawn_blocking(|| EditDoc::new(doc)).await.map_err(internal_error)?; - let edit_doc = Arc::new(edit_doc?); - self.docs_map.insert(doc_id.to_string(), edit_doc.clone()); - Ok(Some(edit_doc)) + let handle = spawn_blocking(|| DocHandle::new(doc, pg_pool)) + .await + .map_err(internal_error)?; + let handle = Arc::new(handle?); + self.docs_map.insert(doc_id.to_string(), handle.clone()); + Ok(Some(handle)) }, Some(ctx) => Ok(Some(ctx.clone())), } diff --git a/backend/src/service/doc/edit/context.rs b/backend/src/service/doc/edit/context.rs deleted file mode 100644 index 65fae935e1..0000000000 --- a/backend/src/service/doc/edit/context.rs +++ /dev/null @@ -1,194 +0,0 @@ -use crate::service::{doc::edit::actor::EditUser, util::md5, ws::WsMessageAdaptor}; -use byteorder::{BigEndian, WriteBytesExt}; -use bytes::Bytes; -use dashmap::DashMap; -use flowy_document::{ - entities::ws::{WsDataType, WsDocumentData}, - protobuf::{Doc, RevType, Revision, RevisionRange, UpdateDocParams}, - services::doc::Document, -}; -use flowy_net::errors::{internal_error, ServerError}; -use flowy_ot::{ - core::{Delta, OperationTransformable}, - errors::OTError, -}; -use flowy_ws::WsMessage; -use parking_lot::RwLock; -use protobuf::Message; -use std::{ - convert::TryInto, - sync::{ - atomic::{AtomicI64, Ordering::SeqCst}, - Arc, - }, - time::Duration, -}; -pub struct EditDocContext { - doc_id: String, - rev_id: AtomicI64, - document: Arc>, - users: DashMap, -} - -impl EditDocContext { - pub fn new(doc: Doc) -> Result { - let delta = Delta::from_bytes(&doc.data).map_err(internal_error)?; - let document = Arc::new(RwLock::new(Document::from_delta(delta))); - let users = DashMap::new(); - Ok(Self { - doc_id: doc.id.clone(), - rev_id: AtomicI64::new(doc.rev_id), - document, - users, - }) - } - - pub fn document_json(&self) -> String { self.document.read().to_json() } - - pub async fn apply_revision(&self, user: EditUser, revision: Revision) -> Result<(), ServerError> { - // Opti: find out another way to keep the user socket available. - self.users.insert(user.id(), user.clone()); - log::debug!( - "cur_base_rev_id: {}, expect_base_rev_id: {} rev_id: {}", - self.rev_id.load(SeqCst), - revision.base_rev_id, - revision.rev_id - ); - - let cur_rev_id = self.rev_id.load(SeqCst); - if cur_rev_id > revision.rev_id { - // The client document is outdated. Transform the client revision delta and then - // send the prime delta to the client. Client should compose the this prime - // delta. - - let (cli_prime, server_prime) = self.transform(&revision.delta_data).map_err(internal_error)?; - let _ = self.update_document_delta(server_prime)?; - - log::debug!("{} client delta: {}", self.doc_id, cli_prime.to_json()); - let cli_revision = self.mk_revision(revision.rev_id, cli_prime); - let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision); - user.socket.do_send(ws_cli_revision).map_err(internal_error)?; - Ok(()) - } else if cur_rev_id < revision.rev_id { - if cur_rev_id != revision.base_rev_id { - // The server document is outdated, try to get the missing revision from the - // client. - user.socket - .do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, revision.rev_id)) - .map_err(internal_error)?; - } else { - let delta = Delta::from_bytes(&revision.delta_data).map_err(internal_error)?; - let _ = self.update_document_delta(delta)?; - user.socket - .do_send(mk_acked_ws_message(&revision)) - .map_err(internal_error)?; - self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id)); - let _ = self.save_revision(&revision).await?; - } - - Ok(()) - } else { - log::error!("Client rev_id should not equal to server rev_id"); - Ok(()) - } - } - - fn mk_revision(&self, base_rev_id: i64, delta: Delta) -> Revision { - let delta_data = delta.to_bytes().to_vec(); - let md5 = md5(&delta_data); - let revision = Revision { - base_rev_id, - rev_id: self.rev_id.load(SeqCst), - delta_data, - md5, - doc_id: self.doc_id.to_string(), - ty: RevType::Remote, - ..Default::default() - }; - revision - } - - #[tracing::instrument(level = "debug", skip(self, delta_data))] - fn transform(&self, delta_data: &Vec) -> Result<(Delta, Delta), OTError> { - log::debug!("Document: {}", self.document.read().to_json()); - let doc_delta = self.document.read().delta().clone(); - let cli_delta = Delta::from_bytes(delta_data)?; - - log::debug!("Compose delta: {}", cli_delta); - let (cli_prime, server_prime) = doc_delta.transform(&cli_delta)?; - - Ok((cli_prime, server_prime)) - } - - #[tracing::instrument(level = "debug", skip(self, delta))] - fn update_document_delta(&self, delta: Delta) -> Result<(), ServerError> { - // Opti: push each revision into queue and process it one by one. - match self.document.try_write_for(Duration::from_millis(300)) { - None => { - log::error!("Failed to acquire write lock of document"); - }, - Some(mut write_guard) => { - let _ = write_guard.compose_delta(&delta).map_err(internal_error)?; - log::debug!("Document: {}", write_guard.to_json()); - }, - } - Ok(()) - } - - #[tracing::instrument(level = "debug", skip(self, revision))] - async fn save_revision(&self, revision: &Revision) -> Result<(), ServerError> { - // Opti: save with multiple revisions - let mut params = UpdateDocParams::new(); - params.set_doc_id(self.doc_id.clone()); - params.set_data(self.document.read().to_json()); - params.set_rev_id(revision.rev_id); - // let _ = update_doc(self.pg_pool.get_ref(), params).await?; - Ok(()) - } -} - -fn mk_push_rev_ws_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor { - let bytes = revision.write_to_bytes().unwrap(); - let data = WsDocumentData { - id: doc_id.to_string(), - ty: WsDataType::PushRev, - data: bytes, - }; - mk_ws_message(data) -} - -fn mk_pull_rev_ws_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsMessageAdaptor { - let range = RevisionRange { - doc_id: doc_id.to_string(), - from_rev_id, - to_rev_id, - ..Default::default() - }; - - let bytes = range.write_to_bytes().unwrap(); - let data = WsDocumentData { - id: doc_id.to_string(), - ty: WsDataType::PullRev, - data: bytes, - }; - mk_ws_message(data) -} - -fn mk_acked_ws_message(revision: &Revision) -> WsMessageAdaptor { - let mut wtr = vec![]; - let _ = wtr.write_i64::(revision.rev_id); - - let data = WsDocumentData { - id: revision.doc_id.clone(), - ty: WsDataType::Acked, - data: wtr, - }; - - mk_ws_message(data) -} - -fn mk_ws_message>(data: T) -> WsMessageAdaptor { - let msg: WsMessage = data.into(); - let bytes: Bytes = msg.try_into().unwrap(); - WsMessageAdaptor(bytes) -} diff --git a/backend/src/service/doc/edit/actor.rs b/backend/src/service/doc/edit/edit_actor.rs similarity index 74% rename from backend/src/service/doc/edit/actor.rs rename to backend/src/service/doc/edit/edit_actor.rs index 0ffa0b34ba..94455f4c44 100644 --- a/backend/src/service/doc/edit/actor.rs +++ b/backend/src/service/doc/edit/edit_actor.rs @@ -1,11 +1,13 @@ use crate::service::{ - doc::edit::EditDocContext, + doc::edit::ServerEditDoc, ws::{entities::Socket, WsUser}, }; +use actix_web::web::Data; use async_stream::stream; -use flowy_document::protobuf::Revision; -use flowy_net::errors::{internal_error, Result as DocResult}; +use flowy_document::protobuf::{Doc, Revision}; +use flowy_net::errors::{internal_error, Result as DocResult, ServerError}; use futures::stream::StreamExt; +use sqlx::PgPool; use std::sync::Arc; use tokio::{ sync::{mpsc, oneshot}, @@ -37,15 +39,18 @@ pub enum EditMsg { pub struct EditDocActor { receiver: Option>, - edit_context: Arc, + edit_doc: Arc, + pg_pool: Data, } impl EditDocActor { - pub fn new(receiver: mpsc::Receiver, edit_context: Arc) -> Self { - Self { + pub fn new(receiver: mpsc::Receiver, doc: Doc, pg_pool: Data) -> Result { + let edit_doc = Arc::new(ServerEditDoc::new(doc)?); + Ok(Self { receiver: Some(receiver), - edit_context, - } + edit_doc, + pg_pool, + }) } pub async fn run(mut self) { @@ -78,10 +83,10 @@ impl EditDocActor { user: user.clone(), socket: socket.clone(), }; - let _ = ret.send(self.edit_context.apply_revision(user, revision).await); + let _ = ret.send(self.edit_doc.apply_revision(user, revision, self.pg_pool.clone()).await); }, EditMsg::DocumentJson { ret } => { - let edit_context = self.edit_context.clone(); + let edit_context = self.edit_doc.clone(); let json = spawn_blocking(move || edit_context.document_json()) .await .map_err(internal_error); diff --git a/backend/src/service/doc/edit/edit_doc.rs b/backend/src/service/doc/edit/edit_doc.rs index ceca90a19a..b35dd1f853 100644 --- a/backend/src/service/doc/edit/edit_doc.rs +++ b/backend/src/service/doc/edit/edit_doc.rs @@ -1,56 +1,206 @@ use crate::service::{ - doc::edit::{ - actor::{EditDocActor, EditMsg}, - EditDocContext, - }, - ws::{entities::Socket, WsUser}, + doc::{edit::edit_actor::EditUser, update_doc}, + util::md5, + ws::WsMessageAdaptor, }; -use flowy_document::protobuf::{Doc, Revision}; -use flowy_net::errors::{internal_error, Result as DocResult, ServerError}; -use std::sync::Arc; -use tokio::sync::{mpsc, oneshot}; -pub struct EditDoc { - sender: mpsc::Sender, +use actix_web::web::Data; +use byteorder::{BigEndian, WriteBytesExt}; +use bytes::Bytes; +use dashmap::DashMap; +use flowy_document::{ + entities::ws::{WsDataType, WsDocumentData}, + protobuf::{Doc, RevType, Revision, RevisionRange, UpdateDocParams}, + services::doc::Document, +}; +use flowy_net::errors::{internal_error, ServerError}; +use flowy_ot::{ + core::{Delta, OperationTransformable}, + errors::OTError, +}; +use flowy_ws::WsMessage; +use parking_lot::RwLock; +use protobuf::Message; +use sqlx::PgPool; +use std::{ + convert::TryInto, + sync::{ + atomic::{AtomicI64, Ordering::SeqCst}, + Arc, + }, + time::Duration, +}; + +pub struct ServerEditDoc { + doc_id: String, + rev_id: AtomicI64, + document: Arc>, + users: DashMap, } -impl EditDoc { +impl ServerEditDoc { pub fn new(doc: Doc) -> Result { - let (sender, receiver) = mpsc::channel(100); - let edit_context = Arc::new(EditDocContext::new(doc)?); - - let actor = EditDocActor::new(receiver, edit_context); - tokio::task::spawn(actor.run()); - - Ok(Self { sender }) + let delta = Delta::from_bytes(&doc.data).map_err(internal_error)?; + let document = Arc::new(RwLock::new(Document::from_delta(delta))); + let users = DashMap::new(); + Ok(Self { + doc_id: doc.id.clone(), + rev_id: AtomicI64::new(doc.rev_id), + document, + users, + }) } - #[tracing::instrument(level = "debug", skip(self, user, socket, revision))] + pub fn document_json(&self) -> String { self.document.read().to_json() } + pub async fn apply_revision( &self, - user: Arc, - socket: Socket, + user: EditUser, revision: Revision, + pg_pool: Data, ) -> Result<(), ServerError> { - let (ret, rx) = oneshot::channel(); - let msg = EditMsg::Revision { - user, - socket, - revision, - ret, + // Opti: find out another way to keep the user socket available. + self.users.insert(user.id(), user.clone()); + log::debug!( + "cur_base_rev_id: {}, expect_base_rev_id: {} rev_id: {}", + self.rev_id.load(SeqCst), + revision.base_rev_id, + revision.rev_id + ); + + let cur_rev_id = self.rev_id.load(SeqCst); + if cur_rev_id > revision.rev_id { + // The client document is outdated. Transform the client revision delta and then + // send the prime delta to the client. Client should compose the this prime + // delta. + + let (cli_prime, server_prime) = self.transform(&revision.delta_data).map_err(internal_error)?; + let _ = self.update_document_delta(server_prime)?; + + log::debug!("{} client delta: {}", self.doc_id, cli_prime.to_json()); + let cli_revision = self.mk_revision(revision.rev_id, cli_prime); + let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision); + user.socket.do_send(ws_cli_revision).map_err(internal_error)?; + Ok(()) + } else if cur_rev_id < revision.rev_id { + if cur_rev_id != revision.base_rev_id { + // The server document is outdated, try to get the missing revision from the + // client. + user.socket + .do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, revision.rev_id)) + .map_err(internal_error)?; + } else { + let delta = Delta::from_bytes(&revision.delta_data).map_err(internal_error)?; + let _ = self.update_document_delta(delta)?; + user.socket + .do_send(mk_acked_ws_message(&revision)) + .map_err(internal_error)?; + self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id)); + let _ = self.save_revision(&revision, pg_pool).await?; + } + + Ok(()) + } else { + log::error!("Client rev_id should not equal to server rev_id"); + Ok(()) + } + } + + fn mk_revision(&self, base_rev_id: i64, delta: Delta) -> Revision { + let delta_data = delta.to_bytes().to_vec(); + let md5 = md5(&delta_data); + let revision = Revision { + base_rev_id, + rev_id: self.rev_id.load(SeqCst), + delta_data, + md5, + doc_id: self.doc_id.to_string(), + ty: RevType::Remote, + ..Default::default() }; - let _ = self.send(msg, rx).await?; + revision + } + + #[tracing::instrument(level = "debug", skip(self, delta_data))] + fn transform(&self, delta_data: &Vec) -> Result<(Delta, Delta), OTError> { + log::debug!("Document: {}", self.document.read().to_json()); + let doc_delta = self.document.read().delta().clone(); + let cli_delta = Delta::from_bytes(delta_data)?; + + log::debug!("Compose delta: {}", cli_delta); + let (cli_prime, server_prime) = doc_delta.transform(&cli_delta)?; + + Ok((cli_prime, server_prime)) + } + + #[tracing::instrument(level = "debug", skip(self), err)] + fn update_document_delta(&self, delta: Delta) -> Result<(), ServerError> { + // Opti: push each revision into queue and process it one by one. + match self.document.try_write_for(Duration::from_millis(300)) { + None => { + log::error!("Failed to acquire write lock of document"); + }, + Some(mut write_guard) => { + let _ = write_guard.compose_delta(&delta).map_err(internal_error)?; + log::debug!("Document: {}", write_guard.to_json()); + }, + } Ok(()) } - pub async fn document_json(&self) -> DocResult { - let (ret, rx) = oneshot::channel(); - let msg = EditMsg::DocumentJson { ret }; - self.send(msg, rx).await? - } - - async fn send(&self, msg: EditMsg, rx: oneshot::Receiver) -> DocResult { - let _ = self.sender.send(msg).await.map_err(internal_error)?; - let result = rx.await?; - Ok(result) + #[tracing::instrument(level = "debug", skip(self, pg_pool), err)] + async fn save_revision(&self, revision: &Revision, pg_pool: Data) -> Result<(), ServerError> { + // Opti: save with multiple revisions + let mut params = UpdateDocParams::new(); + params.set_doc_id(self.doc_id.clone()); + params.set_data(self.document.read().to_json()); + params.set_rev_id(revision.rev_id); + let _ = update_doc(pg_pool.get_ref(), params).await?; + Ok(()) } } + +fn mk_push_rev_ws_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor { + let bytes = revision.write_to_bytes().unwrap(); + let data = WsDocumentData { + id: doc_id.to_string(), + ty: WsDataType::PushRev, + data: bytes, + }; + mk_ws_message(data) +} + +fn mk_pull_rev_ws_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsMessageAdaptor { + let range = RevisionRange { + doc_id: doc_id.to_string(), + from_rev_id, + to_rev_id, + ..Default::default() + }; + + let bytes = range.write_to_bytes().unwrap(); + let data = WsDocumentData { + id: doc_id.to_string(), + ty: WsDataType::PullRev, + data: bytes, + }; + mk_ws_message(data) +} + +fn mk_acked_ws_message(revision: &Revision) -> WsMessageAdaptor { + let mut wtr = vec![]; + let _ = wtr.write_i64::(revision.rev_id); + + let data = WsDocumentData { + id: revision.doc_id.clone(), + ty: WsDataType::Acked, + data: wtr, + }; + + mk_ws_message(data) +} + +fn mk_ws_message>(data: T) -> WsMessageAdaptor { + let msg: WsMessage = data.into(); + let bytes: Bytes = msg.try_into().unwrap(); + WsMessageAdaptor(bytes) +} diff --git a/backend/src/service/doc/edit/mod.rs b/backend/src/service/doc/edit/mod.rs index 69c0bb84fd..7c279b1ac8 100644 --- a/backend/src/service/doc/edit/mod.rs +++ b/backend/src/service/doc/edit/mod.rs @@ -1,6 +1,6 @@ -mod actor; -mod context; +mod edit_actor; mod edit_doc; +mod open_handle; -pub use context::*; pub use edit_doc::*; +pub use open_handle::*; diff --git a/backend/src/service/doc/edit/open_handle.rs b/backend/src/service/doc/edit/open_handle.rs new file mode 100644 index 0000000000..928c5cda23 --- /dev/null +++ b/backend/src/service/doc/edit/open_handle.rs @@ -0,0 +1,57 @@ +use crate::service::{ + doc::edit::{ + edit_actor::{EditDocActor, EditMsg}, + ServerEditDoc, + }, + ws::{entities::Socket, WsUser}, +}; +use actix_web::web::Data; +use flowy_document::protobuf::{Doc, Revision}; +use flowy_net::errors::{internal_error, Result as DocResult, ServerError}; +use sqlx::PgPool; +use std::sync::Arc; +use tokio::sync::{mpsc, oneshot}; + +pub struct DocHandle { + sender: mpsc::Sender, +} + +impl DocHandle { + pub fn new(doc: Doc, pg_pool: Data) -> Result { + let (sender, receiver) = mpsc::channel(100); + let actor = EditDocActor::new(receiver, doc, pg_pool)?; + tokio::task::spawn(actor.run()); + + Ok(Self { sender }) + } + + #[tracing::instrument(level = "debug", skip(self, user, socket, revision))] + pub async fn apply_revision( + &self, + user: Arc, + socket: Socket, + revision: Revision, + ) -> Result<(), ServerError> { + let (ret, rx) = oneshot::channel(); + let msg = EditMsg::Revision { + user, + socket, + revision, + ret, + }; + let _ = self.send(msg, rx).await?; + Ok(()) + } + + pub async fn document_json(&self) -> DocResult { + let (ret, rx) = oneshot::channel(); + let msg = EditMsg::DocumentJson { ret }; + self.send(msg, rx).await? + } + + async fn send(&self, msg: EditMsg, rx: oneshot::Receiver) -> DocResult { + let _ = self.sender.send(msg).await.map_err(internal_error)?; + let result = rx.await?; + Ok(result) + } +} diff --git a/backend/src/service/doc/mod.rs b/backend/src/service/doc/mod.rs index 8e775df131..7a9f6c2753 100644 --- a/backend/src/service/doc/mod.rs +++ b/backend/src/service/doc/mod.rs @@ -1,8 +1,8 @@ pub(crate) use crud::*; pub use router::*; -mod actor; pub mod crud; pub mod doc; mod edit; pub mod router; +mod ws_actor; diff --git a/backend/src/service/doc/router.rs b/backend/src/service/doc/router.rs index 3ae46431bc..3c919c3be9 100644 --- a/backend/src/service/doc/router.rs +++ b/backend/src/service/doc/router.rs @@ -29,6 +29,7 @@ pub async fn create_handler(payload: Payload, pool: Data) -> Result) -> Result { let params: QueryDocParams = parse_from_payload(payload).await?; let doc = read_doc(pool.get_ref(), params).await?; diff --git a/backend/src/service/doc/actor.rs b/backend/src/service/doc/ws_actor.rs similarity index 98% rename from backend/src/service/doc/actor.rs rename to backend/src/service/doc/ws_actor.rs index 99222df908..ee69ed5a1a 100644 --- a/backend/src/service/doc/actor.rs +++ b/backend/src/service/doc/ws_actor.rs @@ -21,12 +21,12 @@ pub enum DocWsMsg { }, } -pub struct DocWsMsgActor { +pub struct DocWsActor { receiver: Option>, doc_manager: Arc, } -impl DocWsMsgActor { +impl DocWsActor { pub fn new(receiver: mpsc::Receiver, manager: Arc) -> Self { Self { receiver: Some(receiver), diff --git a/backend/tests/document/helper.rs b/backend/tests/document/helper.rs index 2b254bebed..9687df773d 100644 --- a/backend/tests/document/helper.rs +++ b/backend/tests/document/helper.rs @@ -6,7 +6,7 @@ use sqlx::PgPool; use tokio::time::{sleep, Duration}; use backend::service::doc::doc::DocManager; -use flowy_document::{entities::doc::QueryDocParams, services::doc::edit::EditDocContext as ClientEditDocContext}; +use flowy_document::{entities::doc::QueryDocParams, services::doc::edit::ClientEditDoc as ClientEditDocContext}; use flowy_net::config::ServerConfig; use flowy_test::{workspace::ViewTest, FlowyTest}; use flowy_user::services::user::UserSession; diff --git a/rust-lib/flowy-document/src/entities/doc/revision.rs b/rust-lib/flowy-document/src/entities/doc/revision.rs index be4a353426..56514ee005 100644 --- a/rust-lib/flowy-document/src/entities/doc/revision.rs +++ b/rust-lib/flowy-document/src/entities/doc/revision.rs @@ -1,5 +1,6 @@ use crate::services::util::md5; use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; +use flowy_ot::core::Delta; #[derive(Debug, ProtoBuf_Enum, Clone, Eq, PartialEq)] pub enum RevType { @@ -11,7 +12,7 @@ impl std::default::Default for RevType { fn default() -> Self { RevType::Local } } -#[derive(Debug, Clone, Default, ProtoBuf)] +#[derive(Clone, Default, ProtoBuf)] pub struct Revision { #[pb(index = 1)] pub base_rev_id: i64, @@ -32,6 +33,22 @@ pub struct Revision { pub ty: RevType, } +impl std::fmt::Debug for Revision { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + f.write_fmt(format_args!("doc_id {}, ", self.doc_id)); + f.write_fmt(format_args!("rev_id {}, ", self.rev_id)); + match Delta::from_bytes(&self.delta_data) { + Ok(delta) => { + f.write_fmt(format_args!("delta {:?}", delta.to_json())); + }, + Err(e) => { + f.write_fmt(format_args!("delta {:?}", e)); + }, + } + Ok(()) + } +} + impl Revision { pub fn new(base_rev_id: i64, rev_id: i64, delta_data: Vec, doc_id: &str, ty: RevType) -> Revision { let md5 = md5(&delta_data); diff --git a/rust-lib/flowy-document/src/module.rs b/rust-lib/flowy-document/src/module.rs index 2c9aa8b540..4e6fc45b52 100644 --- a/rust-lib/flowy-document/src/module.rs +++ b/rust-lib/flowy-document/src/module.rs @@ -10,7 +10,7 @@ use crate::{ entities::doc::{CreateDocParams, Doc, DocDelta, QueryDocParams}, errors::DocError, services::{ - doc::{doc_controller::DocController, edit::EditDocContext}, + doc::{doc_controller::DocController, edit::ClientEditDoc}, server::construct_doc_server, ws::WsDocumentManager, }, @@ -51,7 +51,7 @@ impl FlowyDocument { &self, params: QueryDocParams, pool: Arc, - ) -> Result, DocError> { + ) -> Result, DocError> { let edit_context = self.doc_ctrl.open(params, pool).await?; Ok(edit_context) } diff --git a/rust-lib/flowy-document/src/services/cache.rs b/rust-lib/flowy-document/src/services/cache.rs index cf214e17fa..c2d38029d4 100644 --- a/rust-lib/flowy-document/src/services/cache.rs +++ b/rust-lib/flowy-document/src/services/cache.rs @@ -4,25 +4,25 @@ use dashmap::DashMap; use crate::{ errors::DocError, - services::doc::edit::{DocId, EditDocContext}, + services::doc::edit::{ClientEditDoc, DocId}, }; pub(crate) struct DocCache { - inner: DashMap>, + inner: DashMap>, } impl DocCache { pub(crate) fn new() -> Self { Self { inner: DashMap::new() } } #[allow(dead_code)] - pub(crate) fn all_docs(&self) -> Vec> { + pub(crate) fn all_docs(&self) -> Vec> { self.inner .iter() .map(|kv| kv.value().clone()) - .collect::>>() + .collect::>>() } - pub(crate) fn set(&self, doc: Arc) { + pub(crate) fn set(&self, doc: Arc) { let doc_id = doc.doc_id.clone(); if self.inner.contains_key(&doc_id) { log::warn!("Doc:{} already exists in cache", &doc_id); @@ -32,7 +32,7 @@ impl DocCache { pub(crate) fn is_opened(&self, doc_id: &str) -> bool { self.inner.get(doc_id).is_some() } - pub(crate) fn get(&self, doc_id: &str) -> Result, DocError> { + pub(crate) fn get(&self, doc_id: &str) -> Result, DocError> { if !self.is_opened(&doc_id) { return Err(doc_not_found()); } diff --git a/rust-lib/flowy-document/src/services/doc/doc_controller.rs b/rust-lib/flowy-document/src/services/doc/doc_controller.rs index ed70035735..007a1e0a3d 100644 --- a/rust-lib/flowy-document/src/services/doc/doc_controller.rs +++ b/rust-lib/flowy-document/src/services/doc/doc_controller.rs @@ -5,15 +5,24 @@ use parking_lot::RwLock; use tokio::time::{interval, Duration}; use flowy_database::{ConnectionPool, SqliteConnection}; -use flowy_infra::future::{wrap_future, FnFuture}; +use flowy_infra::future::{wrap_future, FnFuture, ResultFuture}; use crate::{ entities::doc::{CreateDocParams, Doc, DocDelta, QueryDocParams}, - errors::{internal_error, DocError}, + errors::{internal_error, DocError, DocResult}, module::DocumentUser, - services::{cache::DocCache, doc::edit::EditDocContext, server::Server, ws::WsDocumentManager}, + services::{ + cache::DocCache, + doc::{ + edit::ClientEditDoc, + revision::{DocRevision, RevisionServer}, + }, + server::Server, + ws::WsDocumentManager, + }, sql_tables::doc::{DocTable, DocTableSql}, }; +use flowy_ot::core::Delta; pub(crate) struct DocController { server: Server, @@ -53,7 +62,7 @@ impl DocController { &self, params: QueryDocParams, pool: Arc, - ) -> Result, DocError> { + ) -> Result, DocError> { if self.cache.is_opened(¶ms.doc_id) == false { let edit_ctx = self.make_edit_context(¶ms.doc_id, pool.clone()).await?; return Ok(edit_ctx); @@ -105,21 +114,24 @@ impl DocController { Ok(()) } - async fn make_edit_context( - &self, - doc_id: &str, - pool: Arc, - ) -> Result, DocError> { + async fn make_edit_context(&self, doc_id: &str, pool: Arc) -> Result, DocError> { // Opti: require upgradable_read lock and then upgrade to write lock using // RwLockUpgradableReadGuard::upgrade(xx) of ws - let doc = self.read_doc(doc_id, pool.clone()).await?; + // let doc = self.read_doc(doc_id, pool.clone()).await?; let ws_sender = self.ws.read().sender(); - let edit_ctx = Arc::new(EditDocContext::new(doc, pool, ws_sender).await?); + let token = self.user.token()?; + let server = Arc::new(RevisionServerImpl { + token, + server: self.server.clone(), + }); + + let edit_ctx = Arc::new(ClientEditDoc::new(doc_id, pool, ws_sender, server).await?); self.ws.write().register_handler(doc_id, edit_ctx.clone()); self.cache.set(edit_ctx.clone()); Ok(edit_ctx) } + #[allow(dead_code)] #[tracing::instrument(level = "debug", skip(self, pool), err)] async fn read_doc(&self, doc_id: &str, pool: Arc) -> Result { match self.doc_sql.read_doc_table(doc_id, pool.clone()) { @@ -146,6 +158,34 @@ impl DocController { } } +struct RevisionServerImpl { + token: String, + server: Server, +} + +impl RevisionServer for RevisionServerImpl { + fn fetch_document_from_remote(&self, doc_id: &str) -> ResultFuture { + let params = QueryDocParams { + doc_id: doc_id.to_string(), + }; + let server = self.server.clone(); + let token = self.token.clone(); + + ResultFuture::new(async move { + match server.read_doc(&token, params).await? { + None => Err(DocError::not_found()), + Some(doc) => { + let delta = Delta::from_bytes(doc.data)?; + Ok(DocRevision { + rev_id: doc.rev_id, + delta, + }) + }, + } + }) + } +} + #[allow(dead_code)] fn event_loop(_cache: Arc) -> FnFuture<()> { let mut i = interval(Duration::from_secs(3)); diff --git a/rust-lib/flowy-document/src/services/doc/document/document.rs b/rust-lib/flowy-document/src/services/doc/document/document.rs index ec012338a0..2792aa2798 100644 --- a/rust-lib/flowy-document/src/services/doc/document/document.rs +++ b/rust-lib/flowy-document/src/services/doc/document/document.rs @@ -152,7 +152,7 @@ impl Document { match self.history.undo() { None => Err(DocError::undo().context("Undo stack is empty")), Some(undo_delta) => { - let (new_delta, inverted_delta) = self.invert_change(&undo_delta)?; + let (new_delta, inverted_delta) = self.invert(&undo_delta)?; let result = UndoResult::success(new_delta.target_len as usize); self.set_delta(new_delta); self.history.add_redo(inverted_delta); @@ -166,7 +166,7 @@ impl Document { match self.history.redo() { None => Err(DocError::redo()), Some(redo_delta) => { - let (new_delta, inverted_delta) = self.invert_change(&redo_delta)?; + let (new_delta, inverted_delta) = self.invert(&redo_delta)?; let result = UndoResult::success(new_delta.target_len as usize); self.set_delta(new_delta); @@ -178,13 +178,13 @@ impl Document { } impl Document { - fn invert_change(&self, change: &Delta) -> Result<(Delta, Delta), DocError> { + fn invert(&self, delta: &Delta) -> Result<(Delta, Delta), DocError> { // c = a.compose(b) // d = b.invert(a) // a = c.compose(d) - log::trace!("👉invert change {}", change); - let new_delta = self.delta.compose(change)?; - let inverted_delta = change.invert(&self.delta); + log::trace!("Invert {}", delta); + let new_delta = self.delta.compose(delta)?; + let inverted_delta = delta.invert(&self.delta); Ok((new_delta, inverted_delta)) } } diff --git a/rust-lib/flowy-document/src/services/doc/edit/context.rs b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs similarity index 93% rename from rust-lib/flowy-document/src/services/doc/edit/context.rs rename to rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs index 6f5f686d84..6411ff0803 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/context.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs @@ -7,9 +7,10 @@ use crate::{ services::{ doc::{ edit::{actor::DocumentEditActor, message::EditMsg}, - rev_manager::RevisionManager, + revision::{RevisionManager, RevisionServer}, UndoResult, }, + server::Server, util::bytes_to_rev_id, ws::{WsDocumentHandler, WsDocumentSender}, }, @@ -22,27 +23,28 @@ use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot}; pub type DocId = String; -pub struct EditDocContext { +pub struct ClientEditDoc { pub doc_id: DocId, rev_manager: Arc, document: UnboundedSender, pool: Arc, } -impl EditDocContext { +impl ClientEditDoc { pub(crate) async fn new( - doc: Doc, + doc_id: &str, pool: Arc, ws_sender: Arc, - ) -> Result { - let delta = Delta::from_bytes(doc.data)?; + server: Arc, + ) -> DocResult { + let (rev_manager, delta) = RevisionManager::new(doc_id, pool.clone(), ws_sender, server).await?; + let rev_manager = Arc::new(rev_manager); let (sender, receiver) = mpsc::unbounded_channel::(); - let edit_actor = DocumentEditActor::new(&doc.id, delta, pool.clone(), receiver); + let edit_actor = DocumentEditActor::new(doc_id, delta, pool.clone(), receiver); tokio::spawn(edit_actor.run()); - let rev_manager = Arc::new(RevisionManager::new(&doc.id, doc.rev_id, pool.clone(), ws_sender)); let edit_context = Self { - doc_id: doc.id, + doc_id: doc_id.to_string(), rev_manager, document: sender, pool, @@ -166,7 +168,7 @@ impl EditDocContext { } } -impl WsDocumentHandler for EditDocContext { +impl WsDocumentHandler for ClientEditDoc { fn receive(&self, doc_data: WsDocumentData) { let document = self.document.clone(); let rev_manager = self.rev_manager.clone(); diff --git a/rust-lib/flowy-document/src/services/doc/edit/mod.rs b/rust-lib/flowy-document/src/services/doc/edit/mod.rs index ab421fea8e..7c75a6f473 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/mod.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/mod.rs @@ -1,5 +1,5 @@ mod actor; -mod context; +mod edit_doc; mod message; -pub use context::*; +pub use edit_doc::*; diff --git a/rust-lib/flowy-document/src/services/doc/mod.rs b/rust-lib/flowy-document/src/services/doc/mod.rs index 4e7ee2b6bc..022f9b28e9 100644 --- a/rust-lib/flowy-document/src/services/doc/mod.rs +++ b/rust-lib/flowy-document/src/services/doc/mod.rs @@ -9,4 +9,4 @@ mod view; pub(crate) mod doc_controller; pub mod edit; pub mod extensions; -mod rev_manager; +mod revision; diff --git a/rust-lib/flowy-document/src/services/doc/rev_manager/mod.rs b/rust-lib/flowy-document/src/services/doc/rev_manager/mod.rs deleted file mode 100644 index f03ccb5ea8..0000000000 --- a/rust-lib/flowy-document/src/services/doc/rev_manager/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod rev_manager; -mod store; -mod util; - -pub use rev_manager::*; diff --git a/rust-lib/flowy-document/src/services/doc/rev_manager/store.rs b/rust-lib/flowy-document/src/services/doc/rev_manager/store.rs deleted file mode 100644 index eeadd81e64..0000000000 --- a/rust-lib/flowy-document/src/services/doc/rev_manager/store.rs +++ /dev/null @@ -1,191 +0,0 @@ -use crate::{ - entities::doc::{Revision, RevisionRange}, - errors::{internal_error, DocError, DocResult}, - services::doc::rev_manager::util::RevisionOperation, - sql_tables::{OpTableSql, RevChangeset, RevState}, -}; -use async_stream::stream; -use dashmap::DashMap; -use flowy_database::ConnectionPool; -use futures::stream::StreamExt; - -use std::{cell::RefCell, sync::Arc, time::Duration}; -use tokio::{ - sync::{mpsc, oneshot, RwLock}, - task::JoinHandle, -}; - -pub enum StoreMsg { - Revision { - revision: Revision, - }, - AckRevision { - rev_id: i64, - }, - SendRevisions { - range: RevisionRange, - ret: oneshot::Sender>>, - }, -} - -pub struct Store { - doc_id: String, - op_sql: Arc, - pool: Arc, - revs: Arc>, - delay_save: RwLock>>, - receiver: Option>, -} - -impl Store { - pub fn new(doc_id: &str, pool: Arc, receiver: mpsc::Receiver) -> Store { - let op_sql = Arc::new(OpTableSql {}); - let revs = Arc::new(DashMap::new()); - let doc_id = doc_id.to_owned(); - - Self { - doc_id, - op_sql, - pool, - revs, - delay_save: RwLock::new(None), - receiver: Some(receiver), - } - } - - pub async fn run(mut self) { - let mut receiver = self.receiver.take().expect("Should only call once"); - let stream = stream! { - loop { - match receiver.recv().await { - Some(msg) => yield msg, - None => break, - } - } - }; - stream.for_each(|msg| self.handle_message(msg)).await; - } - - async fn handle_message(&self, msg: StoreMsg) { - match msg { - StoreMsg::Revision { revision } => { - self.handle_new_revision(revision).await; - }, - StoreMsg::AckRevision { rev_id } => { - self.handle_revision_acked(rev_id).await; - }, - StoreMsg::SendRevisions { range: _, ret: _ } => { - unimplemented!() - }, - } - } - - async fn handle_new_revision(&self, revision: Revision) { - let mut operation = RevisionOperation::new(&revision); - let _receiver = operation.receiver(); - self.revs.insert(revision.rev_id, operation); - self.save_revisions().await; - } - - async fn handle_revision_acked(&self, rev_id: i64) { - match self.revs.get_mut(&rev_id) { - None => {}, - Some(mut rev) => rev.value_mut().finish(), - } - self.save_revisions().await; - } - - pub fn revs_in_range(&self, _range: RevisionRange) -> DocResult> { unimplemented!() } - - async fn save_revisions(&self) { - if let Some(handler) = self.delay_save.write().await.take() { - handler.abort(); - } - - if self.revs.is_empty() { - return; - } - - let revs = self.revs.clone(); - let pool = self.pool.clone(); - let op_sql = self.op_sql.clone(); - - *self.delay_save.write().await = Some(tokio::spawn(async move { - tokio::time::sleep(Duration::from_millis(300)).await; - - let ids = revs.iter().map(|kv| kv.key().clone()).collect::>(); - let revisions = revs - .iter() - .map(|kv| ((*kv.value()).clone(), kv.state)) - .collect::>(); - - let conn = &*pool.get().map_err(internal_error).unwrap(); - let result = conn.immediate_transaction::<_, DocError, _>(|| { - let _ = op_sql.create_rev_table(revisions, conn).unwrap(); - Ok(()) - }); - - match result { - Ok(_) => revs.retain(|k, _| !ids.contains(k)), - Err(e) => log::error!("Save revision failed: {:?}", e), - } - })); - } - // fn update_revisions(&self) { - // let rev_ids = self - // .revs - // .iter() - // .flat_map(|kv| match kv.state == RevState::Acked { - // true => None, - // false => Some(kv.key().clone()), - // }) - // .collect::>(); - // - // if rev_ids.is_empty() { - // return; - // } - // - // log::debug!("Try to update {:?} state", rev_ids); - // match self.update(&rev_ids) { - // Ok(_) => { - // self.revs.retain(|k, _| !rev_ids.contains(k)); - // }, - // Err(e) => log::error!("Save revision failed: {:?}", e), - // } - // } - // - // fn update(&self, rev_ids: &Vec) -> Result<(), DocError> { - // let conn = &*self.pool.get().map_err(internal_error).unwrap(); - // let result = conn.immediate_transaction::<_, DocError, _>(|| { - // for rev_id in rev_ids { - // let changeset = RevChangeset { - // doc_id: self.doc_id.clone(), - // rev_id: rev_id.clone(), - // state: RevState::Acked, - // }; - // let _ = self.op_sql.update_rev_table(changeset, conn)?; - // } - // Ok(()) - // }); - // - // result - // } - - // fn delete_revision(&self, rev_id: i64) { - // let op_sql = self.op_sql.clone(); - // let pool = self.pool.clone(); - // let doc_id = self.doc_id.clone(); - // tokio::spawn(async move { - // let conn = &*pool.get().map_err(internal_error).unwrap(); - // let result = conn.immediate_transaction::<_, DocError, _>(|| { - // let _ = op_sql.delete_rev_table(&doc_id, rev_id, conn)?; - // Ok(()) - // }); - // - // match result { - // Ok(_) => {}, - // Err(e) => log::error!("Delete revision failed: {:?}", e), - // } - // }); - // } -} diff --git a/rust-lib/flowy-document/src/services/doc/rev_manager/rev_manager.rs b/rust-lib/flowy-document/src/services/doc/revision/manager.rs similarity index 50% rename from rust-lib/flowy-document/src/services/doc/rev_manager/rev_manager.rs rename to rust-lib/flowy-document/src/services/doc/revision/manager.rs index 64d4407117..e6fccb6374 100644 --- a/rust-lib/flowy-document/src/services/doc/rev_manager/rev_manager.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -2,56 +2,76 @@ use crate::{ entities::doc::{RevType, Revision, RevisionRange}, errors::DocError, services::{ - doc::rev_manager::store::{Store, StoreMsg}, + doc::revision::store::{RevisionStore, StoreCmd}, util::RevIdCounter, ws::WsDocumentSender, }, }; +use crate::{entities::doc::Doc, errors::DocResult, services::server::Server}; use flowy_database::ConnectionPool; +use flowy_infra::future::ResultFuture; +use flowy_ot::core::Delta; use parking_lot::RwLock; use std::{collections::VecDeque, sync::Arc}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError}; + +pub struct DocRevision { + pub rev_id: i64, + pub delta: Delta, +} + +pub trait RevisionServer: Send + Sync { + fn fetch_document_from_remote(&self, doc_id: &str) -> ResultFuture; +} pub struct RevisionManager { doc_id: String, rev_id_counter: RevIdCounter, - ws_sender: Arc, - store_sender: mpsc::Sender, + ws: Arc, + store: mpsc::Sender, pending_revs: RwLock>, } -// tokio::time::timeout + impl RevisionManager { - pub fn new(doc_id: &str, rev_id: i64, pool: Arc, ws_sender: Arc) -> Self { - let (sender, receiver) = mpsc::channel::(50); - let store = Store::new(doc_id, pool, receiver); + pub async fn new( + doc_id: &str, + pool: Arc, + ws_sender: Arc, + server: Arc, + ) -> DocResult<(Self, Delta)> { + let (sender, receiver) = mpsc::channel::(50); + let store = RevisionStore::new(doc_id, pool, receiver, server); tokio::spawn(store.run()); + let DocRevision { rev_id, delta } = fetch_document(sender.clone()).await?; + let doc_id = doc_id.to_string(); let rev_id_counter = RevIdCounter::new(rev_id); let pending_revs = RwLock::new(VecDeque::new()); - Self { + let manager = Self { doc_id, rev_id_counter, - ws_sender, + ws: ws_sender, pending_revs, - store_sender: sender, - } + store: sender, + }; + Ok((manager, delta)) } pub fn push_compose_revision(&self, revision: Revision) { self.pending_revs.write().push_front(revision); } pub fn next_compose_revision(&self) -> Option { self.pending_revs.write().pop_front() } - #[tracing::instrument(level = "debug", skip(self, revision))] + #[tracing::instrument(level = "debug", skip(self))] pub async fn add_revision(&self, revision: Revision) -> Result<(), DocError> { - let msg = StoreMsg::Revision { + let cmd = StoreCmd::Revision { revision: revision.clone(), }; - let _ = self.store_sender.send(msg).await; + let _ = self.store.send(cmd).await; match revision.ty { - RevType::Local => match self.ws_sender.send(revision.into()) { + RevType::Local => match self.ws.send(revision.into()) { Ok(_) => {}, Err(e) => log::error!("Send delta failed: {:?}", e), }, @@ -64,9 +84,9 @@ impl RevisionManager { } pub fn ack_rev(&self, rev_id: i64) -> Result<(), DocError> { - let sender = self.store_sender.clone(); + let sender = self.store.clone(); tokio::spawn(async move { - let _ = sender.send(StoreMsg::AckRevision { rev_id }).await; + let _ = sender.send(StoreCmd::AckRevision { rev_id }).await; }); Ok(()) } @@ -82,12 +102,25 @@ impl RevisionManager { pub fn send_revisions(&self, range: RevisionRange) -> Result<(), DocError> { debug_assert!(&range.doc_id == &self.doc_id); let (ret, _rx) = oneshot::channel(); - let sender = self.store_sender.clone(); + let sender = self.store.clone(); tokio::spawn(async move { - let _ = sender.send(StoreMsg::SendRevisions { range, ret }).await; + let _ = sender.send(StoreCmd::SendRevisions { range, ret }).await; }); unimplemented!() } } + +async fn fetch_document(sender: mpsc::Sender) -> DocResult { + let (ret, rx) = oneshot::channel(); + let _ = sender.send(StoreCmd::DocumentDelta { ret }).await; + + match rx.await { + Ok(result) => Ok(result?), + Err(e) => { + log::error!("fetch_document: {}", e); + Err(DocError::internal().context(format!("fetch_document: {}", e))) + }, + } +} diff --git a/rust-lib/flowy-document/src/services/doc/revision/mod.rs b/rust-lib/flowy-document/src/services/doc/revision/mod.rs new file mode 100644 index 0000000000..bc2ddffc57 --- /dev/null +++ b/rust-lib/flowy-document/src/services/doc/revision/mod.rs @@ -0,0 +1,5 @@ +mod manager; +mod store; +mod util; + +pub use manager::*; diff --git a/rust-lib/flowy-document/src/services/doc/revision/store.rs b/rust-lib/flowy-document/src/services/doc/revision/store.rs new file mode 100644 index 0000000000..f7226ee54a --- /dev/null +++ b/rust-lib/flowy-document/src/services/doc/revision/store.rs @@ -0,0 +1,301 @@ +use crate::{ + entities::doc::{Doc, Revision, RevisionRange}, + errors::{internal_error, DocError, DocResult}, + services::{ + doc::revision::{util::RevisionOperation, DocRevision, RevisionServer}, + server::Server, + }, + sql_tables::{DocTableSql, RevChangeset, RevState, RevTableSql}, +}; +use async_stream::stream; +use dashmap::DashMap; +use flowy_database::{ConnectionPool, SqliteConnection}; +use flowy_ot::{ + core::{Attributes, Delta, OperationTransformable}, + errors::OTError, +}; +use futures::{stream::StreamExt, TryFutureExt}; +use std::{cell::RefCell, sync::Arc, time::Duration}; +use tokio::{ + sync::{mpsc, oneshot, RwLock}, + task::{spawn_blocking, JoinHandle}, +}; + +pub enum StoreCmd { + Revision { + revision: Revision, + }, + AckRevision { + rev_id: i64, + }, + SendRevisions { + range: RevisionRange, + ret: oneshot::Sender>>, + }, + DocumentDelta { + ret: oneshot::Sender>, + }, +} + +pub struct RevisionStore { + doc_id: String, + persistence: Arc, + revs: Arc>, + delay_save: RwLock>>, + receiver: Option>, + server: Arc, +} + +impl RevisionStore { + pub fn new( + doc_id: &str, + pool: Arc, + receiver: mpsc::Receiver, + server: Arc, + ) -> RevisionStore { + let persistence = Arc::new(Persistence::new(pool)); + let revs = Arc::new(DashMap::new()); + let doc_id = doc_id.to_owned(); + + Self { + doc_id, + persistence, + revs, + delay_save: RwLock::new(None), + receiver: Some(receiver), + server, + } + } + + pub async fn run(mut self) { + let mut receiver = self.receiver.take().expect("Should only call once"); + let stream = stream! { + loop { + match receiver.recv().await { + Some(msg) => yield msg, + None => break, + } + } + }; + stream.for_each(|msg| self.handle_message(msg)).await; + } + + async fn handle_message(&self, cmd: StoreCmd) { + match cmd { + StoreCmd::Revision { revision } => { + self.handle_new_revision(revision).await; + }, + StoreCmd::AckRevision { rev_id } => { + self.handle_revision_acked(rev_id).await; + }, + StoreCmd::SendRevisions { range, ret } => { + let result = revs_in_range(&self.doc_id, self.persistence.clone(), range).await; + let _ = ret.send(result); + }, + StoreCmd::DocumentDelta { ret } => { + let delta = fetch_document(&self.doc_id, self.server.clone(), self.persistence.clone()).await; + let _ = ret.send(delta); + }, + } + } + + async fn handle_new_revision(&self, revision: Revision) { + let mut operation = RevisionOperation::new(&revision); + let _receiver = operation.receiver(); + self.revs.insert(revision.rev_id, operation); + self.save_revisions().await; + } + + async fn handle_revision_acked(&self, rev_id: i64) { + match self.revs.get_mut(&rev_id) { + None => {}, + Some(mut rev) => rev.value_mut().finish(), + } + self.save_revisions().await; + } + + async fn save_revisions(&self) { + if let Some(handler) = self.delay_save.write().await.take() { + handler.abort(); + } + + if self.revs.is_empty() { + return; + } + + let revs = self.revs.clone(); + let persistence = self.persistence.clone(); + + *self.delay_save.write().await = Some(tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(300)).await; + + let ids = revs.iter().map(|kv| kv.key().clone()).collect::>(); + let revisions = revs + .iter() + .map(|kv| ((*kv.value()).clone(), kv.state)) + .collect::>(); + + // TODO: Ok to unwrap? + let conn = &*persistence.pool.get().map_err(internal_error).unwrap(); + + let result = conn.immediate_transaction::<_, DocError, _>(|| { + let _ = persistence.rev_sql.create_rev_table(revisions, conn).unwrap(); + Ok(()) + }); + + match result { + Ok(_) => revs.retain(|k, _| !ids.contains(k)), + Err(e) => log::error!("Save revision failed: {:?}", e), + } + })); + } +} + +async fn fetch_document( + doc_id: &str, + server: Arc, + persistence: Arc, +) -> DocResult { + let fetch_from_remote = server.fetch_document_from_remote(doc_id).or_else(|result| { + log::error!( + "Fetch document delta from remote failed: {:?}, try to fetch from local", + result + ); + fetch_from_local(doc_id, persistence.clone()) + }); + + let fetch_from_local = fetch_from_local(doc_id, persistence.clone()).or_else(|result| async move { + log::error!( + "Fetch document delta from local failed: {:?}, try to fetch from remote", + result + ); + server.fetch_document_from_remote(doc_id).await + }); + + tokio::select! { + result = fetch_from_remote => { + log::debug!("Finish fetching document from remote"); + result + }, + result = fetch_from_local => { + log::debug!("Finish fetching document from local"); + result + }, + } +} + +async fn fetch_from_local(doc_id: &str, persistence: Arc) -> DocResult { + let doc_id = doc_id.to_owned(); + spawn_blocking(move || { + // tokio::time::timeout + let conn = &*persistence.pool.get().map_err(internal_error)?; + let revisions = persistence.rev_sql.read_rev_tables(&doc_id, None, conn)?; + if revisions.is_empty() { + return Err(DocError::not_found()); + } + + let rev_id = revisions.last().unwrap().rev_id; + let mut delta = Delta::new(); + for revision in revisions { + match Delta::from_bytes(revision.delta_data) { + Ok(local_delta) => { + delta = delta.compose(&local_delta)?; + }, + Err(e) => { + log::error!("Deserialize delta from revision failed: {}", e); + }, + } + } + + delta.insert("\n", Attributes::default()); + + Result::::Ok(DocRevision { rev_id, delta }) + }) + .await + .map_err(internal_error)? +} + +async fn revs_in_range(doc_id: &str, persistence: Arc, range: RevisionRange) -> DocResult> { + let doc_id = doc_id.to_owned(); + let result = spawn_blocking(move || { + let conn = &*persistence.pool.get().map_err(internal_error)?; + let revisions = persistence.rev_sql.read_rev_tables_with_range(&doc_id, range, conn)?; + Ok(revisions) + }) + .await + .map_err(internal_error)?; + + result +} + +struct Persistence { + rev_sql: Arc, + doc_sql: Arc, + pool: Arc, +} + +impl Persistence { + fn new(pool: Arc) -> Self { + let rev_sql = Arc::new(RevTableSql {}); + let doc_sql = Arc::new(DocTableSql {}); + Self { rev_sql, doc_sql, pool } + } +} + +// fn update_revisions(&self) { +// let rev_ids = self +// .revs +// .iter() +// .flat_map(|kv| match kv.state == RevState::Acked { +// true => None, +// false => Some(kv.key().clone()), +// }) +// .collect::>(); +// +// if rev_ids.is_empty() { +// return; +// } +// +// log::debug!("Try to update {:?} state", rev_ids); +// match self.update(&rev_ids) { +// Ok(_) => { +// self.revs.retain(|k, _| !rev_ids.contains(k)); +// }, +// Err(e) => log::error!("Save revision failed: {:?}", e), +// } +// } +// +// fn update(&self, rev_ids: &Vec) -> Result<(), DocError> { +// let conn = &*self.pool.get().map_err(internal_error).unwrap(); +// let result = conn.immediate_transaction::<_, DocError, _>(|| { +// for rev_id in rev_ids { +// let changeset = RevChangeset { +// doc_id: self.doc_id.clone(), +// rev_id: rev_id.clone(), +// state: RevState::Acked, +// }; +// let _ = self.op_sql.update_rev_table(changeset, conn)?; +// } +// Ok(()) +// }); +// +// result +// } + +// fn delete_revision(&self, rev_id: i64) { +// let op_sql = self.op_sql.clone(); +// let pool = self.pool.clone(); +// let doc_id = self.doc_id.clone(); +// tokio::spawn(async move { +// let conn = &*pool.get().map_err(internal_error).unwrap(); +// let result = conn.immediate_transaction::<_, DocError, _>(|| { +// let _ = op_sql.delete_rev_table(&doc_id, rev_id, conn)?; +// Ok(()) +// }); +// +// match result { +// Ok(_) => {}, +// Err(e) => log::error!("Delete revision failed: {:?}", e), +// } +// }); +// } diff --git a/rust-lib/flowy-document/src/services/doc/rev_manager/util.rs b/rust-lib/flowy-document/src/services/doc/revision/util.rs similarity index 100% rename from rust-lib/flowy-document/src/services/doc/rev_manager/util.rs rename to rust-lib/flowy-document/src/services/doc/revision/util.rs diff --git a/rust-lib/flowy-document/src/sql_tables/doc/mod.rs b/rust-lib/flowy-document/src/sql_tables/doc/mod.rs index 448b9fa140..202bf740a3 100644 --- a/rust-lib/flowy-document/src/sql_tables/doc/mod.rs +++ b/rust-lib/flowy-document/src/sql_tables/doc/mod.rs @@ -1,9 +1,9 @@ -mod doc_op_sql; -mod doc_op_table; mod doc_sql; mod doc_table; +mod rev_sql; +mod rev_table; -pub(crate) use doc_op_sql::*; -pub(crate) use doc_op_table::*; pub(crate) use doc_sql::*; pub(crate) use doc_table::*; +pub(crate) use rev_sql::*; +pub(crate) use rev_table::*; diff --git a/rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs b/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs similarity index 82% rename from rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs rename to rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs index ce1e44cf9d..c39ce3b51f 100644 --- a/rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs +++ b/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs @@ -1,5 +1,5 @@ use crate::{ - entities::doc::Revision, + entities::doc::{Revision, RevisionRange}, errors::DocError, sql_tables::{doc::RevTable, RevChangeset, RevState, RevTableType}, }; @@ -11,9 +11,9 @@ use flowy_database::{ SqliteConnection, }; -pub struct OpTableSql {} +pub struct RevTableSql {} -impl OpTableSql { +impl RevTableSql { pub(crate) fn create_rev_table( &self, revisions: Vec<(Revision, RevState)>, @@ -49,16 +49,22 @@ impl OpTableSql { Ok(()) } - pub(crate) fn read_rev_table( + pub(crate) fn read_rev_tables( &self, doc_id_s: &str, - rev_id_s: i64, + rev_id_s: Option, conn: &SqliteConnection, ) -> Result, DocError> { - let rev_tables: Vec = dsl::rev_table - .filter(rev_id.eq(rev_id_s)) + let mut filter = dsl::rev_table .filter(doc_id.eq(doc_id_s)) - .load::(conn)?; + .order(rev_id.asc()) + .into_boxed(); + + if let Some(rev_id_s) = rev_id_s { + filter = filter.filter(rev_id.eq(rev_id_s)) + } + + let rev_tables = filter.load::(conn)?; let revisions = rev_tables .into_iter() @@ -67,16 +73,15 @@ impl OpTableSql { Ok(revisions) } - pub(crate) fn read_revs_table( + pub(crate) fn read_rev_tables_with_range( &self, doc_id_s: &str, - from_rev_id: i64, - to_rev_id: i64, + range: RevisionRange, conn: &SqliteConnection, ) -> Result, DocError> { let rev_tables = dsl::rev_table - .filter(rev_id.ge(from_rev_id)) - .filter(rev_id.lt(to_rev_id)) + .filter(rev_id.ge(range.from_rev_id)) + .filter(rev_id.lt(range.to_rev_id)) .filter(doc_id.eq(doc_id_s)) .load::(conn)?; diff --git a/rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs b/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs similarity index 100% rename from rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs rename to rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs diff --git a/rust-lib/flowy-net/src/request/request.rs b/rust-lib/flowy-net/src/request/request.rs index 3376ea6ee9..ba978bdf7a 100644 --- a/rust-lib/flowy-net/src/request/request.rs +++ b/rust-lib/flowy-net/src/request/request.rs @@ -2,13 +2,13 @@ use crate::{config::HEADER_TOKEN, errors::ServerError, response::FlowyResponse}; use bytes::Bytes; use hyper::http; use protobuf::ProtobufError; -use reqwest::{header::HeaderMap, Client, Method, Response}; +use reqwest::{header::HeaderMap, Client, Error, Method, Response}; use std::{ convert::{TryFrom, TryInto}, sync::Arc, time::Duration, }; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, oneshot::error::RecvError}; pub trait ResponseMiddleware { fn receive_response(&self, token: &Option, response: &FlowyResponse); @@ -144,15 +144,11 @@ impl HttpRequestBuilder { } let response = builder.send().await; - match tx.send(response) { - Ok(_) => {}, - Err(e) => { - log::error!("[{}] Send http request failed: {:?}", method, e); - }, - } + let _ = tx.send(response); }); let response = rx.await??; + log::trace!("Http Response: {:?}", response); let flowy_response = flowy_response_from(response).await?; let token = self.token(); self.middleware.iter().for_each(|middleware| {