chore: remove invalid revisions (#1816)

This commit is contained in:
Nathan.fooo 2023-02-07 14:30:25 +08:00 committed by GitHub
parent 5f760ad578
commit e77fef3a19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 132 additions and 98 deletions

1
.gitignore vendored
View File

@ -21,7 +21,6 @@ node_modules
**/resources/proto **/resources/proto
frontend/.vscode/*
!frontend/.vscode/settings.json !frontend/.vscode/settings.json
!frontend/.vscode/tasks.json !frontend/.vscode/tasks.json
!frontend/.vscode/launch.json !frontend/.vscode/launch.json

View File

@ -12,7 +12,7 @@
"type": "dart", "type": "dart",
"preLaunchTask": "AF: Build Appflowy Core", "preLaunchTask": "AF: Build Appflowy Core",
"env": { "env": {
"RUST_LOG": "trace" "RUST_LOG": "trace",
// "RUST_LOG": "debug" // "RUST_LOG": "debug"
}, },
"cwd": "${workspaceRoot}/app_flowy" "cwd": "${workspaceRoot}/app_flowy"

View File

@ -32,16 +32,18 @@ pub fn contain_newline(s: &str) -> bool {
pub fn recover_operation_from_revisions<T>( pub fn recover_operation_from_revisions<T>(
revisions: Vec<Revision>, revisions: Vec<Revision>,
validator: impl Fn(&DeltaOperations<T>) -> bool, validator: impl Fn(&DeltaOperations<T>) -> bool,
) -> Option<DeltaOperations<T>> ) -> Option<(DeltaOperations<T>, i64)>
where where
T: OperationAttributes + DeserializeOwned + OperationAttributes, T: OperationAttributes + DeserializeOwned + OperationAttributes,
{ {
let mut new_operations = DeltaOperations::<T>::new(); let mut new_operations = DeltaOperations::<T>::new();
let mut rev_id = 0;
for revision in revisions { for revision in revisions {
if let Ok(operations) = DeltaOperations::<T>::from_bytes(revision.bytes) { if let Ok(operations) = DeltaOperations::<T>::from_bytes(revision.bytes) {
match new_operations.compose(&operations) { match new_operations.compose(&operations) {
Ok(composed_operations) => { Ok(composed_operations) => {
if validator(&composed_operations) { if validator(&composed_operations) {
rev_id = revision.rev_id;
new_operations = composed_operations; new_operations = composed_operations;
} else { } else {
break; break;
@ -56,7 +58,7 @@ where
if new_operations.is_empty() { if new_operations.is_empty() {
None None
} else { } else {
Some(new_operations) Some((new_operations, rev_id))
} }
} }

View File

@ -1,5 +1,7 @@
use crate::entities::DatabaseViewLayout; use crate::entities::DatabaseViewLayout;
use crate::services::grid_editor::{DatabaseRevisionEditor, GridRevisionMergeable}; use crate::services::grid_editor::{
DatabaseRevisionEditor, GridRevisionCloudService, GridRevisionMergeable, GridRevisionSerde,
};
use crate::services::persistence::block_index::BlockIndexCache; use crate::services::persistence::block_index::BlockIndexCache;
use crate::services::persistence::kv::DatabaseKVPersistence; use crate::services::persistence::kv::DatabaseKVPersistence;
use crate::services::persistence::migration::DatabaseMigration; use crate::services::persistence::migration::DatabaseMigration;
@ -144,10 +146,16 @@ impl DatabaseManager {
pool: Arc<ConnectionPool>, pool: Arc<ConnectionPool>,
) -> Result<Arc<DatabaseRevisionEditor>, FlowyError> { ) -> Result<Arc<DatabaseRevisionEditor>, FlowyError> {
let user = self.database_user.clone(); let user = self.database_user.clone();
let rev_manager = self.make_database_rev_manager(database_id, pool.clone())?; let token = user.token()?;
let cloud = Arc::new(GridRevisionCloudService::new(token));
let mut rev_manager = self.make_database_rev_manager(database_id, pool.clone())?;
let database_pad = Arc::new(RwLock::new(
rev_manager.initialize::<GridRevisionSerde>(Some(cloud)).await?,
));
let database_editor = DatabaseRevisionEditor::new( let database_editor = DatabaseRevisionEditor::new(
database_id, database_id,
user, user,
database_pad,
rev_manager, rev_manager,
self.block_index_cache.clone(), self.block_index_cache.clone(),
self.task_scheduler.clone(), self.task_scheduler.clone(),

View File

@ -199,7 +199,7 @@ impl RevisionObjectDeserializer for DatabaseBlockRevisionSerde {
Ok(pad) Ok(pad)
} }
fn recover_operations_from_revisions(_revisions: Vec<Revision>) -> Option<Self::Output> { fn recover_from_revisions(_revisions: Vec<Revision>) -> Option<(Self::Output, i64)> {
None None
} }
} }

View File

@ -57,15 +57,12 @@ impl DatabaseRevisionEditor {
pub async fn new( pub async fn new(
database_id: &str, database_id: &str,
user: Arc<dyn DatabaseUser>, user: Arc<dyn DatabaseUser>,
mut rev_manager: RevisionManager<Arc<ConnectionPool>>, database_pad: Arc<RwLock<DatabaseRevisionPad>>,
rev_manager: RevisionManager<Arc<ConnectionPool>>,
persistence: Arc<BlockIndexCache>, persistence: Arc<BlockIndexCache>,
task_scheduler: Arc<RwLock<TaskDispatcher>>, task_scheduler: Arc<RwLock<TaskDispatcher>>,
) -> FlowyResult<Arc<Self>> { ) -> FlowyResult<Arc<Self>> {
let token = user.token()?;
let cloud = Arc::new(GridRevisionCloudService { token });
let database_pad = rev_manager.initialize::<GridRevisionSerde>(Some(cloud)).await?;
let rev_manager = Arc::new(rev_manager); let rev_manager = Arc::new(rev_manager);
let database_pad = Arc::new(RwLock::new(database_pad));
let cell_data_cache = AnyTypeCache::<u64>::new(); let cell_data_cache = AnyTypeCache::<u64>::new();
// Block manager // Block manager
@ -891,7 +888,7 @@ impl RevisionObjectDeserializer for GridRevisionSerde {
Ok(pad) Ok(pad)
} }
fn recover_operations_from_revisions(_revisions: Vec<Revision>) -> Option<Self::Output> { fn recover_from_revisions(_revisions: Vec<Revision>) -> Option<(Self::Output, i64)> {
None None
} }
} }
@ -901,11 +898,18 @@ impl RevisionObjectSerializer for GridRevisionSerde {
Ok(operations.json_bytes()) Ok(operations.json_bytes())
} }
} }
struct GridRevisionCloudService {
pub struct GridRevisionCloudService {
#[allow(dead_code)] #[allow(dead_code)]
token: String, token: String,
} }
impl GridRevisionCloudService {
pub fn new(token: String) -> Self {
Self { token }
}
}
impl RevisionCloudService for GridRevisionCloudService { impl RevisionCloudService for GridRevisionCloudService {
#[tracing::instrument(level = "trace", skip(self))] #[tracing::instrument(level = "trace", skip(self))]
fn fetch_object(&self, _user_id: &str, _object_id: &str) -> FutureResult<Vec<Revision>, FlowyError> { fn fetch_object(&self, _user_id: &str, _object_id: &str) -> FutureResult<Vec<Revision>, FlowyError> {

View File

@ -42,7 +42,7 @@ impl RevisionObjectDeserializer for GridViewRevisionSerde {
Ok(pad) Ok(pad)
} }
fn recover_operations_from_revisions(_revisions: Vec<Revision>) -> Option<Self::Output> { fn recover_from_revisions(_revisions: Vec<Revision>) -> Option<(Self::Output, i64)> {
None None
} }
} }

View File

@ -87,7 +87,7 @@ impl RevisionObjectDeserializer for DocumentRevisionSerde {
Result::<Document, FlowyError>::Ok(document) Result::<Document, FlowyError>::Ok(document)
} }
fn recover_operations_from_revisions(_revisions: Vec<Revision>) -> Option<Self::Output> { fn recover_from_revisions(_revisions: Vec<Revision>) -> Option<(Self::Output, i64)> {
None None
} }
} }

View File

@ -263,7 +263,7 @@ impl RevisionObjectDeserializer for DeltaDocumentRevisionSerde {
}) })
} }
fn recover_operations_from_revisions(_revisions: Vec<Revision>) -> Option<Self::Output> { fn recover_from_revisions(_revisions: Vec<Revision>) -> Option<(Self::Output, i64)> {
None None
} }
} }

View File

@ -107,12 +107,12 @@ impl RevisionObjectDeserializer for FolderRevisionSerde {
Ok(FolderPad::from_operations(operations)?) Ok(FolderPad::from_operations(operations)?)
} }
fn recover_operations_from_revisions(revisions: Vec<Revision>) -> Option<Self::Output> { fn recover_from_revisions(revisions: Vec<Revision>) -> Option<(Self::Output, i64)> {
if let Some(operations) = recover_operation_from_revisions(revisions, |operations| { if let Some((operations, rev_id)) = recover_operation_from_revisions(revisions, |operations| {
FolderPad::from_operations(operations.clone()).is_ok() FolderPad::from_operations(operations.clone()).is_ok()
}) { }) {
if let Ok(pad) = FolderPad::from_operations(operations) { if let Ok(pad) = FolderPad::from_operations(operations) {
return Some(pad); return Some((pad, rev_id));
} }
} }
None None

View File

@ -35,7 +35,7 @@ pub trait RevisionObjectDeserializer: Send + Sync {
/// ///
fn deserialize_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output>; fn deserialize_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output>;
fn recover_operations_from_revisions(revisions: Vec<Revision>) -> Option<Self::Output>; fn recover_from_revisions(revisions: Vec<Revision>) -> Option<(Self::Output, i64)>;
} }
pub trait RevisionObjectSerializer: Send + Sync { pub trait RevisionObjectSerializer: Send + Sync {
@ -58,10 +58,9 @@ pub trait RevisionMergeable: Send + Sync {
return Ok(revisions.pop().unwrap()); return Ok(revisions.pop().unwrap());
} }
let first_revision = revisions.first().unwrap(); // Select the last version, making sure version numbers don't overlap
let last_revision = revisions.last().unwrap(); let last_revision = revisions.last().unwrap();
let (base_rev_id, rev_id) = last_revision.pair_rev_id();
let (base_rev_id, rev_id) = first_revision.pair_rev_id();
let md5 = last_revision.md5.clone(); let md5 = last_revision.md5.clone();
let bytes = self.combine_revisions(revisions)?; let bytes = self.combine_revisions(revisions)?;
Ok(Revision::new(object_id, base_rev_id, rev_id, bytes, md5)) Ok(Revision::new(object_id, base_rev_id, rev_id, bytes, md5))
@ -137,17 +136,33 @@ impl<Connection: 'static> RevisionManager<Connection> {
tracing::Span::current().record("deserializer", std::any::type_name::<De>()); tracing::Span::current().record("deserializer", std::any::type_name::<De>());
let revisions: Vec<Revision> = revision_records.iter().map(|record| record.revision.clone()).collect(); let revisions: Vec<Revision> = revision_records.iter().map(|record| record.revision.clone()).collect();
tracing::Span::current().record("deserialize_revisions", revisions.len()); tracing::Span::current().record("deserialize_revisions", revisions.len());
let current_rev_id = revisions.last().as_ref().map(|revision| revision.rev_id).unwrap_or(0); let last_rev_id = revisions.last().as_ref().map(|revision| revision.rev_id).unwrap_or(0);
match De::deserialize_revisions(&self.object_id, revisions.clone()) { match De::deserialize_revisions(&self.object_id, revisions.clone()) {
Ok(object) => { Ok(object) => {
self.rev_persistence.sync_revision_records(&revision_records).await?; self.rev_persistence.sync_revision_records(&revision_records).await?;
self.rev_id_counter.set(current_rev_id); self.rev_id_counter.set(last_rev_id);
Ok(object) Ok(object)
} }
Err(e) => match self.rev_snapshot.restore_from_snapshot::<De>(current_rev_id) { Err(e) => match self.rev_snapshot.restore_from_snapshot::<De>(last_rev_id) {
None => { None => {
tracing::info!("Restore object from validation revisions"); tracing::info!("[Restore] iterate restore from each revision");
De::recover_operations_from_revisions(revisions).ok_or(e) let (output, recover_rev_id) = De::recover_from_revisions(revisions).ok_or(e)?;
tracing::info!(
"[Restore] last_rev_id:{}, recover_rev_id: {}",
last_rev_id,
recover_rev_id
);
self.rev_id_counter.set(recover_rev_id);
// delete the revisions whose rev_id is greater than recover_rev_id
if recover_rev_id < last_rev_id {
let range = RevisionRange {
start: recover_rev_id + 1,
end: last_rev_id,
};
tracing::info!("[Restore] delete revisions in range: {}", range);
let _ = self.rev_persistence.delete_revisions_from_range(range);
}
Ok(output)
} }
Some((object, snapshot_rev)) => { Some((object, snapshot_rev)) => {
let snapshot_rev_id = snapshot_rev.rev_id; let snapshot_rev_id = snapshot_rev.rev_id;
@ -162,7 +177,7 @@ impl<Connection: 'static> RevisionManager<Connection> {
} }
pub async fn close(&self) { pub async fn close(&self) {
let _ = self.rev_persistence.compact_lagging_revisions(&self.rev_compress).await; let _ = self.rev_persistence.merge_lagging_revisions(&self.rev_compress).await;
} }
pub async fn generate_snapshot(&self) { pub async fn generate_snapshot(&self) {

View File

@ -14,26 +14,26 @@ pub const REVISION_WRITE_INTERVAL_IN_MILLIS: u64 = 600;
#[derive(Clone)] #[derive(Clone)]
pub struct RevisionPersistenceConfiguration { pub struct RevisionPersistenceConfiguration {
// If the number of revisions that didn't sync to the server greater than the merge_threshold // If the number of revisions that didn't sync to the server greater than the max_merge_len
// then these revisions will be merged into one revision. // then these revisions will be merged into one revision.
merge_threshold: usize, max_merge_len: usize,
/// Indicates that the revisions that didn't sync to the server can be merged into one when /// Indicates that the revisions that didn't sync to the server can be merged into one when
/// `compact_lagging_revisions` get called. /// `merge_lagging_revisions` get called.
merge_lagging: bool, merge_lagging: bool,
} }
impl RevisionPersistenceConfiguration { impl RevisionPersistenceConfiguration {
pub fn new(merge_threshold: usize, merge_lagging: bool) -> Self { pub fn new(merge_max_length: usize, merge_lagging: bool) -> Self {
debug_assert!(merge_threshold > 1); debug_assert!(merge_max_length > 1);
if merge_threshold > 1 { if merge_max_length > 1 {
Self { Self {
merge_threshold, max_merge_len: merge_max_length,
merge_lagging, merge_lagging,
} }
} else { } else {
Self { Self {
merge_threshold: 100, max_merge_len: 100,
merge_lagging, merge_lagging,
} }
} }
@ -43,7 +43,7 @@ impl RevisionPersistenceConfiguration {
impl std::default::Default for RevisionPersistenceConfiguration { impl std::default::Default for RevisionPersistenceConfiguration {
fn default() -> Self { fn default() -> Self {
Self { Self {
merge_threshold: 100, max_merge_len: 100,
merge_lagging: false, merge_lagging: false,
} }
} }
@ -106,7 +106,7 @@ where
} }
#[tracing::instrument(level = "trace", skip_all, err)] #[tracing::instrument(level = "trace", skip_all, err)]
pub async fn compact_lagging_revisions<'a>( pub async fn merge_lagging_revisions<'a>(
&'a self, &'a self,
rev_compress: &Arc<dyn RevisionMergeable + 'a>, rev_compress: &Arc<dyn RevisionMergeable + 'a>,
) -> FlowyResult<()> { ) -> FlowyResult<()> {
@ -115,7 +115,7 @@ where
} }
let mut sync_seq = self.sync_seq.write().await; let mut sync_seq = self.sync_seq.write().await;
let compact_seq = sync_seq.compact(); let compact_seq = sync_seq.pop_merge_rev_ids();
if !compact_seq.is_empty() { if !compact_seq.is_empty() {
let range = RevisionRange { let range = RevisionRange {
start: *compact_seq.front().unwrap(), start: *compact_seq.front().unwrap(),
@ -164,18 +164,18 @@ where
) -> FlowyResult<i64> { ) -> FlowyResult<i64> {
let mut sync_seq = self.sync_seq.write().await; let mut sync_seq = self.sync_seq.write().await;
// Before the new_revision is pushed into the sync_seq, we check if the current `compact_length` of the // Before the new_revision is pushed into the sync_seq, we check if the current `merge_length` of the
// sync_seq is less equal to or greater than the merge threshold. If yes, it's needs to merged // sync_seq is less equal to or greater than the `merge_max_length`. If yes, it's needs to merged
// with the new_revision into one revision. // with the new_revision into one revision.
let mut compact_seq = VecDeque::default(); let mut merge_rev_ids = VecDeque::default();
// tracing::info!("{}", compact_seq) // tracing::info!("{}", compact_seq)
if sync_seq.compact_length >= self.configuration.merge_threshold - 1 { if sync_seq.merge_length >= self.configuration.max_merge_len - 1 {
compact_seq.extend(sync_seq.compact()); merge_rev_ids.extend(sync_seq.pop_merge_rev_ids());
} }
if !compact_seq.is_empty() { if !merge_rev_ids.is_empty() {
let range = RevisionRange { let range = RevisionRange {
start: *compact_seq.front().unwrap(), start: *merge_rev_ids.front().unwrap(),
end: *compact_seq.back().unwrap(), end: *merge_rev_ids.back().unwrap(),
}; };
tracing::Span::current().record("compact_range", format!("{}", range).as_str()); tracing::Span::current().record("compact_range", format!("{}", range).as_str());
@ -186,13 +186,13 @@ where
// compact multiple revisions into one // compact multiple revisions into one
let merged_revision = rev_compress.merge_revisions(&self.user_id, &self.object_id, revisions)?; let merged_revision = rev_compress.merge_revisions(&self.user_id, &self.object_id, revisions)?;
let rev_id = merged_revision.rev_id; let new_rev_id = merged_revision.rev_id;
tracing::Span::current().record("rev_id", merged_revision.rev_id); tracing::Span::current().record("rev_id", merged_revision.rev_id);
sync_seq.recv(merged_revision.rev_id)?; sync_seq.recv(new_rev_id)?;
// replace the revisions in range with compact revision // replace the revisions in range with compact revision
self.compact(&range, merged_revision).await?; self.compact(&range, merged_revision).await?;
Ok(rev_id) Ok(new_rev_id)
} else { } else {
let rev_id = new_revision.rev_id; let rev_id = new_revision.rev_id;
tracing::Span::current().record("rev_id", rev_id); tracing::Span::current().record("rev_id", rev_id);
@ -333,7 +333,6 @@ where
.collect::<Vec<Revision>>()) .collect::<Vec<Revision>>())
} }
#[allow(dead_code)]
pub fn delete_revisions_from_range(&self, range: RevisionRange) -> FlowyResult<()> { pub fn delete_revisions_from_range(&self, range: RevisionRange) -> FlowyResult<()> {
self.disk_cache self.disk_cache
.delete_revision_records(&self.object_id, Some(range.to_rev_ids()))?; .delete_revision_records(&self.object_id, Some(range.to_rev_ids()))?;
@ -370,8 +369,8 @@ impl<C> RevisionMemoryCacheDelegate for Arc<dyn RevisionDiskCache<C, Error = Flo
#[derive(Default)] #[derive(Default)]
struct DeferSyncSequence { struct DeferSyncSequence {
rev_ids: VecDeque<i64>, rev_ids: VecDeque<i64>,
compact_index: Option<usize>, merge_start: Option<usize>,
compact_length: usize, merge_length: usize,
} }
impl DeferSyncSequence { impl DeferSyncSequence {
@ -381,14 +380,12 @@ impl DeferSyncSequence {
/// Pushes the new_rev_id to the end of the list and marks this new_rev_id is mergeable. /// Pushes the new_rev_id to the end of the list and marks this new_rev_id is mergeable.
/// ///
/// When calling `compact` method, it will return a list of revision ids started from
/// the `compact_start_pos`, and ends with the `compact_length`.
fn merge_recv(&mut self, new_rev_id: i64) -> FlowyResult<()> { fn merge_recv(&mut self, new_rev_id: i64) -> FlowyResult<()> {
self.recv(new_rev_id)?; self.recv(new_rev_id)?;
self.compact_length += 1; self.merge_length += 1;
if self.compact_index.is_none() && !self.rev_ids.is_empty() { if self.merge_start.is_none() && !self.rev_ids.is_empty() {
self.compact_index = Some(self.rev_ids.len() - 1); self.merge_start = Some(self.rev_ids.len() - 1);
} }
Ok(()) Ok(())
} }
@ -419,14 +416,14 @@ impl DeferSyncSequence {
} }
let mut compact_rev_id = None; let mut compact_rev_id = None;
if let Some(compact_index) = self.compact_index { if let Some(compact_index) = self.merge_start {
compact_rev_id = self.rev_ids.get(compact_index).cloned(); compact_rev_id = self.rev_ids.get(compact_index).cloned();
} }
let pop_rev_id = self.rev_ids.pop_front(); let pop_rev_id = self.rev_ids.pop_front();
if let (Some(compact_rev_id), Some(pop_rev_id)) = (compact_rev_id, pop_rev_id) { if let (Some(compact_rev_id), Some(pop_rev_id)) = (compact_rev_id, pop_rev_id) {
if compact_rev_id <= pop_rev_id && self.compact_length > 0 { if compact_rev_id <= pop_rev_id && self.merge_length > 0 {
self.compact_length -= 1; self.merge_length -= 1;
} }
} }
} }
@ -438,22 +435,22 @@ impl DeferSyncSequence {
} }
fn clear(&mut self) { fn clear(&mut self) {
self.compact_index = None; self.merge_start = None;
self.compact_length = 0; self.merge_length = 0;
self.rev_ids.clear(); self.rev_ids.clear();
} }
// Compact the rev_ids into one except the current synchronizing rev_id. // Returns the rev_ids into one except the current synchronizing rev_id.
fn compact(&mut self) -> VecDeque<i64> { fn pop_merge_rev_ids(&mut self) -> VecDeque<i64> {
let mut compact_seq = VecDeque::with_capacity(self.rev_ids.len()); let mut compact_seq = VecDeque::with_capacity(self.rev_ids.len());
if let Some(start) = self.compact_index { if let Some(start) = self.merge_start {
if start < self.rev_ids.len() { if start < self.rev_ids.len() {
let seq = self.rev_ids.split_off(start); let seq = self.rev_ids.split_off(start);
compact_seq.extend(seq); compact_seq.extend(seq);
} }
} }
self.compact_index = None; self.merge_start = None;
self.compact_length = 0; self.merge_length = 0;
compact_seq compact_seq
} }
} }

View File

@ -63,7 +63,7 @@ where
.for_each(|command| async { .for_each(|command| async {
match self.handle_command(command).await { match self.handle_command(command).await {
Ok(_) => {} Ok(_) => {}
Err(e) => tracing::debug!("[RevQueue]: {}", e), Err(e) => tracing::error!("[RevQueue]: {}", e),
} }
}) })
.await; .await;

View File

@ -19,8 +19,10 @@ pub trait RevisionSnapshotPersistence: Send + Sync {
fn read_snapshot(&self, rev_id: i64) -> FlowyResult<Option<RevisionSnapshotData>>; fn read_snapshot(&self, rev_id: i64) -> FlowyResult<Option<RevisionSnapshotData>>;
fn read_last_snapshot(&self) -> FlowyResult<Option<RevisionSnapshotData>>; fn read_last_snapshot(&self) -> FlowyResult<Option<RevisionSnapshotData>>;
}
// fn generate_snapshot_data(&self) -> Option<RevisionSnapshotData>; pub trait RevisionSnapshotDataGenerator: Send + Sync {
fn generate_snapshot_data(&self) -> Option<RevisionSnapshotData>;
} }
const AUTO_GEN_SNAPSHOT_PER_10_REVISION: i64 = 10; const AUTO_GEN_SNAPSHOT_PER_10_REVISION: i64 = 10;
@ -29,6 +31,7 @@ pub struct RevisionSnapshotController<Connection> {
user_id: String, user_id: String,
object_id: String, object_id: String,
rev_snapshot_persistence: Arc<dyn RevisionSnapshotPersistence>, rev_snapshot_persistence: Arc<dyn RevisionSnapshotPersistence>,
rev_snapshot_data: Option<Arc<dyn RevisionSnapshotDataGenerator>>,
rev_id_counter: Arc<RevIdCounter>, rev_id_counter: Arc<RevIdCounter>,
rev_persistence: Arc<RevisionPersistence<Connection>>, rev_persistence: Arc<RevisionPersistence<Connection>>,
rev_compress: Arc<dyn RevisionMergeable>, rev_compress: Arc<dyn RevisionMergeable>,
@ -57,11 +60,16 @@ where
rev_snapshot_persistence, rev_snapshot_persistence,
rev_id_counter, rev_id_counter,
start_rev_id: AtomicI64::new(0), start_rev_id: AtomicI64::new(0),
rev_snapshot_data: None,
rev_persistence: revision_persistence, rev_persistence: revision_persistence,
rev_compress: revision_compress, rev_compress: revision_compress,
} }
} }
pub async fn set_snapshot_data_generator(&mut self, generator: Arc<dyn RevisionSnapshotDataGenerator>) {
self.rev_snapshot_data = Some(generator);
}
pub async fn generate_snapshot(&self) { pub async fn generate_snapshot(&self) {
if let Some((rev_id, bytes)) = self.generate_snapshot_data() { if let Some((rev_id, bytes)) = self.generate_snapshot_data() {
if let Err(e) = self.rev_snapshot_persistence.write_snapshot(rev_id, bytes.to_vec()) { if let Err(e) = self.rev_snapshot_persistence.write_snapshot(rev_id, bytes.to_vec()) {
@ -76,7 +84,7 @@ where
where where
B: RevisionObjectDeserializer, B: RevisionObjectDeserializer,
{ {
tracing::info!("Try to find if {} has snapshot", self.object_id); tracing::info!("[Restore] Try to find if {} has snapshot", self.object_id);
let snapshot = self.rev_snapshot_persistence.read_last_snapshot().ok()??; let snapshot = self.rev_snapshot_persistence.read_last_snapshot().ok()??;
let snapshot_rev_id = snapshot.rev_id; let snapshot_rev_id = snapshot.rev_id;
let revision = Revision::new( let revision = Revision::new(
@ -87,13 +95,13 @@ where
"".to_owned(), "".to_owned(),
); );
tracing::info!( tracing::info!(
"Try to restore from snapshot: {}, {}", "[Restore] Try to restore from snapshot: {}, {}",
snapshot.base_rev_id, snapshot.base_rev_id,
snapshot.rev_id snapshot.rev_id
); );
let object = B::deserialize_revisions(&self.object_id, vec![revision.clone()]).ok()?; let object = B::deserialize_revisions(&self.object_id, vec![revision.clone()]).ok()?;
tracing::info!( tracing::info!(
"Restore {} from snapshot with rev_id: {}", "[Restore] Restore {} from snapshot with rev_id: {}",
self.object_id, self.object_id,
snapshot_rev_id snapshot_rev_id
); );

View File

@ -28,11 +28,11 @@ async fn revision_compress_2_revisions_with_2_threshold_test() {
.await; .await;
test.run_scripts(vec![ test.run_scripts(vec![
AssertNextSyncRevisionId { rev_id: Some(1) }, AssertNextSyncRevisionId { rev_id: Some(2) },
AssertNextSyncRevisionContent { AssertNextSyncRevisionContent {
expected: "123456".to_string(), expected: "123456".to_string(),
}, },
AckRevision { rev_id: 1 }, AckRevision { rev_id: 2 },
AssertNextSyncRevisionId { rev_id: None }, AssertNextSyncRevisionId { rev_id: None },
]) ])
.await; .await;
@ -41,13 +41,11 @@ async fn revision_compress_2_revisions_with_2_threshold_test() {
#[tokio::test] #[tokio::test]
async fn revision_compress_4_revisions_with_threshold_2_test() { async fn revision_compress_4_revisions_with_threshold_2_test() {
let test = RevisionTest::new_with_configuration(2).await; let test = RevisionTest::new_with_configuration(2).await;
let rev_id_1 = 1;
test.run_script(AddLocalRevision { test.run_script(AddLocalRevision {
content: "1".to_string(), content: "1".to_string(),
}) })
.await; .await;
let rev_id_2 = 2;
test.run_script(AddLocalRevision { test.run_script(AddLocalRevision {
content: "2".to_string(), content: "2".to_string(),
}) })
@ -63,15 +61,14 @@ async fn revision_compress_4_revisions_with_threshold_2_test() {
}) })
.await; .await;
// rev_id_2,rev_id_3,rev_id4 will be merged into rev_id_1
test.run_scripts(vec![ test.run_scripts(vec![
AssertNumberOfSyncRevisions { num: 2 }, AssertNumberOfSyncRevisions { num: 2 },
AssertNextSyncRevisionId { rev_id: Some(rev_id_1) }, AssertNextSyncRevisionId { rev_id: Some(2) },
AssertNextSyncRevisionContent { AssertNextSyncRevisionContent {
expected: "12".to_string(), expected: "12".to_string(),
}, },
AckRevision { rev_id: rev_id_1 }, AckRevision { rev_id: 2 },
AssertNextSyncRevisionId { rev_id: Some(rev_id_2) }, AssertNextSyncRevisionId { rev_id: Some(4) },
AssertNextSyncRevisionContent { AssertNextSyncRevisionContent {
expected: "34".to_string(), expected: "34".to_string(),
}, },
@ -81,8 +78,8 @@ async fn revision_compress_4_revisions_with_threshold_2_test() {
#[tokio::test] #[tokio::test]
async fn revision_compress_8_revisions_with_threshold_4_test() { async fn revision_compress_8_revisions_with_threshold_4_test() {
let test = RevisionTest::new_with_configuration(4).await; let merge_len = 4;
let rev_id_1 = 1; let test = RevisionTest::new_with_configuration(merge_len).await;
test.run_script(AddLocalRevision { test.run_script(AddLocalRevision {
content: "1".to_string(), content: "1".to_string(),
}) })
@ -103,7 +100,6 @@ async fn revision_compress_8_revisions_with_threshold_4_test() {
}) })
.await; .await;
let rev_id_a = 2;
test.run_script(AddLocalRevision { test.run_script(AddLocalRevision {
content: "a".to_string(), content: "a".to_string(),
}) })
@ -126,16 +122,20 @@ async fn revision_compress_8_revisions_with_threshold_4_test() {
test.run_scripts(vec![ test.run_scripts(vec![
AssertNumberOfSyncRevisions { num: 2 }, AssertNumberOfSyncRevisions { num: 2 },
AssertNextSyncRevisionId { rev_id: Some(rev_id_1) }, AssertNextSyncRevisionId {
rev_id: Some(merge_len),
},
AssertNextSyncRevisionContent { AssertNextSyncRevisionContent {
expected: "1234".to_string(), expected: "1234".to_string(),
}, },
AckRevision { rev_id: rev_id_1 }, AckRevision { rev_id: merge_len },
AssertNextSyncRevisionId { rev_id: Some(rev_id_a) }, AssertNextSyncRevisionId {
rev_id: Some(merge_len * 2),
},
AssertNextSyncRevisionContent { AssertNextSyncRevisionContent {
expected: "abcd".to_string(), expected: "abcd".to_string(),
}, },
AckRevision { rev_id: rev_id_a }, AckRevision { rev_id: merge_len * 2 },
AssertNextSyncRevisionId { rev_id: None }, AssertNextSyncRevisionId { rev_id: None },
]) ])
.await; .await;
@ -143,7 +143,8 @@ async fn revision_compress_8_revisions_with_threshold_4_test() {
#[tokio::test] #[tokio::test]
async fn revision_merge_per_5_revision_test() { async fn revision_merge_per_5_revision_test() {
let test = RevisionTest::new_with_configuration(5).await; let merge_len = 5;
let test = RevisionTest::new_with_configuration(merge_len).await;
for i in 0..20 { for i in 0..20 {
let content = format!("{}", i); let content = format!("{}", i);
test.run_script(AddLocalRevision { content }).await; test.run_script(AddLocalRevision { content }).await;
@ -154,19 +155,19 @@ async fn revision_merge_per_5_revision_test() {
AssertNextSyncRevisionContent { AssertNextSyncRevisionContent {
expected: "01234".to_string(), expected: "01234".to_string(),
}, },
AckRevision { rev_id: 1 }, AckRevision { rev_id: merge_len },
AssertNextSyncRevisionContent { AssertNextSyncRevisionContent {
expected: "56789".to_string(), expected: "56789".to_string(),
}, },
AckRevision { rev_id: 2 }, AckRevision { rev_id: merge_len * 2 },
AssertNextSyncRevisionContent { AssertNextSyncRevisionContent {
expected: "1011121314".to_string(), expected: "1011121314".to_string(),
}, },
AckRevision { rev_id: 3 }, AckRevision { rev_id: merge_len * 3 },
AssertNextSyncRevisionContent { AssertNextSyncRevisionContent {
expected: "1516171819".to_string(), expected: "1516171819".to_string(),
}, },
AckRevision { rev_id: 4 }, AckRevision { rev_id: merge_len * 4 },
AssertNextSyncRevisionId { rev_id: None }, AssertNextSyncRevisionId { rev_id: None },
]) ])
.await; .await;

View File

@ -39,10 +39,10 @@ impl RevisionTest {
Self::new_with_configuration(2).await Self::new_with_configuration(2).await
} }
pub async fn new_with_configuration(merge_threshold: i64) -> Self { pub async fn new_with_configuration(max_merge_len: i64) -> Self {
let user_id = nanoid!(10); let user_id = nanoid!(10);
let object_id = nanoid!(6); let object_id = nanoid!(6);
let configuration = RevisionPersistenceConfiguration::new(merge_threshold as usize, false); let configuration = RevisionPersistenceConfiguration::new(max_merge_len as usize, false);
let disk_cache = RevisionDiskCacheMock::new(vec![]); let disk_cache = RevisionDiskCacheMock::new(vec![]);
let persistence = RevisionPersistence::new(&user_id, &object_id, disk_cache, configuration.clone()); let persistence = RevisionPersistence::new(&user_id, &object_id, disk_cache, configuration.clone());
let compress = RevisionMergeableMock {}; let compress = RevisionMergeableMock {};
@ -334,7 +334,7 @@ impl RevisionObjectDeserializer for RevisionObjectMockSerde {
Ok(object) Ok(object)
} }
fn recover_operations_from_revisions(_revisions: Vec<Revision>) -> Option<Self::Output> { fn recover_from_revisions(_revisions: Vec<Revision>) -> Option<(Self::Output, i64)> {
None None
} }
} }