From 4450d4410b801f98c075d96d06e8b6396b7fa945 Mon Sep 17 00:00:00 2001 From: appflowy Date: Wed, 8 Dec 2021 14:17:40 +0800 Subject: [PATCH] send revision periodically --- backend/tests/document/helper.rs | 2 +- .../rust-lib/flowy-document/src/module.rs | 2 +- .../flowy-document/src/services/cache.rs | 2 +- .../src/services/doc/edit/editor.rs | 5 +- .../flowy-document/src/services/doc/mod.rs | 6 +- .../src/services/doc/revision/cache.rs | 263 ++++++++++++ .../src/services/doc/revision/manager.rs | 24 +- .../src/services/doc/revision/mod.rs | 7 +- .../src/services/doc/revision/model.rs | 45 --- .../src/services/doc/revision/persistence.rs | 380 ------------------ .../src/services/doc/revision/sync.rs | 76 ++++ .../src/sql_tables/doc/rev_sql.rs | 38 +- .../src/sql_tables/doc/rev_table.rs | 39 +- shared-lib/lib-ot/src/revision/cache.rs | 56 ++- 14 files changed, 449 insertions(+), 496 deletions(-) create mode 100644 frontend/rust-lib/flowy-document/src/services/doc/revision/cache.rs delete mode 100644 frontend/rust-lib/flowy-document/src/services/doc/revision/model.rs delete mode 100644 frontend/rust-lib/flowy-document/src/services/doc/revision/persistence.rs create mode 100644 frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs diff --git a/backend/tests/document/helper.rs b/backend/tests/document/helper.rs index 3c173b2028..20e622ee14 100644 --- a/backend/tests/document/helper.rs +++ b/backend/tests/document/helper.rs @@ -2,7 +2,7 @@ #![cfg_attr(rustfmt, rustfmt::skip)] use actix_web::web::Data; use backend::services::doc::{crud::update_doc, manager::DocManager}; -use flowy_document::services::doc::ClientDocEditor as ClientEditDocContext; +use flowy_document::services::doc::edit::ClientDocEditor as ClientEditDocContext; use flowy_test::{workspace::ViewTest, FlowyTest}; use flowy_user::services::user::UserSession; use futures_util::{stream, stream::StreamExt}; diff --git a/frontend/rust-lib/flowy-document/src/module.rs b/frontend/rust-lib/flowy-document/src/module.rs index b696b97b30..27a4fa62d2 100644 --- a/frontend/rust-lib/flowy-document/src/module.rs +++ b/frontend/rust-lib/flowy-document/src/module.rs @@ -1,7 +1,7 @@ use crate::{ errors::DocError, services::{ - doc::{doc_controller::DocController, ClientDocEditor}, + doc::{doc_controller::DocController, edit::ClientDocEditor}, server::construct_doc_server, ws::WsDocumentManager, }, diff --git a/frontend/rust-lib/flowy-document/src/services/cache.rs b/frontend/rust-lib/flowy-document/src/services/cache.rs index 6b633dc338..712cde6774 100644 --- a/frontend/rust-lib/flowy-document/src/services/cache.rs +++ b/frontend/rust-lib/flowy-document/src/services/cache.rs @@ -4,7 +4,7 @@ use dashmap::DashMap; use crate::{ errors::DocError, - services::doc::{ClientDocEditor, DocId}, + services::doc::edit::{ClientDocEditor, DocId}, }; pub(crate) struct DocCache { diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs index 119a2335cf..4b2e5cb1c0 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs @@ -2,7 +2,10 @@ use crate::{ errors::{internal_error, DocError, DocResult}, module::DocumentUser, services::{ - doc::{EditCommand, EditCommandQueue, OpenDocAction, RevisionManager, RevisionServer, TransformDeltas}, + doc::{ + edit::{EditCommand, EditCommandQueue, OpenDocAction, TransformDeltas}, + revision::{RevisionManager, RevisionServer}, + }, ws::{DocumentWebSocket, WsDocumentHandler}, }, }; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/mod.rs b/frontend/rust-lib/flowy-document/src/services/doc/mod.rs index 31583599f1..48a4bbd187 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/mod.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/mod.rs @@ -1,6 +1,4 @@ -mod edit; -mod revision; +pub mod edit; +pub mod revision; pub(crate) mod doc_controller; -pub use edit::*; -pub(crate) use revision::*; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache.rs new file mode 100644 index 0000000000..8117178e11 --- /dev/null +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache.rs @@ -0,0 +1,263 @@ +use crate::{ + errors::{internal_error, DocError, DocResult}, + services::doc::revision::RevisionServer, + sql_tables::RevTableSql, +}; +use flowy_database::ConnectionPool; +use flowy_document_infra::entities::doc::Doc; +use lib_infra::future::ResultFuture; +use lib_ot::{ + core::{Operation, OperationTransformable}, + revision::{ + RevId, + RevState, + RevType, + Revision, + RevisionDiskCache, + RevisionMemoryCache, + RevisionRange, + RevisionRecord, + }, + rich_text::RichTextDelta, +}; +use std::{sync::Arc, time::Duration}; +use tokio::{ + sync::RwLock, + task::{spawn_blocking, JoinHandle}, +}; + +pub trait RevisionIterator: Send + Sync { + fn next(&self) -> ResultFuture, DocError>; +} + +type DocRevisionDeskCache = dyn RevisionDiskCache; + +pub struct RevisionCache { + doc_id: String, + dish_cache: Arc, + memory_cache: Arc, + defer_save: RwLock>>, + server: Arc, +} + +impl RevisionCache { + pub fn new(doc_id: &str, pool: Arc, server: Arc) -> RevisionCache { + let doc_id = doc_id.to_owned(); + let dish_cache = Arc::new(Persistence::new(pool)); + let memory_cache = Arc::new(RevisionMemoryCache::new()); + Self { + doc_id, + dish_cache, + memory_cache, + defer_save: RwLock::new(None), + server, + } + } + + #[tracing::instrument(level = "debug", skip(self, revision))] + pub async fn add_revision(&self, revision: Revision) -> DocResult<()> { + if self.memory_cache.contains(&revision.rev_id) { + return Err(DocError::duplicate_rev().context(format!("Duplicate revision id: {}", revision.rev_id))); + } + self.memory_cache.add_revision(revision.clone()).await?; + self.save_revisions().await; + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self, rev_id), fields(rev_id = %rev_id.as_ref()))] + pub async fn ack_revision(&self, rev_id: RevId) { + let rev_id = rev_id.value; + self.memory_cache.mut_revision(&rev_id, |mut rev| rev.value_mut().ack()); + self.save_revisions().await; + } + + async fn save_revisions(&self) { + if let Some(handler) = self.defer_save.write().await.take() { + handler.abort(); + } + + if self.memory_cache.is_empty() { + return; + } + + let memory_cache = self.memory_cache.clone(); + let disk_cache = self.dish_cache.clone(); + *self.defer_save.write().await = Some(tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(300)).await; + let (ids, records) = memory_cache.revisions(); + match disk_cache.create_revisions(records) { + Ok(_) => { + memory_cache.remove_revisions(ids); + }, + Err(e) => log::error!("Save revision failed: {:?}", e), + } + })); + } + + pub async fn revisions_in_range(&self, range: RevisionRange) -> DocResult> { + let revs = self.memory_cache.revisions_in_range(&range).await?; + if revs.len() == range.len() as usize { + Ok(revs) + } else { + let doc_id = self.doc_id.clone(); + let disk_cache = self.dish_cache.clone(); + spawn_blocking(move || disk_cache.revisions_in_range(&doc_id, &range)) + .await + .map_err(internal_error)? + } + } + + pub async fn fetch_document(&self) -> DocResult { + let result = fetch_from_local(&self.doc_id, self.dish_cache.clone()).await; + if result.is_ok() { + return result; + } + + let doc = self.server.fetch_document_from_remote(&self.doc_id).await?; + let delta_data = doc.data.as_bytes(); + let revision = Revision::new( + doc.base_rev_id, + doc.rev_id, + delta_data.to_owned(), + &doc.id, + RevType::Remote, + ); + let record = RevisionRecord { + revision, + state: RevState::Acked, + }; + let _ = self.dish_cache.create_revisions(vec![record])?; + Ok(doc) + } +} + +impl RevisionIterator for RevisionCache { + fn next(&self) -> ResultFuture, DocError> { + let memory_cache = self.memory_cache.clone(); + let disk_cache = self.dish_cache.clone(); + let doc_id = self.doc_id.clone(); + ResultFuture::new(async move { + match memory_cache.front_revision().await { + None => { + // + match memory_cache.front_rev_id().await { + None => Ok(None), + Some(rev_id) => match disk_cache.read_revision(&doc_id, rev_id)? { + None => Ok(None), + Some(revision) => Ok(Some(RevisionRecord::new(revision))), + }, + } + }, + Some((_, record)) => Ok(Some(record)), + } + }) + } +} + +async fn fetch_from_local(doc_id: &str, disk_cache: Arc) -> DocResult { + let doc_id = doc_id.to_owned(); + spawn_blocking(move || { + let revisions = disk_cache.read_revisions(&doc_id)?; + if revisions.is_empty() { + return Err(DocError::record_not_found().context("Local doesn't have this document")); + } + + let base_rev_id: RevId = revisions.last().unwrap().base_rev_id.into(); + let rev_id: RevId = revisions.last().unwrap().rev_id.into(); + let mut delta = RichTextDelta::new(); + for (_, revision) in revisions.into_iter().enumerate() { + match RichTextDelta::from_bytes(revision.delta_data) { + Ok(local_delta) => { + delta = delta.compose(&local_delta)?; + }, + Err(e) => { + log::error!("Deserialize delta from revision failed: {}", e); + }, + } + } + + #[cfg(debug_assertions)] + validate_delta(&doc_id, disk_cache, &delta); + + match delta.ops.last() { + None => {}, + Some(op) => { + let data = op.get_data(); + if !data.ends_with('\n') { + delta.ops.push(Operation::Insert("\n".into())) + } + }, + } + + Result::::Ok(Doc { + id: doc_id, + data: delta.to_json(), + rev_id: rev_id.into(), + base_rev_id: base_rev_id.into(), + }) + }) + .await + .map_err(internal_error)? +} + +#[cfg(debug_assertions)] +fn validate_delta(doc_id: &str, disk_cache: Arc, delta: &RichTextDelta) { + if delta.ops.last().is_none() { + return; + } + + let data = delta.ops.last().as_ref().unwrap().get_data(); + if !data.ends_with('\n') { + log::error!("The op must end with newline"); + let result = || { + let revisions = disk_cache.read_revisions(&doc_id)?; + for revision in revisions { + let delta = RichTextDelta::from_bytes(revision.delta_data)?; + log::error!("Invalid revision: {}:{}", revision.rev_id, delta.to_json()); + } + Ok::<(), DocError>(()) + }; + match result() { + Ok(_) => {}, + Err(e) => log::error!("{}", e), + } + } +} + +pub(crate) struct Persistence { + pub(crate) pool: Arc, +} + +impl RevisionDiskCache for Persistence { + type Error = DocError; + + fn create_revisions(&self, revisions: Vec) -> Result<(), Self::Error> { + let conn = &*self.pool.get().map_err(internal_error)?; + conn.immediate_transaction::<_, DocError, _>(|| { + let _ = RevTableSql::create_rev_table(revisions, conn)?; + Ok(()) + }) + } + + fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result, Self::Error> { + let conn = &*self.pool.get().map_err(internal_error).unwrap(); + let revisions = RevTableSql::read_rev_tables_with_range(doc_id, range.clone(), conn)?; + Ok(revisions) + } + + fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result, Self::Error> { + let conn = self.pool.get().map_err(internal_error)?; + let some = RevTableSql::read_rev_table(doc_id, &rev_id, &*conn)?; + Ok(some) + } + + fn read_revisions(&self, doc_id: &str) -> Result, Self::Error> { + let conn = self.pool.get().map_err(internal_error)?; + let some = RevTableSql::read_rev_tables(doc_id, &*conn)?; + Ok(some) + } +} + +impl Persistence { + pub(crate) fn new(pool: Arc) -> Self { Self { pool } } +} diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs index 84f9679227..1ff1f4bd86 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -1,6 +1,6 @@ use crate::{ errors::{DocError, DocResult}, - services::doc::revision::RevisionStore, + services::doc::revision::{RevisionCache, RevisionUploadStream}, }; use flowy_database::ConnectionPool; use flowy_document_infra::{entities::doc::Doc, util::RevIdCounter}; @@ -20,7 +20,7 @@ pub trait RevisionServer: Send + Sync { pub struct RevisionManager { doc_id: String, rev_id_counter: RevIdCounter, - rev_store: Arc, + cache: Arc, } impl RevisionManager { @@ -28,30 +28,32 @@ impl RevisionManager { doc_id: &str, pool: Arc, server: Arc, - pending_rev_sender: mpsc::UnboundedSender, + ws_sender: mpsc::UnboundedSender, ) -> Self { - let rev_store = RevisionStore::new(doc_id, pool, server, pending_rev_sender); + let cache = Arc::new(RevisionCache::new(doc_id, pool, server)); + spawn_upload_stream(cache.clone(), ws_sender); let rev_id_counter = RevIdCounter::new(0); Self { doc_id: doc_id.to_string(), rev_id_counter, - rev_store, + cache, } } pub async fn load_document(&mut self) -> DocResult { - let doc = self.rev_store.fetch_document().await?; + let doc = self.cache.fetch_document().await?; self.update_rev_id_counter_value(doc.rev_id); Ok(doc.delta()?) } pub async fn add_revision(&self, revision: &Revision) -> Result<(), DocError> { - let _ = self.rev_store.add_revision(revision.clone()).await?; + let _ = self.cache.add_revision(revision.clone()).await?; + Ok(()) } pub async fn ack_revision(&self, rev_id: RevId) -> Result<(), DocError> { - self.rev_store.ack_revision(rev_id).await; + self.cache.ack_revision(rev_id).await; Ok(()) } @@ -67,7 +69,7 @@ impl RevisionManager { pub async fn mk_revisions(&self, range: RevisionRange) -> Result { debug_assert!(range.doc_id == self.doc_id); - let revisions = self.rev_store.revs_in_range(range.clone()).await?; + let revisions = self.cache.revisions_in_range(range.clone()).await?; let mut new_delta = RichTextDelta::new(); for revision in revisions { match RichTextDelta::from_bytes(revision.delta_data) { @@ -90,3 +92,7 @@ impl RevisionManager { Ok(revision) } } + +fn spawn_upload_stream(cache: Arc, ws_sender: mpsc::UnboundedSender) { + tokio::spawn(RevisionUploadStream::new(cache, ws_sender).run()); +} diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/mod.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/mod.rs index 7288a5301b..867c30173c 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/mod.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/mod.rs @@ -1,6 +1,7 @@ +mod cache; mod manager; -mod model; -mod persistence; +mod sync; +pub use cache::*; pub use manager::*; -pub use persistence::*; +pub(crate) use sync::*; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/model.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/model.rs deleted file mode 100644 index 51e2d68fe3..0000000000 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/model.rs +++ /dev/null @@ -1,45 +0,0 @@ -use crate::{ - errors::{internal_error, DocError, DocResult}, - sql_tables::{RevTableSql, SqlRevState}, -}; -use flowy_database::ConnectionPool; -use lib_infra::future::ResultFuture; -use lib_ot::revision::{Revision, RevisionRange}; -use std::sync::Arc; -use tokio::sync::broadcast; - -pub(crate) struct Persistence { - pub(crate) rev_sql: Arc, - pub(crate) pool: Arc, -} - -impl Persistence { - pub(crate) fn new(pool: Arc) -> Self { - let rev_sql = Arc::new(RevTableSql {}); - Self { rev_sql, pool } - } - - pub(crate) fn create_revs(&self, revisions: Vec<(Revision, SqlRevState)>) -> DocResult<()> { - let conn = &*self.pool.get().map_err(internal_error)?; - conn.immediate_transaction::<_, DocError, _>(|| { - let _ = self.rev_sql.create_rev_table(revisions, conn)?; - Ok(()) - }) - } - - pub(crate) fn read_rev_with_range(&self, doc_id: &str, range: RevisionRange) -> DocResult> { - let conn = &*self.pool.get().map_err(internal_error).unwrap(); - let revisions = self.rev_sql.read_rev_tables_with_range(doc_id, range, conn)?; - Ok(revisions) - } - - pub(crate) fn read_rev(&self, doc_id: &str, rev_id: &i64) -> DocResult> { - let conn = self.pool.get().map_err(internal_error)?; - let some = self.rev_sql.read_rev_table(&doc_id, rev_id, &*conn)?; - Ok(some) - } -} - -pub trait RevisionIterator: Send + Sync { - fn next(&self) -> ResultFuture, DocError>; -} diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/persistence.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/persistence.rs deleted file mode 100644 index 5b866181a9..0000000000 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/persistence.rs +++ /dev/null @@ -1,380 +0,0 @@ -use crate::{ - errors::{internal_error, DocError, DocResult}, - services::doc::revision::{model::*, RevisionServer}, - sql_tables::SqlRevState, -}; -use async_stream::stream; -use dashmap::DashMap; -use flowy_database::{ConnectionPool, SqliteConnection}; -use flowy_document_infra::entities::doc::Doc; -use futures::stream::StreamExt; -use lib_infra::future::ResultFuture; -use lib_ot::{ - core::{Operation, OperationTransformable}, - revision::{PendingRevId, RevId, RevIdReceiver, RevType, Revision, RevisionRange, RevisionRecord}, - rich_text::RichTextDelta, -}; -use std::{collections::VecDeque, sync::Arc, time::Duration}; -use tokio::{ - sync::{broadcast, mpsc, RwLock}, - task::{spawn_blocking, JoinHandle}, -}; - -pub struct RevisionStore { - doc_id: String, - persistence: Arc, - revs_map: Arc>, - pending_tx: PendingSender, - pending_revs: Arc>>, - defer_save: RwLock>>, - server: Arc, -} - -impl RevisionStore { - pub fn new( - doc_id: &str, - pool: Arc, - server: Arc, - ws_revision_sender: mpsc::UnboundedSender, - ) -> Arc { - let doc_id = doc_id.to_owned(); - let persistence = Arc::new(Persistence::new(pool)); - let revs_map = Arc::new(DashMap::new()); - let (pending_tx, pending_rx) = mpsc::unbounded_channel(); - let pending_revs = Arc::new(RwLock::new(VecDeque::new())); - - let store = Arc::new(Self { - doc_id, - persistence, - revs_map, - pending_revs, - pending_tx, - defer_save: RwLock::new(None), - server, - }); - - tokio::spawn(RevisionUploadStream::new(store.clone(), pending_rx, ws_revision_sender).run()); - - store - } - - #[tracing::instrument(level = "debug", skip(self, revision))] - pub async fn add_revision(&self, revision: Revision) -> DocResult<()> { - if self.revs_map.contains_key(&revision.rev_id) { - return Err(DocError::duplicate_rev().context(format!("Duplicate revision id: {}", revision.rev_id))); - } - - let (sender, receiver) = broadcast::channel(2); - let revs_map = self.revs_map.clone(); - let mut rx = sender.subscribe(); - tokio::spawn(async move { - if let Ok(rev_id) = rx.recv().await { - match revs_map.get_mut(&rev_id) { - None => {}, - Some(mut rev) => rev.value_mut().state = SqlRevState::Acked.into(), - } - } - }); - - let pending_rev = PendingRevId::new(revision.rev_id, sender); - self.pending_revs.write().await.push_back(pending_rev); - self.revs_map.insert(revision.rev_id, RevisionRecord::new(revision)); - - let _ = self.pending_tx.send(PendingMsg::Revision { ret: receiver }); - self.save_revisions().await; - Ok(()) - } - - #[tracing::instrument(level = "debug", skip(self, rev_id), fields(rev_id = %rev_id.as_ref()))] - pub async fn ack_revision(&self, rev_id: RevId) { - let rev_id = rev_id.value; - self.pending_revs - .write() - .await - .retain(|pending| !pending.finish(rev_id)); - - self.save_revisions().await; - } - - async fn save_revisions(&self) { - if let Some(handler) = self.defer_save.write().await.take() { - handler.abort(); - } - - if self.revs_map.is_empty() { - return; - } - - let revs_map = self.revs_map.clone(); - let persistence = self.persistence.clone(); - - *self.defer_save.write().await = Some(tokio::spawn(async move { - tokio::time::sleep(Duration::from_millis(300)).await; - let ids = revs_map.iter().map(|kv| *kv.key()).collect::>(); - let revisions_state = revs_map - .iter() - .map(|kv| (kv.revision.clone(), kv.state)) - .collect::>(); - - match persistence.create_revs(revisions_state.clone()) { - Ok(_) => { - tracing::debug!( - "Revision State Changed: {:?}", - revisions_state.iter().map(|s| (s.0.rev_id, s.1)).collect::>() - ); - revs_map.retain(|k, _| !ids.contains(k)); - }, - Err(e) => log::error!("Save revision failed: {:?}", e), - } - })); - } - - pub async fn revs_in_range(&self, range: RevisionRange) -> DocResult> { - let revs = range - .iter() - .flat_map(|rev_id| match self.revs_map.get(&rev_id) { - None => None, - Some(rev) => Some(rev.revision.clone()), - }) - .collect::>(); - - if revs.len() == range.len() as usize { - Ok(revs) - } else { - let doc_id = self.doc_id.clone(); - let persistence = self.persistence.clone(); - let result = spawn_blocking(move || persistence.read_rev_with_range(&doc_id, range)) - .await - .map_err(internal_error)?; - result - } - } - - pub async fn fetch_document(&self) -> DocResult { - let result = fetch_from_local(&self.doc_id, self.persistence.clone()).await; - if result.is_ok() { - return result; - } - - let doc = self.server.fetch_document_from_remote(&self.doc_id).await?; - let revision = revision_from_doc(doc.clone(), RevType::Remote); - let _ = self.persistence.create_revs(vec![(revision, SqlRevState::Acked)])?; - Ok(doc) - } -} - -pub fn revision_from_doc(doc: Doc, ty: RevType) -> Revision { - let delta_data = doc.data.as_bytes(); - Revision::new(doc.base_rev_id, doc.rev_id, delta_data.to_owned(), &doc.id, ty) -} - -impl RevisionIterator for RevisionStore { - fn next(&self) -> ResultFuture, DocError> { - let pending_revs = self.pending_revs.clone(); - let revs_map = self.revs_map.clone(); - let persistence = self.persistence.clone(); - let doc_id = self.doc_id.clone(); - ResultFuture::new(async move { - match pending_revs.read().await.front() { - None => Ok(None), - Some(pending) => match revs_map.get(&pending.rev_id) { - None => persistence.read_rev(&doc_id, &pending.rev_id), - Some(context) => Ok(Some(context.revision.clone())), - }, - } - }) - } -} - -async fn fetch_from_local(doc_id: &str, persistence: Arc) -> DocResult { - let doc_id = doc_id.to_owned(); - spawn_blocking(move || { - let conn = &*persistence.pool.get().map_err(internal_error)?; - let revisions = persistence.rev_sql.read_rev_tables(&doc_id, conn)?; - if revisions.is_empty() { - return Err(DocError::record_not_found().context("Local doesn't have this document")); - } - - let base_rev_id: RevId = revisions.last().unwrap().base_rev_id.into(); - let rev_id: RevId = revisions.last().unwrap().rev_id.into(); - let mut delta = RichTextDelta::new(); - for (_, revision) in revisions.into_iter().enumerate() { - match RichTextDelta::from_bytes(revision.delta_data) { - Ok(local_delta) => { - delta = delta.compose(&local_delta)?; - }, - Err(e) => { - log::error!("Deserialize delta from revision failed: {}", e); - }, - } - } - - #[cfg(debug_assertions)] - validate_delta(&doc_id, persistence, conn, &delta); - - match delta.ops.last() { - None => {}, - Some(op) => { - let data = op.get_data(); - if !data.ends_with('\n') { - delta.ops.push(Operation::Insert("\n".into())) - } - }, - } - - Result::::Ok(Doc { - id: doc_id, - data: delta.to_json(), - rev_id: rev_id.into(), - base_rev_id: base_rev_id.into(), - }) - }) - .await - .map_err(internal_error)? -} - -#[cfg(debug_assertions)] -fn validate_delta(doc_id: &str, persistence: Arc, conn: &SqliteConnection, delta: &RichTextDelta) { - if delta.ops.last().is_none() { - return; - } - - let data = delta.ops.last().as_ref().unwrap().get_data(); - if !data.ends_with('\n') { - log::error!("The op must end with newline"); - let result = || { - let revisions = persistence.rev_sql.read_rev_tables(&doc_id, conn)?; - for revision in revisions { - let delta = RichTextDelta::from_bytes(revision.delta_data)?; - log::error!("Invalid revision: {}:{}", revision.rev_id, delta.to_json()); - } - Ok::<(), DocError>(()) - }; - match result() { - Ok(_) => {}, - Err(e) => log::error!("{}", e), - } - } -} - -// fn update_revisions(&self) { -// let rev_ids = self -// .revs -// .iter() -// .flat_map(|kv| match kv.state == RevState::Acked { -// true => None, -// false => Some(kv.key().clone()), -// }) -// .collect::>(); -// -// if rev_ids.is_empty() { -// return; -// } -// -// tracing::debug!("Try to update {:?} state", rev_ids); -// match self.update(&rev_ids) { -// Ok(_) => { -// self.revs.retain(|k, _| !rev_ids.contains(k)); -// }, -// Err(e) => log::error!("Save revision failed: {:?}", e), -// } -// } -// -// fn update(&self, rev_ids: &Vec) -> Result<(), DocError> { -// let conn = &*self.pool.get().map_err(internal_error).unwrap(); -// let result = conn.immediate_transaction::<_, DocError, _>(|| { -// for rev_id in rev_ids { -// let changeset = RevChangeset { -// doc_id: self.doc_id.clone(), -// rev_id: rev_id.clone(), -// state: RevState::Acked, -// }; -// let _ = self.op_sql.update_rev_table(changeset, conn)?; -// } -// Ok(()) -// }); -// -// result -// } - -// fn delete_revision(&self, rev_id: RevId) { -// let op_sql = self.op_sql.clone(); -// let pool = self.pool.clone(); -// let doc_id = self.doc_id.clone(); -// tokio::spawn(async move { -// let conn = &*pool.get().map_err(internal_error).unwrap(); -// let result = conn.immediate_transaction::<_, DocError, _>(|| { -// let _ = op_sql.delete_rev_table(&doc_id, rev_id, conn)?; -// Ok(()) -// }); -// -// match result { -// Ok(_) => {}, -// Err(e) => log::error!("Delete revision failed: {:?}", e), -// } -// }); -// } - -pub(crate) enum PendingMsg { - Revision { ret: RevIdReceiver }, -} - -pub(crate) type PendingSender = mpsc::UnboundedSender; -pub(crate) type PendingReceiver = mpsc::UnboundedReceiver; - -pub(crate) struct RevisionUploadStream { - revisions: Arc, - receiver: Option, - ws_revision_sender: mpsc::UnboundedSender, -} - -impl RevisionUploadStream { - pub(crate) fn new( - revisions: Arc, - pending_rx: PendingReceiver, - ws_revision_sender: mpsc::UnboundedSender, - ) -> Self { - Self { - revisions, - receiver: Some(pending_rx), - ws_revision_sender, - } - } - - pub async fn run(mut self) { - let mut receiver = self.receiver.take().expect("Should only call once"); - let stream = stream! { - loop { - match receiver.recv().await { - Some(msg) => yield msg, - None => break, - } - } - }; - stream - .for_each(|msg| async { - match self.handle_msg(msg).await { - Ok(_) => {}, - Err(e) => log::error!("{:?}", e), - } - }) - .await; - } - - async fn handle_msg(&self, msg: PendingMsg) -> DocResult<()> { - match msg { - PendingMsg::Revision { ret } => self.prepare_next_pending_rev(ret).await, - } - } - - async fn prepare_next_pending_rev(&self, mut ret: RevIdReceiver) -> DocResult<()> { - match self.revisions.next().await? { - None => Ok(()), - Some(revision) => { - let _ = self.ws_revision_sender.send(revision).map_err(internal_error); - let _ = tokio::time::timeout(Duration::from_millis(2000), ret.recv()).await; - Ok(()) - }, - } - } -} diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs new file mode 100644 index 0000000000..ac225ce9a0 --- /dev/null +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs @@ -0,0 +1,76 @@ +use crate::{ + errors::{internal_error, DocResult}, + services::doc::revision::RevisionIterator, +}; +use async_stream::stream; +use futures::stream::StreamExt; +use lib_ot::revision::Revision; +use std::sync::Arc; +use tokio::{ + sync::mpsc, + time::{interval, Duration}, +}; + +pub(crate) enum RevisionMsg { + Tick, +} + +pub(crate) struct RevisionUploadStream { + revisions: Arc, + ws_sender: mpsc::UnboundedSender, +} + +impl RevisionUploadStream { + pub(crate) fn new(revisions: Arc, ws_sender: mpsc::UnboundedSender) -> Self { + Self { revisions, ws_sender } + } + + pub async fn run(self) { + let (tx, mut rx) = mpsc::unbounded_channel(); + tokio::spawn(tick(tx)); + let stream = stream! { + loop { + match rx.recv().await { + Some(msg) => yield msg, + None => break, + } + } + }; + stream + .for_each(|msg| async { + match self.handle_msg(msg).await { + Ok(_) => {}, + Err(e) => log::error!("{:?}", e), + } + }) + .await; + } + + async fn handle_msg(&self, msg: RevisionMsg) -> DocResult<()> { + match msg { + RevisionMsg::Tick => self.send_next_revision().await, + } + } + + async fn send_next_revision(&self) -> DocResult<()> { + match self.revisions.next().await? { + None => Ok(()), + Some(record) => { + let _ = self.ws_sender.send(record.revision).map_err(internal_error); + // let _ = tokio::time::timeout(Duration::from_millis(2000), ret.recv()).await; + Ok(()) + }, + } + } +} + +async fn tick(sender: mpsc::UnboundedSender) { + let mut i = interval(Duration::from_secs(2)); + loop { + match sender.send(RevisionMsg::Tick) { + Ok(_) => {}, + Err(e) => log::error!("RevisionUploadStream tick error: {}", e), + } + i.tick().await; + } +} diff --git a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs index 2098693535..051473048c 100644 --- a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs +++ b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs @@ -1,30 +1,27 @@ use crate::{ errors::DocError, - sql_tables::{doc::RevTable, RevChangeset, RevTableType, SqlRevState}, + sql_tables::{doc::RevTable, RevChangeset, RevTableState, RevTableType}, }; use diesel::update; use flowy_database::{insert_or_ignore_into, prelude::*, schema::rev_table::dsl, SqliteConnection}; -use lib_ot::revision::{Revision, RevisionRange}; +use lib_ot::revision::{Revision, RevisionRange, RevisionRecord}; pub struct RevTableSql {} impl RevTableSql { - pub(crate) fn create_rev_table( - &self, - revisions: Vec<(Revision, SqlRevState)>, - conn: &SqliteConnection, - ) -> Result<(), DocError> { + pub(crate) fn create_rev_table(revisions: Vec, conn: &SqliteConnection) -> Result<(), DocError> { // Batch insert: https://diesel.rs/guides/all-about-inserts.html let records = revisions .into_iter() - .map(|(revision, new_state)| { - let rev_ty: RevTableType = revision.ty.into(); + .map(|record| { + let rev_ty: RevTableType = record.revision.ty.into(); + let rev_state: RevTableState = record.state.into(); ( - dsl::doc_id.eq(revision.doc_id), - dsl::base_rev_id.eq(revision.base_rev_id), - dsl::rev_id.eq(revision.rev_id), - dsl::data.eq(revision.delta_data), - dsl::state.eq(new_state), + dsl::doc_id.eq(record.revision.doc_id), + dsl::base_rev_id.eq(record.revision.base_rev_id), + dsl::rev_id.eq(record.revision.rev_id), + dsl::data.eq(record.revision.delta_data), + dsl::state.eq(rev_state), dsl::ty.eq(rev_ty), ) }) @@ -35,7 +32,7 @@ impl RevTableSql { } #[allow(dead_code)] - pub(crate) fn update_rev_table(&self, changeset: RevChangeset, conn: &SqliteConnection) -> Result<(), DocError> { + pub(crate) fn update_rev_table(changeset: RevChangeset, conn: &SqliteConnection) -> Result<(), DocError> { let filter = dsl::rev_table .filter(dsl::rev_id.eq(changeset.rev_id.as_ref())) .filter(dsl::doc_id.eq(changeset.doc_id)); @@ -44,7 +41,7 @@ impl RevTableSql { Ok(()) } - pub(crate) fn read_rev_tables(&self, doc_id: &str, conn: &SqliteConnection) -> Result, DocError> { + pub(crate) fn read_rev_tables(doc_id: &str, conn: &SqliteConnection) -> Result, DocError> { let filter = dsl::rev_table .filter(dsl::doc_id.eq(doc_id)) .order(dsl::rev_id.asc()) @@ -58,7 +55,6 @@ impl RevTableSql { } pub(crate) fn read_rev_table( - &self, doc_id: &str, revision_id: &i64, conn: &SqliteConnection, @@ -76,7 +72,6 @@ impl RevTableSql { } pub(crate) fn read_rev_tables_with_range( - &self, doc_id_s: &str, range: RevisionRange, conn: &SqliteConnection, @@ -96,12 +91,7 @@ impl RevTableSql { } #[allow(dead_code)] - pub(crate) fn delete_rev_table( - &self, - doc_id_s: &str, - rev_id_s: i64, - conn: &SqliteConnection, - ) -> Result<(), DocError> { + pub(crate) fn delete_rev_table(doc_id_s: &str, rev_id_s: i64, conn: &SqliteConnection) -> Result<(), DocError> { let filter = dsl::rev_table .filter(dsl::rev_id.eq(rev_id_s)) .filter(dsl::doc_id.eq(doc_id_s)); diff --git a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs index 6d5ff19052..d652833978 100644 --- a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs +++ b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs @@ -11,45 +11,54 @@ pub(crate) struct RevTable { pub(crate) base_rev_id: i64, pub(crate) rev_id: i64, pub(crate) data: Vec, - pub(crate) state: SqlRevState, + pub(crate) state: RevTableState, pub(crate) ty: RevTableType, } #[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)] #[repr(i32)] #[sql_type = "Integer"] -pub enum SqlRevState { +pub enum RevTableState { Local = 0, Acked = 1, } -impl std::default::Default for SqlRevState { - fn default() -> Self { SqlRevState::Local } +impl std::default::Default for RevTableState { + fn default() -> Self { RevTableState::Local } } -impl std::convert::From for SqlRevState { +impl std::convert::From for RevTableState { fn from(value: i32) -> Self { match value { - 0 => SqlRevState::Local, - 1 => SqlRevState::Acked, + 0 => RevTableState::Local, + 1 => RevTableState::Acked, o => { log::error!("Unsupported rev state {}, fallback to RevState::Local", o); - SqlRevState::Local + RevTableState::Local }, } } } -impl SqlRevState { +impl RevTableState { pub fn value(&self) -> i32 { *self as i32 } } -impl_sql_integer_expression!(RevState); +impl_sql_integer_expression!(RevTableState); -impl std::convert::From for RevState { - fn from(s: SqlRevState) -> Self { +impl std::convert::From for RevState { + fn from(s: RevTableState) -> Self { match s { - SqlRevState::Local => RevState.Local, - SqlRevState::Acked => RevState.Acked, + RevTableState::Local => RevState::Local, + RevTableState::Acked => RevState::Acked, + } + } +} + +impl std::convert::From for RevTableState { + fn from(s: RevState) -> Self { + match s { + RevState::Local => RevTableState::Local, + RevState::Acked => RevTableState::Acked, } } } @@ -119,5 +128,5 @@ impl_sql_integer_expression!(RevTableType); pub(crate) struct RevChangeset { pub(crate) doc_id: String, pub(crate) rev_id: RevId, - pub(crate) state: SqlRevState, + pub(crate) state: RevTableState, } diff --git a/shared-lib/lib-ot/src/revision/cache.rs b/shared-lib/lib-ot/src/revision/cache.rs index 0a4023b4d5..ede3a4298c 100644 --- a/shared-lib/lib-ot/src/revision/cache.rs +++ b/shared-lib/lib-ot/src/revision/cache.rs @@ -1,15 +1,17 @@ use crate::{ errors::OTError, - revision::{RevId, Revision, RevisionRange}, + revision::{Revision, RevisionRange}, }; use dashmap::{mapref::one::RefMut, DashMap}; -use std::{collections::VecDeque, sync::Arc}; +use std::{collections::VecDeque, fmt::Debug, sync::Arc}; use tokio::sync::{broadcast, RwLock}; -pub trait RevisionDiskCache { - fn create_revision(&self, revision: &Revision) -> Result<(), OTError>; - fn revisions_in_range(&self, range: RevisionRange) -> Result>, OTError>; - fn read_revision(&self, rev_id: i64) -> Result, OTError>; +pub trait RevisionDiskCache: Sync + Send { + type Error: Debug; + fn create_revisions(&self, revisions: Vec) -> Result<(), Self::Error>; + fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result, Self::Error>; + fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result, Self::Error>; + fn read_revisions(&self, doc_id: &str) -> Result, Self::Error>; } pub struct RevisionMemoryCache { @@ -32,7 +34,7 @@ impl RevisionMemoryCache { pub async fn add_revision(&self, revision: Revision) -> Result<(), OTError> { if self.revs_map.contains_key(&revision.rev_id) { - return Err(OTError::duplicate_revision().context(format!("Duplicate revision id: {}", revision.rev_id))); + return Ok(()); } self.pending_revs.write().await.push_back(revision.rev_id); @@ -40,18 +42,20 @@ impl RevisionMemoryCache { Ok(()) } - pub async fn mut_revision(&self, rev_id: i64, f: F) + pub fn remove_revisions(&self, ids: Vec) { self.revs_map.retain(|k, _| !ids.contains(k)); } + + pub fn mut_revision(&self, rev_id: &i64, f: F) where F: Fn(RefMut), { - if let Some(m_revision) = self.revs_map.get_mut(&rev_id) { + if let Some(m_revision) = self.revs_map.get_mut(rev_id) { f(m_revision) } else { log::error!("Can't find revision with id {}", rev_id); } } - pub async fn revisions_in_range(&self, range: RevisionRange) -> Result>, OTError> { + pub async fn revisions_in_range(&self, range: &RevisionRange) -> Result, OTError> { let revs = range .iter() .flat_map(|rev_id| match self.revs_map.get(&rev_id) { @@ -61,21 +65,47 @@ impl RevisionMemoryCache { .collect::>(); if revs.len() == range.len() as usize { - Ok(Some(revs)) + Ok(revs) } else { - Ok(None) + Ok(vec![]) } } + + pub fn contains(&self, rev_id: &i64) -> bool { self.revs_map.contains_key(rev_id) } + + pub fn is_empty(&self) -> bool { self.revs_map.is_empty() } + + pub fn revisions(&self) -> (Vec, Vec) { + let mut records: Vec = vec![]; + let mut ids: Vec = vec![]; + + self.revs_map.iter().for_each(|kv| { + records.push(kv.value().clone()); + ids.push(*kv.key()); + }); + (ids, records) + } + + pub async fn front_revision(&self) -> Option<(i64, RevisionRecord)> { + match self.pending_revs.read().await.front() { + None => None, + Some(rev_id) => self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())), + } + } + + pub async fn front_rev_id(&self) -> Option { self.pending_revs.read().await.front().copied() } } pub type RevIdReceiver = broadcast::Receiver; pub type RevIdSender = broadcast::Sender; +#[derive(Clone, Eq, PartialEq)] pub enum RevState { Local = 0, Acked = 1, } +#[derive(Clone)] pub struct RevisionRecord { pub revision: Revision, pub state: RevState, @@ -88,6 +118,8 @@ impl RevisionRecord { state: RevState::Local, } } + + pub fn ack(&mut self) { self.state = RevState::Acked; } } pub struct PendingRevId {