chore: add snapshot

This commit is contained in:
appflowy 2022-06-10 22:27:19 +08:00
parent aeb69f307c
commit d5eabe4ea3
15 changed files with 225 additions and 87 deletions

View File

@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
DROP TABLE rev_snapshot;

View File

@ -0,0 +1,7 @@
-- Your SQL goes here
CREATE TABLE rev_snapshot (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
object_id TEXT NOT NULL DEFAULT '',
rev_id BIGINT NOT NULL DEFAULT 0,
data BLOB NOT NULL DEFAULT (x'')
);

View File

@ -67,6 +67,15 @@ table! {
}
}
table! {
rev_snapshot (id) {
id -> Integer,
object_id -> Text,
rev_id -> BigInt,
data -> Binary,
}
}
table! {
rev_table (id) {
id -> Integer,
@ -135,6 +144,7 @@ allow_tables_to_appear_in_same_query!(
grid_rev_table,
kv_table,
rev_history,
rev_snapshot,
rev_table,
trash_table,
user_table,

View File

@ -16,7 +16,10 @@ use flowy_error::FlowyError;
use flowy_folder_data_model::entities::view::ViewDataType;
use flowy_folder_data_model::user_default;
use flowy_revision::disk::SQLiteTextBlockRevisionPersistence;
use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionHistoryPersistence};
use flowy_revision::{
RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionHistoryPersistence,
SQLiteRevisionSnapshotPersistence,
};
use flowy_sync::{client_folder::FolderPad, entities::ws_data::ServerRevisionWSData};
use lazy_static::lazy_static;
use lib_infra::future::FutureResult;
@ -163,16 +166,19 @@ impl FolderManager {
let _ = self.persistence.initialize(user_id, &folder_id).await?;
let pool = self.persistence.db_pool()?;
let object_id = folder_id.as_ref();
let disk_cache = SQLiteTextBlockRevisionPersistence::new(user_id, pool.clone());
let rev_persistence = RevisionPersistence::new(user_id, folder_id.as_ref(), disk_cache);
let rev_persistence = RevisionPersistence::new(user_id, object_id, disk_cache);
let rev_compactor = FolderRevisionCompactor();
let history_persistence = SQLiteRevisionHistoryPersistence::new(pool);
let history_persistence = SQLiteRevisionHistoryPersistence::new(object_id, pool.clone());
let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(object_id, pool);
let rev_manager = RevisionManager::new(
user_id,
folder_id.as_ref(),
rev_persistence,
rev_compactor,
history_persistence,
snapshot_persistence,
);
let folder_editor = FolderEditor::new(user_id, &folder_id, token, rev_manager, self.web_socket.clone()).await?;

View File

@ -9,7 +9,10 @@ use flowy_database::ConnectionPool;
use flowy_error::{FlowyError, FlowyResult};
use flowy_grid_data_model::entities::{BuildGridContext, GridMeta};
use flowy_revision::disk::SQLiteGridRevisionPersistence;
use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionHistoryPersistence};
use flowy_revision::{
RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionHistoryPersistence,
SQLiteRevisionSnapshotPersistence,
};
use flowy_sync::client_grid::{make_block_meta_delta, make_grid_delta};
use flowy_sync::entities::revision::{RepeatedRevision, Revision};
use std::sync::Arc;
@ -130,9 +133,17 @@ impl GridManager {
let disk_cache = SQLiteGridRevisionPersistence::new(&user_id, pool.clone());
let rev_persistence = RevisionPersistence::new(&user_id, grid_id, disk_cache);
let history_persistence = SQLiteRevisionHistoryPersistence::new(pool);
let history_persistence = SQLiteRevisionHistoryPersistence::new(grid_id, pool.clone());
let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(grid_id, pool);
let rev_compactor = GridMetaRevisionCompactor();
let rev_manager = RevisionManager::new(&user_id, grid_id, rev_persistence, rev_compactor, history_persistence);
let rev_manager = RevisionManager::new(
&user_id,
grid_id,
rev_persistence,
rev_compactor,
history_persistence,
snapshot_persistence,
);
Ok(rev_manager)
}
}

View File

@ -10,7 +10,9 @@ use flowy_grid_data_model::entities::{
RowMeta, RowMetaChangeset, RowOrder, UpdatedRowOrder,
};
use flowy_revision::disk::SQLiteGridBlockMetaRevisionPersistence;
use flowy_revision::{RevisionManager, RevisionPersistence, SQLiteRevisionHistoryPersistence};
use flowy_revision::{
RevisionManager, RevisionPersistence, SQLiteRevisionHistoryPersistence, SQLiteRevisionSnapshotPersistence,
};
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
@ -280,8 +282,10 @@ pub fn make_grid_block_meta_rev_manager(user: &Arc<dyn GridUser>, block_id: &str
let disk_cache = SQLiteGridBlockMetaRevisionPersistence::new(&user_id, pool.clone());
let rev_persistence = RevisionPersistence::new(&user_id, block_id, disk_cache);
let rev_compactor = GridBlockMetaRevisionCompactor();
let history_persistence = SQLiteRevisionHistoryPersistence::new(pool);
let history_persistence = SQLiteRevisionHistoryPersistence::new(block_id, pool.clone());
let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(block_id, pool);
Ok(RevisionManager::new(
&user_id,
@ -289,5 +293,6 @@ pub fn make_grid_block_meta_rev_manager(user: &Arc<dyn GridUser>, block_id: &str
rev_persistence,
rev_compactor,
history_persistence,
snapshot_persistence,
))
}

View File

@ -95,7 +95,6 @@ struct GridRevisionSql();
impl GridRevisionSql {
fn create(revision_records: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
// Batch insert: https://diesel.rs/guides/all-about-inserts.html
let records = revision_records
.into_iter()
.map(|record| {

View File

@ -1,50 +1,51 @@
use crate::history::RevisionHistoryDiskCache;
use diesel::{sql_types::Integer, update, SqliteConnection};
use flowy_database::{
prelude::*,
schema::{rev_history, rev_history::dsl},
ConnectionPool,
};
use flowy_error::{FlowyError, FlowyResult};
use flowy_error::{internal_error, FlowyResult};
use flowy_sync::entities::revision::Revision;
use std::sync::Arc;
pub struct SQLiteRevisionHistoryPersistence {
object_id: String,
pool: Arc<ConnectionPool>,
}
impl SQLiteRevisionHistoryPersistence {
pub fn new(pool: Arc<ConnectionPool>) -> Self {
Self { pool }
pub fn new(object_id: &str, pool: Arc<ConnectionPool>) -> Self {
let object_id = object_id.to_owned();
Self { object_id, pool }
}
}
impl RevisionHistoryDiskCache for SQLiteRevisionHistoryPersistence {
fn save_revision(&self, revision: Revision) -> FlowyResult<()> {
todo!()
fn write_history(&self, revision: Revision) -> FlowyResult<()> {
let record = (
dsl::object_id.eq(revision.object_id),
dsl::start_rev_id.eq(revision.base_rev_id),
dsl::end_rev_id.eq(revision.rev_id),
dsl::data.eq(revision.delta_data),
);
let conn = self.pool.get().map_err(internal_error)?;
let _ = insert_or_ignore_into(dsl::rev_history)
.values(vec![record])
.execute(&*conn)?;
Ok(())
}
fn read_revision(&self, rev_id: i64) -> FlowyResult<Revision> {
todo!()
}
fn clear(&self) -> FlowyResult<()> {
todo!()
}
}
struct RevisionHistorySql();
impl RevisionHistorySql {
fn read_revision(object_id: &str, rev_id: i64, conn: &SqliteConnection) -> Result<Revision, FlowyError> {
fn read_histories(&self) -> FlowyResult<Vec<RevisionHistory>> {
let conn = self.pool.get().map_err(internal_error)?;
let records: Vec<RevisionRecord> = dsl::rev_history
.filter(dsl::start_rev_id.lt(rev_id))
.filter(dsl::end_rev_id.ge(rev_id))
.filter(dsl::object_id.eq(object_id))
.load::<RevisionRecord>(conn)?;
.filter(dsl::object_id.eq(&self.object_id))
.load::<RevisionRecord>(&*conn)?;
debug_assert_eq!(records.len(), 1);
todo!()
Ok(records
.into_iter()
.map(|record| record.into())
.collect::<Vec<RevisionHistory>>())
}
}
@ -57,3 +58,21 @@ struct RevisionRecord {
end_rev_id: i64,
data: Vec<u8>,
}
pub struct RevisionHistory {
pub object_id: String,
pub start_rev_id: i64,
pub end_rev_id: i64,
pub data: Vec<u8>,
}
impl std::convert::From<RevisionRecord> for RevisionHistory {
fn from(record: RevisionRecord) -> Self {
RevisionHistory {
object_id: record.object_id,
start_rev_id: record.start_rev_id,
end_rev_id: record.end_rev_id,
data: record.data,
}
}
}

View File

@ -1,36 +1,31 @@
use crate::history::persistence::SQLiteRevisionHistoryPersistence;
use crate::RevisionCompactor;
use crate::{RevisionCompactor, RevisionHistory};
use async_stream::stream;
use flowy_database::ConnectionPool;
use flowy_error::{FlowyError, FlowyResult};
use flowy_sync::entities::revision::Revision;
use futures_util::future::BoxFuture;
use futures_util::stream::StreamExt;
use futures_util::FutureExt;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::Sender;
use tokio::sync::{mpsc, oneshot, RwLock};
use tokio::sync::{mpsc, RwLock};
use tokio::time::interval;
pub trait RevisionHistoryDiskCache: Send + Sync {
fn save_revision(&self, revision: Revision) -> FlowyResult<()>;
fn write_history(&self, revision: Revision) -> FlowyResult<()>;
fn read_revision(&self, rev_id: i64) -> FlowyResult<Revision>;
fn clear(&self) -> FlowyResult<()>;
fn read_histories(&self) -> FlowyResult<Vec<RevisionHistory>>;
}
pub struct RevisionHistory {
stop_timer: mpsc::Sender<()>,
pub struct RevisionHistoryManager {
user_id: String,
stop_tx: mpsc::Sender<()>,
config: RevisionHistoryConfig,
revisions: Arc<RwLock<Vec<Revision>>>,
disk_cache: Arc<dyn RevisionHistoryDiskCache>,
}
impl RevisionHistory {
impl RevisionHistoryManager {
pub fn new(
user_id: &str,
object_id: &str,
@ -38,27 +33,13 @@ impl RevisionHistory {
disk_cache: Arc<dyn RevisionHistoryDiskCache>,
rev_compactor: Arc<dyn RevisionCompactor>,
) -> Self {
let user_id = user_id.to_string();
let object_id = object_id.to_string();
let cloned_disk_cache = disk_cache.clone();
let (stop_timer, stop_rx) = mpsc::channel(1);
let (checkpoint_tx, checkpoint_rx) = mpsc::channel(1);
let revisions = Arc::new(RwLock::new(vec![]));
let fix_duration_checkpoint_tx = FixedDurationCheckpointSender {
user_id,
object_id,
checkpoint_tx,
disk_cache: cloned_disk_cache,
revisions: revisions.clone(),
rev_compactor,
duration: config.check_duration,
};
tokio::spawn(CheckpointRunner::new(stop_rx, checkpoint_rx).run());
tokio::spawn(fix_duration_checkpoint_tx.run());
let stop_tx =
spawn_history_checkpoint_runner(user_id, object_id, &disk_cache, &revisions, rev_compactor, &config);
let user_id = user_id.to_owned();
Self {
stop_timer,
user_id,
stop_tx,
config,
revisions,
disk_cache,
@ -69,12 +50,8 @@ impl RevisionHistory {
self.revisions.write().await.push(revision.clone());
}
pub async fn reset_history(&self) {
self.revisions.write().await.clear();
match self.disk_cache.clear() {
Ok(_) => {}
Err(e) => tracing::error!("Clear history failed: {:?}", e),
}
pub async fn read_revision_histories(&self) -> FlowyResult<Vec<RevisionHistory>> {
self.disk_cache.read_histories()
}
}
@ -90,12 +67,41 @@ impl std::default::Default for RevisionHistoryConfig {
}
}
struct CheckpointRunner {
fn spawn_history_checkpoint_runner(
user_id: &str,
object_id: &str,
disk_cache: &Arc<dyn RevisionHistoryDiskCache>,
revisions: &Arc<RwLock<Vec<Revision>>>,
rev_compactor: Arc<dyn RevisionCompactor>,
config: &RevisionHistoryConfig,
) -> mpsc::Sender<()> {
let user_id = user_id.to_string();
let object_id = object_id.to_string();
let disk_cache = disk_cache.clone();
let revisions = revisions.clone();
let (checkpoint_tx, checkpoint_rx) = mpsc::channel(1);
let (stop_tx, stop_rx) = mpsc::channel(1);
let checkpoint_sender = FixedDurationCheckpointSender {
user_id,
object_id,
checkpoint_tx,
disk_cache,
revisions,
rev_compactor,
duration: config.check_duration,
};
tokio::spawn(HistoryCheckpointRunner::new(stop_rx, checkpoint_rx).run());
tokio::spawn(checkpoint_sender.run());
stop_tx
}
struct HistoryCheckpointRunner {
stop_rx: Option<mpsc::Receiver<()>>,
checkpoint_rx: Option<mpsc::Receiver<HistoryCheckpoint>>,
}
impl CheckpointRunner {
impl HistoryCheckpointRunner {
fn new(stop_rx: mpsc::Receiver<()>, checkpoint_rx: mpsc::Receiver<HistoryCheckpoint>) -> Self {
Self {
stop_rx: Some(stop_rx),
@ -149,7 +155,7 @@ impl HistoryCheckpoint {
let revision = self
.rev_compactor
.compact(&self.user_id, &self.object_id, self.revisions)?;
let _ = self.disk_cache.save_revision(revision)?;
let _ = self.disk_cache.write_history(revision)?;
Ok::<(), FlowyError>(())
};
@ -174,7 +180,7 @@ impl FixedDurationCheckpointSender {
fn run(self) -> BoxFuture<'static, ()> {
async move {
let mut interval = interval(self.duration);
let checkpoint_revisions: Vec<Revision> = revisions.write().await.drain(..).collect();
let checkpoint_revisions: Vec<Revision> = self.revisions.write().await.drain(..).collect();
let checkpoint = HistoryCheckpoint {
user_id: self.user_id.clone(),
object_id: self.object_id.clone(),
@ -182,7 +188,7 @@ impl FixedDurationCheckpointSender {
disk_cache: self.disk_cache.clone(),
rev_compactor: self.rev_compactor.clone(),
};
match checkpoint_tx.send(checkpoint).await {
match self.checkpoint_tx.send(checkpoint).await {
Ok(_) => {
interval.tick().await;
self.run();

View File

@ -3,6 +3,7 @@ mod conflict_resolve;
mod history;
mod rev_manager;
mod rev_persistence;
mod snapshot;
mod ws_manager;
pub use cache::*;
@ -10,6 +11,7 @@ pub use conflict_resolve::*;
pub use history::*;
pub use rev_manager::*;
pub use rev_persistence::*;
pub use snapshot::*;
pub use ws_manager::*;
#[macro_use]

View File

@ -1,6 +1,6 @@
use crate::disk::RevisionState;
use crate::history::{RevisionHistory, RevisionHistoryConfig, RevisionHistoryDiskCache};
use crate::{RevisionPersistence, WSDataProviderDataSource};
use crate::history::{RevisionHistoryConfig, RevisionHistoryDiskCache, RevisionHistoryManager};
use crate::{RevisionPersistence, RevisionSnapshotDiskCache, RevisionSnapshotManager, WSDataProviderDataSource};
use bytes::Bytes;
use flowy_error::{FlowyError, FlowyResult};
use flowy_sync::{
@ -46,37 +46,43 @@ pub struct RevisionManager {
user_id: String,
rev_id_counter: RevIdCounter,
rev_persistence: Arc<RevisionPersistence>,
rev_history: Arc<RevisionHistory>,
rev_history: Arc<RevisionHistoryManager>,
rev_snapshot: Arc<RevisionSnapshotManager>,
rev_compactor: Arc<dyn RevisionCompactor>,
#[cfg(feature = "flowy_unit_test")]
rev_ack_notifier: tokio::sync::broadcast::Sender<i64>,
}
impl RevisionManager {
pub fn new<P, C>(
pub fn new<HP, SP, C>(
user_id: &str,
object_id: &str,
rev_persistence: RevisionPersistence,
rev_compactor: C,
history_persistence: P,
history_persistence: HP,
snapshot_persistence: SP,
) -> Self
where
P: 'static + RevisionHistoryDiskCache,
HP: 'static + RevisionHistoryDiskCache,
SP: 'static + RevisionSnapshotDiskCache,
C: 'static + RevisionCompactor,
{
let rev_id_counter = RevIdCounter::new(0);
let rev_compactor = Arc::new(rev_compactor);
let history_persistence = Arc::new(history_persistence);
let rev_history_config = RevisionHistoryConfig::default();
let rev_persistence = Arc::new(rev_persistence);
let rev_history = Arc::new(RevisionHistory::new(
let rev_history = Arc::new(RevisionHistoryManager::new(
user_id,
object_id,
rev_history_config,
history_persistence,
rev_compactor.clone(),
));
let rev_snapshot = Arc::new(RevisionSnapshotManager::new(user_id, object_id, snapshot_persistence));
#[cfg(feature = "flowy_unit_test")]
let (revision_ack_notifier, _) = tokio::sync::broadcast::channel(1);
@ -86,6 +92,7 @@ impl RevisionManager {
rev_id_counter,
rev_persistence,
rev_history,
rev_snapshot,
rev_compactor,
#[cfg(feature = "flowy_unit_test")]
rev_ack_notifier: revision_ack_notifier,
@ -114,7 +121,6 @@ impl RevisionManager {
pub async fn reset_object(&self, revisions: RepeatedRevision) -> FlowyResult<()> {
let rev_id = pair_rev_id_from_revisions(&revisions).1;
let _ = self.rev_persistence.reset(revisions.into_inner()).await?;
self.rev_history.reset_history().await;
self.rev_id_counter.set(rev_id);
Ok(())
}

View File

@ -0,0 +1,5 @@
mod persistence;
mod rev_snapshot;
pub use persistence::*;
pub use rev_snapshot::*;

View File

@ -0,0 +1,28 @@
use crate::{RevisionSnapshotDiskCache, RevisionSnapshotInfo};
use flowy_database::ConnectionPool;
use flowy_error::FlowyResult;
use std::sync::Arc;
pub struct SQLiteRevisionSnapshotPersistence {
object_id: String,
pool: Arc<ConnectionPool>,
}
impl SQLiteRevisionSnapshotPersistence {
pub fn new(object_id: &str, pool: Arc<ConnectionPool>) -> Self {
Self {
object_id: object_id.to_string(),
pool,
}
}
}
impl RevisionSnapshotDiskCache for SQLiteRevisionSnapshotPersistence {
fn write_snapshot(&self, object_id: &str, rev_id: i64, data: Vec<u8>) -> FlowyResult<()> {
todo!()
}
fn read_snapshot(&self, object_id: &str, rev_id: i64) -> FlowyResult<RevisionSnapshotInfo> {
todo!()
}
}

View File

@ -0,0 +1,29 @@
use flowy_error::FlowyResult;
use std::sync::Arc;
pub trait RevisionSnapshotDiskCache: Send + Sync {
fn write_snapshot(&self, object_id: &str, rev_id: i64, data: Vec<u8>) -> FlowyResult<()>;
fn read_snapshot(&self, object_id: &str, rev_id: i64) -> FlowyResult<RevisionSnapshotInfo>;
}
pub struct RevisionSnapshotManager {
user_id: String,
object_id: String,
disk_cache: Arc<dyn RevisionSnapshotDiskCache>,
}
impl RevisionSnapshotManager {
pub fn new<D>(user_id: &str, object_id: &str, disk_cache: D) -> Self
where
D: RevisionSnapshotDiskCache + 'static,
{
let disk_cache = Arc::new(disk_cache);
Self {
user_id: user_id.to_string(),
object_id: object_id.to_string(),
disk_cache,
}
}
}
pub struct RevisionSnapshotInfo {}

View File

@ -7,6 +7,7 @@ use flowy_error::FlowyResult;
use flowy_revision::disk::SQLiteTextBlockRevisionPersistence;
use flowy_revision::{
RevisionCloudService, RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionHistoryPersistence,
SQLiteRevisionSnapshotPersistence,
};
use flowy_sync::entities::{
revision::{md5, RepeatedRevision, Revision},
@ -144,7 +145,8 @@ impl TextBlockManager {
let user_id = self.user.user_id()?;
let disk_cache = SQLiteTextBlockRevisionPersistence::new(&user_id, pool.clone());
let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache);
let history_persistence = SQLiteRevisionHistoryPersistence::new(pool);
let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone());
let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(doc_id, pool);
let rev_compactor = TextBlockRevisionCompactor();
Ok(RevisionManager::new(
@ -153,6 +155,7 @@ impl TextBlockManager {
rev_persistence,
rev_compactor,
history_persistence,
snapshot_persistence,
))
}
}