refactor: md5 of revision

This commit is contained in:
nathan 2022-11-02 10:21:10 +08:00
parent cd8e4ddf00
commit 608a08eb76
34 changed files with 300 additions and 235 deletions

View File

@ -28,9 +28,9 @@ impl Document {
}
}
pub fn md5(&self) -> String {
// format!("{:x}", md5::compute(bytes))
"".to_owned()
pub fn document_md5(&self) -> String {
let bytes = self.tree.to_bytes();
format!("{:x}", md5::compute(&bytes))
}
pub fn get_tree(&self) -> &NodeTree {

View File

@ -63,7 +63,7 @@ impl DocumentQueue {
Command::ComposeTransaction { transaction, ret } => {
self.document.write().await.apply_transaction(transaction.clone())?;
let _ = self
.save_local_operations(transaction, self.document.read().await.md5())
.save_local_operations(transaction, self.document.read().await.document_md5())
.await?;
let _ = ret.send(Ok(()));
}

View File

@ -12,11 +12,8 @@ use flowy_revision::{
RevisionCloudService, RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence,
};
use flowy_sync::client_document::initial_delta_document_content;
use flowy_sync::entities::{
document::DocumentIdPB,
revision::{md5, RepeatedRevision, Revision},
ws_data::ServerRevisionWSData,
};
use flowy_sync::entities::{document::DocumentIdPB, revision::Revision, ws_data::ServerRevisionWSData};
use flowy_sync::util::md5;
use lib_infra::future::FutureResult;
use lib_ws::WSConnectState;
use std::any::Any;
@ -139,7 +136,7 @@ impl DocumentManager {
Ok(())
}
pub async fn create_document<T: AsRef<str>>(&self, doc_id: T, revisions: RepeatedRevision) -> FlowyResult<()> {
pub async fn create_document<T: AsRef<str>>(&self, doc_id: T, revisions: Vec<Revision>) -> FlowyResult<()> {
let doc_id = doc_id.as_ref().to_owned();
let db_pool = self.persistence.database.db_pool()?;
// Maybe we could save the document to disk without creating the RevisionManager

View File

@ -3,7 +3,7 @@ use crate::DocumentUser;
use async_stream::stream;
use flowy_database::ConnectionPool;
use flowy_error::FlowyError;
use flowy_revision::{OperationsMD5, RevisionManager, TransformOperations};
use flowy_revision::{RevisionMD5, RevisionManager, TransformOperations};
use flowy_sync::{
client_document::{history::UndoResult, ClientDocument},
entities::revision::{RevId, Revision},
@ -70,7 +70,7 @@ impl EditDocumentQueue {
EditorCommand::ComposeLocalOperations { operations, ret } => {
let mut document = self.document.write().await;
let _ = document.compose_operations(operations.clone())?;
let md5 = document.md5();
let md5 = document.document_md5();
drop(document);
let _ = self.save_local_operations(operations, md5).await?;
let _ = ret.send(Ok(()));
@ -78,16 +78,16 @@ impl EditDocumentQueue {
EditorCommand::ComposeRemoteOperation { client_operations, ret } => {
let mut document = self.document.write().await;
let _ = document.compose_operations(client_operations.clone())?;
let md5 = document.md5();
let md5 = document.document_md5();
drop(document);
let _ = ret.send(Ok(md5));
let _ = ret.send(Ok(md5.into()));
}
EditorCommand::ResetOperations { operations, ret } => {
let mut document = self.document.write().await;
let _ = document.set_operations(operations);
let md5 = document.md5();
let md5 = document.document_md5();
drop(document);
let _ = ret.send(Ok(md5));
let _ = ret.send(Ok(md5.into()));
}
EditorCommand::TransformOperations { operations, ret } => {
let f = || async {
@ -114,14 +114,14 @@ impl EditDocumentQueue {
EditorCommand::Insert { index, data, ret } => {
let mut write_guard = self.document.write().await;
let operations = write_guard.insert(index, data)?;
let md5 = write_guard.md5();
let md5 = write_guard.document_md5();
let _ = self.save_local_operations(operations, md5).await?;
let _ = ret.send(Ok(()));
}
EditorCommand::Delete { interval, ret } => {
let mut write_guard = self.document.write().await;
let operations = write_guard.delete(interval)?;
let md5 = write_guard.md5();
let md5 = write_guard.document_md5();
let _ = self.save_local_operations(operations, md5).await?;
let _ = ret.send(Ok(()));
}
@ -132,14 +132,14 @@ impl EditDocumentQueue {
} => {
let mut write_guard = self.document.write().await;
let operations = write_guard.format(interval, attribute)?;
let md5 = write_guard.md5();
let md5 = write_guard.document_md5();
let _ = self.save_local_operations(operations, md5).await?;
let _ = ret.send(Ok(()));
}
EditorCommand::Replace { interval, data, ret } => {
let mut write_guard = self.document.write().await;
let operations = write_guard.replace(interval, data)?;
let md5 = write_guard.md5();
let md5 = write_guard.document_md5();
let _ = self.save_local_operations(operations, md5).await?;
let _ = ret.send(Ok(()));
}
@ -152,14 +152,14 @@ impl EditDocumentQueue {
EditorCommand::Undo { ret } => {
let mut write_guard = self.document.write().await;
let UndoResult { operations } = write_guard.undo()?;
let md5 = write_guard.md5();
let md5 = write_guard.document_md5();
let _ = self.save_local_operations(operations, md5).await?;
let _ = ret.send(Ok(()));
}
EditorCommand::Redo { ret } => {
let mut write_guard = self.document.write().await;
let UndoResult { operations } = write_guard.redo()?;
let md5 = write_guard.md5();
let md5 = write_guard.document_md5();
let _ = self.save_local_operations(operations, md5).await?;
let _ = ret.send(Ok(()));
}
@ -197,11 +197,11 @@ pub(crate) enum EditorCommand {
},
ComposeRemoteOperation {
client_operations: DeltaTextOperations,
ret: Ret<OperationsMD5>,
ret: Ret<RevisionMD5>,
},
ResetOperations {
operations: DeltaTextOperations,
ret: Ret<OperationsMD5>,
ret: Ret<RevisionMD5>,
},
TransformOperations {
operations: DeltaTextOperations,

View File

@ -136,7 +136,7 @@ impl ConflictResolver<DeltaDocumentResolveOperations> for DocumentConflictResolv
fn compose_operations(
&self,
operations: DeltaDocumentResolveOperations,
) -> BoxResultFuture<OperationsMD5, FlowyError> {
) -> BoxResultFuture<RevisionMD5, FlowyError> {
let tx = self.edit_cmd_tx.clone();
let operations = operations.into_inner();
Box::pin(async move {
@ -172,10 +172,7 @@ impl ConflictResolver<DeltaDocumentResolveOperations> for DocumentConflictResolv
})
}
fn reset_operations(
&self,
operations: DeltaDocumentResolveOperations,
) -> BoxResultFuture<OperationsMD5, FlowyError> {
fn reset_operations(&self, operations: DeltaDocumentResolveOperations) -> BoxResultFuture<RevisionMD5, FlowyError> {
let tx = self.edit_cmd_tx.clone();
let operations = operations.into_inner();
Box::pin(async move {

View File

@ -4,9 +4,9 @@ use crate::DocumentDatabase;
use bytes::Bytes;
use flowy_database::kv::KV;
use flowy_error::FlowyResult;
use flowy_revision::disk::{RevisionDiskCache, RevisionRecord};
use flowy_sync::entities::revision::{md5, Revision};
use flowy_sync::util::make_operations_from_revisions;
use flowy_revision::disk::{RevisionDiskCache, SyncRecord};
use flowy_sync::entities::revision::Revision;
use flowy_sync::util::{make_operations_from_revisions, md5};
use std::sync::Arc;
const V1_MIGRATION: &str = "DOCUMENT_V1_MIGRATION";
@ -44,7 +44,7 @@ impl DocumentMigration {
let bytes = Bytes::from(transaction.to_bytes()?);
let md5 = format!("{:x}", md5::compute(&bytes));
let revision = Revision::new(&document_id, 0, 1, bytes, &self.user_id, md5);
let record = RevisionRecord::new(revision);
let record = SyncRecord::new(revision);
match disk_cache.create_revision_records(vec![record]) {
Ok(_) => {}
Err(err) => {

View File

@ -7,7 +7,7 @@ use flowy_database::{
ConnectionPool,
};
use flowy_error::{internal_error, FlowyError, FlowyResult};
use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionRecord, RevisionState};
use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionState, SyncRecord};
use flowy_sync::{
entities::revision::{RevType, Revision, RevisionRange},
util::md5,
@ -23,7 +23,7 @@ pub struct SQLiteDeltaDocumentRevisionPersistence {
impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteDeltaDocumentRevisionPersistence {
type Error = FlowyError;
fn create_revision_records(&self, revision_records: Vec<RevisionRecord>) -> Result<(), Self::Error> {
fn create_revision_records(&self, revision_records: Vec<SyncRecord>) -> Result<(), Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let _ = DeltaRevisionSql::create(revision_records, &*conn)?;
Ok(())
@ -37,7 +37,7 @@ impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteDeltaDocumentRevisionPersi
&self,
object_id: &str,
rev_ids: Option<Vec<i64>>,
) -> Result<Vec<RevisionRecord>, Self::Error> {
) -> Result<Vec<SyncRecord>, Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let records = DeltaRevisionSql::read(&self.user_id, object_id, rev_ids, &*conn)?;
Ok(records)
@ -47,7 +47,7 @@ impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteDeltaDocumentRevisionPersi
&self,
object_id: &str,
range: &RevisionRange,
) -> Result<Vec<RevisionRecord>, Self::Error> {
) -> Result<Vec<SyncRecord>, Self::Error> {
let conn = &*self.pool.get().map_err(internal_error)?;
let revisions = DeltaRevisionSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?;
Ok(revisions)
@ -74,7 +74,7 @@ impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteDeltaDocumentRevisionPersi
&self,
object_id: &str,
deleted_rev_ids: Option<Vec<i64>>,
inserted_records: Vec<RevisionRecord>,
inserted_records: Vec<SyncRecord>,
) -> Result<(), Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
conn.immediate_transaction::<_, FlowyError, _>(|| {
@ -97,7 +97,7 @@ impl SQLiteDeltaDocumentRevisionPersistence {
pub struct DeltaRevisionSql {}
impl DeltaRevisionSql {
fn create(revision_records: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
fn create(revision_records: Vec<SyncRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
// Batch insert: https://diesel.rs/guides/all-about-inserts.html
let records = revision_records
@ -143,7 +143,7 @@ impl DeltaRevisionSql {
object_id: &str,
rev_ids: Option<Vec<i64>>,
conn: &SqliteConnection,
) -> Result<Vec<RevisionRecord>, FlowyError> {
) -> Result<Vec<SyncRecord>, FlowyError> {
let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(object_id)).into_boxed();
if let Some(rev_ids) = rev_ids {
sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
@ -162,7 +162,7 @@ impl DeltaRevisionSql {
object_id: &str,
range: RevisionRange,
conn: &SqliteConnection,
) -> Result<Vec<RevisionRecord>, FlowyError> {
) -> Result<Vec<SyncRecord>, FlowyError> {
let rev_tables = dsl::rev_table
.filter(dsl::rev_id.ge(range.start))
.filter(dsl::rev_id.le(range.end))
@ -244,7 +244,7 @@ impl std::default::Default for TextRevisionState {
}
}
fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> RevisionRecord {
fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> SyncRecord {
let md5 = md5(&table.data);
let revision = Revision::new(
&table.doc_id,
@ -254,7 +254,7 @@ fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> Revisio
user_id,
md5,
);
RevisionRecord {
SyncRecord {
revision,
state: table.state.into(),
write_to_disk: false,

View File

@ -7,7 +7,7 @@ use flowy_database::{
ConnectionPool,
};
use flowy_error::{internal_error, FlowyError, FlowyResult};
use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionRecord, RevisionState};
use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionState, SyncRecord};
use flowy_sync::{
entities::revision::{Revision, RevisionRange},
util::md5,
@ -22,7 +22,7 @@ pub struct SQLiteDocumentRevisionPersistence {
impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteDocumentRevisionPersistence {
type Error = FlowyError;
fn create_revision_records(&self, revision_records: Vec<RevisionRecord>) -> Result<(), Self::Error> {
fn create_revision_records(&self, revision_records: Vec<SyncRecord>) -> Result<(), Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let _ = DocumentRevisionSql::create(revision_records, &*conn)?;
Ok(())
@ -36,7 +36,7 @@ impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteDocumentRevisionPersistenc
&self,
object_id: &str,
rev_ids: Option<Vec<i64>>,
) -> Result<Vec<RevisionRecord>, Self::Error> {
) -> Result<Vec<SyncRecord>, Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let records = DocumentRevisionSql::read(&self.user_id, object_id, rev_ids, &*conn)?;
Ok(records)
@ -46,7 +46,7 @@ impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteDocumentRevisionPersistenc
&self,
object_id: &str,
range: &RevisionRange,
) -> Result<Vec<RevisionRecord>, Self::Error> {
) -> Result<Vec<SyncRecord>, Self::Error> {
let conn = &*self.pool.get().map_err(internal_error)?;
let revisions = DocumentRevisionSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?;
Ok(revisions)
@ -73,7 +73,7 @@ impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteDocumentRevisionPersistenc
&self,
object_id: &str,
deleted_rev_ids: Option<Vec<i64>>,
inserted_records: Vec<RevisionRecord>,
inserted_records: Vec<SyncRecord>,
) -> Result<(), Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
conn.immediate_transaction::<_, FlowyError, _>(|| {
@ -96,7 +96,7 @@ impl SQLiteDocumentRevisionPersistence {
struct DocumentRevisionSql {}
impl DocumentRevisionSql {
fn create(revision_records: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
fn create(revision_records: Vec<SyncRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
// Batch insert: https://diesel.rs/guides/all-about-inserts.html
let records = revision_records
.into_iter()
@ -142,7 +142,7 @@ impl DocumentRevisionSql {
object_id: &str,
rev_ids: Option<Vec<i64>>,
conn: &SqliteConnection,
) -> Result<Vec<RevisionRecord>, FlowyError> {
) -> Result<Vec<SyncRecord>, FlowyError> {
let mut sql = dsl::document_rev_table
.filter(dsl::document_id.eq(object_id))
.into_boxed();
@ -163,7 +163,7 @@ impl DocumentRevisionSql {
object_id: &str,
range: RevisionRange,
conn: &SqliteConnection,
) -> Result<Vec<RevisionRecord>, FlowyError> {
) -> Result<Vec<SyncRecord>, FlowyError> {
let rev_tables = dsl::document_rev_table
.filter(dsl::rev_id.ge(range.start))
.filter(dsl::rev_id.le(range.end))
@ -220,7 +220,7 @@ impl std::default::Default for DocumentRevisionState {
}
}
fn mk_revision_record_from_table(user_id: &str, table: DocumentRevisionTable) -> RevisionRecord {
fn mk_revision_record_from_table(user_id: &str, table: DocumentRevisionTable) -> SyncRecord {
let md5 = md5(&table.data);
let revision = Revision::new(
&table.document_id,
@ -230,7 +230,7 @@ fn mk_revision_record_from_table(user_id: &str, table: DocumentRevisionTable) ->
user_id,
md5,
);
RevisionRecord {
SyncRecord {
revision,
state: table.state.into(),
write_to_disk: false,

View File

@ -9,11 +9,12 @@ use flowy_error::{FlowyError, FlowyResult};
use flowy_folder_data_model::revision::{AppRevision, FolderRevision, ViewRevision, WorkspaceRevision};
use flowy_revision::reset::{RevisionResettable, RevisionStructReset};
use flowy_sync::client_folder::make_folder_rev_json_str;
use flowy_sync::client_folder::FolderPad;
use flowy_sync::entities::revision::Revision;
use flowy_sync::server_folder::FolderOperationsBuilder;
use flowy_sync::{client_folder::FolderPad, entities::revision::md5};
use crate::services::persistence::rev_sqlite::SQLiteFolderRevisionPersistence;
use flowy_sync::util::md5;
use std::sync::Arc;
const V1_MIGRATION: &str = "FOLDER_V1_MIGRATION";

View File

@ -11,7 +11,7 @@ use crate::{
use flowy_database::ConnectionPool;
use flowy_error::{FlowyError, FlowyResult};
use flowy_folder_data_model::revision::{AppRevision, TrashRevision, ViewRevision, WorkspaceRevision};
use flowy_revision::disk::{RevisionDiskCache, RevisionRecord, RevisionState};
use flowy_revision::disk::{RevisionDiskCache, RevisionState, SyncRecord};
use flowy_sync::{client_folder::FolderPad, entities::revision::Revision};
use crate::services::persistence::rev_sqlite::SQLiteFolderRevisionPersistence;
@ -112,7 +112,7 @@ impl FolderPersistence {
let json = folder.to_json()?;
let delta_data = FolderOperationsBuilder::new().insert(&json).build().json_bytes();
let revision = Revision::initial_revision(user_id, folder_id.as_ref(), delta_data);
let record = RevisionRecord {
let record = SyncRecord {
revision,
state: RevisionState::Sync,
write_to_disk: true,

View File

@ -7,7 +7,7 @@ use flowy_database::{
ConnectionPool,
};
use flowy_error::{internal_error, FlowyError, FlowyResult};
use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionRecord, RevisionState};
use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionState, SyncRecord};
use flowy_sync::{
entities::revision::{RevType, Revision, RevisionRange},
util::md5,
@ -23,7 +23,7 @@ pub struct SQLiteFolderRevisionPersistence {
impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteFolderRevisionPersistence {
type Error = FlowyError;
fn create_revision_records(&self, revision_records: Vec<RevisionRecord>) -> Result<(), Self::Error> {
fn create_revision_records(&self, revision_records: Vec<SyncRecord>) -> Result<(), Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let _ = FolderRevisionSql::create(revision_records, &*conn)?;
Ok(())
@ -37,7 +37,7 @@ impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteFolderRevisionPersistence
&self,
object_id: &str,
rev_ids: Option<Vec<i64>>,
) -> Result<Vec<RevisionRecord>, Self::Error> {
) -> Result<Vec<SyncRecord>, Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let records = FolderRevisionSql::read(&self.user_id, object_id, rev_ids, &*conn)?;
Ok(records)
@ -47,7 +47,7 @@ impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteFolderRevisionPersistence
&self,
object_id: &str,
range: &RevisionRange,
) -> Result<Vec<RevisionRecord>, Self::Error> {
) -> Result<Vec<SyncRecord>, Self::Error> {
let conn = &*self.pool.get().map_err(internal_error)?;
let revisions = FolderRevisionSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?;
Ok(revisions)
@ -74,7 +74,7 @@ impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteFolderRevisionPersistence
&self,
object_id: &str,
deleted_rev_ids: Option<Vec<i64>>,
inserted_records: Vec<RevisionRecord>,
inserted_records: Vec<SyncRecord>,
) -> Result<(), Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
conn.immediate_transaction::<_, FlowyError, _>(|| {
@ -97,7 +97,7 @@ impl SQLiteFolderRevisionPersistence {
struct FolderRevisionSql {}
impl FolderRevisionSql {
fn create(revision_records: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
fn create(revision_records: Vec<SyncRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
// Batch insert: https://diesel.rs/guides/all-about-inserts.html
let records = revision_records
@ -143,7 +143,7 @@ impl FolderRevisionSql {
object_id: &str,
rev_ids: Option<Vec<i64>>,
conn: &SqliteConnection,
) -> Result<Vec<RevisionRecord>, FlowyError> {
) -> Result<Vec<SyncRecord>, FlowyError> {
let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(object_id)).into_boxed();
if let Some(rev_ids) = rev_ids {
sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
@ -162,7 +162,7 @@ impl FolderRevisionSql {
object_id: &str,
range: RevisionRange,
conn: &SqliteConnection,
) -> Result<Vec<RevisionRecord>, FlowyError> {
) -> Result<Vec<SyncRecord>, FlowyError> {
let rev_tables = dsl::rev_table
.filter(dsl::rev_id.ge(range.start))
.filter(dsl::rev_id.le(range.end))
@ -220,7 +220,7 @@ impl std::default::Default for TextRevisionState {
}
}
fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> RevisionRecord {
fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> SyncRecord {
let md5 = md5(&table.data);
let revision = Revision::new(
&table.doc_id,
@ -230,7 +230,7 @@ fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> Revisio
user_id,
md5,
);
RevisionRecord {
SyncRecord {
revision,
state: table.state.into(),
write_to_disk: false,

View File

@ -78,12 +78,12 @@ struct FolderConflictResolver {
}
impl ConflictResolver<FolderResolveOperations> for FolderConflictResolver {
fn compose_operations(&self, operations: FolderResolveOperations) -> BoxResultFuture<OperationsMD5, FlowyError> {
fn compose_operations(&self, operations: FolderResolveOperations) -> BoxResultFuture<RevisionMD5, FlowyError> {
let operations = operations.into_inner();
let folder_pad = self.folder_pad.clone();
Box::pin(async move {
let md5 = folder_pad.write().compose_remote_operations(operations)?;
Ok(md5)
Ok(md5.into())
})
}
@ -113,11 +113,11 @@ impl ConflictResolver<FolderResolveOperations> for FolderConflictResolver {
})
}
fn reset_operations(&self, operations: FolderResolveOperations) -> BoxResultFuture<OperationsMD5, FlowyError> {
fn reset_operations(&self, operations: FolderResolveOperations) -> BoxResultFuture<RevisionMD5, FlowyError> {
let folder_pad = self.folder_pad.clone();
Box::pin(async move {
let md5 = folder_pad.write().reset_folder(operations.into_inner())?;
Ok(md5)
Ok(md5.into())
})
}
}

View File

@ -15,7 +15,7 @@ use flowy_error::{FlowyError, FlowyResult};
use flowy_grid_data_model::revision::{BuildGridContext, GridRevision, GridViewRevision};
use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence};
use flowy_sync::client_grid::{make_grid_block_operations, make_grid_operations, make_grid_view_operations};
use flowy_sync::entities::revision::{RepeatedRevision, Revision};
use flowy_sync::entities::revision::Revision;
use std::sync::Arc;
use tokio::sync::RwLock;
@ -67,7 +67,7 @@ impl GridManager {
}
#[tracing::instrument(level = "debug", skip_all, err)]
pub async fn create_grid<T: AsRef<str>>(&self, grid_id: T, revisions: RepeatedRevision) -> FlowyResult<()> {
pub async fn create_grid<T: AsRef<str>>(&self, grid_id: T, revisions: Vec<Revision>) -> FlowyResult<()> {
let grid_id = grid_id.as_ref();
let db_pool = self.grid_user.db_pool()?;
let rev_manager = self.make_grid_rev_manager(grid_id, db_pool)?;
@ -77,7 +77,7 @@ impl GridManager {
}
#[tracing::instrument(level = "debug", skip_all, err)]
async fn create_grid_view<T: AsRef<str>>(&self, view_id: T, revisions: RepeatedRevision) -> FlowyResult<()> {
async fn create_grid_view<T: AsRef<str>>(&self, view_id: T, revisions: Vec<Revision>) -> FlowyResult<()> {
let view_id = view_id.as_ref();
let rev_manager = make_grid_view_rev_manager(&self.grid_user, view_id).await?;
let _ = rev_manager.reset_object(revisions).await?;
@ -85,7 +85,7 @@ impl GridManager {
}
#[tracing::instrument(level = "debug", skip_all, err)]
pub async fn create_grid_block<T: AsRef<str>>(&self, block_id: T, revisions: RepeatedRevision) -> FlowyResult<()> {
pub async fn create_grid_block<T: AsRef<str>>(&self, block_id: T, revisions: Vec<Revision>) -> FlowyResult<()> {
let block_id = block_id.as_ref();
let db_pool = self.grid_user.db_pool()?;
let rev_manager = self.make_grid_block_rev_manager(block_id, db_pool)?;
@ -208,9 +208,8 @@ pub async fn make_grid_view_data(
// Create grid's block
let grid_block_delta = make_grid_block_operations(block_meta_data);
let block_delta_data = grid_block_delta.json_bytes();
let repeated_revision: RepeatedRevision =
Revision::initial_revision(user_id, block_id, block_delta_data).into();
let _ = grid_manager.create_grid_block(&block_id, repeated_revision).await?;
let revision = Revision::initial_revision(user_id, block_id, block_delta_data);
let _ = grid_manager.create_grid_block(&block_id, vec![revision]).await?;
}
// Will replace the grid_id with the value returned by the gen_grid_id()
@ -220,9 +219,8 @@ pub async fn make_grid_view_data(
// Create grid
let grid_rev_delta = make_grid_operations(&grid_rev);
let grid_rev_delta_bytes = grid_rev_delta.json_bytes();
let repeated_revision: RepeatedRevision =
Revision::initial_revision(user_id, &grid_id, grid_rev_delta_bytes.clone()).into();
let _ = grid_manager.create_grid(&grid_id, repeated_revision).await?;
let revision = Revision::initial_revision(user_id, &grid_id, grid_rev_delta_bytes.clone());
let _ = grid_manager.create_grid(&grid_id, vec![revision]).await?;
// Create grid view
let grid_view = if grid_view_revision_data.is_empty() {
@ -232,9 +230,8 @@ pub async fn make_grid_view_data(
};
let grid_view_delta = make_grid_view_operations(&grid_view);
let grid_view_delta_bytes = grid_view_delta.json_bytes();
let repeated_revision: RepeatedRevision =
Revision::initial_revision(user_id, view_id, grid_view_delta_bytes).into();
let _ = grid_manager.create_grid_view(view_id, repeated_revision).await?;
let revision = Revision::initial_revision(user_id, view_id, grid_view_delta_bytes);
let _ = grid_manager.create_grid_view(view_id, vec![revision]).await?;
Ok(grid_rev_delta_bytes)
}

View File

@ -7,7 +7,7 @@ use flowy_database::{
ConnectionPool,
};
use flowy_error::{internal_error, FlowyError, FlowyResult};
use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionRecord, RevisionState};
use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionState, SyncRecord};
use flowy_sync::{
entities::revision::{Revision, RevisionRange},
util::md5,
@ -22,7 +22,7 @@ pub struct SQLiteGridBlockRevisionPersistence {
impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteGridBlockRevisionPersistence {
type Error = FlowyError;
fn create_revision_records(&self, revision_records: Vec<RevisionRecord>) -> Result<(), Self::Error> {
fn create_revision_records(&self, revision_records: Vec<SyncRecord>) -> Result<(), Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let _ = GridMetaRevisionSql::create(revision_records, &*conn)?;
Ok(())
@ -36,7 +36,7 @@ impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteGridBlockRevisionPersisten
&self,
object_id: &str,
rev_ids: Option<Vec<i64>>,
) -> Result<Vec<RevisionRecord>, Self::Error> {
) -> Result<Vec<SyncRecord>, Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let records = GridMetaRevisionSql::read(&self.user_id, object_id, rev_ids, &*conn)?;
Ok(records)
@ -46,7 +46,7 @@ impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteGridBlockRevisionPersisten
&self,
object_id: &str,
range: &RevisionRange,
) -> Result<Vec<RevisionRecord>, Self::Error> {
) -> Result<Vec<SyncRecord>, Self::Error> {
let conn = &*self.pool.get().map_err(internal_error)?;
let revisions = GridMetaRevisionSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?;
Ok(revisions)
@ -73,7 +73,7 @@ impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteGridBlockRevisionPersisten
&self,
object_id: &str,
deleted_rev_ids: Option<Vec<i64>>,
inserted_records: Vec<RevisionRecord>,
inserted_records: Vec<SyncRecord>,
) -> Result<(), Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
conn.immediate_transaction::<_, FlowyError, _>(|| {
@ -95,7 +95,7 @@ impl SQLiteGridBlockRevisionPersistence {
struct GridMetaRevisionSql();
impl GridMetaRevisionSql {
fn create(revision_records: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
fn create(revision_records: Vec<SyncRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
// Batch insert: https://diesel.rs/guides/all-about-inserts.html
let records = revision_records
@ -142,7 +142,7 @@ impl GridMetaRevisionSql {
object_id: &str,
rev_ids: Option<Vec<i64>>,
conn: &SqliteConnection,
) -> Result<Vec<RevisionRecord>, FlowyError> {
) -> Result<Vec<SyncRecord>, FlowyError> {
let mut sql = dsl::grid_meta_rev_table
.filter(dsl::object_id.eq(object_id))
.into_boxed();
@ -163,7 +163,7 @@ impl GridMetaRevisionSql {
object_id: &str,
range: RevisionRange,
conn: &SqliteConnection,
) -> Result<Vec<RevisionRecord>, FlowyError> {
) -> Result<Vec<SyncRecord>, FlowyError> {
let rev_tables = dsl::grid_meta_rev_table
.filter(dsl::rev_id.ge(range.start))
.filter(dsl::rev_id.le(range.end))
@ -219,7 +219,7 @@ impl std::default::Default for GridBlockRevisionState {
}
}
fn mk_revision_record_from_table(user_id: &str, table: GridBlockRevisionTable) -> RevisionRecord {
fn mk_revision_record_from_table(user_id: &str, table: GridBlockRevisionTable) -> SyncRecord {
let md5 = md5(&table.data);
let revision = Revision::new(
&table.object_id,
@ -229,7 +229,7 @@ fn mk_revision_record_from_table(user_id: &str, table: GridBlockRevisionTable) -
user_id,
md5,
);
RevisionRecord {
SyncRecord {
revision,
state: table.state.into(),
write_to_disk: false,

View File

@ -7,7 +7,7 @@ use flowy_database::{
ConnectionPool,
};
use flowy_error::{internal_error, FlowyError, FlowyResult};
use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionRecord, RevisionState};
use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionState, SyncRecord};
use flowy_sync::{
entities::revision::{Revision, RevisionRange},
util::md5,
@ -22,7 +22,7 @@ pub struct SQLiteGridRevisionPersistence {
impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteGridRevisionPersistence {
type Error = FlowyError;
fn create_revision_records(&self, revision_records: Vec<RevisionRecord>) -> Result<(), Self::Error> {
fn create_revision_records(&self, revision_records: Vec<SyncRecord>) -> Result<(), Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let _ = GridRevisionSql::create(revision_records, &*conn)?;
Ok(())
@ -36,7 +36,7 @@ impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteGridRevisionPersistence {
&self,
object_id: &str,
rev_ids: Option<Vec<i64>>,
) -> Result<Vec<RevisionRecord>, Self::Error> {
) -> Result<Vec<SyncRecord>, Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let records = GridRevisionSql::read(&self.user_id, object_id, rev_ids, &*conn)?;
Ok(records)
@ -46,7 +46,7 @@ impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteGridRevisionPersistence {
&self,
object_id: &str,
range: &RevisionRange,
) -> Result<Vec<RevisionRecord>, Self::Error> {
) -> Result<Vec<SyncRecord>, Self::Error> {
let conn = &*self.pool.get().map_err(internal_error)?;
let revisions = GridRevisionSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?;
Ok(revisions)
@ -73,7 +73,7 @@ impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteGridRevisionPersistence {
&self,
object_id: &str,
deleted_rev_ids: Option<Vec<i64>>,
inserted_records: Vec<RevisionRecord>,
inserted_records: Vec<SyncRecord>,
) -> Result<(), Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
conn.immediate_transaction::<_, FlowyError, _>(|| {
@ -95,7 +95,7 @@ impl SQLiteGridRevisionPersistence {
struct GridRevisionSql();
impl GridRevisionSql {
fn create(revision_records: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
fn create(revision_records: Vec<SyncRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
// Batch insert: https://diesel.rs/guides/all-about-inserts.html
let records = revision_records
.into_iter()
@ -141,7 +141,7 @@ impl GridRevisionSql {
object_id: &str,
rev_ids: Option<Vec<i64>>,
conn: &SqliteConnection,
) -> Result<Vec<RevisionRecord>, FlowyError> {
) -> Result<Vec<SyncRecord>, FlowyError> {
let mut sql = dsl::grid_rev_table.filter(dsl::object_id.eq(object_id)).into_boxed();
if let Some(rev_ids) = rev_ids {
sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
@ -160,7 +160,7 @@ impl GridRevisionSql {
object_id: &str,
range: RevisionRange,
conn: &SqliteConnection,
) -> Result<Vec<RevisionRecord>, FlowyError> {
) -> Result<Vec<SyncRecord>, FlowyError> {
let rev_tables = dsl::grid_rev_table
.filter(dsl::rev_id.ge(range.start))
.filter(dsl::rev_id.le(range.end))
@ -217,7 +217,7 @@ impl std::default::Default for GridRevisionState {
}
}
fn mk_revision_record_from_table(user_id: &str, table: GridRevisionTable) -> RevisionRecord {
fn mk_revision_record_from_table(user_id: &str, table: GridRevisionTable) -> SyncRecord {
let md5 = md5(&table.data);
let revision = Revision::new(
&table.object_id,
@ -227,7 +227,7 @@ fn mk_revision_record_from_table(user_id: &str, table: GridRevisionTable) -> Rev
user_id,
md5,
);
RevisionRecord {
SyncRecord {
revision,
state: table.state.into(),
write_to_disk: false,

View File

@ -7,7 +7,7 @@ use flowy_database::{
ConnectionPool,
};
use flowy_error::{internal_error, FlowyError, FlowyResult};
use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionRecord, RevisionState};
use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, RevisionState, SyncRecord};
use flowy_sync::{
entities::revision::{Revision, RevisionRange},
util::md5,
@ -31,7 +31,7 @@ impl SQLiteGridViewRevisionPersistence {
impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteGridViewRevisionPersistence {
type Error = FlowyError;
fn create_revision_records(&self, revision_records: Vec<RevisionRecord>) -> Result<(), Self::Error> {
fn create_revision_records(&self, revision_records: Vec<SyncRecord>) -> Result<(), Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let _ = GridViewRevisionSql::create(revision_records, &*conn)?;
Ok(())
@ -45,7 +45,7 @@ impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteGridViewRevisionPersistenc
&self,
object_id: &str,
rev_ids: Option<Vec<i64>>,
) -> Result<Vec<RevisionRecord>, Self::Error> {
) -> Result<Vec<SyncRecord>, Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let records = GridViewRevisionSql::read(&self.user_id, object_id, rev_ids, &*conn)?;
Ok(records)
@ -55,7 +55,7 @@ impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteGridViewRevisionPersistenc
&self,
object_id: &str,
range: &RevisionRange,
) -> Result<Vec<RevisionRecord>, Self::Error> {
) -> Result<Vec<SyncRecord>, Self::Error> {
let conn = &*self.pool.get().map_err(internal_error)?;
let revisions = GridViewRevisionSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?;
Ok(revisions)
@ -82,7 +82,7 @@ impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteGridViewRevisionPersistenc
&self,
object_id: &str,
deleted_rev_ids: Option<Vec<i64>>,
inserted_records: Vec<RevisionRecord>,
inserted_records: Vec<SyncRecord>,
) -> Result<(), Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
conn.immediate_transaction::<_, FlowyError, _>(|| {
@ -95,7 +95,7 @@ impl RevisionDiskCache<Arc<ConnectionPool>> for SQLiteGridViewRevisionPersistenc
struct GridViewRevisionSql();
impl GridViewRevisionSql {
fn create(revision_records: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
fn create(revision_records: Vec<SyncRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
// Batch insert: https://diesel.rs/guides/all-about-inserts.html
let records = revision_records
.into_iter()
@ -141,7 +141,7 @@ impl GridViewRevisionSql {
object_id: &str,
rev_ids: Option<Vec<i64>>,
conn: &SqliteConnection,
) -> Result<Vec<RevisionRecord>, FlowyError> {
) -> Result<Vec<SyncRecord>, FlowyError> {
let mut sql = dsl::grid_view_rev_table
.filter(dsl::object_id.eq(object_id))
.into_boxed();
@ -162,7 +162,7 @@ impl GridViewRevisionSql {
object_id: &str,
range: RevisionRange,
conn: &SqliteConnection,
) -> Result<Vec<RevisionRecord>, FlowyError> {
) -> Result<Vec<SyncRecord>, FlowyError> {
let rev_tables = dsl::grid_view_rev_table
.filter(dsl::rev_id.ge(range.start))
.filter(dsl::rev_id.le(range.end))
@ -219,7 +219,7 @@ impl std::default::Default for GridViewRevisionState {
}
}
fn mk_revision_record_from_table(user_id: &str, table: GridViewRevisionTable) -> RevisionRecord {
fn mk_revision_record_from_table(user_id: &str, table: GridViewRevisionTable) -> SyncRecord {
let md5 = md5(&table.data);
let revision = Revision::new(
&table.object_id,
@ -229,7 +229,7 @@ fn mk_revision_record_from_table(user_id: &str, table: GridViewRevisionTable) ->
user_id,
md5,
);
RevisionRecord {
SyncRecord {
revision,
state: table.state.into(),
write_to_disk: false,

View File

@ -5,23 +5,20 @@ use std::sync::Arc;
pub trait RevisionDiskCache<Connection>: Sync + Send {
type Error: Debug;
fn create_revision_records(&self, revision_records: Vec<RevisionRecord>) -> Result<(), Self::Error>;
fn create_revision_records(&self, revision_records: Vec<SyncRecord>) -> Result<(), Self::Error>;
fn get_connection(&self) -> Result<Connection, Self::Error>;
// Read all the records if the rev_ids is None
fn read_revision_records(
&self,
object_id: &str,
rev_ids: Option<Vec<i64>>,
) -> Result<Vec<RevisionRecord>, Self::Error>;
fn read_revision_records(&self, object_id: &str, rev_ids: Option<Vec<i64>>)
-> Result<Vec<SyncRecord>, Self::Error>;
// Read the revision which rev_id >= range.start && rev_id <= range.end
fn read_revision_records_with_range(
&self,
object_id: &str,
range: &RevisionRange,
) -> Result<Vec<RevisionRecord>, Self::Error>;
) -> Result<Vec<SyncRecord>, Self::Error>;
fn update_revision_record(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()>;
@ -34,7 +31,7 @@ pub trait RevisionDiskCache<Connection>: Sync + Send {
&self,
object_id: &str,
deleted_rev_ids: Option<Vec<i64>>,
inserted_records: Vec<RevisionRecord>,
inserted_records: Vec<SyncRecord>,
) -> Result<(), Self::Error>;
}
@ -44,7 +41,7 @@ where
{
type Error = FlowyError;
fn create_revision_records(&self, revision_records: Vec<RevisionRecord>) -> Result<(), Self::Error> {
fn create_revision_records(&self, revision_records: Vec<SyncRecord>) -> Result<(), Self::Error> {
(**self).create_revision_records(revision_records)
}
@ -56,7 +53,7 @@ where
&self,
object_id: &str,
rev_ids: Option<Vec<i64>>,
) -> Result<Vec<RevisionRecord>, Self::Error> {
) -> Result<Vec<SyncRecord>, Self::Error> {
(**self).read_revision_records(object_id, rev_ids)
}
@ -64,7 +61,7 @@ where
&self,
object_id: &str,
range: &RevisionRange,
) -> Result<Vec<RevisionRecord>, Self::Error> {
) -> Result<Vec<SyncRecord>, Self::Error> {
(**self).read_revision_records_with_range(object_id, range)
}
@ -80,20 +77,20 @@ where
&self,
object_id: &str,
deleted_rev_ids: Option<Vec<i64>>,
inserted_records: Vec<RevisionRecord>,
inserted_records: Vec<SyncRecord>,
) -> Result<(), Self::Error> {
(**self).delete_and_insert_records(object_id, deleted_rev_ids, inserted_records)
}
}
#[derive(Clone, Debug)]
pub struct RevisionRecord {
pub struct SyncRecord {
pub revision: Revision,
pub state: RevisionState,
pub write_to_disk: bool,
}
impl RevisionRecord {
impl SyncRecord {
pub fn new(revision: Revision) -> Self {
Self {
revision,

View File

@ -1,4 +1,4 @@
use crate::disk::RevisionRecord;
use crate::disk::SyncRecord;
use crate::REVISION_WRITE_INTERVAL_IN_MILLIS;
use dashmap::DashMap;
use flowy_error::{FlowyError, FlowyResult};
@ -7,15 +7,15 @@ use std::{borrow::Cow, sync::Arc, time::Duration};
use tokio::{sync::RwLock, task::JoinHandle};
pub(crate) trait RevisionMemoryCacheDelegate: Send + Sync {
fn checkpoint_tick(&self, records: Vec<RevisionRecord>) -> FlowyResult<()>;
fn send_sync(&self, records: Vec<SyncRecord>) -> FlowyResult<()>;
fn receive_ack(&self, object_id: &str, rev_id: i64);
}
pub(crate) struct RevisionMemoryCache {
object_id: String,
revs_map: Arc<DashMap<i64, RevisionRecord>>,
revs_map: Arc<DashMap<i64, SyncRecord>>,
delegate: Arc<dyn RevisionMemoryCacheDelegate>,
pending_write_revs: Arc<RwLock<Vec<i64>>>,
defer_write_revs: Arc<RwLock<Vec<i64>>>,
defer_save: RwLock<Option<JoinHandle<()>>>,
}
@ -25,7 +25,7 @@ impl RevisionMemoryCache {
object_id: object_id.to_owned(),
revs_map: Arc::new(DashMap::new()),
delegate,
pending_write_revs: Arc::new(RwLock::new(vec![])),
defer_write_revs: Arc::new(RwLock::new(vec![])),
defer_save: RwLock::new(None),
}
}
@ -34,7 +34,7 @@ impl RevisionMemoryCache {
self.revs_map.contains_key(rev_id)
}
pub(crate) async fn add<'a>(&'a self, record: Cow<'a, RevisionRecord>) {
pub(crate) async fn add<'a>(&'a self, record: Cow<'a, SyncRecord>) {
let record = match record {
Cow::Borrowed(record) => record.clone(),
Cow::Owned(record) => record,
@ -43,11 +43,11 @@ impl RevisionMemoryCache {
let rev_id = record.revision.rev_id;
self.revs_map.insert(rev_id, record);
let mut write_guard = self.pending_write_revs.write().await;
let mut write_guard = self.defer_write_revs.write().await;
if !write_guard.contains(&rev_id) {
write_guard.push(rev_id);
drop(write_guard);
self.make_checkpoint().await;
self.tick_checkpoint().await;
}
}
@ -57,8 +57,8 @@ impl RevisionMemoryCache {
Some(mut record) => record.ack(),
}
if self.pending_write_revs.read().await.contains(rev_id) {
self.make_checkpoint().await;
if self.defer_write_revs.read().await.contains(rev_id) {
self.tick_checkpoint().await;
} else {
// The revision must be saved on disk if the pending_write_revs
// doesn't contains the rev_id.
@ -66,7 +66,7 @@ impl RevisionMemoryCache {
}
}
pub(crate) async fn get(&self, rev_id: &i64) -> Option<RevisionRecord> {
pub(crate) async fn get(&self, rev_id: &i64) -> Option<SyncRecord> {
self.revs_map.get(rev_id).map(|r| r.value().clone())
}
@ -80,21 +80,21 @@ impl RevisionMemoryCache {
}
}
pub(crate) async fn get_with_range(&self, range: &RevisionRange) -> Result<Vec<RevisionRecord>, FlowyError> {
pub(crate) async fn get_with_range(&self, range: &RevisionRange) -> Result<Vec<SyncRecord>, FlowyError> {
let revs = range
.iter()
.flat_map(|rev_id| self.revs_map.get(&rev_id).map(|record| record.clone()))
.collect::<Vec<RevisionRecord>>();
.collect::<Vec<SyncRecord>>();
Ok(revs)
}
pub(crate) async fn reset_with_revisions(&self, revision_records: Vec<RevisionRecord>) {
pub(crate) async fn reset_with_revisions(&self, revision_records: Vec<SyncRecord>) {
self.revs_map.clear();
if let Some(handler) = self.defer_save.write().await.take() {
handler.abort();
}
let mut write_guard = self.pending_write_revs.write().await;
let mut write_guard = self.defer_write_revs.write().await;
write_guard.clear();
for record in revision_records {
write_guard.push(record.revision.rev_id);
@ -102,21 +102,21 @@ impl RevisionMemoryCache {
}
drop(write_guard);
self.make_checkpoint().await;
self.tick_checkpoint().await;
}
async fn make_checkpoint(&self) {
async fn tick_checkpoint(&self) {
// https://github.com/async-graphql/async-graphql/blob/ed8449beec3d9c54b94da39bab33cec809903953/src/dataloader/mod.rs#L362
if let Some(handler) = self.defer_save.write().await.take() {
handler.abort();
}
if self.pending_write_revs.read().await.is_empty() {
if self.defer_write_revs.read().await.is_empty() {
return;
}
let rev_map = self.revs_map.clone();
let pending_write_revs = self.pending_write_revs.clone();
let pending_write_revs = self.defer_write_revs.clone();
let delegate = self.delegate.clone();
*self.defer_save.write().await = Some(tokio::spawn(async move {
@ -128,7 +128,7 @@ impl RevisionMemoryCache {
//
// Use saturating_sub and split_off ?
// https://stackoverflow.com/questions/28952411/what-is-the-idiomatic-way-to-pop-the-last-n-elements-in-a-mutable-vec
let mut save_records: Vec<RevisionRecord> = vec![];
let mut save_records: Vec<SyncRecord> = vec![];
revs_write_guard.iter().for_each(|rev_id| match rev_map.get(rev_id) {
None => {}
Some(value) => {
@ -136,7 +136,7 @@ impl RevisionMemoryCache {
}
});
if delegate.checkpoint_tick(save_records).is_ok() {
if delegate.send_sync(save_records).is_ok() {
revs_write_guard.clear();
drop(revs_write_guard);
}

View File

@ -1,4 +1,4 @@
use crate::disk::{RevisionDiskCache, RevisionRecord};
use crate::disk::{RevisionDiskCache, SyncRecord};
use crate::{RevisionLoader, RevisionPersistence};
use bytes::Bytes;
use flowy_error::{FlowyError, FlowyResult};
@ -76,7 +76,7 @@ where
let bytes = self.target.reset_data(revisions)?;
let revision = Revision::initial_revision(&self.user_id, self.target.target_id(), bytes);
let record = RevisionRecord::new(revision);
let record = SyncRecord::new(revision);
tracing::trace!("Reset {} revision record object", self.target.target_id());
let _ = self

View File

@ -1,4 +1,4 @@
use crate::RevisionManager;
use crate::{RevisionMD5, RevisionManager};
use bytes::Bytes;
use flowy_error::{FlowyError, FlowyResult};
use flowy_sync::entities::{
@ -8,8 +8,6 @@ use flowy_sync::entities::{
use lib_infra::future::BoxResultFuture;
use std::{convert::TryFrom, sync::Arc};
pub type OperationsMD5 = String;
pub struct TransformOperations<Operations> {
pub client_operations: Operations,
pub server_operations: Option<Operations>,
@ -28,12 +26,12 @@ pub trait ConflictResolver<Operations>
where
Operations: Send + Sync,
{
fn compose_operations(&self, operations: Operations) -> BoxResultFuture<OperationsMD5, FlowyError>;
fn compose_operations(&self, operations: Operations) -> BoxResultFuture<RevisionMD5, FlowyError>;
fn transform_operations(
&self,
operations: Operations,
) -> BoxResultFuture<TransformOperations<Operations>, FlowyError>;
fn reset_operations(&self, operations: Operations) -> BoxResultFuture<OperationsMD5, FlowyError>;
fn reset_operations(&self, operations: Operations) -> BoxResultFuture<RevisionMD5, FlowyError>;
}
pub trait ConflictRevisionSink: Send + Sync + 'static {
@ -129,9 +127,8 @@ where
// The server_prime is None means the client local revisions conflict with the
// // server, and it needs to override the client delta.
let md5 = self.resolver.reset_operations(client_operations).await?;
let repeated_revision = RepeatedRevision::new(revisions);
assert_eq!(repeated_revision.last().unwrap().md5, md5);
let _ = self.rev_manager.reset_object(repeated_revision).await?;
debug_assert!(md5.is_equal(&revisions.last().unwrap().md5));
let _ = self.rev_manager.reset_object(revisions).await?;
Ok(None)
}
Some(server_operations) => {
@ -158,7 +155,7 @@ fn make_client_and_server_revision<Operations, Connection>(
rev_manager: &Arc<RevisionManager<Connection>>,
client_operations: Operations,
server_operations: Option<Operations>,
md5: String,
md5: RevisionMD5,
) -> (Revision, Option<Revision>)
where
Operations: OperationsSerializer,

View File

@ -3,8 +3,8 @@ use crate::{RevisionPersistence, RevisionSnapshotDiskCache, RevisionSnapshotMana
use bytes::Bytes;
use flowy_error::{FlowyError, FlowyResult};
use flowy_sync::{
entities::revision::{RepeatedRevision, Revision, RevisionRange},
util::{pair_rev_id_from_revisions, RevIdCounter},
entities::revision::{Revision, RevisionRange},
util::{md5, pair_rev_id_from_revisions, RevIdCounter},
};
use lib_infra::future::FutureResult;
use std::sync::Arc;
@ -143,9 +143,9 @@ impl<Connection: 'static> RevisionManager<Connection> {
}
#[tracing::instrument(level = "debug", skip(self, revisions), err)]
pub async fn reset_object(&self, revisions: RepeatedRevision) -> FlowyResult<()> {
pub async fn reset_object(&self, revisions: Vec<Revision>) -> FlowyResult<()> {
let rev_id = pair_rev_id_from_revisions(&revisions).1;
let _ = self.rev_persistence.reset(revisions.into_inner()).await?;
let _ = self.rev_persistence.reset(revisions).await?;
self.rev_id_counter.set(rev_id);
Ok(())
}
@ -191,7 +191,7 @@ impl<Connection: 'static> RevisionManager<Connection> {
pub fn next_rev_id_pair(&self) -> (i64, i64) {
let cur = self.rev_id_counter.value();
let next = self.rev_id_counter.next();
let next = self.rev_id_counter.next_id();
(cur, next)
}
@ -283,3 +283,56 @@ impl<Connection: 'static> RevisionLoader<Connection> {
Ok(revisions)
}
}
/// Represents as the md5 of the revision object after applying the
/// revision. For example, RevisionMD5 will be the md5 of the document
/// content.
#[derive(Debug, Clone)]
pub struct RevisionMD5(String);
impl RevisionMD5 {
pub fn from_bytes<T: AsRef<[u8]>>(bytes: T) -> Result<Self, FlowyError> {
Ok(RevisionMD5(md5(bytes)))
}
pub fn into_inner(self) -> String {
self.0
}
pub fn is_equal(&self, s: &str) -> bool {
self.0 == s
}
}
impl std::convert::From<RevisionMD5> for String {
fn from(md5: RevisionMD5) -> Self {
md5.0
}
}
impl std::convert::From<&str> for RevisionMD5 {
fn from(s: &str) -> Self {
Self(s.to_owned())
}
}
impl std::convert::From<String> for RevisionMD5 {
fn from(s: String) -> Self {
Self(s)
}
}
impl std::ops::Deref for RevisionMD5 {
type Target = String;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl PartialEq<Self> for RevisionMD5 {
fn eq(&self, other: &Self) -> bool {
self.0 == other.0
}
}
impl std::cmp::Eq for RevisionMD5 {}

View File

@ -2,10 +2,9 @@ use crate::cache::{
disk::{RevisionChangeset, RevisionDiskCache},
memory::RevisionMemoryCacheDelegate,
};
use crate::disk::{RevisionRecord, RevisionState};
use crate::disk::{RevisionState, SyncRecord};
use crate::memory::RevisionMemoryCache;
use crate::RevisionCompress;
use flowy_error::{internal_error, FlowyError, FlowyResult};
use flowy_sync::entities::revision::{Revision, RevisionRange};
use std::collections::VecDeque;
@ -20,10 +19,13 @@ pub struct RevisionPersistence<Connection> {
object_id: String,
disk_cache: Arc<dyn RevisionDiskCache<Connection, Error = FlowyError>>,
memory_cache: Arc<RevisionMemoryCache>,
sync_seq: RwLock<RevisionSyncSequence>,
sync_seq: RwLock<DeferSyncSequence>,
}
impl<Connection: 'static> RevisionPersistence<Connection> {
impl<Connection> RevisionPersistence<Connection>
where
Connection: 'static,
{
pub fn new<C>(user_id: &str, object_id: &str, disk_cache: C) -> RevisionPersistence<Connection>
where
C: 'static + RevisionDiskCache<Connection, Error = FlowyError>,
@ -39,7 +41,7 @@ impl<Connection: 'static> RevisionPersistence<Connection> {
) -> RevisionPersistence<Connection> {
let object_id = object_id.to_owned();
let user_id = user_id.to_owned();
let sync_seq = RwLock::new(RevisionSyncSequence::new());
let sync_seq = RwLock::new(DeferSyncSequence::new());
let memory_cache = Arc::new(RevisionMemoryCache::new(&object_id, Arc::new(disk_cache.clone())));
Self {
user_id,
@ -131,7 +133,7 @@ impl<Connection: 'static> RevisionPersistence<Connection> {
pub(crate) async fn reset(&self, revisions: Vec<Revision>) -> FlowyResult<()> {
let records = revisions
.into_iter()
.map(|revision| RevisionRecord {
.map(|revision| SyncRecord {
revision,
state: RevisionState::Sync,
write_to_disk: false,
@ -151,7 +153,7 @@ impl<Connection: 'static> RevisionPersistence<Connection> {
tracing::warn!("Duplicate revision: {}:{}-{:?}", self.object_id, revision.rev_id, state);
return Ok(());
}
let record = RevisionRecord {
let record = SyncRecord {
revision,
state,
write_to_disk,
@ -172,7 +174,7 @@ impl<Connection: 'static> RevisionPersistence<Connection> {
Ok(())
}
pub async fn get(&self, rev_id: i64) -> Option<RevisionRecord> {
pub async fn get(&self, rev_id: i64) -> Option<SyncRecord> {
match self.memory_cache.get(&rev_id).await {
None => match self
.disk_cache
@ -192,7 +194,7 @@ impl<Connection: 'static> RevisionPersistence<Connection> {
}
}
pub fn batch_get(&self, doc_id: &str) -> FlowyResult<Vec<RevisionRecord>> {
pub fn batch_get(&self, doc_id: &str) -> FlowyResult<Vec<SyncRecord>> {
self.disk_cache.read_revision_records(doc_id, None)
}
@ -225,7 +227,7 @@ impl<Connection: 'static> RevisionPersistence<Connection> {
}
impl<C> RevisionMemoryCacheDelegate for Arc<dyn RevisionDiskCache<C, Error = FlowyError>> {
fn checkpoint_tick(&self, mut records: Vec<RevisionRecord>) -> FlowyResult<()> {
fn send_sync(&self, mut records: Vec<SyncRecord>) -> FlowyResult<()> {
records.retain(|record| record.write_to_disk);
if !records.is_empty() {
tracing::Span::current().record(
@ -251,10 +253,10 @@ impl<C> RevisionMemoryCacheDelegate for Arc<dyn RevisionDiskCache<C, Error = Flo
}
#[derive(Default)]
struct RevisionSyncSequence(VecDeque<i64>);
impl RevisionSyncSequence {
struct DeferSyncSequence(VecDeque<i64>);
impl DeferSyncSequence {
fn new() -> Self {
RevisionSyncSequence::default()
DeferSyncSequence::default()
}
fn add(&mut self, new_rev_id: i64) -> FlowyResult<()> {

View File

@ -28,7 +28,7 @@ pub trait RevisionWSDataStream: Send + Sync {
}
// The sink provides the data that will be sent through the web socket to the
// backend.
// server.
pub trait RevisionWebSocketSink: Send + Sync {
fn next(&self) -> FutureResult<Option<ClientRevisionWSData>, FlowyError>;
}

View File

@ -0,0 +1 @@

View File

@ -18,7 +18,7 @@ use flowy_net::{
http_server::folder::FolderHttpCloudService, local_server::LocalServer, ws::connection::FlowyWebSocketConnect,
};
use flowy_revision::{RevisionWebSocket, WSStateReceiver};
use flowy_sync::entities::revision::{RepeatedRevision, Revision};
use flowy_sync::entities::revision::Revision;
use flowy_sync::entities::ws_data::ClientRevisionWSData;
use flowy_user::services::UserSession;
use futures_core::future::BoxFuture;
@ -151,12 +151,12 @@ impl ViewDataProcessor for DocumentViewDataProcessor {
) -> FutureResult<(), FlowyError> {
// Only accept Document type
debug_assert_eq!(layout, ViewLayoutTypePB::Document);
let repeated_revision: RepeatedRevision = Revision::initial_revision(user_id, view_id, view_data).into();
let revision = Revision::initial_revision(user_id, view_id, view_data);
let view_id = view_id.to_string();
let manager = self.0.clone();
FutureResult::new(async move {
let _ = manager.create_document(view_id, repeated_revision).await?;
let _ = manager.create_document(view_id, vec![revision]).await?;
Ok(())
})
}
@ -194,9 +194,8 @@ impl ViewDataProcessor for DocumentViewDataProcessor {
let document_content = self.0.initial_document_content();
FutureResult::new(async move {
let delta_data = Bytes::from(document_content);
let repeated_revision: RepeatedRevision =
Revision::initial_revision(&user_id, &view_id, delta_data.clone()).into();
let _ = manager.create_document(view_id, repeated_revision).await?;
let revision = Revision::initial_revision(&user_id, &view_id, delta_data.clone());
let _ = manager.create_document(view_id, vec![revision]).await?;
Ok(delta_data)
})
}
@ -226,11 +225,11 @@ impl ViewDataProcessor for GridViewDataProcessor {
_layout: ViewLayoutTypePB,
delta_data: Bytes,
) -> FutureResult<(), FlowyError> {
let repeated_revision: RepeatedRevision = Revision::initial_revision(user_id, view_id, delta_data).into();
let revision = Revision::initial_revision(user_id, view_id, delta_data);
let view_id = view_id.to_string();
let grid_manager = self.0.clone();
FutureResult::new(async move {
let _ = grid_manager.create_grid(view_id, repeated_revision).await?;
let _ = grid_manager.create_grid(view_id, vec![revision]).await?;
Ok(())
})
}

View File

@ -1,3 +1,4 @@
use crate::util::md5;
use crate::{
client_document::{
history::{History, UndoResult},
@ -77,9 +78,9 @@ impl ClientDocument {
&self.operations
}
pub fn md5(&self) -> String {
pub fn document_md5(&self) -> String {
let bytes = self.to_bytes();
format!("{:x}", md5::compute(bytes))
md5(&bytes)
}
pub fn set_notify(&mut self, notify: mpsc::UnboundedSender<()>) {

View File

@ -1,9 +1,9 @@
use crate::errors::internal_error;
use crate::server_folder::{FolderOperations, FolderOperationsBuilder};
use crate::util::cal_diff;
use crate::util::{cal_diff, md5};
use crate::{
client_folder::builder::FolderPadBuilder,
entities::revision::{md5, Revision},
entities::revision::Revision,
errors::{CollaborateError, CollaborateResult},
};
use flowy_folder_data_model::revision::{AppRevision, FolderRevision, TrashRevision, ViewRevision, WorkspaceRevision};
@ -61,7 +61,7 @@ impl FolderPad {
self.folder_rev = folder.folder_rev;
self.operations = folder.operations;
Ok(self.md5())
Ok(self.folder_md5())
}
pub fn compose_remote_operations(&mut self, operations: FolderOperations) -> CollaborateResult<String> {
@ -313,7 +313,7 @@ impl FolderPad {
}
}
pub fn md5(&self) -> String {
pub fn folder_md5(&self) -> String {
md5(&self.operations.json_bytes())
}
@ -345,7 +345,7 @@ impl FolderPad {
self.operations = self.operations.compose(&operations)?;
Ok(Some(FolderChangeset {
operations,
md5: self.md5(),
md5: self.folder_md5(),
}))
}
}
@ -383,7 +383,7 @@ impl FolderPad {
self.operations = self.operations.compose(&operations)?;
Ok(Some(FolderChangeset {
operations,
md5: self.md5(),
md5: self.folder_md5(),
}))
}
}

View File

@ -1,6 +1,6 @@
use crate::entities::revision::{md5, RepeatedRevision, Revision};
use crate::entities::revision::{RepeatedRevision, Revision};
use crate::errors::{CollaborateError, CollaborateResult};
use crate::util::{cal_diff, make_operations_from_revisions};
use crate::util::{cal_diff, make_operations_from_revisions, md5};
use flowy_grid_data_model::revision::{
gen_block_id, gen_row_id, CellRevision, GridBlockRevision, RowChangeset, RowRevision,
};

View File

@ -1,6 +1,6 @@
use crate::entities::revision::{md5, RepeatedRevision, Revision};
use crate::entities::revision::{RepeatedRevision, Revision};
use crate::errors::{internal_error, CollaborateError, CollaborateResult};
use crate::util::{cal_diff, make_operations_from_revisions};
use crate::util::{cal_diff, make_operations_from_revisions, md5};
use flowy_grid_data_model::revision::{
gen_block_id, gen_grid_id, FieldRevision, FieldTypeRevision, GridBlockMetaRevision, GridBlockMetaRevisionChangeset,
@ -315,7 +315,7 @@ impl GridRevisionPad {
})
}
pub fn md5(&self) -> String {
pub fn grid_md5(&self) -> String {
md5(&self.operations.json_bytes())
}
@ -343,7 +343,7 @@ impl GridRevisionPad {
self.operations = self.operations.compose(&operations)?;
Ok(Some(GridRevisionChangeset {
operations,
md5: self.md5(),
md5: self.grid_md5(),
}))
}
}

View File

@ -1,6 +1,6 @@
use crate::entities::revision::{md5, Revision};
use crate::entities::revision::Revision;
use crate::errors::{internal_error, CollaborateError, CollaborateResult};
use crate::util::{cal_diff, make_operations_from_revisions};
use crate::util::{cal_diff, make_operations_from_revisions, md5};
use flowy_grid_data_model::revision::{
FieldRevision, FieldTypeRevision, FilterConfigurationRevision, FilterConfigurationsByFieldId, GridViewRevision,
GroupConfigurationRevision, GroupConfigurationsByFieldId, LayoutRevision,

View File

@ -1,3 +1,4 @@
use crate::util::md5;
use bytes::Bytes;
use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
use std::{convert::TryFrom, fmt::Formatter, ops::RangeInclusive};
@ -36,6 +37,34 @@ impl std::convert::From<Vec<u8>> for Revision {
}
impl Revision {
pub fn new<T: Into<String>>(
object_id: &str,
base_rev_id: i64,
rev_id: i64,
bytes: Bytes,
user_id: &str,
md5: T,
) -> Revision {
let user_id = user_id.to_owned();
let object_id = object_id.to_owned();
let bytes = bytes.to_vec();
let base_rev_id = base_rev_id;
let rev_id = rev_id;
if base_rev_id != 0 {
debug_assert!(base_rev_id != rev_id);
}
Self {
base_rev_id,
rev_id,
bytes,
md5: md5.into(),
object_id,
ty: RevType::DeprecatedLocal,
user_id,
}
}
pub fn is_empty(&self) -> bool {
self.base_rev_id == self.rev_id
}
@ -52,28 +81,6 @@ impl Revision {
let md5 = md5(&bytes);
Self::new(object_id, 0, 0, bytes, user_id, md5)
}
pub fn new(object_id: &str, base_rev_id: i64, rev_id: i64, bytes: Bytes, user_id: &str, md5: String) -> Revision {
let user_id = user_id.to_owned();
let object_id = object_id.to_owned();
let bytes = bytes.to_vec();
let base_rev_id = base_rev_id;
let rev_id = rev_id;
if base_rev_id != 0 {
debug_assert!(base_rev_id != rev_id);
}
Self {
base_rev_id,
rev_id,
bytes,
md5,
object_id,
ty: RevType::DeprecatedLocal,
user_id,
}
}
}
impl std::fmt::Debug for Revision {
@ -209,12 +216,6 @@ impl RevisionRange {
}
}
#[inline]
pub fn md5<T: AsRef<[u8]>>(data: T) -> String {
let md5 = format!("{:x}", md5::compute(data));
md5
}
#[derive(Debug, ProtoBuf_Enum, Clone, Eq, PartialEq)]
pub enum RevType {
DeprecatedLocal = 0,

View File

@ -49,7 +49,8 @@ impl RevIdCounter {
pub fn new(n: i64) -> Self {
Self(AtomicI64::new(n))
}
pub fn next(&self) -> i64 {
pub fn next_id(&self) -> i64 {
let _ = self.0.fetch_add(1, SeqCst);
self.value()
}

View File

@ -35,9 +35,19 @@ impl NodeTree {
Ok(tree)
}
pub fn from_bytes(bytes: Vec<u8>, context: NodeTreeContext) -> Result<Self, OTError> {
let operations = NodeOperations::from_bytes(bytes)?;
Self::from_operations(operations, context)
pub fn from_bytes(bytes: &[u8]) -> Result<Self, OTError> {
let tree: NodeTree = serde_json::from_slice(bytes).map_err(|e| OTError::serde().context(e))?;
Ok(tree)
}
pub fn to_bytes(&self) -> Vec<u8> {
match serde_json::to_vec(self) {
Ok(bytes) => bytes,
Err(e) => {
tracing::error!("{}", e);
vec![]
}
}
}
pub fn from_operations<T: Into<NodeOperations>>(operations: T, context: NodeTreeContext) -> Result<Self, OTError> {

View File

@ -1,4 +1,6 @@
use lib_ot::core::{AttributeBuilder, Changeset, NodeData, NodeDataBuilder, NodeOperation, NodeTree, Path};
use lib_ot::core::{
AttributeBuilder, Changeset, NodeData, NodeDataBuilder, NodeOperation, NodeTree, NodeTreeContext, Path,
};
use lib_ot::text_delta::DeltaTextOperationBuilder;
#[test]
@ -26,6 +28,7 @@ fn operation_insert_node_with_children_serde_test() {
r#"{"op":"insert","path":[0,1],"nodes":[{"type":"text","children":[{"type":"sub_text"}]}]}"#
);
}
#[test]
fn operation_update_node_attributes_serde_test() {
let operation = NodeOperation::Update {
@ -102,6 +105,14 @@ fn node_tree_serialize_test() {
assert_eq!(json, TREE_JSON);
}
#[test]
fn node_tree_serde_test() {
let tree: NodeTree = serde_json::from_str(TREE_JSON).unwrap();
let bytes = tree.to_bytes();
let tree = NodeTree::from_bytes(&bytes).unwrap();
assert_eq!(bytes, tree.to_bytes());
}
#[allow(dead_code)]
const TREE_JSON: &str = r#"{
"type": "editor",