chore: config grid rev persistence

This commit is contained in:
appflowy
2022-03-10 22:27:19 +08:00
parent cea7d30a53
commit 9a791974b4
17 changed files with 404 additions and 206 deletions

View File

@ -9,21 +9,6 @@
import 'dart:core' as $core; import 'dart:core' as $core;
import 'package:protobuf/protobuf.dart' as $pb; import 'package:protobuf/protobuf.dart' as $pb;
class RevisionState extends $pb.ProtobufEnum {
static const RevisionState Sync = RevisionState._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Sync');
static const RevisionState Ack = RevisionState._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Ack');
static const $core.List<RevisionState> values = <RevisionState> [
Sync,
Ack,
];
static final $core.Map<$core.int, RevisionState> _byValue = $pb.ProtobufEnum.initByValue(values);
static RevisionState? valueOf($core.int value) => _byValue[value];
const RevisionState._($core.int v, $core.String n) : super(v, n);
}
class RevType extends $pb.ProtobufEnum { class RevType extends $pb.ProtobufEnum {
static const RevType DeprecatedLocal = RevType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'DeprecatedLocal'); static const RevType DeprecatedLocal = RevType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'DeprecatedLocal');
static const RevType DeprecatedRemote = RevType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'DeprecatedRemote'); static const RevType DeprecatedRemote = RevType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'DeprecatedRemote');

View File

@ -8,17 +8,6 @@
import 'dart:core' as $core; import 'dart:core' as $core;
import 'dart:convert' as $convert; import 'dart:convert' as $convert;
import 'dart:typed_data' as $typed_data; import 'dart:typed_data' as $typed_data;
@$core.Deprecated('Use revisionStateDescriptor instead')
const RevisionState$json = const {
'1': 'RevisionState',
'2': const [
const {'1': 'Sync', '2': 0},
const {'1': 'Ack', '2': 1},
],
};
/// Descriptor for `RevisionState`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List revisionStateDescriptor = $convert.base64Decode('Cg1SZXZpc2lvblN0YXRlEggKBFN5bmMQABIHCgNBY2sQAQ==');
@$core.Deprecated('Use revTypeDescriptor instead') @$core.Deprecated('Use revTypeDescriptor instead')
const RevType$json = const { const RevType$json = const {
'1': 'RevType', '1': 'RevType',

View File

@ -458,7 +458,7 @@ class AnyData extends $pb.GeneratedMessage {
class RowMeta extends $pb.GeneratedMessage { class RowMeta extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'RowMeta', createEmptyInstance: create) static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'RowMeta', createEmptyInstance: create)
..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id') ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id')
..aOS(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'gridId') ..aOS(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'blockId')
..m<$core.String, CellMeta>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'cellByFieldId', entryClassName: 'RowMeta.CellByFieldIdEntry', keyFieldType: $pb.PbFieldType.OS, valueFieldType: $pb.PbFieldType.OM, valueCreator: CellMeta.create) ..m<$core.String, CellMeta>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'cellByFieldId', entryClassName: 'RowMeta.CellByFieldIdEntry', keyFieldType: $pb.PbFieldType.OS, valueFieldType: $pb.PbFieldType.OM, valueCreator: CellMeta.create)
..a<$core.int>(4, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'height', $pb.PbFieldType.O3) ..a<$core.int>(4, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'height', $pb.PbFieldType.O3)
..aOB(5, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'visibility') ..aOB(5, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'visibility')
@ -468,7 +468,7 @@ class RowMeta extends $pb.GeneratedMessage {
RowMeta._() : super(); RowMeta._() : super();
factory RowMeta({ factory RowMeta({
$core.String? id, $core.String? id,
$core.String? gridId, $core.String? blockId,
$core.Map<$core.String, CellMeta>? cellByFieldId, $core.Map<$core.String, CellMeta>? cellByFieldId,
$core.int? height, $core.int? height,
$core.bool? visibility, $core.bool? visibility,
@ -477,8 +477,8 @@ class RowMeta extends $pb.GeneratedMessage {
if (id != null) { if (id != null) {
_result.id = id; _result.id = id;
} }
if (gridId != null) { if (blockId != null) {
_result.gridId = gridId; _result.blockId = blockId;
} }
if (cellByFieldId != null) { if (cellByFieldId != null) {
_result.cellByFieldId.addAll(cellByFieldId); _result.cellByFieldId.addAll(cellByFieldId);
@ -522,13 +522,13 @@ class RowMeta extends $pb.GeneratedMessage {
void clearId() => clearField(1); void clearId() => clearField(1);
@$pb.TagNumber(2) @$pb.TagNumber(2)
$core.String get gridId => $_getSZ(1); $core.String get blockId => $_getSZ(1);
@$pb.TagNumber(2) @$pb.TagNumber(2)
set gridId($core.String v) { $_setString(1, v); } set blockId($core.String v) { $_setString(1, v); }
@$pb.TagNumber(2) @$pb.TagNumber(2)
$core.bool hasGridId() => $_has(1); $core.bool hasBlockId() => $_has(1);
@$pb.TagNumber(2) @$pb.TagNumber(2)
void clearGridId() => clearField(2); void clearBlockId() => clearField(2);
@$pb.TagNumber(3) @$pb.TagNumber(3)
$core.Map<$core.String, CellMeta> get cellByFieldId => $_getMap(2); $core.Map<$core.String, CellMeta> get cellByFieldId => $_getMap(2);

View File

@ -101,7 +101,7 @@ const RowMeta$json = const {
'1': 'RowMeta', '1': 'RowMeta',
'2': const [ '2': const [
const {'1': 'id', '3': 1, '4': 1, '5': 9, '10': 'id'}, const {'1': 'id', '3': 1, '4': 1, '5': 9, '10': 'id'},
const {'1': 'grid_id', '3': 2, '4': 1, '5': 9, '10': 'gridId'}, const {'1': 'block_id', '3': 2, '4': 1, '5': 9, '10': 'blockId'},
const {'1': 'cell_by_field_id', '3': 3, '4': 3, '5': 11, '6': '.RowMeta.CellByFieldIdEntry', '10': 'cellByFieldId'}, const {'1': 'cell_by_field_id', '3': 3, '4': 3, '5': 11, '6': '.RowMeta.CellByFieldIdEntry', '10': 'cellByFieldId'},
const {'1': 'height', '3': 4, '4': 1, '5': 5, '10': 'height'}, const {'1': 'height', '3': 4, '4': 1, '5': 5, '10': 'height'},
const {'1': 'visibility', '3': 5, '4': 1, '5': 8, '10': 'visibility'}, const {'1': 'visibility', '3': 5, '4': 1, '5': 8, '10': 'visibility'},
@ -120,7 +120,7 @@ const RowMeta_CellByFieldIdEntry$json = const {
}; };
/// Descriptor for `RowMeta`. Decode as a `google.protobuf.DescriptorProto`. /// Descriptor for `RowMeta`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List rowMetaDescriptor = $convert.base64Decode('CgdSb3dNZXRhEg4KAmlkGAEgASgJUgJpZBIXCgdncmlkX2lkGAIgASgJUgZncmlkSWQSRAoQY2VsbF9ieV9maWVsZF9pZBgDIAMoCzIbLlJvd01ldGEuQ2VsbEJ5RmllbGRJZEVudHJ5Ug1jZWxsQnlGaWVsZElkEhYKBmhlaWdodBgEIAEoBVIGaGVpZ2h0Eh4KCnZpc2liaWxpdHkYBSABKAhSCnZpc2liaWxpdHkaSwoSQ2VsbEJ5RmllbGRJZEVudHJ5EhAKA2tleRgBIAEoCVIDa2V5Eh8KBXZhbHVlGAIgASgLMgkuQ2VsbE1ldGFSBXZhbHVlOgI4AQ=='); final $typed_data.Uint8List rowMetaDescriptor = $convert.base64Decode('CgdSb3dNZXRhEg4KAmlkGAEgASgJUgJpZBIZCghibG9ja19pZBgCIAEoCVIHYmxvY2tJZBJEChBjZWxsX2J5X2ZpZWxkX2lkGAMgAygLMhsuUm93TWV0YS5DZWxsQnlGaWVsZElkRW50cnlSDWNlbGxCeUZpZWxkSWQSFgoGaGVpZ2h0GAQgASgFUgZoZWlnaHQSHgoKdmlzaWJpbGl0eRgFIAEoCFIKdmlzaWJpbGl0eRpLChJDZWxsQnlGaWVsZElkRW50cnkSEAoDa2V5GAEgASgJUgNrZXkSHwoFdmFsdWUYAiABKAsyCS5DZWxsTWV0YVIFdmFsdWU6AjgB');
@$core.Deprecated('Use rowMetaChangesetDescriptor instead') @$core.Deprecated('Use rowMetaChangesetDescriptor instead')
const RowMetaChangeset$json = const { const RowMetaChangeset$json = const {
'1': 'RowMetaChangeset', '1': 'RowMetaChangeset',

View File

@ -7,8 +7,8 @@ edition = "2018"
[lib] [lib]
name = "dart_ffi" name = "dart_ffi"
# this value will change depending on the target os # this value will change depending on the target os
# default cdylib # default staticlib
crate-type = ["cdylib"] crate-type = ["staticlib"]
[dependencies] [dependencies]

View File

@ -21,6 +21,17 @@ table! {
} }
} }
table! {
grid_rev_table (id) {
id -> Integer,
object_id -> Text,
base_rev_id -> BigInt,
rev_id -> BigInt,
data -> Binary,
state -> Integer,
}
}
table! { table! {
kv_table (key) { kv_table (key) {
key -> Text, key -> Text,
@ -91,6 +102,7 @@ table! {
allow_tables_to_appear_in_same_query!( allow_tables_to_appear_in_same_query!(
app_table, app_table,
doc_table, doc_table,
grid_rev_table,
kv_table, kv_table,
rev_table, rev_table,
trash_table, trash_table,

View File

@ -2,20 +2,13 @@ mod migration;
pub mod version_1; pub mod version_1;
mod version_2; mod version_2;
use flowy_collaboration::client_folder::initial_folder_delta;
use flowy_collaboration::{
client_folder::FolderPad,
entities::revision::{Revision, RevisionState},
};
use std::sync::Arc;
use tokio::sync::RwLock;
pub use version_1::{app_sql::*, trash_sql::*, v1_impl::V1Transaction, view_sql::*, workspace_sql::*};
use crate::{ use crate::{
event_map::WorkspaceDatabase, event_map::WorkspaceDatabase,
manager::FolderId, manager::FolderId,
services::{folder_editor::ClientFolderEditor, persistence::migration::FolderMigration}, services::{folder_editor::ClientFolderEditor, persistence::migration::FolderMigration},
}; };
use flowy_collaboration::client_folder::initial_folder_delta;
use flowy_collaboration::{client_folder::FolderPad, entities::revision::Revision};
use flowy_error::{FlowyError, FlowyResult}; use flowy_error::{FlowyError, FlowyResult};
use flowy_folder_data_model::entities::{ use flowy_folder_data_model::entities::{
app::App, app::App,
@ -23,8 +16,12 @@ use flowy_folder_data_model::entities::{
view::View, view::View,
workspace::Workspace, workspace::Workspace,
}; };
use flowy_sync::{mk_revision_disk_cache, RevisionRecord}; use flowy_sync::disk::{RevisionRecord, RevisionState};
use flowy_sync::mk_revision_disk_cache;
use lib_sqlite::ConnectionPool; use lib_sqlite::ConnectionPool;
use std::sync::Arc;
use tokio::sync::RwLock;
pub use version_1::{app_sql::*, trash_sql::*, v1_impl::V1Transaction, view_sql::*, workspace_sql::*};
pub trait FolderPersistenceTransaction { pub trait FolderPersistenceTransaction {
fn create_workspace(&self, user_id: &str, workspace: Workspace) -> FlowyResult<()>; fn create_workspace(&self, user_id: &str, workspace: Workspace) -> FlowyResult<()>;

View File

@ -0,0 +1,272 @@
use crate::cache::disk::RevisionDiskCache;
use crate::disk::{RevisionChangeset, RevisionRecord, RevisionState};
use bytes::Bytes;
use diesel::{sql_types::Integer, update, SqliteConnection};
use flowy_collaboration::{
entities::revision::{RevId, RevType, Revision, RevisionRange},
util::md5,
};
use flowy_database::{
impl_sql_integer_expression, insert_or_ignore_into,
prelude::*,
schema::{grid_rev_table, grid_rev_table::dsl},
ConnectionPool,
};
use flowy_error::{internal_error, FlowyError, FlowyResult};
use std::sync::Arc;
pub struct SQLiteGridRevisionPersistence {
user_id: String,
pub(crate) pool: Arc<ConnectionPool>,
}
impl RevisionDiskCache for SQLiteGridRevisionPersistence {
type Error = FlowyError;
fn create_revision_records(
&self,
revision_records: Vec<RevisionRecord>,
conn: &SqliteConnection,
) -> Result<(), Self::Error> {
let _ = GridRevisionSql::create(revision_records, conn)?;
Ok(())
}
fn read_revision_records(
&self,
object_id: &str,
rev_ids: Option<Vec<i64>>,
) -> Result<Vec<RevisionRecord>, Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let records = GridRevisionSql::read(&self.user_id, object_id, rev_ids, &*conn)?;
Ok(records)
}
fn read_revision_records_with_range(
&self,
object_id: &str,
range: &RevisionRange,
) -> Result<Vec<RevisionRecord>, Self::Error> {
let conn = &*self.pool.get().map_err(internal_error)?;
let revisions = GridRevisionSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?;
Ok(revisions)
}
fn update_revision_record(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()> {
let conn = &*self.pool.get().map_err(internal_error)?;
let _ = conn.immediate_transaction::<_, FlowyError, _>(|| {
for changeset in changesets {
let _ = GridRevisionSql::update(changeset, conn)?;
}
Ok(())
})?;
Ok(())
}
fn delete_revision_records(&self, object_id: &str, rev_ids: Option<Vec<i64>>) -> Result<(), Self::Error> {
let conn = &*self.pool.get().map_err(internal_error)?;
let _ = GridRevisionSql::delete(object_id, rev_ids, conn)?;
Ok(())
}
fn delete_and_insert_records(
&self,
object_id: &str,
deleted_rev_ids: Option<Vec<i64>>,
inserted_records: Vec<RevisionRecord>,
) -> Result<(), Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
conn.immediate_transaction::<_, FlowyError, _>(|| {
let _ = GridRevisionSql::delete(object_id, deleted_rev_ids, &*conn)?;
let _ = self.create_revision_records(inserted_records, &*conn)?;
Ok(())
})
}
}
impl SQLiteGridRevisionPersistence {
pub(crate) fn new(user_id: &str, pool: Arc<ConnectionPool>) -> Self {
Self {
user_id: user_id.to_owned(),
pool,
}
}
}
struct GridRevisionSql();
impl GridRevisionSql {
fn create(revision_records: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
// Batch insert: https://diesel.rs/guides/all-about-inserts.html
let records = revision_records
.into_iter()
.map(|record| {
tracing::trace!(
"[GridRevisionSql] create revision: {}:{:?}",
record.revision.object_id,
record.revision.rev_id
);
let rev_state: GridRevisionState = record.state.into();
(
dsl::object_id.eq(record.revision.object_id),
dsl::base_rev_id.eq(record.revision.base_rev_id),
dsl::rev_id.eq(record.revision.rev_id),
dsl::data.eq(record.revision.delta_data),
dsl::state.eq(rev_state),
)
})
.collect::<Vec<_>>();
let _ = insert_or_ignore_into(dsl::grid_rev_table)
.values(&records)
.execute(conn)?;
Ok(())
}
fn update(changeset: RevisionChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> {
let state: GridRevisionState = changeset.state.clone().into();
let filter = dsl::grid_rev_table
.filter(dsl::rev_id.eq(changeset.rev_id.as_ref()))
.filter(dsl::object_id.eq(changeset.object_id));
let _ = update(filter).set(dsl::state.eq(state)).execute(conn)?;
tracing::debug!(
"[GridRevisionSql] update revision:{} state:to {:?}",
changeset.rev_id,
changeset.state
);
Ok(())
}
fn read(
user_id: &str,
object_id: &str,
rev_ids: Option<Vec<i64>>,
conn: &SqliteConnection,
) -> Result<Vec<RevisionRecord>, FlowyError> {
let mut sql = dsl::grid_rev_table.filter(dsl::object_id.eq(object_id)).into_boxed();
if let Some(rev_ids) = rev_ids {
sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
}
let rows = sql.order(dsl::rev_id.asc()).load::<GridRevisionTable>(conn)?;
let records = rows
.into_iter()
.map(|row| mk_revision_record_from_table(user_id, row))
.collect::<Vec<_>>();
Ok(records)
}
fn read_with_range(
user_id: &str,
object_id: &str,
range: RevisionRange,
conn: &SqliteConnection,
) -> Result<Vec<RevisionRecord>, FlowyError> {
let rev_tables = dsl::grid_rev_table
.filter(dsl::rev_id.ge(range.start))
.filter(dsl::rev_id.le(range.end))
.filter(dsl::object_id.eq(object_id))
.order(dsl::rev_id.asc())
.load::<GridRevisionTable>(conn)?;
let revisions = rev_tables
.into_iter()
.map(|table| mk_revision_record_from_table(user_id, table))
.collect::<Vec<_>>();
Ok(revisions)
}
fn delete(object_id: &str, rev_ids: Option<Vec<i64>>, conn: &SqliteConnection) -> Result<(), FlowyError> {
let mut sql = diesel::delete(dsl::grid_rev_table).into_boxed();
sql = sql.filter(dsl::object_id.eq(object_id));
if let Some(rev_ids) = rev_ids {
tracing::trace!("[GridRevisionSql] Delete revision: {}:{:?}", object_id, rev_ids);
sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
}
let affected_row = sql.execute(conn)?;
tracing::trace!("[GridRevisionSql] Delete {} rows", affected_row);
Ok(())
}
}
#[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)]
#[table_name = "grid_rev_table"]
pub(crate) struct GridRevisionTable {
id: i32,
pub(crate) object_id: String,
pub(crate) base_rev_id: i64,
pub(crate) rev_id: i64,
pub(crate) data: Vec<u8>,
pub(crate) state: GridRevisionState,
}
#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)]
#[repr(i32)]
#[sql_type = "Integer"]
pub enum GridRevisionState {
Sync = 0,
Ack = 1,
}
impl std::default::Default for GridRevisionState {
fn default() -> Self {
GridRevisionState::Sync
}
}
impl std::convert::From<i32> for GridRevisionState {
fn from(value: i32) -> Self {
match value {
0 => GridRevisionState::Sync,
1 => GridRevisionState::Ack,
o => {
tracing::error!("Unsupported rev state {}, fallback to RevState::Local", o);
GridRevisionState::Sync
}
}
}
}
impl GridRevisionState {
pub fn value(&self) -> i32 {
*self as i32
}
}
impl_sql_integer_expression!(GridRevisionState);
impl std::convert::From<GridRevisionState> for RevisionState {
fn from(s: GridRevisionState) -> Self {
match s {
GridRevisionState::Sync => RevisionState::Sync,
GridRevisionState::Ack => RevisionState::Ack,
}
}
}
impl std::convert::From<RevisionState> for GridRevisionState {
fn from(s: RevisionState) -> Self {
match s {
RevisionState::Sync => GridRevisionState::Sync,
RevisionState::Ack => GridRevisionState::Ack,
}
}
}
fn mk_revision_record_from_table(user_id: &str, table: GridRevisionTable) -> RevisionRecord {
let md5 = md5(&table.data);
let revision = Revision::new(
&table.object_id,
table.base_rev_id,
table.rev_id,
Bytes::from(table.data),
user_id,
md5,
);
RevisionRecord {
revision,
state: table.state.into(),
write_to_disk: false,
}
}

View File

@ -1,14 +1,13 @@
mod folder_rev_impl; mod folder_rev_impl;
mod grid_rev_impl; mod grid_rev_impl;
mod text_block_rev_impl; mod text_rev_impl;
pub use folder_rev_impl::*; pub use folder_rev_impl::*;
pub use grid_rev_impl::*; pub use grid_rev_impl::*;
pub use text_block_rev_impl::*; pub use text_rev_impl::*;
use crate::RevisionRecord;
use diesel::SqliteConnection; use diesel::SqliteConnection;
use flowy_collaboration::entities::revision::RevisionRange; use flowy_collaboration::entities::revision::{RevId, Revision, RevisionRange};
use flowy_error::FlowyResult; use flowy_error::FlowyResult;
use std::fmt::Debug; use std::fmt::Debug;
@ -48,3 +47,43 @@ pub trait RevisionDiskCache: Sync + Send {
inserted_records: Vec<RevisionRecord>, inserted_records: Vec<RevisionRecord>,
) -> Result<(), Self::Error>; ) -> Result<(), Self::Error>;
} }
#[derive(Clone, Debug)]
pub struct RevisionRecord {
pub revision: Revision,
pub state: RevisionState,
pub write_to_disk: bool,
}
impl RevisionRecord {
pub fn ack(&mut self) {
self.state = RevisionState::Ack;
}
}
pub struct RevisionChangeset {
pub(crate) object_id: String,
pub(crate) rev_id: RevId,
pub(crate) state: RevisionState,
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum RevisionState {
Sync = 0,
Ack = 1,
}
impl RevisionState {
pub fn is_need_sync(&self) -> bool {
match self {
RevisionState::Sync => true,
RevisionState::Ack => false,
}
}
}
impl AsRef<RevisionState> for RevisionState {
fn as_ref(&self) -> &RevisionState {
self
}
}

View File

@ -1,8 +1,9 @@
use crate::{cache::disk::RevisionDiskCache, RevisionRecord}; use crate::cache::disk::RevisionDiskCache;
use crate::disk::{RevisionChangeset, RevisionRecord, RevisionState};
use bytes::Bytes; use bytes::Bytes;
use diesel::{sql_types::Integer, update, SqliteConnection}; use diesel::{sql_types::Integer, update, SqliteConnection};
use flowy_collaboration::{ use flowy_collaboration::{
entities::revision::{RevId, RevType, Revision, RevisionRange, RevisionState}, entities::revision::{RevId, RevType, Revision, RevisionRange},
util::md5, util::md5,
}; };
use flowy_database::{ use flowy_database::{
@ -27,7 +28,7 @@ impl RevisionDiskCache for SQLiteTextBlockRevisionPersistence {
revision_records: Vec<RevisionRecord>, revision_records: Vec<RevisionRecord>,
conn: &SqliteConnection, conn: &SqliteConnection,
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
let _ = RevisionTableSql::create(revision_records, conn)?; let _ = TextRevisionSql::create(revision_records, conn)?;
Ok(()) Ok(())
} }
@ -37,7 +38,7 @@ impl RevisionDiskCache for SQLiteTextBlockRevisionPersistence {
rev_ids: Option<Vec<i64>>, rev_ids: Option<Vec<i64>>,
) -> Result<Vec<RevisionRecord>, Self::Error> { ) -> Result<Vec<RevisionRecord>, Self::Error> {
let conn = self.pool.get().map_err(internal_error)?; let conn = self.pool.get().map_err(internal_error)?;
let records = RevisionTableSql::read(&self.user_id, object_id, rev_ids, &*conn)?; let records = TextRevisionSql::read(&self.user_id, object_id, rev_ids, &*conn)?;
Ok(records) Ok(records)
} }
@ -47,7 +48,7 @@ impl RevisionDiskCache for SQLiteTextBlockRevisionPersistence {
range: &RevisionRange, range: &RevisionRange,
) -> Result<Vec<RevisionRecord>, Self::Error> { ) -> Result<Vec<RevisionRecord>, Self::Error> {
let conn = &*self.pool.get().map_err(internal_error)?; let conn = &*self.pool.get().map_err(internal_error)?;
let revisions = RevisionTableSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?; let revisions = TextRevisionSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?;
Ok(revisions) Ok(revisions)
} }
@ -55,7 +56,7 @@ impl RevisionDiskCache for SQLiteTextBlockRevisionPersistence {
let conn = &*self.pool.get().map_err(internal_error)?; let conn = &*self.pool.get().map_err(internal_error)?;
let _ = conn.immediate_transaction::<_, FlowyError, _>(|| { let _ = conn.immediate_transaction::<_, FlowyError, _>(|| {
for changeset in changesets { for changeset in changesets {
let _ = RevisionTableSql::update(changeset, conn)?; let _ = TextRevisionSql::update(changeset, conn)?;
} }
Ok(()) Ok(())
})?; })?;
@ -64,7 +65,7 @@ impl RevisionDiskCache for SQLiteTextBlockRevisionPersistence {
fn delete_revision_records(&self, object_id: &str, rev_ids: Option<Vec<i64>>) -> Result<(), Self::Error> { fn delete_revision_records(&self, object_id: &str, rev_ids: Option<Vec<i64>>) -> Result<(), Self::Error> {
let conn = &*self.pool.get().map_err(internal_error)?; let conn = &*self.pool.get().map_err(internal_error)?;
let _ = RevisionTableSql::delete(object_id, rev_ids, conn)?; let _ = TextRevisionSql::delete(object_id, rev_ids, conn)?;
Ok(()) Ok(())
} }
@ -76,7 +77,7 @@ impl RevisionDiskCache for SQLiteTextBlockRevisionPersistence {
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
let conn = self.pool.get().map_err(internal_error)?; let conn = self.pool.get().map_err(internal_error)?;
conn.immediate_transaction::<_, FlowyError, _>(|| { conn.immediate_transaction::<_, FlowyError, _>(|| {
let _ = RevisionTableSql::delete(object_id, deleted_rev_ids, &*conn)?; let _ = TextRevisionSql::delete(object_id, deleted_rev_ids, &*conn)?;
let _ = self.create_revision_records(inserted_records, &*conn)?; let _ = self.create_revision_records(inserted_records, &*conn)?;
Ok(()) Ok(())
}) })
@ -92,21 +93,21 @@ impl SQLiteTextBlockRevisionPersistence {
} }
} }
pub struct RevisionTableSql {} struct TextRevisionSql {}
impl RevisionTableSql { impl TextRevisionSql {
pub(crate) fn create(revision_records: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> { fn create(revision_records: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
// Batch insert: https://diesel.rs/guides/all-about-inserts.html // Batch insert: https://diesel.rs/guides/all-about-inserts.html
let records = revision_records let records = revision_records
.into_iter() .into_iter()
.map(|record| { .map(|record| {
tracing::trace!( tracing::trace!(
"[RevisionTable] create revision: {}:{:?}", "[TextRevisionSql] create revision: {}:{:?}",
record.revision.object_id, record.revision.object_id,
record.revision.rev_id record.revision.rev_id
); );
let rev_state: RevisionTableState = record.state.into(); let rev_state: TextRevisionState = record.state.into();
( (
dsl::doc_id.eq(record.revision.object_id), dsl::doc_id.eq(record.revision.object_id),
dsl::base_rev_id.eq(record.revision.base_rev_id), dsl::base_rev_id.eq(record.revision.base_rev_id),
@ -122,20 +123,21 @@ impl RevisionTableSql {
Ok(()) Ok(())
} }
pub(crate) fn update(changeset: RevisionChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> { fn update(changeset: RevisionChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> {
let state: TextRevisionState = changeset.state.clone().into();
let filter = dsl::rev_table let filter = dsl::rev_table
.filter(dsl::rev_id.eq(changeset.rev_id.as_ref())) .filter(dsl::rev_id.eq(changeset.rev_id.as_ref()))
.filter(dsl::doc_id.eq(changeset.object_id)); .filter(dsl::doc_id.eq(changeset.object_id));
let _ = update(filter).set(dsl::state.eq(changeset.state)).execute(conn)?; let _ = update(filter).set(dsl::state.eq(state)).execute(conn)?;
tracing::debug!( tracing::debug!(
"[RevisionTable] update revision:{} state:to {:?}", "[TextRevisionSql] update revision:{} state:to {:?}",
changeset.rev_id, changeset.rev_id,
changeset.state changeset.state
); );
Ok(()) Ok(())
} }
pub(crate) fn read( fn read(
user_id: &str, user_id: &str,
object_id: &str, object_id: &str,
rev_ids: Option<Vec<i64>>, rev_ids: Option<Vec<i64>>,
@ -154,7 +156,7 @@ impl RevisionTableSql {
Ok(records) Ok(records)
} }
pub(crate) fn read_with_range( fn read_with_range(
user_id: &str, user_id: &str,
object_id: &str, object_id: &str,
range: RevisionRange, range: RevisionRange,
@ -174,90 +176,86 @@ impl RevisionTableSql {
Ok(revisions) Ok(revisions)
} }
pub(crate) fn delete( fn delete(object_id: &str, rev_ids: Option<Vec<i64>>, conn: &SqliteConnection) -> Result<(), FlowyError> {
object_id: &str,
rev_ids: Option<Vec<i64>>,
conn: &SqliteConnection,
) -> Result<(), FlowyError> {
let mut sql = diesel::delete(dsl::rev_table).into_boxed(); let mut sql = diesel::delete(dsl::rev_table).into_boxed();
sql = sql.filter(dsl::doc_id.eq(object_id)); sql = sql.filter(dsl::doc_id.eq(object_id));
if let Some(rev_ids) = rev_ids { if let Some(rev_ids) = rev_ids {
tracing::trace!("[RevisionTable] Delete revision: {}:{:?}", object_id, rev_ids); tracing::trace!("[TextRevisionSql] Delete revision: {}:{:?}", object_id, rev_ids);
sql = sql.filter(dsl::rev_id.eq_any(rev_ids)); sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
} }
let affected_row = sql.execute(conn)?; let affected_row = sql.execute(conn)?;
tracing::trace!("[RevisionTable] Delete {} rows", affected_row); tracing::trace!("[TextRevisionSql] Delete {} rows", affected_row);
Ok(()) Ok(())
} }
} }
#[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)] #[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)]
#[table_name = "rev_table"] #[table_name = "rev_table"]
pub(crate) struct RevisionTable { struct RevisionTable {
id: i32, id: i32,
pub(crate) doc_id: String, doc_id: String,
pub(crate) base_rev_id: i64, base_rev_id: i64,
pub(crate) rev_id: i64, rev_id: i64,
pub(crate) data: Vec<u8>, data: Vec<u8>,
pub(crate) state: RevisionTableState, state: TextRevisionState,
pub(crate) ty: RevTableType, // Deprecated ty: RevTableType, // Deprecated
} }
#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)] #[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)]
#[repr(i32)] #[repr(i32)]
#[sql_type = "Integer"] #[sql_type = "Integer"]
pub enum RevisionTableState { enum TextRevisionState {
Sync = 0, Sync = 0,
Ack = 1, Ack = 1,
} }
impl std::default::Default for RevisionTableState { impl std::default::Default for TextRevisionState {
fn default() -> Self { fn default() -> Self {
RevisionTableState::Sync TextRevisionState::Sync
} }
} }
impl std::convert::From<i32> for RevisionTableState { impl std::convert::From<i32> for TextRevisionState {
fn from(value: i32) -> Self { fn from(value: i32) -> Self {
match value { match value {
0 => RevisionTableState::Sync, 0 => TextRevisionState::Sync,
1 => RevisionTableState::Ack, 1 => TextRevisionState::Ack,
o => { o => {
tracing::error!("Unsupported rev state {}, fallback to RevState::Local", o); tracing::error!("Unsupported rev state {}, fallback to RevState::Local", o);
RevisionTableState::Sync TextRevisionState::Sync
} }
} }
} }
} }
impl RevisionTableState { impl TextRevisionState {
pub fn value(&self) -> i32 { pub fn value(&self) -> i32 {
*self as i32 *self as i32
} }
} }
impl_sql_integer_expression!(RevisionTableState); impl_sql_integer_expression!(TextRevisionState);
impl std::convert::From<RevisionTableState> for RevisionState { impl std::convert::From<TextRevisionState> for RevisionState {
fn from(s: RevisionTableState) -> Self { fn from(s: TextRevisionState) -> Self {
match s { match s {
RevisionTableState::Sync => RevisionState::Sync, TextRevisionState::Sync => RevisionState::Sync,
RevisionTableState::Ack => RevisionState::Ack, TextRevisionState::Ack => RevisionState::Ack,
} }
} }
} }
impl std::convert::From<RevisionState> for RevisionTableState { impl std::convert::From<RevisionState> for TextRevisionState {
fn from(s: RevisionState) -> Self { fn from(s: RevisionState) -> Self {
match s { match s {
RevisionState::Sync => RevisionTableState::Sync, RevisionState::Sync => TextRevisionState::Sync,
RevisionState::Ack => RevisionTableState::Ack, RevisionState::Ack => TextRevisionState::Ack,
} }
} }
} }
pub(crate) fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> RevisionRecord { fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> RevisionRecord {
let md5 = md5(&table.data); let md5 = md5(&table.data);
let revision = Revision::new( let revision = Revision::new(
&table.doc_id, &table.doc_id,
@ -324,9 +322,3 @@ impl std::convert::From<RevTableType> for RevType {
} }
} }
} }
pub struct RevisionChangeset {
pub(crate) object_id: String,
pub(crate) rev_id: RevId,
pub(crate) state: RevisionTableState,
}

View File

@ -1,4 +1,5 @@
use crate::{RevisionRecord, REVISION_WRITE_INTERVAL_IN_MILLIS}; use crate::disk::RevisionRecord;
use crate::REVISION_WRITE_INTERVAL_IN_MILLIS;
use dashmap::DashMap; use dashmap::DashMap;
use flowy_collaboration::entities::revision::RevisionRange; use flowy_collaboration::entities::revision::RevisionRange;
use flowy_error::{FlowyError, FlowyResult}; use flowy_error::{FlowyError, FlowyResult};

View File

@ -1,2 +1,2 @@
pub(crate) mod disk; pub mod disk;
pub(crate) mod memory; pub(crate) mod memory;

View File

@ -1,6 +1,7 @@
use crate::disk::RevisionState;
use crate::{RevisionPersistence, WSDataProviderDataSource}; use crate::{RevisionPersistence, WSDataProviderDataSource};
use flowy_collaboration::{ use flowy_collaboration::{
entities::revision::{RepeatedRevision, Revision, RevisionRange, RevisionState}, entities::revision::{RepeatedRevision, Revision, RevisionRange},
util::{pair_rev_id_from_revisions, RevIdCounter}, util::{pair_rev_id_from_revisions, RevIdCounter},
}; };
use flowy_error::{FlowyError, FlowyResult}; use flowy_error::{FlowyError, FlowyResult};

View File

@ -1,13 +1,12 @@
use crate::cache::{ use crate::cache::{
disk::{RevisionChangeset, RevisionDiskCache, RevisionTableState, SQLiteTextBlockRevisionPersistence}, disk::{RevisionChangeset, RevisionDiskCache, SQLiteTextBlockRevisionPersistence},
memory::{RevisionMemoryCache, RevisionMemoryCacheDelegate}, memory::{RevisionMemoryCache, RevisionMemoryCacheDelegate},
}; };
use crate::disk::{RevisionRecord, RevisionState};
use flowy_collaboration::entities::revision::{Revision, RevisionRange, RevisionState}; use crate::RevisionCompact;
use flowy_collaboration::entities::revision::{Revision, RevisionRange};
use flowy_database::ConnectionPool; use flowy_database::ConnectionPool;
use flowy_error::{internal_error, FlowyError, FlowyResult}; use flowy_error::{internal_error, FlowyError, FlowyResult};
use crate::RevisionCompact;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::{borrow::Cow, sync::Arc}; use std::{borrow::Cow, sync::Arc};
use tokio::sync::RwLock; use tokio::sync::RwLock;
@ -235,7 +234,7 @@ impl RevisionMemoryCacheDelegate for Arc<SQLiteTextBlockRevisionPersistence> {
let changeset = RevisionChangeset { let changeset = RevisionChangeset {
object_id: object_id.to_string(), object_id: object_id.to_string(),
rev_id: rev_id.into(), rev_id: rev_id.into(),
state: RevisionTableState::Ack, state: RevisionState::Ack,
}; };
match self.update_revision_record(vec![changeset]) { match self.update_revision_record(vec![changeset]) {
Ok(_) => {} Ok(_) => {}
@ -244,19 +243,6 @@ impl RevisionMemoryCacheDelegate for Arc<SQLiteTextBlockRevisionPersistence> {
} }
} }
#[derive(Clone, Debug)]
pub struct RevisionRecord {
pub revision: Revision,
pub state: RevisionState,
pub write_to_disk: bool,
}
impl RevisionRecord {
pub fn ack(&mut self) {
self.state = RevisionState::Ack;
}
}
#[derive(Default)] #[derive(Default)]
struct RevisionSyncSequence(VecDeque<i64>); struct RevisionSyncSequence(VecDeque<i64>);
impl RevisionSyncSequence { impl RevisionSyncSequence {

View File

@ -215,27 +215,6 @@ pub fn md5<T: AsRef<[u8]>>(data: T) -> String {
md5 md5
} }
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum RevisionState {
Sync = 0,
Ack = 1,
}
impl RevisionState {
pub fn is_need_sync(&self) -> bool {
match self {
RevisionState::Sync => true,
RevisionState::Ack => false,
}
}
}
impl AsRef<RevisionState> for RevisionState {
fn as_ref(&self) -> &RevisionState {
self
}
}
#[derive(Debug, ProtoBuf_Enum, Clone, Eq, PartialEq)] #[derive(Debug, ProtoBuf_Enum, Clone, Eq, PartialEq)]
pub enum RevType { pub enum RevType {
DeprecatedLocal = 0, DeprecatedLocal = 0,

View File

@ -914,56 +914,6 @@ impl ::protobuf::reflect::ProtobufValue for RevisionRange {
} }
} }
#[derive(Clone,PartialEq,Eq,Debug,Hash)]
pub enum RevisionState {
Sync = 0,
Ack = 1,
}
impl ::protobuf::ProtobufEnum for RevisionState {
fn value(&self) -> i32 {
*self as i32
}
fn from_i32(value: i32) -> ::std::option::Option<RevisionState> {
match value {
0 => ::std::option::Option::Some(RevisionState::Sync),
1 => ::std::option::Option::Some(RevisionState::Ack),
_ => ::std::option::Option::None
}
}
fn values() -> &'static [Self] {
static values: &'static [RevisionState] = &[
RevisionState::Sync,
RevisionState::Ack,
];
values
}
fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor {
static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::EnumDescriptor> = ::protobuf::rt::LazyV2::INIT;
descriptor.get(|| {
::protobuf::reflect::EnumDescriptor::new_pb_name::<RevisionState>("RevisionState", file_descriptor_proto())
})
}
}
impl ::std::marker::Copy for RevisionState {
}
impl ::std::default::Default for RevisionState {
fn default() -> Self {
RevisionState::Sync
}
}
impl ::protobuf::reflect::ProtobufValue for RevisionState {
fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
::protobuf::reflect::ReflectValueRef::Enum(::protobuf::ProtobufEnum::descriptor(self))
}
}
#[derive(Clone,PartialEq,Eq,Debug,Hash)] #[derive(Clone,PartialEq,Eq,Debug,Hash)]
pub enum RevType { pub enum RevType {
DeprecatedLocal = 0, DeprecatedLocal = 0,
@ -1024,8 +974,7 @@ static file_descriptor_proto_data: &'static [u8] = b"\
\x10RepeatedRevision\x12\x1f\n\x05items\x18\x01\x20\x03(\x0b2\t.Revision\ \x10RepeatedRevision\x12\x1f\n\x05items\x18\x01\x20\x03(\x0b2\t.Revision\
R\x05items\"\x1d\n\x05RevId\x12\x14\n\x05value\x18\x01\x20\x01(\x03R\x05\ R\x05items\"\x1d\n\x05RevId\x12\x14\n\x05value\x18\x01\x20\x01(\x03R\x05\
value\"7\n\rRevisionRange\x12\x14\n\x05start\x18\x01\x20\x01(\x03R\x05st\ value\"7\n\rRevisionRange\x12\x14\n\x05start\x18\x01\x20\x01(\x03R\x05st\
art\x12\x10\n\x03end\x18\x02\x20\x01(\x03R\x03end*\"\n\rRevisionState\ art\x12\x10\n\x03end\x18\x02\x20\x01(\x03R\x03end*4\n\x07RevType\x12\x13\
\x12\x08\n\x04Sync\x10\0\x12\x07\n\x03Ack\x10\x01*4\n\x07RevType\x12\x13\
\n\x0fDeprecatedLocal\x10\0\x12\x14\n\x10DeprecatedRemote\x10\x01b\x06pr\ \n\x0fDeprecatedLocal\x10\0\x12\x14\n\x10DeprecatedRemote\x10\x01b\x06pr\
oto3\ oto3\
"; ";

View File

@ -19,10 +19,6 @@ message RevisionRange {
int64 start = 1; int64 start = 1;
int64 end = 2; int64 end = 2;
} }
enum RevisionState {
Sync = 0;
Ack = 1;
}
enum RevType { enum RevType {
DeprecatedLocal = 0; DeprecatedLocal = 0;
DeprecatedRemote = 1; DeprecatedRemote = 1;