diff --git a/frontend/rust-lib/flowy-folder/src/manager.rs b/frontend/rust-lib/flowy-folder/src/manager.rs index 3181c14625..035555576a 100644 --- a/frontend/rust-lib/flowy-folder/src/manager.rs +++ b/frontend/rust-lib/flowy-folder/src/manager.rs @@ -11,11 +11,12 @@ use crate::{ use bytes::Bytes; use flowy_sync::client_document::default::{initial_quill_delta_string, initial_read_me}; +use crate::services::folder_editor::FolderRevisionCompactor; use flowy_error::FlowyError; use flowy_folder_data_model::entities::view::ViewDataType; use flowy_folder_data_model::user_default; use flowy_revision::disk::SQLiteTextBlockRevisionPersistence; -use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket}; +use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionHistoryPersistence}; use flowy_sync::{client_folder::FolderPad, entities::ws_data::ServerRevisionWSData}; use lazy_static::lazy_static; use lib_infra::future::FutureResult; @@ -162,9 +163,17 @@ impl FolderManager { let _ = self.persistence.initialize(user_id, &folder_id).await?; let pool = self.persistence.db_pool()?; - let disk_cache = Arc::new(SQLiteTextBlockRevisionPersistence::new(user_id, pool)); - let rev_persistence = Arc::new(RevisionPersistence::new(user_id, folder_id.as_ref(), disk_cache)); - let rev_manager = RevisionManager::new(user_id, folder_id.as_ref(), rev_persistence); + let disk_cache = SQLiteTextBlockRevisionPersistence::new(user_id, pool.clone()); + let rev_persistence = RevisionPersistence::new(user_id, folder_id.as_ref(), disk_cache); + let rev_compactor = FolderRevisionCompactor(); + let history_persistence = SQLiteRevisionHistoryPersistence::new(pool); + let rev_manager = RevisionManager::new( + user_id, + folder_id.as_ref(), + rev_persistence, + rev_compactor, + history_persistence, + ); let folder_editor = FolderEditor::new(user_id, &folder_id, token, rev_manager, self.web_socket.clone()).await?; *self.folder_editor.write().await = Some(Arc::new(folder_editor)); diff --git a/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs b/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs index ef94c1d316..6661530213 100644 --- a/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs +++ b/frontend/rust-lib/flowy-folder/src/services/folder_editor.rs @@ -91,11 +91,7 @@ impl FolderEditor { &self.user_id, md5, ); - let _ = futures::executor::block_on(async { - self.rev_manager - .add_local_revision(&revision, Box::new(FolderRevisionCompactor())) - .await - })?; + let _ = futures::executor::block_on(async { self.rev_manager.add_local_revision(&revision).await })?; Ok(()) } @@ -135,7 +131,7 @@ impl FolderEditor { } } -struct FolderRevisionCompactor(); +pub struct FolderRevisionCompactor(); impl RevisionCompactor for FolderRevisionCompactor { fn bytes_from_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> { let delta = make_delta_from_revisions::<PlainTextAttributes>(revisions)?; diff --git a/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs b/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs index bc8838059a..ee8731c870 100644 --- a/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs +++ b/frontend/rust-lib/flowy-folder/src/services/persistence/migration.rs @@ -88,7 +88,7 @@ impl FolderMigration { return Ok(None); } let pool = self.database.db_pool()?; - let disk_cache = Arc::new(SQLiteTextBlockRevisionPersistence::new(user_id, pool)); + let disk_cache = SQLiteTextBlockRevisionPersistence::new(user_id, pool); let rev_persistence = Arc::new(RevisionPersistence::new(user_id, folder_id.as_ref(), disk_cache)); let (revisions, _) = RevisionLoader { object_id: folder_id.as_ref().to_owned(), diff --git a/frontend/rust-lib/flowy-grid/src/manager.rs b/frontend/rust-lib/flowy-grid/src/manager.rs index 08d9559d41..8d8171ee02 100644 --- a/frontend/rust-lib/flowy-grid/src/manager.rs +++ b/frontend/rust-lib/flowy-grid/src/manager.rs @@ -1,5 +1,5 @@ use crate::services::block::make_grid_block_meta_rev_manager; -use crate::services::grid_meta_editor::GridMetaEditor; +use crate::services::grid_meta_editor::{GridMetaEditor, GridMetaRevisionCompactor}; use crate::services::persistence::block_index::BlockIndexCache; use crate::services::persistence::kv::GridKVPersistence; use crate::services::persistence::GridDatabase; @@ -9,7 +9,7 @@ use flowy_database::ConnectionPool; use flowy_error::{FlowyError, FlowyResult}; use flowy_grid_data_model::entities::{BuildGridContext, GridMeta}; use flowy_revision::disk::SQLiteGridRevisionPersistence; -use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket}; +use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionHistoryPersistence}; use flowy_sync::client_grid::{make_block_meta_delta, make_grid_delta}; use flowy_sync::entities::revision::{RepeatedRevision, Revision}; use std::sync::Arc; @@ -128,9 +128,11 @@ impl GridManager { pub fn make_grid_rev_manager(&self, grid_id: &str, pool: Arc<ConnectionPool>) -> FlowyResult<RevisionManager> { let user_id = self.grid_user.user_id()?; - let disk_cache = Arc::new(SQLiteGridRevisionPersistence::new(&user_id, pool)); - let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, grid_id, disk_cache)); - let rev_manager = RevisionManager::new(&user_id, grid_id, rev_persistence); + let disk_cache = SQLiteGridRevisionPersistence::new(&user_id, pool.clone()); + let rev_persistence = RevisionPersistence::new(&user_id, grid_id, disk_cache); + let history_persistence = SQLiteRevisionHistoryPersistence::new(pool); + let rev_compactor = GridMetaRevisionCompactor(); + let rev_manager = RevisionManager::new(&user_id, grid_id, rev_persistence, rev_compactor, history_persistence); Ok(rev_manager) } } diff --git a/frontend/rust-lib/flowy-grid/src/services/block/block_meta_editor.rs b/frontend/rust-lib/flowy-grid/src/services/block/block_meta_editor.rs index 81c23f9c69..ee43aebf10 100644 --- a/frontend/rust-lib/flowy-grid/src/services/block/block_meta_editor.rs +++ b/frontend/rust-lib/flowy-grid/src/services/block/block_meta_editor.rs @@ -169,10 +169,7 @@ impl GridBlockMetaEditor { &user_id, md5, ); - let _ = self - .rev_manager - .add_local_revision(&revision, Box::new(GridBlockMetaRevisionCompactor())) - .await?; + let _ = self.rev_manager.add_local_revision(&revision).await?; Ok(()) } } @@ -199,7 +196,7 @@ impl RevisionObjectBuilder for GridBlockMetaPadBuilder { } } -struct GridBlockMetaRevisionCompactor(); +pub struct GridBlockMetaRevisionCompactor(); impl RevisionCompactor for GridBlockMetaRevisionCompactor { fn bytes_from_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> { let delta = make_delta_from_revisions::<PlainTextAttributes>(revisions)?; diff --git a/frontend/rust-lib/flowy-grid/src/services/block/block_meta_manager.rs b/frontend/rust-lib/flowy-grid/src/services/block/block_meta_manager.rs index a1d9e27f4a..417c158cff 100644 --- a/frontend/rust-lib/flowy-grid/src/services/block/block_meta_manager.rs +++ b/frontend/rust-lib/flowy-grid/src/services/block/block_meta_manager.rs @@ -1,6 +1,6 @@ use crate::dart_notification::{send_dart_notification, GridNotification}; use crate::manager::GridUser; -use crate::services::block::GridBlockMetaEditor; +use crate::services::block::{GridBlockMetaEditor, GridBlockMetaRevisionCompactor}; use crate::services::persistence::block_index::BlockIndexCache; use crate::services::row::{group_row_orders, GridBlockSnapshot}; use dashmap::DashMap; @@ -10,7 +10,7 @@ use flowy_grid_data_model::entities::{ RowMeta, RowMetaChangeset, RowOrder, UpdatedRowOrder, }; use flowy_revision::disk::SQLiteGridBlockMetaRevisionPersistence; -use flowy_revision::{RevisionManager, RevisionPersistence}; +use flowy_revision::{RevisionManager, RevisionPersistence, SQLiteRevisionHistoryPersistence}; use std::borrow::Cow; use std::collections::HashMap; use std::sync::Arc; @@ -278,7 +278,16 @@ pub fn make_grid_block_meta_rev_manager(user: &Arc<dyn GridUser>, block_id: &str let user_id = user.user_id()?; let pool = user.db_pool()?; - let disk_cache = Arc::new(SQLiteGridBlockMetaRevisionPersistence::new(&user_id, pool)); - let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, block_id, disk_cache)); - Ok(RevisionManager::new(&user_id, block_id, rev_persistence)) + let disk_cache = SQLiteGridBlockMetaRevisionPersistence::new(&user_id, pool.clone()); + let rev_persistence = RevisionPersistence::new(&user_id, block_id, disk_cache); + let rev_compactor = GridBlockMetaRevisionCompactor(); + let history_persistence = SQLiteRevisionHistoryPersistence::new(pool); + + Ok(RevisionManager::new( + &user_id, + block_id, + rev_persistence, + rev_compactor, + history_persistence, + )) } diff --git a/frontend/rust-lib/flowy-grid/src/services/grid_meta_editor.rs b/frontend/rust-lib/flowy-grid/src/services/grid_meta_editor.rs index d584e1dc31..4f4b21ad4c 100644 --- a/frontend/rust-lib/flowy-grid/src/services/grid_meta_editor.rs +++ b/frontend/rust-lib/flowy-grid/src/services/grid_meta_editor.rs @@ -547,10 +547,7 @@ impl GridMetaEditor { &user_id, md5, ); - let _ = self - .rev_manager - .add_local_revision(&revision, Box::new(GridRevisionCompactor())) - .await?; + let _ = self.rev_manager.add_local_revision(&revision).await?; Ok(()) } @@ -629,8 +626,8 @@ impl RevisionCloudService for GridRevisionCloudService { } } -struct GridRevisionCompactor(); -impl RevisionCompactor for GridRevisionCompactor { +pub struct GridMetaRevisionCompactor(); +impl RevisionCompactor for GridMetaRevisionCompactor { fn bytes_from_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> { let delta = make_delta_from_revisions::<PlainTextAttributes>(revisions)?; Ok(delta.to_delta_bytes()) diff --git a/frontend/rust-lib/flowy-revision/src/cache/disk/grid_meta_rev_impl.rs b/frontend/rust-lib/flowy-revision/src/cache/disk/grid_block_meta_rev_impl.rs similarity index 99% rename from frontend/rust-lib/flowy-revision/src/cache/disk/grid_meta_rev_impl.rs rename to frontend/rust-lib/flowy-revision/src/cache/disk/grid_block_meta_rev_impl.rs index e850119c30..93af25e7ca 100644 --- a/frontend/rust-lib/flowy-revision/src/cache/disk/grid_meta_rev_impl.rs +++ b/frontend/rust-lib/flowy-revision/src/cache/disk/grid_block_meta_rev_impl.rs @@ -1,6 +1,5 @@ use crate::cache::disk::RevisionDiskCache; use crate::disk::{RevisionChangeset, RevisionRecord, RevisionState}; - use bytes::Bytes; use diesel::{sql_types::Integer, update, SqliteConnection}; use flowy_database::{ diff --git a/frontend/rust-lib/flowy-revision/src/cache/disk/mod.rs b/frontend/rust-lib/flowy-revision/src/cache/disk/mod.rs index 945c6d707f..814b591f15 100644 --- a/frontend/rust-lib/flowy-revision/src/cache/disk/mod.rs +++ b/frontend/rust-lib/flowy-revision/src/cache/disk/mod.rs @@ -1,10 +1,10 @@ mod folder_rev_impl; -mod grid_meta_rev_impl; +mod grid_block_meta_rev_impl; mod grid_rev_impl; mod text_rev_impl; pub use folder_rev_impl::*; -pub use grid_meta_rev_impl::*; +pub use grid_block_meta_rev_impl::*; pub use grid_rev_impl::*; pub use text_rev_impl::*; diff --git a/frontend/rust-lib/flowy-revision/src/history/mod.rs b/frontend/rust-lib/flowy-revision/src/history/mod.rs index be42f6f9be..9de42831ed 100644 --- a/frontend/rust-lib/flowy-revision/src/history/mod.rs +++ b/frontend/rust-lib/flowy-revision/src/history/mod.rs @@ -1,4 +1,5 @@ mod persistence; mod rev_history; +pub use persistence::*; pub use rev_history::*; diff --git a/frontend/rust-lib/flowy-revision/src/history/persistence.rs b/frontend/rust-lib/flowy-revision/src/history/persistence.rs index c548e04940..01d22fe6a4 100644 --- a/frontend/rust-lib/flowy-revision/src/history/persistence.rs +++ b/frontend/rust-lib/flowy-revision/src/history/persistence.rs @@ -1,22 +1,59 @@ use crate::history::RevisionHistoryDiskCache; -use flowy_error::FlowyError; +use diesel::{sql_types::Integer, update, SqliteConnection}; +use flowy_database::{ + prelude::*, + schema::{rev_history, rev_history::dsl}, + ConnectionPool, +}; +use flowy_error::{FlowyError, FlowyResult}; use flowy_sync::entities::revision::Revision; +use std::sync::Arc; -pub struct SQLiteRevisionHistoryPersistence {} +pub struct SQLiteRevisionHistoryPersistence { + pool: Arc<ConnectionPool>, +} impl SQLiteRevisionHistoryPersistence { - pub fn new() -> Self { - Self {} + pub fn new(pool: Arc<ConnectionPool>) -> Self { + Self { pool } } } impl RevisionHistoryDiskCache for SQLiteRevisionHistoryPersistence { - type Error = FlowyError; + fn save_revision(&self, revision: Revision) -> FlowyResult<()> { + todo!() + } - fn save_revision(&self, revision: Revision) -> Result<(), Self::Error> { + fn read_revision(&self, rev_id: i64) -> FlowyResult<Revision> { + todo!() + } + + fn clear(&self) -> FlowyResult<()> { todo!() } } struct RevisionHistorySql(); -impl RevisionHistorySql {} +impl RevisionHistorySql { + fn read_revision(object_id: &str, rev_id: i64, conn: &SqliteConnection) -> Result<Revision, FlowyError> { + let records: Vec<RevisionRecord> = dsl::rev_history + .filter(dsl::start_rev_id.lt(rev_id)) + .filter(dsl::end_rev_id.ge(rev_id)) + .filter(dsl::object_id.eq(object_id)) + .load::<RevisionRecord>(conn)?; + + debug_assert_eq!(records.len(), 1); + + todo!() + } +} + +#[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)] +#[table_name = "rev_history"] +struct RevisionRecord { + id: i32, + object_id: String, + start_rev_id: i64, + end_rev_id: i64, + data: Vec<u8>, +} diff --git a/frontend/rust-lib/flowy-revision/src/history/rev_history.rs b/frontend/rust-lib/flowy-revision/src/history/rev_history.rs index 8c0b10e071..faa35a9ede 100644 --- a/frontend/rust-lib/flowy-revision/src/history/rev_history.rs +++ b/frontend/rust-lib/flowy-revision/src/history/rev_history.rs @@ -1,78 +1,195 @@ use crate::history::persistence::SQLiteRevisionHistoryPersistence; -use flowy_error::FlowyError; +use crate::RevisionCompactor; +use async_stream::stream; +use flowy_database::ConnectionPool; +use flowy_error::{FlowyError, FlowyResult}; use flowy_sync::entities::revision::Revision; +use futures_util::future::BoxFuture; +use futures_util::stream::StreamExt; +use futures_util::FutureExt; use std::fmt::Debug; use std::sync::Arc; -use tokio::sync::RwLock; +use std::time::Duration; +use tokio::sync::mpsc::error::SendError; +use tokio::sync::mpsc::Sender; +use tokio::sync::{mpsc, oneshot, RwLock}; +use tokio::time::interval; pub trait RevisionHistoryDiskCache: Send + Sync { - type Error: Debug; + fn save_revision(&self, revision: Revision) -> FlowyResult<()>; - fn save_revision(&self, revision: Revision) -> Result<(), Self::Error>; + fn read_revision(&self, rev_id: i64) -> FlowyResult<Revision>; + + fn clear(&self) -> FlowyResult<()>; } pub struct RevisionHistory { + stop_timer: mpsc::Sender<()>, config: RevisionHistoryConfig, - checkpoint: Arc<RwLock<HistoryCheckpoint>>, - disk_cache: Arc<dyn RevisionHistoryDiskCache<Error = FlowyError>>, + revisions: Arc<RwLock<Vec<Revision>>>, + disk_cache: Arc<dyn RevisionHistoryDiskCache>, } impl RevisionHistory { - pub fn new(config: RevisionHistoryConfig) -> Self { - let disk_cache = Arc::new(SQLiteRevisionHistoryPersistence::new()); + pub fn new( + user_id: &str, + object_id: &str, + config: RevisionHistoryConfig, + disk_cache: Arc<dyn RevisionHistoryDiskCache>, + rev_compactor: Arc<dyn RevisionCompactor>, + ) -> Self { + let user_id = user_id.to_string(); + let object_id = object_id.to_string(); let cloned_disk_cache = disk_cache.clone(); - let checkpoint = HistoryCheckpoint::from_config(&config, move |revision| { - let _ = cloned_disk_cache.save_revision(revision); - }); - let checkpoint = Arc::new(RwLock::new(checkpoint)); + let (stop_timer, stop_rx) = mpsc::channel(1); + let (checkpoint_tx, checkpoint_rx) = mpsc::channel(1); + let revisions = Arc::new(RwLock::new(vec![])); + let fix_duration_checkpoint_tx = FixedDurationCheckpointSender { + user_id, + object_id, + checkpoint_tx, + disk_cache: cloned_disk_cache, + revisions: revisions.clone(), + rev_compactor, + duration: config.check_duration, + }; + + tokio::spawn(CheckpointRunner::new(stop_rx, checkpoint_rx).run()); + tokio::spawn(fix_duration_checkpoint_tx.run()); Self { + stop_timer, config, - checkpoint, + revisions, disk_cache, } } - pub async fn save_revision(&self, revision: &Revision) { - self.checkpoint.write().await.add_revision(revision); + pub async fn add_revision(&self, revision: &Revision) { + self.revisions.write().await.push(revision.clone()); + } + + pub async fn reset_history(&self) { + self.revisions.write().await.clear(); + match self.disk_cache.clear() { + Ok(_) => {} + Err(e) => tracing::error!("Clear history failed: {:?}", e), + } } } pub struct RevisionHistoryConfig { - check_when_close: bool, - check_interval: i64, + check_duration: Duration, } impl std::default::Default for RevisionHistoryConfig { fn default() -> Self { Self { - check_when_close: true, - check_interval: 19, + check_duration: Duration::from_secs(5), } } } +struct CheckpointRunner { + stop_rx: Option<mpsc::Receiver<()>>, + checkpoint_rx: Option<mpsc::Receiver<HistoryCheckpoint>>, +} + +impl CheckpointRunner { + fn new(stop_rx: mpsc::Receiver<()>, checkpoint_rx: mpsc::Receiver<HistoryCheckpoint>) -> Self { + Self { + stop_rx: Some(stop_rx), + checkpoint_rx: Some(checkpoint_rx), + } + } + + async fn run(mut self) { + let mut stop_rx = self.stop_rx.take().expect("It should only run once"); + let mut checkpoint_rx = self.checkpoint_rx.take().expect("It should only run once"); + let stream = stream! { + loop { + tokio::select! { + result = checkpoint_rx.recv() => { + match result { + Some(checkpoint) => yield checkpoint, + None => {}, + } + }, + _ = stop_rx.recv() => { + tracing::trace!("Checkpoint runner exit"); + break + }, + }; + } + }; + + stream + .for_each(|checkpoint| async move { + checkpoint.write().await; + }) + .await; + } +} + struct HistoryCheckpoint { - interval: i64, + user_id: String, + object_id: String, revisions: Vec<Revision>, - on_check: Box<dyn Fn(Revision) + Send + Sync + 'static>, + disk_cache: Arc<dyn RevisionHistoryDiskCache>, + rev_compactor: Arc<dyn RevisionCompactor>, } impl HistoryCheckpoint { - fn from_config<F>(config: &RevisionHistoryConfig, on_check: F) -> Self - where - F: Fn(Revision) + Send + Sync + 'static, - { - Self { - interval: config.check_interval, - revisions: vec![], - on_check: Box::new(on_check), + async fn write(self) { + if self.revisions.is_empty() { + return; + } + + let result = || { + let revision = self + .rev_compactor + .compact(&self.user_id, &self.object_id, self.revisions)?; + let _ = self.disk_cache.save_revision(revision)?; + Ok::<(), FlowyError>(()) + }; + + match result() { + Ok(_) => {} + Err(e) => tracing::error!("Write history checkout failed: {:?}", e), } } - - fn check(&mut self) -> Revision { - todo!() - } - - fn add_revision(&mut self, revision: &Revision) {} +} + +struct FixedDurationCheckpointSender { + user_id: String, + object_id: String, + checkpoint_tx: mpsc::Sender<HistoryCheckpoint>, + disk_cache: Arc<dyn RevisionHistoryDiskCache>, + revisions: Arc<RwLock<Vec<Revision>>>, + rev_compactor: Arc<dyn RevisionCompactor>, + duration: Duration, +} + +impl FixedDurationCheckpointSender { + fn run(self) -> BoxFuture<'static, ()> { + async move { + let mut interval = interval(self.duration); + let checkpoint_revisions: Vec<Revision> = revisions.write().await.drain(..).collect(); + let checkpoint = HistoryCheckpoint { + user_id: self.user_id.clone(), + object_id: self.object_id.clone(), + revisions: checkpoint_revisions, + disk_cache: self.disk_cache.clone(), + rev_compactor: self.rev_compactor.clone(), + }; + match checkpoint_tx.send(checkpoint).await { + Ok(_) => { + interval.tick().await; + self.run(); + } + Err(_) => {} + } + } + .boxed() + } } diff --git a/frontend/rust-lib/flowy-revision/src/lib.rs b/frontend/rust-lib/flowy-revision/src/lib.rs index 6c708afab2..14f00568f2 100644 --- a/frontend/rust-lib/flowy-revision/src/lib.rs +++ b/frontend/rust-lib/flowy-revision/src/lib.rs @@ -7,6 +7,7 @@ mod ws_manager; pub use cache::*; pub use conflict_resolve::*; +pub use history::*; pub use rev_manager::*; pub use rev_persistence::*; pub use ws_manager::*; diff --git a/frontend/rust-lib/flowy-revision/src/rev_manager.rs b/frontend/rust-lib/flowy-revision/src/rev_manager.rs index 15f37af456..bed942b7b4 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_manager.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_manager.rs @@ -1,5 +1,5 @@ use crate::disk::RevisionState; -use crate::history::{RevisionHistory, RevisionHistoryConfig}; +use crate::history::{RevisionHistory, RevisionHistoryConfig, RevisionHistoryDiskCache}; use crate::{RevisionPersistence, WSDataProviderDataSource}; use bytes::Bytes; use flowy_error::{FlowyError, FlowyResult}; @@ -47,15 +47,36 @@ pub struct RevisionManager { rev_id_counter: RevIdCounter, rev_persistence: Arc<RevisionPersistence>, rev_history: Arc<RevisionHistory>, + rev_compactor: Arc<dyn RevisionCompactor>, #[cfg(feature = "flowy_unit_test")] rev_ack_notifier: tokio::sync::broadcast::Sender<i64>, } impl RevisionManager { - pub fn new(user_id: &str, object_id: &str, rev_persistence: Arc<RevisionPersistence>) -> Self { + pub fn new<P, C>( + user_id: &str, + object_id: &str, + rev_persistence: RevisionPersistence, + rev_compactor: C, + history_persistence: P, + ) -> Self + where + P: 'static + RevisionHistoryDiskCache, + C: 'static + RevisionCompactor, + { let rev_id_counter = RevIdCounter::new(0); + let rev_compactor = Arc::new(rev_compactor); + let history_persistence = Arc::new(history_persistence); let rev_history_config = RevisionHistoryConfig::default(); - let rev_history = Arc::new(RevisionHistory::new(rev_history_config)); + let rev_persistence = Arc::new(rev_persistence); + + let rev_history = Arc::new(RevisionHistory::new( + user_id, + object_id, + rev_history_config, + history_persistence, + rev_compactor.clone(), + )); #[cfg(feature = "flowy_unit_test")] let (revision_ack_notifier, _) = tokio::sync::broadcast::channel(1); @@ -65,7 +86,7 @@ impl RevisionManager { rev_id_counter, rev_persistence, rev_history, - + rev_compactor, #[cfg(feature = "flowy_unit_test")] rev_ack_notifier: revision_ack_notifier, } @@ -93,6 +114,7 @@ impl RevisionManager { pub async fn reset_object(&self, revisions: RepeatedRevision) -> FlowyResult<()> { let rev_id = pair_rev_id_from_revisions(&revisions).1; let _ = self.rev_persistence.reset(revisions.into_inner()).await?; + self.rev_history.reset_history().await; self.rev_id_counter.set(rev_id); Ok(()) } @@ -104,20 +126,21 @@ impl RevisionManager { } let _ = self.rev_persistence.add_ack_revision(revision).await?; + self.rev_history.add_revision(revision).await; self.rev_id_counter.set(revision.rev_id); Ok(()) } #[tracing::instrument(level = "debug", skip_all, err)] - pub async fn add_local_revision<'a>( - &'a self, - revision: &Revision, - compactor: Box<dyn RevisionCompactor + 'a>, - ) -> Result<(), FlowyError> { + pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), FlowyError> { if revision.delta_data.is_empty() { return Err(FlowyError::internal().context("Delta data should be empty")); } - let rev_id = self.rev_persistence.add_sync_revision(revision, compactor).await?; + let rev_id = self + .rev_persistence + .add_sync_revision(revision, &self.rev_compactor) + .await?; + self.rev_history.add_revision(revision).await; self.rev_id_counter.set(rev_id); Ok(()) } diff --git a/frontend/rust-lib/flowy-revision/src/rev_persistence.rs b/frontend/rust-lib/flowy-revision/src/rev_persistence.rs index 6412b26ec6..98bc89ba24 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_persistence.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_persistence.rs @@ -24,13 +24,13 @@ pub struct RevisionPersistence { } impl RevisionPersistence { - pub fn new( - user_id: &str, - object_id: &str, - disk_cache: Arc<dyn RevisionDiskCache<Error = FlowyError>>, - ) -> RevisionPersistence { + pub fn new<C>(user_id: &str, object_id: &str, disk_cache: C) -> RevisionPersistence + where + C: 'static + RevisionDiskCache<Error = FlowyError>, + { let object_id = object_id.to_owned(); let user_id = user_id.to_owned(); + let disk_cache = Arc::new(disk_cache) as Arc<dyn RevisionDiskCache<Error = FlowyError>>; let sync_seq = RwLock::new(RevisionSyncSequence::new()); let memory_cache = Arc::new(RevisionMemoryCache::new(&object_id, Arc::new(disk_cache.clone()))); Self { @@ -63,7 +63,7 @@ impl RevisionPersistence { pub(crate) async fn add_sync_revision<'a>( &'a self, revision: &'a Revision, - compactor: Box<dyn RevisionCompactor + 'a>, + compactor: &Arc<dyn RevisionCompactor + 'a>, ) -> FlowyResult<i64> { let result = self.sync_seq.read().await.compact(); match result { diff --git a/frontend/rust-lib/flowy-revision/src/ws_manager.rs b/frontend/rust-lib/flowy-revision/src/ws_manager.rs index 42ad39c617..eb7539c380 100644 --- a/frontend/rust-lib/flowy-revision/src/ws_manager.rs +++ b/frontend/rust-lib/flowy-revision/src/ws_manager.rs @@ -1,6 +1,5 @@ use crate::ConflictRevisionSink; use async_stream::stream; - use bytes::Bytes; use flowy_error::{FlowyError, FlowyResult}; use flowy_sync::entities::{ diff --git a/frontend/rust-lib/flowy-text-block/src/manager.rs b/frontend/rust-lib/flowy-text-block/src/manager.rs index 9325fe6001..a6cdda1220 100644 --- a/frontend/rust-lib/flowy-text-block/src/manager.rs +++ b/frontend/rust-lib/flowy-text-block/src/manager.rs @@ -1,10 +1,13 @@ +use crate::queue::TextBlockRevisionCompactor; use crate::{editor::TextBlockEditor, errors::FlowyError, BlockCloudService}; use bytes::Bytes; use dashmap::DashMap; use flowy_database::ConnectionPool; use flowy_error::FlowyResult; use flowy_revision::disk::SQLiteTextBlockRevisionPersistence; -use flowy_revision::{RevisionCloudService, RevisionManager, RevisionPersistence, RevisionWebSocket}; +use flowy_revision::{ + RevisionCloudService, RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionHistoryPersistence, +}; use flowy_sync::entities::{ revision::{md5, RepeatedRevision, Revision}, text_block_info::{TextBlockDelta, TextBlockId}, @@ -139,9 +142,18 @@ impl TextBlockManager { fn make_rev_manager(&self, doc_id: &str, pool: Arc<ConnectionPool>) -> Result<RevisionManager, FlowyError> { let user_id = self.user.user_id()?; - let disk_cache = Arc::new(SQLiteTextBlockRevisionPersistence::new(&user_id, pool)); - let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, doc_id, disk_cache)); - Ok(RevisionManager::new(&user_id, doc_id, rev_persistence)) + let disk_cache = SQLiteTextBlockRevisionPersistence::new(&user_id, pool.clone()); + let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache); + let history_persistence = SQLiteRevisionHistoryPersistence::new(pool); + let rev_compactor = TextBlockRevisionCompactor(); + + Ok(RevisionManager::new( + &user_id, + doc_id, + rev_persistence, + rev_compactor, + history_persistence, + )) } } diff --git a/frontend/rust-lib/flowy-text-block/src/queue.rs b/frontend/rust-lib/flowy-text-block/src/queue.rs index 7c11afd0ff..343ad11e7b 100644 --- a/frontend/rust-lib/flowy-text-block/src/queue.rs +++ b/frontend/rust-lib/flowy-text-block/src/queue.rs @@ -186,10 +186,7 @@ impl EditBlockQueue { &user_id, md5, ); - let _ = self - .rev_manager - .add_local_revision(&revision, Box::new(TextBlockRevisionCompactor())) - .await?; + let _ = self.rev_manager.add_local_revision(&revision).await?; Ok(rev_id.into()) } } diff --git a/shared-lib/flowy-sync/src/client_grid/grid_meta_pad.rs b/shared-lib/flowy-sync/src/client_grid/grid_meta_pad.rs index 1d800961e3..3ef9251e4a 100644 --- a/shared-lib/flowy-sync/src/client_grid/grid_meta_pad.rs +++ b/shared-lib/flowy-sync/src/client_grid/grid_meta_pad.rs @@ -25,12 +25,7 @@ pub trait JsonDeserializer { impl GridMetaPad { pub async fn duplicate_grid_meta(&self) -> (Vec<FieldMeta>, Vec<GridBlockMetaSnapshot>) { - let fields = self - .grid_meta - .fields - .iter() - .map(|field| field.clone()) - .collect::<Vec<FieldMeta>>(); + let fields = self.grid_meta.fields.to_vec(); let blocks = self .grid_meta