chore: config revision history

This commit is contained in:
appflowy
2022-06-07 16:38:00 +08:00
parent ab63ce7bce
commit f6ade11eb2
9 changed files with 150 additions and 9 deletions

View File

@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
DROP TABLE rev_history;

View File

@ -0,0 +1,8 @@
-- Your SQL goes here
CREATE TABLE rev_history (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
object_id TEXT NOT NULL DEFAULT '',
start_rev_id BIGINT NOT NULL DEFAULT 0,
end_rev_id BIGINT NOT NULL DEFAULT 0,
data BLOB NOT NULL DEFAULT (x'')
);

View File

@ -57,6 +57,16 @@ table! {
} }
} }
table! {
rev_history (id) {
id -> Integer,
object_id -> Text,
start_rev_id -> BigInt,
end_rev_id -> BigInt,
data -> Binary,
}
}
table! { table! {
rev_table (id) { rev_table (id) {
id -> Integer, id -> Integer,
@ -124,6 +134,7 @@ allow_tables_to_appear_in_same_query!(
grid_meta_rev_table, grid_meta_rev_table,
grid_rev_table, grid_rev_table,
kv_table, kv_table,
rev_history,
rev_table, rev_table,
trash_table, trash_table,
user_table, user_table,

View File

@ -0,0 +1,4 @@
mod persistence;
mod rev_history;
pub use rev_history::*;

View File

@ -0,0 +1,22 @@
use crate::history::RevisionHistoryDiskCache;
use flowy_error::FlowyError;
use flowy_sync::entities::revision::Revision;
pub struct SQLiteRevisionHistoryPersistence {}
impl SQLiteRevisionHistoryPersistence {
pub fn new() -> Self {
Self {}
}
}
impl RevisionHistoryDiskCache for SQLiteRevisionHistoryPersistence {
type Error = FlowyError;
fn save_revision(&self, revision: Revision) -> Result<(), Self::Error> {
todo!()
}
}
struct RevisionHistorySql();
impl RevisionHistorySql {}

View File

@ -0,0 +1,78 @@
use crate::history::persistence::SQLiteRevisionHistoryPersistence;
use flowy_error::FlowyError;
use flowy_sync::entities::revision::Revision;
use std::fmt::Debug;
use std::sync::Arc;
use tokio::sync::RwLock;
pub trait RevisionHistoryDiskCache: Send + Sync {
type Error: Debug;
fn save_revision(&self, revision: Revision) -> Result<(), Self::Error>;
}
pub struct RevisionHistory {
config: RevisionHistoryConfig,
checkpoint: Arc<RwLock<HistoryCheckpoint>>,
disk_cache: Arc<dyn RevisionHistoryDiskCache<Error = FlowyError>>,
}
impl RevisionHistory {
pub fn new(config: RevisionHistoryConfig) -> Self {
let disk_cache = Arc::new(SQLiteRevisionHistoryPersistence::new());
let cloned_disk_cache = disk_cache.clone();
let checkpoint = HistoryCheckpoint::from_config(&config, move |revision| {
let _ = cloned_disk_cache.save_revision(revision);
});
let checkpoint = Arc::new(RwLock::new(checkpoint));
Self {
config,
checkpoint,
disk_cache,
}
}
pub async fn save_revision(&self, revision: &Revision) {
self.checkpoint.write().await.add_revision(revision);
}
}
pub struct RevisionHistoryConfig {
check_when_close: bool,
check_interval: i64,
}
impl std::default::Default for RevisionHistoryConfig {
fn default() -> Self {
Self {
check_when_close: true,
check_interval: 19,
}
}
}
struct HistoryCheckpoint {
interval: i64,
revisions: Vec<Revision>,
on_check: Box<dyn Fn(Revision) + Send + Sync + 'static>,
}
impl HistoryCheckpoint {
fn from_config<F>(config: &RevisionHistoryConfig, on_check: F) -> Self
where
F: Fn(Revision) + Send + Sync + 'static,
{
Self {
interval: config.check_interval,
revisions: vec![],
on_check: Box::new(on_check),
}
}
fn check(&mut self) -> Revision {
todo!()
}
fn add_revision(&mut self, revision: &Revision) {}
}

View File

@ -1,5 +1,6 @@
mod cache; mod cache;
mod conflict_resolve; mod conflict_resolve;
mod history;
mod rev_manager; mod rev_manager;
mod rev_persistence; mod rev_persistence;
mod ws_manager; mod ws_manager;

View File

@ -1,4 +1,5 @@
use crate::disk::RevisionState; use crate::disk::RevisionState;
use crate::history::{RevisionHistory, RevisionHistoryConfig};
use crate::{RevisionPersistence, WSDataProviderDataSource}; use crate::{RevisionPersistence, WSDataProviderDataSource};
use bytes::Bytes; use bytes::Bytes;
use flowy_error::{FlowyError, FlowyResult}; use flowy_error::{FlowyError, FlowyResult};
@ -45,7 +46,7 @@ pub struct RevisionManager {
user_id: String, user_id: String,
rev_id_counter: RevIdCounter, rev_id_counter: RevIdCounter,
rev_persistence: Arc<RevisionPersistence>, rev_persistence: Arc<RevisionPersistence>,
rev_history: Arc<RevisionHistory>,
#[cfg(feature = "flowy_unit_test")] #[cfg(feature = "flowy_unit_test")]
rev_ack_notifier: tokio::sync::broadcast::Sender<i64>, rev_ack_notifier: tokio::sync::broadcast::Sender<i64>,
} }
@ -53,6 +54,8 @@ pub struct RevisionManager {
impl RevisionManager { impl RevisionManager {
pub fn new(user_id: &str, object_id: &str, rev_persistence: Arc<RevisionPersistence>) -> Self { pub fn new(user_id: &str, object_id: &str, rev_persistence: Arc<RevisionPersistence>) -> Self {
let rev_id_counter = RevIdCounter::new(0); let rev_id_counter = RevIdCounter::new(0);
let rev_history_config = RevisionHistoryConfig::default();
let rev_history = Arc::new(RevisionHistory::new(rev_history_config));
#[cfg(feature = "flowy_unit_test")] #[cfg(feature = "flowy_unit_test")]
let (revision_ack_notifier, _) = tokio::sync::broadcast::channel(1); let (revision_ack_notifier, _) = tokio::sync::broadcast::channel(1);
@ -61,6 +64,7 @@ impl RevisionManager {
user_id: user_id.to_owned(), user_id: user_id.to_owned(),
rev_id_counter, rev_id_counter,
rev_persistence, rev_persistence,
rev_history,
#[cfg(feature = "flowy_unit_test")] #[cfg(feature = "flowy_unit_test")]
rev_ack_notifier: revision_ack_notifier, rev_ack_notifier: revision_ack_notifier,

View File

@ -61,7 +61,7 @@ impl GridBlockMetaPad {
&mut self, &mut self,
row: RowMeta, row: RowMeta,
start_row_id: Option<String>, start_row_id: Option<String>,
) -> CollaborateResult<Option<GridBlockMetaChange>> { ) -> CollaborateResult<Option<GridBlockMetaDeltaChangeset>> {
self.modify(|rows| { self.modify(|rows| {
if let Some(start_row_id) = start_row_id { if let Some(start_row_id) = start_row_id {
if !start_row_id.is_empty() { if !start_row_id.is_empty() {
@ -77,7 +77,10 @@ impl GridBlockMetaPad {
}) })
} }
pub fn delete_rows(&mut self, row_ids: Vec<Cow<'_, String>>) -> CollaborateResult<Option<GridBlockMetaChange>> { pub fn delete_rows(
&mut self,
row_ids: Vec<Cow<'_, String>>,
) -> CollaborateResult<Option<GridBlockMetaDeltaChangeset>> {
self.modify(|rows| { self.modify(|rows| {
rows.retain(|row| !row_ids.contains(&Cow::Borrowed(&row.id))); rows.retain(|row| !row_ids.contains(&Cow::Borrowed(&row.id)));
Ok(Some(())) Ok(Some(()))
@ -141,7 +144,10 @@ impl GridBlockMetaPad {
.map(|index| index as i32) .map(|index| index as i32)
} }
pub fn update_row(&mut self, changeset: RowMetaChangeset) -> CollaborateResult<Option<GridBlockMetaChange>> { pub fn update_row(
&mut self,
changeset: RowMetaChangeset,
) -> CollaborateResult<Option<GridBlockMetaDeltaChangeset>> {
let row_id = changeset.row_id.clone(); let row_id = changeset.row_id.clone();
self.modify_row(&row_id, |row| { self.modify_row(&row_id, |row| {
let mut is_changed = None; let mut is_changed = None;
@ -166,7 +172,12 @@ impl GridBlockMetaPad {
}) })
} }
pub fn move_row(&mut self, row_id: &str, from: usize, to: usize) -> CollaborateResult<Option<GridBlockMetaChange>> { pub fn move_row(
&mut self,
row_id: &str,
from: usize,
to: usize,
) -> CollaborateResult<Option<GridBlockMetaDeltaChangeset>> {
self.modify(|row_metas| { self.modify(|row_metas| {
if let Some(position) = row_metas.iter().position(|row_meta| row_meta.id == row_id) { if let Some(position) = row_metas.iter().position(|row_meta| row_meta.id == row_id) {
debug_assert_eq!(from, position); debug_assert_eq!(from, position);
@ -179,7 +190,7 @@ impl GridBlockMetaPad {
}) })
} }
pub fn modify<F>(&mut self, f: F) -> CollaborateResult<Option<GridBlockMetaChange>> pub fn modify<F>(&mut self, f: F) -> CollaborateResult<Option<GridBlockMetaDeltaChangeset>>
where where
F: for<'a> FnOnce(&'a mut Vec<Arc<RowMeta>>) -> CollaborateResult<Option<()>>, F: for<'a> FnOnce(&'a mut Vec<Arc<RowMeta>>) -> CollaborateResult<Option<()>>,
{ {
@ -198,14 +209,14 @@ impl GridBlockMetaPad {
// self.delta.to_str().unwrap_or_else(|_| "".to_string()) // self.delta.to_str().unwrap_or_else(|_| "".to_string())
// ); // );
self.delta = self.delta.compose(&delta)?; self.delta = self.delta.compose(&delta)?;
Ok(Some(GridBlockMetaChange { delta, md5: self.md5() })) Ok(Some(GridBlockMetaDeltaChangeset { delta, md5: self.md5() }))
} }
} }
} }
} }
} }
fn modify_row<F>(&mut self, row_id: &str, f: F) -> CollaborateResult<Option<GridBlockMetaChange>> fn modify_row<F>(&mut self, row_id: &str, f: F) -> CollaborateResult<Option<GridBlockMetaDeltaChangeset>>
where where
F: FnOnce(&mut RowMeta) -> CollaborateResult<Option<()>>, F: FnOnce(&mut RowMeta) -> CollaborateResult<Option<()>>,
{ {
@ -233,7 +244,7 @@ impl GridBlockMetaPad {
} }
} }
pub struct GridBlockMetaChange { pub struct GridBlockMetaDeltaChangeset {
pub delta: GridBlockMetaDelta, pub delta: GridBlockMetaDelta,
/// md5: the md5 of the grid after applying the change. /// md5: the md5 of the grid after applying the change.
pub md5: String, pub md5: String,