diff --git a/frontend/rust-lib/flowy-database/migrations/2023-01-06-032859_folder_rev_snapshot/down.sql b/frontend/rust-lib/flowy-database/migrations/2023-01-06-032859_folder_rev_snapshot/down.sql new file mode 100644 index 0000000000..a1b31de5bf --- /dev/null +++ b/frontend/rust-lib/flowy-database/migrations/2023-01-06-032859_folder_rev_snapshot/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +DROP TABLE folder_rev_snapshot; \ No newline at end of file diff --git a/frontend/rust-lib/flowy-database/migrations/2023-01-06-032859_folder_rev_snapshot/up.sql b/frontend/rust-lib/flowy-database/migrations/2023-01-06-032859_folder_rev_snapshot/up.sql new file mode 100644 index 0000000000..784b15295c --- /dev/null +++ b/frontend/rust-lib/flowy-database/migrations/2023-01-06-032859_folder_rev_snapshot/up.sql @@ -0,0 +1,9 @@ +-- Your SQL goes here +CREATE TABLE folder_rev_snapshot ( + snapshot_id TEXT NOT NULL PRIMARY KEY DEFAULT '', + object_id TEXT NOT NULL DEFAULT '', + rev_id BIGINT NOT NULL DEFAULT 0, + base_rev_id BIGINT NOT NULL DEFAULT 0, + timestamp BIGINT NOT NULL DEFAULT 0, + data BLOB NOT NULL DEFAULT (x'') +); \ No newline at end of file diff --git a/frontend/rust-lib/flowy-database/src/schema.rs b/frontend/rust-lib/flowy-database/src/schema.rs index 27f45e86d4..d45d20a938 100644 --- a/frontend/rust-lib/flowy-database/src/schema.rs +++ b/frontend/rust-lib/flowy-database/src/schema.rs @@ -1,6 +1,4 @@ -// @generated automatically by Diesel CLI. - -diesel::table! { +table! { app_table (id) { id -> Text, workspace_id -> Text, @@ -15,7 +13,7 @@ diesel::table! { } } -diesel::table! { +table! { document_rev_table (id) { id -> Integer, document_id -> Text, @@ -26,14 +24,25 @@ diesel::table! { } } -diesel::table! { +table! { + folder_rev_snapshot (snapshot_id) { + snapshot_id -> Text, + object_id -> Text, + rev_id -> BigInt, + base_rev_id -> BigInt, + timestamp -> BigInt, + data -> Binary, + } +} + +table! { grid_block_index_table (row_id) { row_id -> Text, block_id -> Text, } } -diesel::table! { +table! { grid_meta_rev_table (id) { id -> Integer, object_id -> Text, @@ -44,7 +53,7 @@ diesel::table! { } } -diesel::table! { +table! { grid_rev_snapshot (snapshot_id) { snapshot_id -> Text, object_id -> Text, @@ -55,7 +64,7 @@ diesel::table! { } } -diesel::table! { +table! { grid_rev_table (id) { id -> Integer, object_id -> Text, @@ -66,7 +75,7 @@ diesel::table! { } } -diesel::table! { +table! { grid_view_rev_table (id) { id -> Integer, object_id -> Text, @@ -77,14 +86,14 @@ diesel::table! { } } -diesel::table! { +table! { kv_table (key) { key -> Text, value -> Binary, } } -diesel::table! { +table! { rev_snapshot (id) { id -> Integer, object_id -> Text, @@ -93,7 +102,7 @@ diesel::table! { } } -diesel::table! { +table! { rev_table (id) { id -> Integer, doc_id -> Text, @@ -105,7 +114,7 @@ diesel::table! { } } -diesel::table! { +table! { trash_table (id) { id -> Text, name -> Text, @@ -116,7 +125,7 @@ diesel::table! { } } -diesel::table! { +table! { user_table (id) { id -> Text, name -> Text, @@ -127,7 +136,7 @@ diesel::table! { } } -diesel::table! { +table! { view_table (id) { id -> Text, belong_to_id -> Text, @@ -143,7 +152,7 @@ diesel::table! { } } -diesel::table! { +table! { workspace_table (id) { id -> Text, name -> Text, @@ -155,9 +164,10 @@ diesel::table! { } } -diesel::allow_tables_to_appear_in_same_query!( +allow_tables_to_appear_in_same_query!( app_table, document_rev_table, + folder_rev_snapshot, grid_block_index_table, grid_meta_rev_table, grid_rev_snapshot, diff --git a/frontend/rust-lib/flowy-folder/src/manager.rs b/frontend/rust-lib/flowy-folder/src/manager.rs index b4e9d53858..05ec736157 100644 --- a/frontend/rust-lib/flowy-folder/src/manager.rs +++ b/frontend/rust-lib/flowy-folder/src/manager.rs @@ -23,7 +23,9 @@ use lazy_static::lazy_static; use lib_infra::future::FutureResult; use crate::services::clear_current_workspace; -use crate::services::persistence::rev_sqlite::SQLiteFolderRevisionPersistence; +use crate::services::persistence::rev_sqlite::{ + SQLiteFolderRevisionPersistence, SQLiteFolderRevisionSnapshotPersistence, +}; use flowy_http_model::ws_data::ServerRevisionWSData; use flowy_sync::client_folder::FolderPad; use std::convert::TryFrom; @@ -174,12 +176,15 @@ impl FolderManager { let configuration = RevisionPersistenceConfiguration::new(100, false); let rev_persistence = RevisionPersistence::new(user_id, object_id, disk_cache, configuration); let rev_compactor = FolderRevisionMergeable(); + + let snapshot_object_id = format!("folder:{}", object_id); + let snapshot_persistence = SQLiteFolderRevisionSnapshotPersistence::new(&snapshot_object_id, pool); let rev_manager = RevisionManager::new( user_id, folder_id.as_ref(), rev_persistence, rev_compactor, - PhantomSnapshotPersistence(), + snapshot_persistence, ); let folder_editor = FolderEditor::new(user_id, &folder_id, token, rev_manager, self.web_socket.clone()).await?; diff --git a/frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/folder_snapshot_sqlite_impl.rs b/frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/folder_snapshot_sqlite_impl.rs new file mode 100644 index 0000000000..09d2334fcb --- /dev/null +++ b/frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/folder_snapshot_sqlite_impl.rs @@ -0,0 +1,96 @@ +use bytes::Bytes; +use flowy_database::{ + prelude::*, + schema::{folder_rev_snapshot, folder_rev_snapshot::dsl}, + ConnectionPool, +}; +use flowy_error::{internal_error, FlowyResult}; +use flowy_revision::{RevisionSnapshot, RevisionSnapshotDiskCache}; +use lib_infra::util::timestamp; +use std::sync::Arc; + +pub struct SQLiteFolderRevisionSnapshotPersistence { + object_id: String, + pool: Arc, +} + +impl SQLiteFolderRevisionSnapshotPersistence { + pub fn new(object_id: &str, pool: Arc) -> Self { + Self { + object_id: object_id.to_string(), + pool, + } + } + + fn gen_snapshot_id(&self, rev_id: i64) -> String { + format!("{}:{}", self.object_id, rev_id) + } +} + +impl RevisionSnapshotDiskCache for SQLiteFolderRevisionSnapshotPersistence { + fn should_generate_snapshot_from_range(&self, start_rev_id: i64, current_rev_id: i64) -> bool { + (current_rev_id - start_rev_id) >= 2 + } + + fn write_snapshot(&self, rev_id: i64, data: Vec) -> FlowyResult<()> { + let conn = self.pool.get().map_err(internal_error)?; + let snapshot_id = self.gen_snapshot_id(rev_id); + let timestamp = timestamp(); + let record = ( + dsl::snapshot_id.eq(&snapshot_id), + dsl::object_id.eq(&self.object_id), + dsl::rev_id.eq(rev_id), + dsl::base_rev_id.eq(rev_id), + dsl::timestamp.eq(timestamp), + dsl::data.eq(data), + ); + let _ = insert_or_ignore_into(dsl::folder_rev_snapshot) + .values(record) + .execute(&*conn)?; + Ok(()) + } + + fn read_snapshot(&self, rev_id: i64) -> FlowyResult> { + let conn = self.pool.get().map_err(internal_error)?; + let snapshot_id = self.gen_snapshot_id(rev_id); + let record = dsl::folder_rev_snapshot + .filter(dsl::snapshot_id.eq(&snapshot_id)) + .first::(&*conn)?; + + Ok(Some(record.into())) + } + + fn read_last_snapshot(&self) -> FlowyResult> { + let conn = self.pool.get().map_err(internal_error)?; + let latest_record = dsl::folder_rev_snapshot + .filter(dsl::object_id.eq(&self.object_id)) + .order(dsl::rev_id.desc()) + // .select(max(dsl::rev_id)) + // .select((dsl::id, dsl::object_id, dsl::rev_id, dsl::data)) + .first::(&*conn)?; + Ok(Some(latest_record.into())) + } +} + +#[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)] +#[table_name = "folder_rev_snapshot"] +#[primary_key("snapshot_id")] +struct FolderSnapshotRecord { + snapshot_id: String, + object_id: String, + rev_id: i64, + base_rev_id: i64, + timestamp: i64, + data: Vec, +} + +impl std::convert::From for RevisionSnapshot { + fn from(record: FolderSnapshotRecord) -> Self { + RevisionSnapshot { + rev_id: record.rev_id, + base_rev_id: record.base_rev_id, + timestamp: record.timestamp, + data: Bytes::from(record.data), + } + } +} diff --git a/frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/mod.rs b/frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/mod.rs index ff986cb0a9..754848674a 100644 --- a/frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/mod.rs +++ b/frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/mod.rs @@ -1,2 +1,5 @@ mod folder_rev_sqlite; +mod folder_snapshot_sqlite_impl; + pub use folder_rev_sqlite::*; +pub use folder_snapshot_sqlite_impl::*; diff --git a/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_snapshot_sqlite_impl.rs b/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_snapshot_sqlite_impl.rs index 0abf98f398..7aa7bc630a 100644 --- a/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_snapshot_sqlite_impl.rs +++ b/frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_snapshot_sqlite_impl.rs @@ -115,35 +115,3 @@ impl std::convert::From for RevisionSnapshot { } } } - -// pub(crate) fn get_latest_rev_id_from(rev_ids: Vec, anchor: i64) -> Option { -// let mut target_rev_id = None; -// let mut old_step: Option = None; -// for rev_id in rev_ids { -// let step = (rev_id - anchor).abs(); -// if let Some(old_step) = &mut old_step { -// if *old_step > step { -// *old_step = step; -// target_rev_id = Some(rev_id); -// } -// } else { -// old_step = Some(step); -// target_rev_id = Some(rev_id); -// } -// } -// target_rev_id -// } - -// #[cfg(test)] -// mod tests { -// use crate::services::persistence::rev_sqlite::get_latest_rev_id_from; -// -// #[test] -// fn test_latest_rev_id() { -// let ids = vec![1, 2, 3, 4, 5, 6]; -// for (anchor, expected_value) in vec![(3, 3), (7, 6), (1, 1)] { -// let value = get_latest_rev_id_from(ids.clone(), anchor).unwrap(); -// assert_eq!(value, expected_value); -// } -// } -// } diff --git a/frontend/rust-lib/flowy-revision/src/rev_snapshot.rs b/frontend/rust-lib/flowy-revision/src/rev_snapshot.rs index 7726880561..6c1df77280 100644 --- a/frontend/rust-lib/flowy-revision/src/rev_snapshot.rs +++ b/frontend/rust-lib/flowy-revision/src/rev_snapshot.rs @@ -10,14 +10,21 @@ use std::sync::atomic::Ordering::SeqCst; use std::sync::Arc; pub trait RevisionSnapshotDiskCache: Send + Sync { + fn should_generate_snapshot_from_range(&self, start_rev_id: i64, current_rev_id: i64) -> bool { + (current_rev_id - start_rev_id) >= AUTO_GEN_SNAPSHOT_PER_10_REVISION + } + fn write_snapshot(&self, rev_id: i64, data: Vec) -> FlowyResult<()>; + fn read_snapshot(&self, rev_id: i64) -> FlowyResult>; + fn read_last_snapshot(&self) -> FlowyResult>; } /// Do nothing but just used to clam the rust compiler about the generic parameter `SP` of `RevisionManager` /// pub struct PhantomSnapshotPersistence(); + impl RevisionSnapshotDiskCache for PhantomSnapshotPersistence { fn write_snapshot(&self, rev_id: i64, data: Vec) -> FlowyResult<()> { Ok(()) @@ -37,7 +44,7 @@ const AUTO_GEN_SNAPSHOT_PER_10_REVISION: i64 = 10; pub struct RevisionSnapshotController { user_id: String, object_id: String, - disk_cache: Arc, + rev_snapshot_persistence: Arc, rev_id_counter: Arc, rev_persistence: Arc>, rev_compress: Arc, @@ -63,7 +70,7 @@ where Self { user_id: user_id.to_string(), object_id: object_id.to_string(), - disk_cache, + rev_snapshot_persistence: disk_cache, rev_id_counter, start_rev_id: AtomicI64::new(0), rev_persistence: revision_persistence, @@ -73,7 +80,7 @@ where pub async fn generate_snapshot(&self) { if let Some((rev_id, bytes)) = self.generate_snapshot_data() { - if let Err(e) = self.disk_cache.write_snapshot(rev_id, bytes.to_vec()) { + if let Err(e) = self.rev_snapshot_persistence.write_snapshot(rev_id, bytes.to_vec()) { tracing::error!("Save snapshot failed: {}", e); } } @@ -85,7 +92,7 @@ where B: RevisionObjectDeserializer, { tracing::trace!("Try to find if {} has snapshot", self.object_id); - let snapshot = self.disk_cache.read_last_snapshot().ok()??; + let snapshot = self.rev_snapshot_persistence.read_last_snapshot().ok()??; let snapshot_rev_id = snapshot.rev_id; let revision = Revision::new( &self.object_id, @@ -115,10 +122,12 @@ where if current_rev_id <= start_rev_id { return; } - - if (current_rev_id - start_rev_id) >= AUTO_GEN_SNAPSHOT_PER_10_REVISION { + if self + .rev_snapshot_persistence + .should_generate_snapshot_from_range(start_rev_id, current_rev_id) + { if let Some((rev_id, bytes)) = self.generate_snapshot_data() { - let disk_cache = self.disk_cache.clone(); + let disk_cache = self.rev_snapshot_persistence.clone(); tokio::spawn(async move { let _ = disk_cache.write_snapshot(rev_id, bytes.to_vec()); }); @@ -161,7 +170,7 @@ impl std::ops::Deref for RevisionSnapshotController { type Target = Arc; fn deref(&self) -> &Self::Target { - &self.disk_cache + &self.rev_snapshot_persistence } }