chore: merge with config setting

This commit is contained in:
nathan 2022-11-06 09:59:53 +08:00
parent 2c71e4f885
commit ff7aab73cc
29 changed files with 317 additions and 176 deletions

View File

@ -1,6 +1,6 @@
use bytes::Bytes;
use flowy_error::{FlowyError, FlowyResult};
use flowy_revision::{RevisionCompress, RevisionObjectDeserializer, RevisionObjectSerializer};
use flowy_revision::{RevisionMergeable, RevisionObjectDeserializer, RevisionObjectSerializer};
use flowy_sync::entities::revision::Revision;
use lib_ot::core::{Extension, NodeDataBuilder, NodeOperation, NodeTree, NodeTreeContext, Selection, Transaction};
use lib_ot::text_delta::DeltaTextOperationBuilder;
@ -96,7 +96,7 @@ impl RevisionObjectSerializer for DocumentRevisionSerde {
}
pub(crate) struct DocumentRevisionCompress();
impl RevisionCompress for DocumentRevisionCompress {
impl RevisionMergeable for DocumentRevisionCompress {
fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
DocumentRevisionSerde::combine_revisions(revisions)
}

View File

@ -9,7 +9,8 @@ use dashmap::DashMap;
use flowy_database::ConnectionPool;
use flowy_error::FlowyResult;
use flowy_revision::{
RevisionCloudService, RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence,
RevisionCloudService, RevisionManager, RevisionPersistence, RevisionPersistenceConfiguration, RevisionWebSocket,
SQLiteRevisionSnapshotPersistence,
};
use flowy_sync::client_document::initial_delta_document_content;
use flowy_sync::entities::{document::DocumentIdPB, revision::Revision, ws_data::ServerRevisionWSData};
@ -246,7 +247,8 @@ impl DocumentManager {
) -> Result<RevisionManager<Arc<ConnectionPool>>, FlowyError> {
let user_id = self.user.user_id()?;
let disk_cache = SQLiteDocumentRevisionPersistence::new(&user_id, pool.clone());
let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache);
let configuration = RevisionPersistenceConfiguration::default();
let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache, configuration);
// let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone());
let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(doc_id, pool);
Ok(RevisionManager::new(
@ -266,7 +268,8 @@ impl DocumentManager {
) -> Result<RevisionManager<Arc<ConnectionPool>>, FlowyError> {
let user_id = self.user.user_id()?;
let disk_cache = SQLiteDeltaDocumentRevisionPersistence::new(&user_id, pool.clone());
let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache);
let configuration = RevisionPersistenceConfiguration::default();
let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache, configuration);
// let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone());
let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(doc_id, pool);
Ok(RevisionManager::new(

View File

@ -6,7 +6,7 @@ use bytes::Bytes;
use flowy_database::ConnectionPool;
use flowy_error::{internal_error, FlowyResult};
use flowy_revision::{
RevisionCloudService, RevisionCompress, RevisionManager, RevisionObjectDeserializer, RevisionObjectSerializer,
RevisionCloudService, RevisionManager, RevisionMergeable, RevisionObjectDeserializer, RevisionObjectSerializer,
RevisionWebSocket,
};
use flowy_sync::entities::ws_data::ServerRevisionWSData;
@ -270,7 +270,7 @@ impl RevisionObjectSerializer for DeltaDocumentRevisionSerde {
}
pub(crate) struct DeltaDocumentRevisionCompress();
impl RevisionCompress for DeltaDocumentRevisionCompress {
impl RevisionMergeable for DeltaDocumentRevisionCompress {
fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
DeltaDocumentRevisionSerde::combine_revisions(revisions)
}

View File

@ -23,6 +23,7 @@ use tokio::sync::{oneshot, RwLock};
// serial.
pub(crate) struct EditDocumentQueue {
document: Arc<RwLock<ClientDocument>>,
#[allow(dead_code)]
user: Arc<dyn DocumentUser>,
rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
receiver: Option<EditorCommandReceiver>,

View File

@ -244,7 +244,7 @@ impl std::default::Default for TextRevisionState {
}
}
fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> SyncRecord {
fn mk_revision_record_from_table(_user_id: &str, table: RevisionTable) -> SyncRecord {
let md5 = md5(&table.data);
let revision = Revision::new(
&table.doc_id,

View File

@ -220,7 +220,7 @@ impl std::default::Default for DocumentRevisionState {
}
}
fn mk_revision_record_from_table(user_id: &str, table: DocumentRevisionTable) -> SyncRecord {
fn mk_revision_record_from_table(_user_id: &str, table: DocumentRevisionTable) -> SyncRecord {
let md5 = md5(&table.data);
let revision = Revision::new(
&table.document_id,

View File

@ -15,7 +15,10 @@ use bytes::Bytes;
use flowy_document::editor::initial_read_me;
use flowy_error::FlowyError;
use flowy_folder_data_model::user_default;
use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence};
use flowy_revision::{
RevisionManager, RevisionPersistence, RevisionPersistenceConfiguration, RevisionWebSocket,
SQLiteRevisionSnapshotPersistence,
};
use flowy_sync::{client_folder::FolderPad, entities::ws_data::ServerRevisionWSData};
use lazy_static::lazy_static;
use lib_infra::future::FutureResult;
@ -165,7 +168,8 @@ impl FolderManager {
let pool = self.persistence.db_pool()?;
let object_id = folder_id.as_ref();
let disk_cache = SQLiteFolderRevisionPersistence::new(user_id, pool.clone());
let rev_persistence = RevisionPersistence::new(user_id, object_id, disk_cache);
let configuration = RevisionPersistenceConfiguration::new(50);
let rev_persistence = RevisionPersistence::new(user_id, object_id, disk_cache, configuration);
let rev_compactor = FolderRevisionCompress();
// let history_persistence = SQLiteRevisionHistoryPersistence::new(object_id, pool.clone());
let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(object_id, pool);

View File

@ -2,7 +2,7 @@ use crate::manager::FolderId;
use bytes::Bytes;
use flowy_error::{FlowyError, FlowyResult};
use flowy_revision::{
RevisionCloudService, RevisionCompress, RevisionManager, RevisionObjectDeserializer, RevisionObjectSerializer,
RevisionCloudService, RevisionManager, RevisionMergeable, RevisionObjectDeserializer, RevisionObjectSerializer,
RevisionWebSocket,
};
use flowy_sync::util::make_operations_from_revisions;
@ -18,9 +18,8 @@ use parking_lot::RwLock;
use std::sync::Arc;
pub struct FolderEditor {
user_id: String,
#[allow(dead_code)]
pub(crate) folder_id: FolderId,
user_id: String,
pub(crate) folder: Arc<RwLock<FolderPad>>,
rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
#[cfg(feature = "sync")]
@ -56,7 +55,6 @@ impl FolderEditor {
let folder_id = folder_id.to_owned();
Ok(Self {
user_id,
folder_id,
folder,
rev_manager,
#[cfg(feature = "sync")]
@ -113,7 +111,7 @@ impl RevisionObjectSerializer for FolderRevisionSerde {
}
pub struct FolderRevisionCompress();
impl RevisionCompress for FolderRevisionCompress {
impl RevisionMergeable for FolderRevisionCompress {
fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
FolderRevisionSerde::combine_revisions(revisions)
}

View File

@ -220,7 +220,7 @@ impl std::default::Default for TextRevisionState {
}
}
fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> SyncRecord {
fn mk_revision_record_from_table(_user_id: &str, table: RevisionTable) -> SyncRecord {
let md5 = md5(&table.data);
let revision = Revision::new(
&table.doc_id,

View File

@ -13,7 +13,10 @@ use dashmap::DashMap;
use flowy_database::ConnectionPool;
use flowy_error::{FlowyError, FlowyResult};
use flowy_grid_data_model::revision::{BuildGridContext, GridRevision, GridViewRevision};
use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence};
use flowy_revision::{
RevisionManager, RevisionPersistence, RevisionPersistenceConfiguration, RevisionWebSocket,
SQLiteRevisionSnapshotPersistence,
};
use flowy_sync::client_grid::{make_grid_block_operations, make_grid_operations, make_grid_view_operations};
use flowy_sync::entities::revision::Revision;
use std::sync::Arc;
@ -161,7 +164,8 @@ impl GridManager {
) -> FlowyResult<RevisionManager<Arc<ConnectionPool>>> {
let user_id = self.grid_user.user_id()?;
let disk_cache = SQLiteGridRevisionPersistence::new(&user_id, pool.clone());
let rev_persistence = RevisionPersistence::new(&user_id, grid_id, disk_cache);
let configuration = RevisionPersistenceConfiguration::default();
let rev_persistence = RevisionPersistence::new(&user_id, grid_id, disk_cache, configuration);
let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(grid_id, pool);
let rev_compactor = GridRevisionCompress();
let rev_manager = RevisionManager::new(&user_id, grid_id, rev_persistence, rev_compactor, snapshot_persistence);
@ -175,7 +179,8 @@ impl GridManager {
) -> FlowyResult<RevisionManager<Arc<ConnectionPool>>> {
let user_id = self.grid_user.user_id()?;
let disk_cache = SQLiteGridBlockRevisionPersistence::new(&user_id, pool.clone());
let rev_persistence = RevisionPersistence::new(&user_id, block_id, disk_cache);
let configuration = RevisionPersistenceConfiguration::default();
let rev_persistence = RevisionPersistence::new(&user_id, block_id, disk_cache, configuration);
let rev_compactor = GridBlockRevisionCompress();
let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(block_id, pool);
let rev_manager =
@ -185,7 +190,7 @@ impl GridManager {
}
pub async fn make_grid_view_data(
user_id: &str,
_user_id: &str,
view_id: &str,
layout: GridLayout,
grid_manager: Arc<GridManager>,

View File

@ -3,7 +3,7 @@ use bytes::Bytes;
use flowy_error::{FlowyError, FlowyResult};
use flowy_grid_data_model::revision::{CellRevision, GridBlockRevision, RowChangeset, RowRevision};
use flowy_revision::{
RevisionCloudService, RevisionCompress, RevisionManager, RevisionObjectDeserializer, RevisionObjectSerializer,
RevisionCloudService, RevisionManager, RevisionMergeable, RevisionObjectDeserializer, RevisionObjectSerializer,
};
use flowy_sync::client_grid::{GridBlockRevisionChangeset, GridBlockRevisionPad};
use flowy_sync::entities::revision::Revision;
@ -17,6 +17,7 @@ use std::sync::Arc;
use tokio::sync::RwLock;
pub struct GridBlockRevisionEditor {
#[allow(dead_code)]
user_id: String,
pub block_id: String,
pad: Arc<RwLock<GridBlockRevisionPad>>,
@ -204,7 +205,7 @@ impl RevisionObjectSerializer for GridBlockRevisionSerde {
}
pub struct GridBlockRevisionCompress();
impl RevisionCompress for GridBlockRevisionCompress {
impl RevisionMergeable for GridBlockRevisionCompress {
fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
GridBlockRevisionSerde::combine_revisions(revisions)
}

View File

@ -10,7 +10,9 @@ use flowy_error::FlowyResult;
use flowy_grid_data_model::revision::{
GridBlockMetaRevision, GridBlockMetaRevisionChangeset, RowChangeset, RowRevision,
};
use flowy_revision::{RevisionManager, RevisionPersistence, SQLiteRevisionSnapshotPersistence};
use flowy_revision::{
RevisionManager, RevisionPersistence, RevisionPersistenceConfiguration, SQLiteRevisionSnapshotPersistence,
};
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
@ -273,7 +275,8 @@ async fn make_block_editor(user: &Arc<dyn GridUser>, block_id: &str) -> FlowyRes
let pool = user.db_pool()?;
let disk_cache = SQLiteGridBlockRevisionPersistence::new(&user_id, pool.clone());
let rev_persistence = RevisionPersistence::new(&user_id, block_id, disk_cache);
let configuration = RevisionPersistenceConfiguration::default();
let rev_persistence = RevisionPersistence::new(&user_id, block_id, disk_cache, configuration);
let rev_compactor = GridBlockRevisionCompress();
let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(block_id, pool);
let rev_manager = RevisionManager::new(&user_id, block_id, rev_persistence, rev_compactor, snapshot_persistence);

View File

@ -17,7 +17,7 @@ use bytes::Bytes;
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_grid_data_model::revision::*;
use flowy_revision::{
RevisionCloudService, RevisionCompress, RevisionManager, RevisionObjectDeserializer, RevisionObjectSerializer,
RevisionCloudService, RevisionManager, RevisionMergeable, RevisionObjectDeserializer, RevisionObjectSerializer,
};
use flowy_sync::client_grid::{GridRevisionChangeset, GridRevisionPad, JsonDeserializer};
use flowy_sync::entities::revision::Revision;
@ -33,6 +33,7 @@ use tokio::sync::RwLock;
pub struct GridRevisionEditor {
pub grid_id: String,
#[allow(dead_code)]
user: Arc<dyn GridUser>,
grid_pad: Arc<RwLock<GridRevisionPad>>,
view_manager: Arc<GridViewManager>,
@ -846,7 +847,7 @@ impl RevisionCloudService for GridRevisionCloudService {
pub struct GridRevisionCompress();
impl RevisionCompress for GridRevisionCompress {
impl RevisionMergeable for GridRevisionCompress {
fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
GridRevisionSerde::combine_revisions(revisions)
}

View File

@ -19,7 +19,7 @@ use flowy_grid_data_model::revision::{
RowChangeset, RowRevision,
};
use flowy_revision::{
RevisionCloudService, RevisionCompress, RevisionManager, RevisionObjectDeserializer, RevisionObjectSerializer,
RevisionCloudService, RevisionManager, RevisionMergeable, RevisionObjectDeserializer, RevisionObjectSerializer,
};
use flowy_sync::client_grid::{GridViewRevisionChangeset, GridViewRevisionPad};
use flowy_sync::entities::revision::Revision;
@ -454,7 +454,7 @@ async fn new_group_controller_with_field_rev(
}
async fn apply_change(
user_id: &str,
_user_id: &str,
rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
change: GridViewRevisionChangeset,
) -> FlowyResult<()> {
@ -496,7 +496,7 @@ impl RevisionObjectSerializer for GridViewRevisionSerde {
}
pub struct GridViewRevisionCompress();
impl RevisionCompress for GridViewRevisionCompress {
impl RevisionMergeable for GridViewRevisionCompress {
fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
GridViewRevisionSerde::combine_revisions(revisions)
}

View File

@ -11,7 +11,9 @@ use dashmap::DashMap;
use flowy_database::ConnectionPool;
use flowy_error::FlowyResult;
use flowy_grid_data_model::revision::{FieldRevision, RowChangeset, RowRevision};
use flowy_revision::{RevisionManager, RevisionPersistence, SQLiteRevisionSnapshotPersistence};
use flowy_revision::{
RevisionManager, RevisionPersistence, RevisionPersistenceConfiguration, SQLiteRevisionSnapshotPersistence,
};
use lib_infra::future::AFFuture;
use std::sync::Arc;
@ -253,7 +255,8 @@ pub async fn make_grid_view_rev_manager(
let pool = user.db_pool()?;
let disk_cache = SQLiteGridViewRevisionPersistence::new(&user_id, pool.clone());
let rev_persistence = RevisionPersistence::new(&user_id, view_id, disk_cache);
let configuration = RevisionPersistenceConfiguration::default();
let rev_persistence = RevisionPersistence::new(&user_id, view_id, disk_cache, configuration);
let rev_compactor = GridViewRevisionCompress();
let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(view_id, pool);

View File

@ -219,7 +219,7 @@ impl std::default::Default for GridBlockRevisionState {
}
}
fn mk_revision_record_from_table(user_id: &str, table: GridBlockRevisionTable) -> SyncRecord {
fn mk_revision_record_from_table(_user_id: &str, table: GridBlockRevisionTable) -> SyncRecord {
let md5 = md5(&table.data);
let revision = Revision::new(
&table.object_id,

View File

@ -217,7 +217,7 @@ impl std::default::Default for GridRevisionState {
}
}
fn mk_revision_record_from_table(user_id: &str, table: GridRevisionTable) -> SyncRecord {
fn mk_revision_record_from_table(_user_id: &str, table: GridRevisionTable) -> SyncRecord {
let md5 = md5(&table.data);
let revision = Revision::new(
&table.object_id,

View File

@ -219,7 +219,7 @@ impl std::default::Default for GridViewRevisionState {
}
}
fn mk_revision_record_from_table(user_id: &str, table: GridViewRevisionTable) -> SyncRecord {
fn mk_revision_record_from_table(_user_id: &str, table: GridViewRevisionTable) -> SyncRecord {
let md5 = md5(&table.data);
let revision = Revision::new(
&table.object_id,

View File

@ -88,6 +88,10 @@ impl RevisionMemoryCache {
Ok(revs)
}
pub(crate) fn number_of_sync_records(&self) -> usize {
self.revs_map.len()
}
pub(crate) async fn reset_with_revisions(&self, revision_records: Vec<SyncRecord>) {
self.revs_map.clear();
if let Some(handler) = self.defer_save.write().await.take() {

View File

@ -1,5 +1,5 @@
use crate::disk::{RevisionDiskCache, SyncRecord};
use crate::{RevisionLoader, RevisionPersistence};
use crate::{RevisionLoader, RevisionPersistence, RevisionPersistenceConfiguration};
use bytes::Bytes;
use flowy_error::{FlowyError, FlowyResult};
use flowy_sync::entities::revision::Revision;
@ -60,10 +60,12 @@ where
}
async fn reset_object(&self) -> FlowyResult<()> {
let configuration = RevisionPersistenceConfiguration::new(2);
let rev_persistence = Arc::new(RevisionPersistence::from_disk_cache(
&self.user_id,
self.target.target_id(),
self.disk_cache.clone(),
configuration,
));
let (revisions, _) = RevisionLoader {
object_id: self.target.target_id().to_owned(),

View File

@ -151,7 +151,7 @@ where
}
fn make_client_and_server_revision<Operations, Connection>(
user_id: &str,
_user_id: &str,
rev_manager: &Arc<RevisionManager<Connection>>,
client_operations: Operations,
server_operations: Option<Operations>,

View File

@ -42,13 +42,8 @@ pub trait RevisionObjectSerializer: Send + Sync {
/// `RevisionCompress` is used to compress multiple revisions into one revision
///
pub trait RevisionCompress: Send + Sync {
fn compress_revisions(
&self,
user_id: &str,
object_id: &str,
mut revisions: Vec<Revision>,
) -> FlowyResult<Revision> {
pub trait RevisionMergeable: Send + Sync {
fn merge_revisions(&self, _user_id: &str, object_id: &str, mut revisions: Vec<Revision>) -> FlowyResult<Revision> {
if revisions.is_empty() {
return Err(FlowyError::internal().context("Can't compact the empty revisions"));
}
@ -69,18 +64,6 @@ pub trait RevisionCompress: Send + Sync {
fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes>;
}
pub struct RevisionConfiguration {
merge_when_excess_number_of_version: i64,
}
impl std::default::Default for RevisionConfiguration {
fn default() -> Self {
Self {
merge_when_excess_number_of_version: 100,
}
}
}
pub struct RevisionManager<Connection> {
pub object_id: String,
user_id: String,
@ -88,10 +71,9 @@ pub struct RevisionManager<Connection> {
rev_persistence: Arc<RevisionPersistence<Connection>>,
#[allow(dead_code)]
rev_snapshot: Arc<RevisionSnapshotManager>,
rev_compress: Arc<dyn RevisionCompress>,
rev_compress: Arc<dyn RevisionMergeable>,
#[cfg(feature = "flowy_unit_test")]
rev_ack_notifier: tokio::sync::broadcast::Sender<i64>,
// configuration: RevisionConfiguration,
}
impl<Connection: 'static> RevisionManager<Connection> {
@ -104,7 +86,7 @@ impl<Connection: 'static> RevisionManager<Connection> {
) -> Self
where
SP: 'static + RevisionSnapshotDiskCache,
C: 'static + RevisionCompress,
C: 'static + RevisionMergeable,
{
let rev_id_counter = RevIdCounter::new(0);
let rev_compress = Arc::new(rev_compress);
@ -213,6 +195,10 @@ impl<Connection: 'static> RevisionManager<Connection> {
(cur, next)
}
pub fn number_of_sync_revisions(&self) -> usize {
self.rev_persistence.number_of_sync_records()
}
pub async fn get_revisions_in_range(&self, range: RevisionRange) -> Result<Vec<Revision>, FlowyError> {
let revisions = self.rev_persistence.revisions_in_range(&range).await?;
Ok(revisions)

View File

@ -4,7 +4,7 @@ use crate::cache::{
};
use crate::disk::{RevisionState, SyncRecord};
use crate::memory::RevisionMemoryCache;
use crate::RevisionCompress;
use crate::RevisionMergeable;
use flowy_error::{internal_error, FlowyError, FlowyResult};
use flowy_sync::entities::revision::{Revision, RevisionRange};
use std::collections::VecDeque;
@ -14,30 +14,58 @@ use tokio::task::spawn_blocking;
pub const REVISION_WRITE_INTERVAL_IN_MILLIS: u64 = 600;
pub struct RevisionPersistenceConfiguration {
merge_threshold: usize,
}
impl RevisionPersistenceConfiguration {
pub fn new(merge_threshold: usize) -> Self {
debug_assert!(merge_threshold > 1);
if merge_threshold > 1 {
Self { merge_threshold }
} else {
Self { merge_threshold: 2 }
}
}
}
impl std::default::Default for RevisionPersistenceConfiguration {
fn default() -> Self {
Self { merge_threshold: 2 }
}
}
pub struct RevisionPersistence<Connection> {
user_id: String,
object_id: String,
disk_cache: Arc<dyn RevisionDiskCache<Connection, Error = FlowyError>>,
memory_cache: Arc<RevisionMemoryCache>,
sync_seq: RwLock<DeferSyncSequence>,
configuration: RevisionPersistenceConfiguration,
}
impl<Connection> RevisionPersistence<Connection>
where
Connection: 'static,
{
pub fn new<C>(user_id: &str, object_id: &str, disk_cache: C) -> RevisionPersistence<Connection>
pub fn new<C>(
user_id: &str,
object_id: &str,
disk_cache: C,
configuration: RevisionPersistenceConfiguration,
) -> RevisionPersistence<Connection>
where
C: 'static + RevisionDiskCache<Connection, Error = FlowyError>,
{
let disk_cache = Arc::new(disk_cache) as Arc<dyn RevisionDiskCache<Connection, Error = FlowyError>>;
Self::from_disk_cache(user_id, object_id, disk_cache)
Self::from_disk_cache(user_id, object_id, disk_cache, configuration)
}
pub fn from_disk_cache(
user_id: &str,
object_id: &str,
disk_cache: Arc<dyn RevisionDiskCache<Connection, Error = FlowyError>>,
configuration: RevisionPersistenceConfiguration,
) -> RevisionPersistence<Connection> {
let object_id = object_id.to_owned();
let user_id = user_id.to_owned();
@ -49,6 +77,7 @@ where
disk_cache,
memory_cache,
sync_seq,
configuration,
}
}
@ -64,7 +93,7 @@ where
pub(crate) async fn sync_revision(&self, revision: &Revision) -> FlowyResult<()> {
tracing::Span::current().record("rev_id", &revision.rev_id);
self.add(revision.clone(), RevisionState::Sync, false).await?;
self.sync_seq.write().await.add(revision.rev_id)?;
self.sync_seq.write().await.dry_push(revision.rev_id)?;
Ok(())
}
@ -72,44 +101,39 @@ where
#[tracing::instrument(level = "trace", skip_all, fields(rev_id, compact_range, object_id=%self.object_id), err)]
pub(crate) async fn add_sync_revision<'a>(
&'a self,
revision: &'a Revision,
rev_compress: &Arc<dyn RevisionCompress + 'a>,
new_revision: &'a Revision,
rev_compress: &Arc<dyn RevisionMergeable + 'a>,
) -> FlowyResult<i64> {
let mut sync_seq_write_guard = self.sync_seq.write().await;
let result = sync_seq_write_guard.compact();
match result {
None => {
tracing::Span::current().record("rev_id", &revision.rev_id);
self.add(revision.clone(), RevisionState::Sync, true).await?;
sync_seq_write_guard.add(revision.rev_id)?;
Ok(revision.rev_id)
}
Some((range, mut compact_seq)) => {
tracing::Span::current().record("compact_range", &format!("{}", range).as_str());
let mut revisions = self.revisions_in_range(&range).await?;
if range.to_rev_ids().len() != revisions.len() {
debug_assert_eq!(range.to_rev_ids().len(), revisions.len());
}
if sync_seq_write_guard.step > self.configuration.merge_threshold {
let compact_seq = sync_seq_write_guard.compact();
let range = RevisionRange {
start: *compact_seq.front().unwrap(),
end: *compact_seq.back().unwrap(),
};
// append the new revision
revisions.push(revision.clone());
tracing::Span::current().record("compact_range", &format!("{}", range).as_str());
let mut revisions = self.revisions_in_range(&range).await?;
debug_assert_eq!(range.len() as usize, revisions.len());
// append the new revision
revisions.push(new_revision.clone());
// compact multiple revisions into one
let compact_revision = rev_compress.compress_revisions(&self.user_id, &self.object_id, revisions)?;
let rev_id = compact_revision.rev_id;
tracing::Span::current().record("rev_id", &rev_id);
// compact multiple revisions into one
let compact_revision = rev_compress.merge_revisions(&self.user_id, &self.object_id, revisions)?;
let rev_id = compact_revision.rev_id;
tracing::Span::current().record("rev_id", &rev_id);
// insert new revision
compact_seq.push_back(rev_id);
// insert new revision
let _ = sync_seq_write_guard.dry_push(rev_id)?;
// replace the revisions in range with compact revision
self.compact(&range, compact_revision).await?;
//
debug_assert_eq!(compact_seq.len(), 2);
debug_assert_eq!(sync_seq_write_guard.len(), compact_seq.len());
sync_seq_write_guard.reset(compact_seq);
Ok(rev_id)
}
// replace the revisions in range with compact revision
self.compact(&range, compact_revision).await?;
Ok(rev_id)
} else {
tracing::Span::current().record("rev_id", &new_revision.rev_id);
self.add(new_revision.clone(), RevisionState::Sync, true).await?;
sync_seq_write_guard.push(new_revision.rev_id)?;
Ok(new_revision.rev_id)
}
}
@ -132,6 +156,10 @@ where
self.sync_seq.read().await.next_rev_id()
}
pub(crate) fn number_of_sync_records(&self) -> usize {
self.memory_cache.number_of_sync_records()
}
/// The cache gets reset while it conflicts with the remote revisions.
#[tracing::instrument(level = "trace", skip(self, revisions), err)]
pub(crate) async fn reset(&self, revisions: Vec<Revision>) -> FlowyResult<()> {
@ -257,27 +285,42 @@ impl<C> RevisionMemoryCacheDelegate for Arc<dyn RevisionDiskCache<C, Error = Flo
}
#[derive(Default)]
struct DeferSyncSequence(VecDeque<i64>);
struct DeferSyncSequence {
rev_ids: VecDeque<i64>,
start: Option<usize>,
step: usize,
}
impl DeferSyncSequence {
fn new() -> Self {
DeferSyncSequence::default()
}
fn add(&mut self, new_rev_id: i64) -> FlowyResult<()> {
fn push(&mut self, new_rev_id: i64) -> FlowyResult<()> {
let _ = self.dry_push(new_rev_id)?;
self.step += 1;
if self.start.is_none() && !self.rev_ids.is_empty() {
self.start = Some(self.rev_ids.len() - 1);
}
Ok(())
}
fn dry_push(&mut self, new_rev_id: i64) -> FlowyResult<()> {
// The last revision's rev_id must be greater than the new one.
if let Some(rev_id) = self.0.back() {
if let Some(rev_id) = self.rev_ids.back() {
if *rev_id >= new_rev_id {
return Err(
FlowyError::internal().context(format!("The new revision's id must be greater than {}", rev_id))
);
}
}
self.0.push_back(new_rev_id);
self.rev_ids.push_back(new_rev_id);
Ok(())
}
fn ack(&mut self, rev_id: &i64) -> FlowyResult<()> {
let cur_rev_id = self.0.front().cloned();
let cur_rev_id = self.rev_ids.front().cloned();
if let Some(pop_rev_id) = cur_rev_id {
if &pop_rev_id != rev_id {
let desc = format!(
@ -286,38 +329,38 @@ impl DeferSyncSequence {
);
return Err(FlowyError::internal().context(desc));
}
let _ = self.0.pop_front();
let _ = self.rev_ids.pop_front();
}
Ok(())
}
fn next_rev_id(&self) -> Option<i64> {
self.0.front().cloned()
}
fn reset(&mut self, new_seq: VecDeque<i64>) {
self.0 = new_seq;
self.rev_ids.front().cloned()
}
fn clear(&mut self) {
self.0.clear();
}
fn len(&self) -> usize {
self.0.len()
self.start = None;
self.step = 0;
self.rev_ids.clear();
}
// Compact the rev_ids into one except the current synchronizing rev_id.
fn compact(&self) -> Option<(RevisionRange, VecDeque<i64>)> {
// Make sure there are two rev_id going to sync. No need to compact if there is only
// one rev_id in queue.
self.next_rev_id()?;
fn compact(&mut self) -> VecDeque<i64> {
if self.start.is_none() {
return VecDeque::default();
}
let mut new_seq = self.0.clone();
let mut drained = new_seq.drain(1..).collect::<VecDeque<_>>();
let start = self.start.unwrap();
let compact_seq = self.rev_ids.split_off(start);
self.start = None;
self.step = 0;
compact_seq
let start = drained.pop_front()?;
let end = drained.pop_back().unwrap_or(start);
Some((RevisionRange { start, end }, new_seq))
// let mut new_seq = self.rev_ids.clone();
// let mut drained = new_seq.drain(1..).collect::<VecDeque<_>>();
//
// let start = drained.pop_front()?;
// let end = drained.pop_back().unwrap_or(start);
// Some((RevisionRange { start, end }, new_seq))
}
}

View File

@ -20,7 +20,7 @@ async fn revision_sync_test() {
#[tokio::test]
async fn revision_sync_multiple_revisions() {
let test = RevisionTest::new().await;
let test = RevisionTest::new_with_configuration(2).await;
let (base_rev_id, rev_id_1) = test.next_rev_id_pair();
test.run_script(AddLocalRevision {
@ -49,21 +49,20 @@ async fn revision_sync_multiple_revisions() {
}
#[tokio::test]
async fn revision_compress_two_revisions_test() {
let test = RevisionTest::new().await;
async fn revision_compress_three_revisions_test() {
let test = RevisionTest::new_with_configuration(2).await;
let (base_rev_id, rev_id_1) = test.next_rev_id_pair();
test.run_script(AddLocalRevision {
content: "123".to_string(),
content: "1".to_string(),
base_rev_id,
rev_id: rev_id_1,
})
.await;
// rev_id_2 will be merged with rev_id_3
let (base_rev_id, rev_id_2) = test.next_rev_id_pair();
test.run_script(AddLocalRevision {
content: "456".to_string(),
content: "2".to_string(),
base_rev_id,
rev_id: rev_id_2,
})
@ -71,36 +70,129 @@ async fn revision_compress_two_revisions_test() {
let (base_rev_id, rev_id_3) = test.next_rev_id_pair();
test.run_script(AddLocalRevision {
content: "789".to_string(),
content: "3".to_string(),
base_rev_id,
rev_id: rev_id_3,
})
.await;
let (base_rev_id, rev_id_4) = test.next_rev_id_pair();
test.run_script(AddLocalRevision {
content: "4".to_string(),
base_rev_id,
rev_id: rev_id_4,
})
.await;
// rev_id_2,rev_id_3,rev_id4 will be merged into rev_id_1
test.run_scripts(vec![
Wait {
milliseconds: REVISION_WRITE_INTERVAL_IN_MILLIS,
},
AssertNumberOfSyncRevisions { num: 1 },
AssertNextSyncRevisionId { rev_id: Some(rev_id_1) },
AckRevision { rev_id: rev_id_1 },
AssertNextSyncRevisionId { rev_id: Some(rev_id_2) },
AssertNextSyncRevisionContent {
expected: "456789".to_string(),
expected: "1234".to_string(),
},
AckRevision { rev_id: rev_id_1 },
AssertNextSyncRevisionId { rev_id: None },
])
.await;
}
#[tokio::test]
async fn revision_compress_multiple_revisions_test() {
let test = RevisionTest::new().await;
let mut expected = "".to_owned();
async fn revision_compress_three_revisions_test2() {
let test = RevisionTest::new_with_configuration(2).await;
let (base_rev_id, rev_id_1) = test.next_rev_id_pair();
for i in 0..100 {
test.run_script(AddLocalRevision {
content: "1".to_string(),
base_rev_id,
rev_id: rev_id_1,
})
.await;
let (base_rev_id, rev_id_2) = test.next_rev_id_pair();
test.run_script(AddLocalRevision {
content: "2".to_string(),
base_rev_id,
rev_id: rev_id_2,
})
.await;
let (base_rev_id, rev_id_3) = test.next_rev_id_pair();
test.run_script(AddLocalRevision {
content: "3".to_string(),
base_rev_id,
rev_id: rev_id_3,
})
.await;
let (base_rev_id, rev_id_4) = test.next_rev_id_pair();
test.run_script(AddLocalRevision {
content: "4".to_string(),
base_rev_id,
rev_id: rev_id_4,
})
.await;
let (base_rev_id, rev_id_a) = test.next_rev_id_pair();
test.run_script(AddLocalRevision {
content: "a".to_string(),
base_rev_id,
rev_id: rev_id_a,
})
.await;
let (base_rev_id, rev_id_b) = test.next_rev_id_pair();
test.run_script(AddLocalRevision {
content: "b".to_string(),
base_rev_id,
rev_id: rev_id_b,
})
.await;
let (base_rev_id, rev_id_c) = test.next_rev_id_pair();
test.run_script(AddLocalRevision {
content: "c".to_string(),
base_rev_id,
rev_id: rev_id_c,
})
.await;
let (base_rev_id, rev_id_d) = test.next_rev_id_pair();
test.run_script(AddLocalRevision {
content: "d".to_string(),
base_rev_id,
rev_id: rev_id_d,
})
.await;
test.run_scripts(vec![
// Wait {
// milliseconds: REVISION_WRITE_INTERVAL_IN_MILLIS,
// },
AssertNumberOfSyncRevisions { num: 2 },
AssertNextSyncRevisionId { rev_id: Some(rev_id_1) },
AssertNextSyncRevisionContent {
expected: "1234".to_string(),
},
AckRevision { rev_id: rev_id_1 },
AssertNextSyncRevisionId { rev_id: Some(rev_id_a) },
AssertNextSyncRevisionContent {
expected: "abcd".to_string(),
},
AckRevision { rev_id: rev_id_a },
AssertNextSyncRevisionId { rev_id: None },
])
.await;
}
#[tokio::test]
async fn revision_merge_per_5_revision_test() {
let test = RevisionTest::new_with_configuration(4).await;
for i in 0..20 {
let content = format!("{}", i);
if i != 0 {
expected.push_str(&content);
}
let (base_rev_id, rev_id) = test.next_rev_id_pair();
test.run_script(AddLocalRevision {
content,
@ -110,14 +202,5 @@ async fn revision_compress_multiple_revisions_test() {
.await;
}
test.run_scripts(vec![
Wait {
milliseconds: REVISION_WRITE_INTERVAL_IN_MILLIS,
},
AssertNextSyncRevisionId { rev_id: Some(1) },
AckRevision { rev_id: 1 },
AssertNextSyncRevisionId { rev_id: Some(2) },
AssertNextSyncRevisionContent { expected },
])
.await;
test.run_scripts(vec![AssertNumberOfSyncRevisions { num: 5 }]).await;
}

View File

@ -2,7 +2,8 @@ use bytes::Bytes;
use flowy_error::{FlowyError, FlowyResult};
use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, SyncRecord};
use flowy_revision::{
RevisionCompress, RevisionManager, RevisionPersistence, RevisionSnapshotDiskCache, RevisionSnapshotInfo,
RevisionManager, RevisionMergeable, RevisionPersistence, RevisionPersistenceConfiguration,
RevisionSnapshotDiskCache, RevisionSnapshotInfo,
};
use flowy_sync::entities::revision::{Revision, RevisionRange};
use flowy_sync::util::md5;
@ -11,7 +12,6 @@ use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::interval;
pub enum RevisionScript {
AddLocalRevision {
@ -25,14 +25,15 @@ pub enum RevisionScript {
AssertNextSyncRevisionId {
rev_id: Option<i64>,
},
AssertNumberOfSyncRevisions {
num: usize,
},
AssertNextSyncRevisionContent {
expected: String,
},
Wait {
milliseconds: u64,
},
AssertNextSyncRevision(Option<Revision>),
}
pub struct RevisionTest {
@ -41,9 +42,14 @@ pub struct RevisionTest {
impl RevisionTest {
pub async fn new() -> Self {
Self::new_with_configuration(2).await
}
pub async fn new_with_configuration(merge_when_excess_number_of_version: i64) -> Self {
let user_id = nanoid!(10);
let object_id = nanoid!(6);
let persistence = RevisionPersistence::new(&user_id, &object_id, RevisionDiskCacheMock::new());
let configuration = RevisionPersistenceConfiguration::new(merge_when_excess_number_of_version as usize);
let persistence = RevisionPersistence::new(&user_id, &object_id, RevisionDiskCacheMock::new(), configuration);
let compress = RevisionCompressMock {};
let snapshot = RevisionSnapshotMock {};
let rev_manager = RevisionManager::new(&user_id, &object_id, persistence, compress, snapshot);
@ -51,6 +57,7 @@ impl RevisionTest {
rev_manager: Arc::new(rev_manager),
}
}
pub async fn run_scripts(&self, scripts: Vec<RevisionScript>) {
for script in scripts {
self.run_script(script).await;
@ -87,6 +94,9 @@ impl RevisionTest {
RevisionScript::AssertNextSyncRevisionId { rev_id } => {
assert_eq!(self.rev_manager.next_sync_rev_id().await, rev_id)
}
RevisionScript::AssertNumberOfSyncRevisions { num } => {
assert_eq!(self.rev_manager.number_of_sync_revisions(), num)
}
RevisionScript::AssertNextSyncRevisionContent { expected } => {
//
let rev_id = self.rev_manager.next_sync_rev_id().await.unwrap();
@ -95,14 +105,8 @@ impl RevisionTest {
assert_eq!(object.content, expected);
}
RevisionScript::Wait { milliseconds } => {
// let mut interval = interval(Duration::from_millis(milliseconds));
// interval.tick().await;
tokio::time::sleep(Duration::from_millis(milliseconds)).await;
}
RevisionScript::AssertNextSyncRevision(expected) => {
let next_revision = self.rev_manager.next_sync_revision().await.unwrap();
assert_eq!(next_revision, expected);
}
}
}
}
@ -133,16 +137,16 @@ impl RevisionDiskCache<RevisionConnectionMock> for RevisionDiskCacheMock {
fn read_revision_records(
&self,
object_id: &str,
rev_ids: Option<Vec<i64>>,
_object_id: &str,
_rev_ids: Option<Vec<i64>>,
) -> Result<Vec<SyncRecord>, Self::Error> {
todo!()
}
fn read_revision_records_with_range(
&self,
object_id: &str,
range: &RevisionRange,
_object_id: &str,
_range: &RevisionRange,
) -> Result<Vec<SyncRecord>, Self::Error> {
todo!()
}
@ -161,7 +165,7 @@ impl RevisionDiskCache<RevisionConnectionMock> for RevisionDiskCacheMock {
Ok(())
}
fn delete_revision_records(&self, object_id: &str, rev_ids: Option<Vec<i64>>) -> Result<(), Self::Error> {
fn delete_revision_records(&self, _object_id: &str, rev_ids: Option<Vec<i64>>) -> Result<(), Self::Error> {
match rev_ids {
None => {}
Some(rev_ids) => {
@ -182,9 +186,9 @@ impl RevisionDiskCache<RevisionConnectionMock> for RevisionDiskCacheMock {
fn delete_and_insert_records(
&self,
object_id: &str,
deleted_rev_ids: Option<Vec<i64>>,
inserted_records: Vec<SyncRecord>,
_object_id: &str,
_deleted_rev_ids: Option<Vec<i64>>,
_inserted_records: Vec<SyncRecord>,
) -> Result<(), Self::Error> {
todo!()
}
@ -195,18 +199,18 @@ pub struct RevisionConnectionMock {}
pub struct RevisionSnapshotMock {}
impl RevisionSnapshotDiskCache for RevisionSnapshotMock {
fn write_snapshot(&self, object_id: &str, rev_id: i64, data: Vec<u8>) -> FlowyResult<()> {
fn write_snapshot(&self, _object_id: &str, _rev_id: i64, _data: Vec<u8>) -> FlowyResult<()> {
todo!()
}
fn read_snapshot(&self, object_id: &str, rev_id: i64) -> FlowyResult<RevisionSnapshotInfo> {
fn read_snapshot(&self, _object_id: &str, _rev_id: i64) -> FlowyResult<RevisionSnapshotInfo> {
todo!()
}
}
pub struct RevisionCompressMock {}
impl RevisionCompress for RevisionCompressMock {
impl RevisionMergeable for RevisionCompressMock {
fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
let mut object = RevisionObjectMock::new("");
for revision in revisions {

View File

@ -144,7 +144,7 @@ struct DocumentViewDataProcessor(Arc<DocumentManager>);
impl ViewDataProcessor for DocumentViewDataProcessor {
fn create_view(
&self,
user_id: &str,
_user_id: &str,
view_id: &str,
layout: ViewLayoutTypePB,
view_data: Bytes,
@ -188,7 +188,7 @@ impl ViewDataProcessor for DocumentViewDataProcessor {
_data_format: ViewDataFormatPB,
) -> FutureResult<Bytes, FlowyError> {
debug_assert_eq!(layout, ViewLayoutTypePB::Document);
let user_id = user_id.to_string();
let _user_id = user_id.to_string();
let view_id = view_id.to_string();
let manager = self.0.clone();
let document_content = self.0.initial_document_content();
@ -220,7 +220,7 @@ struct GridViewDataProcessor(Arc<GridManager>);
impl ViewDataProcessor for GridViewDataProcessor {
fn create_view(
&self,
user_id: &str,
_user_id: &str,
view_id: &str,
_layout: ViewLayoutTypePB,
delta_data: Bytes,

View File

@ -256,7 +256,7 @@ pub fn make_grid_block_operations(block_rev: &GridBlockRevision) -> GridBlockOpe
GridBlockOperationsBuilder::new().insert(&json).build()
}
pub fn make_grid_block_revisions(user_id: &str, grid_block_meta_data: &GridBlockRevision) -> RepeatedRevision {
pub fn make_grid_block_revisions(_user_id: &str, grid_block_meta_data: &GridBlockRevision) -> RepeatedRevision {
let operations = make_grid_block_operations(grid_block_meta_data);
let bytes = operations.json_bytes();
let revision = Revision::initial_revision(&grid_block_meta_data.block_id, bytes);

View File

@ -409,7 +409,7 @@ pub fn make_grid_operations(grid_rev: &GridRevision) -> GridOperations {
GridOperationsBuilder::new().insert(&json).build()
}
pub fn make_grid_revisions(user_id: &str, grid_rev: &GridRevision) -> RepeatedRevision {
pub fn make_grid_revisions(_user_id: &str, grid_rev: &GridRevision) -> RepeatedRevision {
let operations = make_grid_operations(grid_rev);
let bytes = operations.json_bytes();
let revision = Revision::initial_revision(&grid_rev.grid_id, bytes);

View File

@ -178,10 +178,10 @@ impl std::fmt::Display for RevisionRange {
}
impl RevisionRange {
pub fn len(&self) -> i64 {
pub fn len(&self) -> u64 {
debug_assert!(self.end >= self.start);
if self.end >= self.start {
self.end - self.start + 1
(self.end - self.start + 1) as u64
} else {
0
}