compose delta from remote and sync with client

This commit is contained in:
appflowy 2021-09-25 21:47:02 +08:00
parent a26f588409
commit 9175efa4c6
76 changed files with 854 additions and 828 deletions

View File

@ -18,7 +18,7 @@ export 'package:app_flowy/welcome/domain/i_splash.dart';
class SplashUserImpl implements ISplashUser {
@override
Future<AuthState> currentUserProfile() {
final result = UserEventInitUser().send();
final result = UserEventCheckUser().send();
return result.then((result) {
return result.fold(
(userProfile) {

View File

@ -1,4 +1,5 @@
import 'package:app_flowy/workspace/domain/i_user.dart';
import 'package:flowy_log/flowy_log.dart';
import 'package:flowy_sdk/protobuf/flowy-user/user_profile.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-workspace/errors.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-workspace/workspace_create.pb.dart';
@ -22,6 +23,8 @@ class MenuUserBloc extends Bloc<MenuUserEvent, MenuUserState> {
watch.setProfileCallback(_profileUpdated);
watch.setWorkspacesCallback(_workspacesUpdated);
watch.startWatching();
await _initUser();
},
fetchWorkspaces: (_FetchWorkspaces value) async* {},
);
@ -33,6 +36,11 @@ class MenuUserBloc extends Bloc<MenuUserEvent, MenuUserState> {
super.close();
}
Future<void> _initUser() async {
final result = await iUserImpl.initUser();
result.fold((l) => null, (error) => Log.error(error));
}
void _profileUpdated(Either<UserProfile, UserError> userOrFailed) {}
void _workspacesUpdated(
Either<List<Workspace>, WorkspaceError> workspacesOrFailed) {

View File

@ -14,6 +14,7 @@ abstract class IUser {
Future<Either<List<Workspace>, WorkspaceError>> fetchWorkspaces();
Future<Either<Unit, WorkspaceError>> deleteWorkspace(String workspaceId);
Future<Either<Unit, UserError>> signOut();
Future<Either<Unit, UserError>> initUser();
}
typedef UserProfileUpdateCallback = void Function(

View File

@ -42,6 +42,11 @@ class IUserImpl extends IUser {
Future<Either<List<Workspace>, WorkspaceError>> fetchWorkspaces() {
return repo.getWorkspaces();
}
@override
Future<Either<Unit, UserError>> initUser() {
return repo.initUser();
}
}
class IUserWatchImpl extends IUserWatch {

View File

@ -28,6 +28,11 @@ class UserRepo {
return UserEventSignOut().send();
}
Future<Either<Unit, UserError>> initUser() {
final result = UserEventInitUser().send();
return result;
}
Future<Either<List<Workspace>, WorkspaceError>> getWorkspaces() {
final request = QueryWorkspaceRequest.create();

View File

@ -274,12 +274,12 @@ class WorkspaceEventApplyDocDelta {
class UserEventInitUser {
UserEventInitUser();
Future<Either<UserProfile, UserError>> send() {
Future<Either<Unit, UserError>> send() {
final request = FFIRequest.create()
..event = UserEvent.InitUser.toString();
return Dispatch.asyncRequest(request).then((bytesResult) => bytesResult.fold(
(okBytes) => left(UserProfile.fromBuffer(okBytes)),
(bytes) => left(unit),
(errBytes) => right(UserError.fromBuffer(errBytes)),
));
}
@ -364,3 +364,17 @@ class UserEventGetUserProfile {
}
}
class UserEventCheckUser {
UserEventCheckUser();
Future<Either<UserProfile, UserError>> send() {
final request = FFIRequest.create()
..event = UserEvent.CheckUser.toString();
return Dispatch.asyncRequest(request).then((bytesResult) => bytesResult.fold(
(okBytes) => left(UserProfile.fromBuffer(okBytes)),
(errBytes) => right(UserError.fromBuffer(errBytes)),
));
}
}

View File

@ -75,7 +75,7 @@ class Doc extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'Doc', createEmptyInstance: create)
..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id')
..a<$core.List<$core.int>>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY)
..aInt64(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'revision')
..aInt64(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'revId')
..hasRequiredFields = false
;
@ -83,7 +83,7 @@ class Doc extends $pb.GeneratedMessage {
factory Doc({
$core.String? id,
$core.List<$core.int>? data,
$fixnum.Int64? revision,
$fixnum.Int64? revId,
}) {
final _result = create();
if (id != null) {
@ -92,8 +92,8 @@ class Doc extends $pb.GeneratedMessage {
if (data != null) {
_result.data = data;
}
if (revision != null) {
_result.revision = revision;
if (revId != null) {
_result.revId = revId;
}
return _result;
}
@ -137,19 +137,20 @@ class Doc extends $pb.GeneratedMessage {
void clearData() => clearField(2);
@$pb.TagNumber(3)
$fixnum.Int64 get revision => $_getI64(2);
$fixnum.Int64 get revId => $_getI64(2);
@$pb.TagNumber(3)
set revision($fixnum.Int64 v) { $_setInt64(2, v); }
set revId($fixnum.Int64 v) { $_setInt64(2, v); }
@$pb.TagNumber(3)
$core.bool hasRevision() => $_has(2);
$core.bool hasRevId() => $_has(2);
@$pb.TagNumber(3)
void clearRevision() => clearField(3);
void clearRevId() => clearField(3);
}
class UpdateDocParams extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'UpdateDocParams', createEmptyInstance: create)
..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId')
..a<$core.List<$core.int>>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY)
..aInt64(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'revId')
..hasRequiredFields = false
;
@ -157,6 +158,7 @@ class UpdateDocParams extends $pb.GeneratedMessage {
factory UpdateDocParams({
$core.String? docId,
$core.List<$core.int>? data,
$fixnum.Int64? revId,
}) {
final _result = create();
if (docId != null) {
@ -165,6 +167,9 @@ class UpdateDocParams extends $pb.GeneratedMessage {
if (data != null) {
_result.data = data;
}
if (revId != null) {
_result.revId = revId;
}
return _result;
}
factory UpdateDocParams.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
@ -205,6 +210,15 @@ class UpdateDocParams extends $pb.GeneratedMessage {
$core.bool hasData() => $_has(1);
@$pb.TagNumber(2)
void clearData() => clearField(2);
@$pb.TagNumber(3)
$fixnum.Int64 get revId => $_getI64(2);
@$pb.TagNumber(3)
set revId($fixnum.Int64 v) { $_setInt64(2, v); }
@$pb.TagNumber(3)
$core.bool hasRevId() => $_has(2);
@$pb.TagNumber(3)
void clearRevId() => clearField(3);
}
class DocDelta extends $pb.GeneratedMessage {

View File

@ -25,23 +25,24 @@ const Doc$json = const {
'2': const [
const {'1': 'id', '3': 1, '4': 1, '5': 9, '10': 'id'},
const {'1': 'data', '3': 2, '4': 1, '5': 12, '10': 'data'},
const {'1': 'revision', '3': 3, '4': 1, '5': 3, '10': 'revision'},
const {'1': 'rev_id', '3': 3, '4': 1, '5': 3, '10': 'revId'},
],
};
/// Descriptor for `Doc`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List docDescriptor = $convert.base64Decode('CgNEb2MSDgoCaWQYASABKAlSAmlkEhIKBGRhdGEYAiABKAxSBGRhdGESGgoIcmV2aXNpb24YAyABKANSCHJldmlzaW9u');
final $typed_data.Uint8List docDescriptor = $convert.base64Decode('CgNEb2MSDgoCaWQYASABKAlSAmlkEhIKBGRhdGEYAiABKAxSBGRhdGESFQoGcmV2X2lkGAMgASgDUgVyZXZJZA==');
@$core.Deprecated('Use updateDocParamsDescriptor instead')
const UpdateDocParams$json = const {
'1': 'UpdateDocParams',
'2': const [
const {'1': 'doc_id', '3': 1, '4': 1, '5': 9, '10': 'docId'},
const {'1': 'data', '3': 2, '4': 1, '5': 12, '10': 'data'},
const {'1': 'rev_id', '3': 3, '4': 1, '5': 3, '10': 'revId'},
],
};
/// Descriptor for `UpdateDocParams`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List updateDocParamsDescriptor = $convert.base64Decode('Cg9VcGRhdGVEb2NQYXJhbXMSFQoGZG9jX2lkGAEgASgJUgVkb2NJZBISCgRkYXRhGAIgASgMUgRkYXRh');
final $typed_data.Uint8List updateDocParamsDescriptor = $convert.base64Decode('Cg9VcGRhdGVEb2NQYXJhbXMSFQoGZG9jX2lkGAEgASgJUgVkb2NJZBISCgRkYXRhGAIgASgMUgRkYXRhEhUKBnJldl9pZBgDIAEoA1IFcmV2SWQ=');
@$core.Deprecated('Use docDeltaDescriptor instead')
const DocDelta$json = const {
'1': 'DocDelta',

View File

@ -11,11 +11,11 @@ import 'package:protobuf/protobuf.dart' as $pb;
class WsDataType extends $pb.ProtobufEnum {
static const WsDataType Acked = WsDataType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Acked');
static const WsDataType Delta = WsDataType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Delta');
static const WsDataType Rev = WsDataType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Rev');
static const $core.List<WsDataType> values = <WsDataType> [
Acked,
Delta,
Rev,
];
static final $core.Map<$core.int, WsDataType> _byValue = $pb.ProtobufEnum.initByValue(values);

View File

@ -13,12 +13,12 @@ const WsDataType$json = const {
'1': 'WsDataType',
'2': const [
const {'1': 'Acked', '2': 0},
const {'1': 'Delta', '2': 1},
const {'1': 'Rev', '2': 1},
],
};
/// Descriptor for `WsDataType`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List wsDataTypeDescriptor = $convert.base64Decode('CgpXc0RhdGFUeXBlEgkKBUFja2VkEAASCQoFRGVsdGEQAQ==');
final $typed_data.Uint8List wsDataTypeDescriptor = $convert.base64Decode('CgpXc0RhdGFUeXBlEgkKBUFja2VkEAASBwoDUmV2EAE=');
@$core.Deprecated('Use wsDocumentDataDescriptor instead')
const WsDocumentData$json = const {
'1': 'WsDocumentData',

View File

@ -16,6 +16,7 @@ class UserEvent extends $pb.ProtobufEnum {
static const UserEvent SignOut = UserEvent._(3, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'SignOut');
static const UserEvent UpdateUser = UserEvent._(4, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UpdateUser');
static const UserEvent GetUserProfile = UserEvent._(5, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'GetUserProfile');
static const UserEvent CheckUser = UserEvent._(6, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'CheckUser');
static const $core.List<UserEvent> values = <UserEvent> [
InitUser,
@ -24,6 +25,7 @@ class UserEvent extends $pb.ProtobufEnum {
SignOut,
UpdateUser,
GetUserProfile,
CheckUser,
];
static final $core.Map<$core.int, UserEvent> _byValue = $pb.ProtobufEnum.initByValue(values);

View File

@ -18,8 +18,9 @@ const UserEvent$json = const {
const {'1': 'SignOut', '2': 3},
const {'1': 'UpdateUser', '2': 4},
const {'1': 'GetUserProfile', '2': 5},
const {'1': 'CheckUser', '2': 6},
],
};
/// Descriptor for `UserEvent`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List userEventDescriptor = $convert.base64Decode('CglVc2VyRXZlbnQSDAoISW5pdFVzZXIQABIKCgZTaWduSW4QARIKCgZTaWduVXAQAhILCgdTaWduT3V0EAMSDgoKVXBkYXRlVXNlchAEEhIKDkdldFVzZXJQcm9maWxlEAU=');
final $typed_data.Uint8List userEventDescriptor = $convert.base64Decode('CglVc2VyRXZlbnQSDAoISW5pdFVzZXIQABIKCgZTaWduSW4QARIKCgZTaWduVXAQAhILCgdTaWduT3V0EAMSDgoKVXBkYXRlVXNlchAEEhIKDkdldFVzZXJQcm9maWxlEAUSDQoJQ2hlY2tVc2VyEAY=');

View File

@ -319,67 +319,6 @@ class UpdateViewParams extends $pb.GeneratedMessage {
void clearIsTrash() => clearField(5);
}
class SaveViewDataRequest extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'SaveViewDataRequest', createEmptyInstance: create)
..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'viewId')
..a<$core.List<$core.int>>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY)
..hasRequiredFields = false
;
SaveViewDataRequest._() : super();
factory SaveViewDataRequest({
$core.String? viewId,
$core.List<$core.int>? data,
}) {
final _result = create();
if (viewId != null) {
_result.viewId = viewId;
}
if (data != null) {
_result.data = data;
}
return _result;
}
factory SaveViewDataRequest.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
factory SaveViewDataRequest.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.deepCopy] instead. '
'Will be removed in next major version')
SaveViewDataRequest clone() => SaveViewDataRequest()..mergeFromMessage(this);
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.rebuild] instead. '
'Will be removed in next major version')
SaveViewDataRequest copyWith(void Function(SaveViewDataRequest) updates) => super.copyWith((message) => updates(message as SaveViewDataRequest)) as SaveViewDataRequest; // ignore: deprecated_member_use
$pb.BuilderInfo get info_ => _i;
@$core.pragma('dart2js:noInline')
static SaveViewDataRequest create() => SaveViewDataRequest._();
SaveViewDataRequest createEmptyInstance() => create();
static $pb.PbList<SaveViewDataRequest> createRepeated() => $pb.PbList<SaveViewDataRequest>();
@$core.pragma('dart2js:noInline')
static SaveViewDataRequest getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<SaveViewDataRequest>(create);
static SaveViewDataRequest? _defaultInstance;
@$pb.TagNumber(1)
$core.String get viewId => $_getSZ(0);
@$pb.TagNumber(1)
set viewId($core.String v) { $_setString(0, v); }
@$pb.TagNumber(1)
$core.bool hasViewId() => $_has(0);
@$pb.TagNumber(1)
void clearViewId() => clearField(1);
@$pb.TagNumber(2)
$core.List<$core.int> get data => $_getN(1);
@$pb.TagNumber(2)
set data($core.List<$core.int> v) { $_setBytes(1, v); }
@$pb.TagNumber(2)
$core.bool hasData() => $_has(1);
@$pb.TagNumber(2)
void clearData() => clearField(2);
}
class ApplyChangesetRequest extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'ApplyChangesetRequest', createEmptyInstance: create)
..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'viewId')

View File

@ -48,17 +48,6 @@ const UpdateViewParams$json = const {
/// Descriptor for `UpdateViewParams`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List updateViewParamsDescriptor = $convert.base64Decode('ChBVcGRhdGVWaWV3UGFyYW1zEhcKB3ZpZXdfaWQYASABKAlSBnZpZXdJZBIUCgRuYW1lGAIgASgJSABSBG5hbWUSFAoEZGVzYxgDIAEoCUgBUgRkZXNjEh4KCXRodW1ibmFpbBgEIAEoCUgCUgl0aHVtYm5haWwSGwoIaXNfdHJhc2gYBSABKAhIA1IHaXNUcmFzaEINCgtvbmVfb2ZfbmFtZUINCgtvbmVfb2ZfZGVzY0ISChBvbmVfb2ZfdGh1bWJuYWlsQhEKD29uZV9vZl9pc190cmFzaA==');
@$core.Deprecated('Use saveViewDataRequestDescriptor instead')
const SaveViewDataRequest$json = const {
'1': 'SaveViewDataRequest',
'2': const [
const {'1': 'view_id', '3': 1, '4': 1, '5': 9, '10': 'viewId'},
const {'1': 'data', '3': 2, '4': 1, '5': 12, '10': 'data'},
],
};
/// Descriptor for `SaveViewDataRequest`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List saveViewDataRequestDescriptor = $convert.base64Decode('ChNTYXZlVmlld0RhdGFSZXF1ZXN0EhcKB3ZpZXdfaWQYASABKAlSBnZpZXdJZBISCgRkYXRhGAIgASgMUgRkYXRh');
@$core.Deprecated('Use applyChangesetRequestDescriptor instead')
const ApplyChangesetRequest$json = const {
'1': 'ApplyChangesetRequest',

View File

@ -2,5 +2,6 @@
CREATE TABLE IF NOT EXISTS doc_table(
id uuid NOT NULL,
PRIMARY KEY (id),
data bytea NOT NULL DEFAULT ''
data bytea NOT NULL DEFAULT '',
rev_id bigint NOT NULL DEFAULT 0
);

View File

@ -6,6 +6,7 @@ pub(crate) const DOC_TABLE: &'static str = "doc_table";
pub struct DocTable {
pub(crate) id: uuid::Uuid,
pub(crate) data: Vec<u8>,
pub(crate) rev_id: i64,
}
impl std::convert::Into<Doc> for DocTable {
@ -13,6 +14,7 @@ impl std::convert::Into<Doc> for DocTable {
let mut doc = Doc::new();
doc.set_id(self.id.to_string());
doc.set_data(self.data);
doc.set_rev_id(self.rev_id);
doc
}
}

View File

@ -65,6 +65,7 @@ pub(crate) async fn update_doc(
let (sql, args) = SqlBuilder::update(DOC_TABLE)
.add_some_arg("data", data)
.add_arg("rev_id", params.rev_id)
.and_where_eq("id", doc_id)
.build()?;

View File

@ -1,83 +0,0 @@
use crate::service::{
doc::update_doc,
ws::{entities::Socket, WsClientData, WsMessageAdaptor},
};
use actix_web::web::Data;
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use bytes::Bytes;
use flowy_document::{
entities::ws::{WsDataType, WsDocumentData},
protobuf::{Doc, Revision, UpdateDocParams},
services::doc::Document,
};
use flowy_net::errors::{internal_error, ServerError};
use flowy_ot::core::Delta;
use flowy_ws::{protobuf::WsModule, WsMessage};
use parking_lot::RwLock;
use protobuf::Message;
use sqlx::PgPool;
use std::{convert::TryInto, sync::Arc, time::Duration};
pub(crate) struct EditDoc {
doc_id: String,
document: Arc<RwLock<Document>>,
pg_pool: Data<PgPool>,
}
impl EditDoc {
pub(crate) fn new(doc: Doc, pg_pool: Data<PgPool>) -> Result<Self, ServerError> {
let delta = Delta::from_bytes(doc.data).map_err(internal_error)?;
let document = Arc::new(RwLock::new(Document::from_delta(delta)));
Ok(Self {
doc_id: doc.id.clone(),
document,
pg_pool,
})
}
#[tracing::instrument(level = "debug", skip(self, socket, revision))]
pub(crate) async fn apply_revision(
&self,
socket: Socket,
revision: Revision,
) -> Result<(), ServerError> {
let delta = Delta::from_bytes(revision.delta).map_err(internal_error)?;
match self.document.try_write_for(Duration::from_millis(300)) {
None => {
log::error!("Failed to acquire write lock of document");
},
Some(mut write_guard) => {
let _ = write_guard.apply_delta(delta).map_err(internal_error)?;
let mut wtr = vec![];
let _ = wtr.write_i64::<BigEndian>(revision.rev_id);
let data = WsDocumentData {
id: self.doc_id.clone(),
ty: WsDataType::Acked,
data: wtr,
};
let msg: WsMessage = data.into();
let bytes: Bytes = msg.try_into().unwrap();
socket.do_send(WsMessageAdaptor(bytes));
},
}
let md5 = format!("{:x}", md5::compute(self.document.read().to_json()));
if md5 != revision.md5 {
log::warn!("Document md5 not match")
}
let mut params = UpdateDocParams::new();
params.set_doc_id(self.doc_id.clone());
params.set_data(self.document.read().to_bytes());
match update_doc(self.pg_pool.get_ref(), params).await {
Ok(_) => {},
Err(e) => {
log::error!("Save doc data failed: {:?}", e);
},
}
Ok(())
}
}

View File

@ -0,0 +1,163 @@
use crate::service::{
doc::update_doc,
util::md5,
ws::{entities::Socket, WsMessageAdaptor},
};
use actix_web::web::Data;
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use bytes::Bytes;
use flowy_document::{
entities::ws::{WsDataType, WsDocumentData},
protobuf::{Doc, Revision, UpdateDocParams},
services::doc::Document,
};
use flowy_net::errors::{internal_error, ServerError};
use flowy_ot::{
core::{Delta, OperationTransformable},
errors::OTError,
};
use flowy_ws::WsMessage;
use parking_lot::RwLock;
use protobuf::Message;
use sqlx::PgPool;
use std::{convert::TryInto, sync::Arc, time::Duration};
pub(crate) struct EditDocContext {
doc_id: String,
rev_id: i64,
document: Arc<RwLock<Document>>,
pg_pool: Data<PgPool>,
}
impl EditDocContext {
pub(crate) fn new(doc: Doc, pg_pool: Data<PgPool>) -> Result<Self, ServerError> {
let delta = Delta::from_bytes(doc.data).map_err(internal_error)?;
let document = Arc::new(RwLock::new(Document::from_delta(delta)));
Ok(Self {
doc_id: doc.id.clone(),
rev_id: doc.rev_id,
document,
pg_pool,
})
}
#[tracing::instrument(level = "debug", skip(self, socket, revision))]
pub(crate) async fn apply_revision(
&self,
socket: Socket,
revision: Revision,
) -> Result<(), ServerError> {
let _ = self.verify_md5(&revision)?;
if self.rev_id > revision.rev_id {
let (cli_prime, server_prime) = self.compose(revision.delta).map_err(internal_error)?;
let _ = self.update_document_delta(server_prime)?;
log::debug!("{} client delta: {}", self.doc_id, cli_prime.to_json());
let cli_revision = self.mk_revision(revision.rev_id, cli_prime);
let ws_cli_revision = mk_ws_rev_message(&self.doc_id, cli_revision);
socket.do_send(ws_cli_revision).map_err(internal_error)?;
Ok(())
} else {
let delta = Delta::from_bytes(revision.delta.clone()).map_err(internal_error)?;
let _ = self.update_document_delta(delta)?;
socket.do_send(mk_ws_acked_message(&revision));
// Opti: save with multiple revisions
let _ = self.save_doc_to_disk(&revision).await?;
Ok(())
}
}
fn mk_revision(&self, base_rev_id: i64, delta: Delta) -> Revision {
let delta_data = delta.into_bytes();
let md5 = md5(&delta_data);
let revision = Revision {
base_rev_id,
rev_id: self.rev_id,
delta: delta_data,
md5,
doc_id: self.doc_id.to_string(),
..Default::default()
};
revision
}
#[tracing::instrument(level = "debug", skip(self, delta_data))]
fn compose(&self, delta_data: Vec<u8>) -> Result<(Delta, Delta), OTError> {
log::debug!(
"{} document data: {}",
self.doc_id,
self.document.read().to_json()
);
let doc_delta = self.document.read().delta().clone();
let cli_delta = Delta::from_bytes(delta_data)?;
let (a, b) = doc_delta.transform(&cli_delta)?;
Ok((a, b))
}
#[tracing::instrument(level = "debug", skip(self, delta))]
fn update_document_delta(&self, delta: Delta) -> Result<(), ServerError> {
// Opti: push each revision into queue and process it one by one.
match self.document.try_write_for(Duration::from_millis(300)) {
None => {
log::error!("Failed to acquire write lock of document");
},
Some(mut write_guard) => {
let _ = write_guard
.apply_delta(delta.clone())
.map_err(internal_error)?;
log::debug!("Document: {}", write_guard.to_plain_string());
},
}
Ok(())
}
fn verify_md5(&self, revision: &Revision) -> Result<(), ServerError> {
if md5(&revision.delta) != revision.md5 {
return Err(ServerError::internal().context("Delta md5 not match"));
}
Ok(())
}
async fn save_doc_to_disk(&self, revision: &Revision) -> Result<(), ServerError> {
let mut params = UpdateDocParams::new();
params.set_doc_id(self.doc_id.clone());
params.set_data(self.document.read().to_bytes());
params.set_rev_id(revision.rev_id);
let _ = update_doc(self.pg_pool.get_ref(), params).await?;
Ok(())
}
}
fn mk_ws_rev_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor {
let bytes = revision.write_to_bytes().unwrap();
let data = WsDocumentData {
id: doc_id.to_string(),
ty: WsDataType::Rev,
data: bytes,
};
let msg: WsMessage = data.into();
let bytes: Bytes = msg.try_into().unwrap();
WsMessageAdaptor(bytes)
}
fn mk_ws_acked_message(revision: &Revision) -> WsMessageAdaptor {
let mut wtr = vec![];
let _ = wtr.write_i64::<BigEndian>(revision.rev_id);
let data = WsDocumentData {
id: revision.doc_id.clone(),
ty: WsDataType::Acked,
data: wtr,
};
let msg: WsMessage = data.into();
let bytes: Bytes = msg.try_into().unwrap();
WsMessageAdaptor(bytes)
}

View File

@ -1,5 +1,5 @@
mod doc;
mod edit_doc;
mod edit_doc_context;
pub mod router;
mod sql_builder;
pub mod ws_handler;

View File

@ -14,7 +14,11 @@ pub struct NewDocSqlBuilder {
impl NewDocSqlBuilder {
pub fn new(id: Uuid) -> Self {
let table = DocTable { id, data: vec![] };
let table = DocTable {
id,
data: vec![],
rev_id: 0,
};
Self { table }
}
@ -27,6 +31,7 @@ impl NewDocSqlBuilder {
let (sql, args) = SqlBuilder::create(DOC_TABLE)
.add_arg("id", self.table.id)
.add_arg("data", self.table.data)
.add_arg("rev_id", self.table.rev_id)
.build()?;
Ok((sql, args))

View File

@ -1,17 +1,14 @@
use super::edit_doc::EditDoc;
use super::edit_doc_context::EditDocContext;
use crate::service::{
doc::read_doc,
util::parse_from_bytes,
ws::{WsBizHandler, WsClientData},
};
use actix_web::web::Data;
use bytes::Bytes;
use flowy_document::{
protobuf::{QueryDocParams, Revision, WsDataType, WsDocumentData},
services::doc::Document,
};
use flowy_document::protobuf::{QueryDocParams, Revision, WsDataType, WsDocumentData};
use flowy_net::errors::ServerError;
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use protobuf::Message;
use sqlx::PgPool;
use std::{collections::HashMap, sync::Arc};
@ -43,7 +40,7 @@ impl WsBizHandler for DocWsBizHandler {
struct EditDocManager {
pg_pool: Data<PgPool>,
edit_docs: RwLock<HashMap<String, Arc<EditDoc>>>,
edit_docs: RwLock<HashMap<String, Arc<EditDocContext>>>,
}
impl EditDocManager {
@ -59,7 +56,7 @@ impl EditDocManager {
match document_data.ty {
WsDataType::Acked => {},
WsDataType::Delta => {
WsDataType::Rev => {
let revision: Revision = parse_from_bytes(&document_data.data)?;
let edited_doc = self.get_edit_doc(&revision.doc_id).await?;
tokio::spawn(async move {
@ -77,7 +74,7 @@ impl EditDocManager {
Ok(())
}
async fn get_edit_doc(&self, doc_id: &str) -> Result<Arc<EditDoc>, ServerError> {
async fn get_edit_doc(&self, doc_id: &str) -> Result<Arc<EditDocContext>, ServerError> {
// Opti: using lock free map instead?
let edit_docs = self.edit_docs.upgradable_read();
if let Some(doc) = edit_docs.get(doc_id) {
@ -91,7 +88,7 @@ impl EditDocManager {
};
let doc = read_doc(pg_pool.get_ref(), params).await?;
let edit_doc = Arc::new(EditDoc::new(doc, self.pg_pool.clone())?);
let edit_doc = Arc::new(EditDocContext::new(doc, self.pg_pool.clone())?);
edit_docs.insert(doc_id.to_string(), edit_doc.clone());
Ok(edit_doc)
}

View File

@ -17,6 +17,12 @@ pub async fn parse_from_dev_payload<T: Message>(
parse_from_bytes(&bytes)
}
#[inline]
pub fn md5<T: AsRef<[u8]>>(data: T) -> String {
let md5 = format!("{:x}", md5::compute(data));
md5
}
pub fn parse_from_bytes<T: Message>(bytes: &[u8]) -> Result<T, ServerError> {
let result: ProtobufResult<T> = Message::parse_from_bytes(&bytes);
match result {

View File

@ -1,5 +1,5 @@
use crate::service::ws::WsClientData;
use bytes::Bytes;
use flowy_ws::WsModule;
use std::{collections::HashMap, sync::Arc};

View File

@ -1,7 +1,5 @@
use crate::service::ws::entities::SessionId;
use actix::Message;
use bytes::Bytes;
use std::fmt::Formatter;
#[derive(Debug, Message, Clone)]
#[rtype(result = "()")]

View File

@ -1,7 +1,8 @@
-- Your SQL goes here
CREATE TABLE op_table (
doc_id TEXT NOT NULL PRIMARY KEY,
base_rev_id BIGINT NOT NULL DEFAULT 0,
rev_id BIGINT NOT NULL PRIMARY KEY,
rev_id BIGINT NOT NULL DEFAULT 0,
data BLOB NOT NULL DEFAULT (x''),
md5 TEXT NOT NULL DEFAULT '',
state INTEGER NOT NULL DEFAULT 0

View File

@ -22,7 +22,8 @@ table! {
}
table! {
op_table (rev_id) {
op_table (doc_id) {
doc_id -> Text,
base_rev_id -> BigInt,
rev_id -> BigInt,
data -> Binary,

View File

@ -41,7 +41,6 @@ pub fn category_from_str(type_str: &str) -> TypeCategory {
| "CurrentWorkspace"
| "UpdateViewRequest"
| "UpdateViewParams"
| "SaveViewDataRequest"
| "ApplyChangesetRequest"
| "DeleteViewRequest"
| "DeleteViewParams"

View File

@ -22,7 +22,7 @@ pub struct Doc {
pub data: Vec<u8>,
#[pb(index = 3)]
pub revision: i64,
pub rev_id: i64,
}
#[derive(ProtoBuf, Default, Debug, Clone)]
@ -32,6 +32,9 @@ pub struct UpdateDocParams {
#[pb(index = 2)]
pub data: Vec<u8>,
#[pb(index = 3)]
pub rev_id: i64,
}
#[derive(ProtoBuf, Default, Debug, Clone)]

View File

@ -7,7 +7,7 @@ use std::convert::TryInto;
#[derive(Debug, Clone, ProtoBuf_Enum, Eq, PartialEq, Hash)]
pub enum WsDataType {
Acked = 0,
Delta = 1,
Rev = 1,
}
impl std::default::Default for WsDataType {
@ -33,7 +33,7 @@ impl std::convert::From<Revision> for WsDocumentData {
let data = bytes.to_vec();
Self {
id,
ty: WsDataType::Delta,
ty: WsDataType::Rev,
data,
}
}

View File

@ -43,10 +43,10 @@ impl FlowyDocument {
Ok(open_doc.doc())
}
pub async fn apply_doc_delta(&self, params: DocDelta, pool: Arc<ConnectionPool>) -> Result<Doc, DocError> {
pub async fn apply_doc_delta(&self, params: DocDelta) -> Result<Doc, DocError> {
// workaround: compare the rust's delta with flutter's delta. Will be removed
// very soon
let doc = self.doc_ctrl.edit_doc(params.clone(), pool)?;
let doc = self.doc_ctrl.edit_doc(params.clone())?;
Ok(doc)
}
}

View File

@ -229,7 +229,7 @@ pub struct Doc {
// message fields
pub id: ::std::string::String,
pub data: ::std::vec::Vec<u8>,
pub revision: i64,
pub rev_id: i64,
// special fields
pub unknown_fields: ::protobuf::UnknownFields,
pub cached_size: ::protobuf::CachedSize,
@ -298,19 +298,19 @@ impl Doc {
::std::mem::replace(&mut self.data, ::std::vec::Vec::new())
}
// int64 revision = 3;
// int64 rev_id = 3;
pub fn get_revision(&self) -> i64 {
self.revision
pub fn get_rev_id(&self) -> i64 {
self.rev_id
}
pub fn clear_revision(&mut self) {
self.revision = 0;
pub fn clear_rev_id(&mut self) {
self.rev_id = 0;
}
// Param is passed by value, moved
pub fn set_revision(&mut self, v: i64) {
self.revision = v;
pub fn set_rev_id(&mut self, v: i64) {
self.rev_id = v;
}
}
@ -334,7 +334,7 @@ impl ::protobuf::Message for Doc {
return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
}
let tmp = is.read_int64()?;
self.revision = tmp;
self.rev_id = tmp;
},
_ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
@ -354,8 +354,8 @@ impl ::protobuf::Message for Doc {
if !self.data.is_empty() {
my_size += ::protobuf::rt::bytes_size(2, &self.data);
}
if self.revision != 0 {
my_size += ::protobuf::rt::value_size(3, self.revision, ::protobuf::wire_format::WireTypeVarint);
if self.rev_id != 0 {
my_size += ::protobuf::rt::value_size(3, self.rev_id, ::protobuf::wire_format::WireTypeVarint);
}
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size);
@ -369,8 +369,8 @@ impl ::protobuf::Message for Doc {
if !self.data.is_empty() {
os.write_bytes(2, &self.data)?;
}
if self.revision != 0 {
os.write_int64(3, self.revision)?;
if self.rev_id != 0 {
os.write_int64(3, self.rev_id)?;
}
os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(())
@ -421,9 +421,9 @@ impl ::protobuf::Message for Doc {
|m: &mut Doc| { &mut m.data },
));
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>(
"revision",
|m: &Doc| { &m.revision },
|m: &mut Doc| { &mut m.revision },
"rev_id",
|m: &Doc| { &m.rev_id },
|m: &mut Doc| { &mut m.rev_id },
));
::protobuf::reflect::MessageDescriptor::new_pb_name::<Doc>(
"Doc",
@ -443,7 +443,7 @@ impl ::protobuf::Clear for Doc {
fn clear(&mut self) {
self.id.clear();
self.data.clear();
self.revision = 0;
self.rev_id = 0;
self.unknown_fields.clear();
}
}
@ -465,6 +465,7 @@ pub struct UpdateDocParams {
// message fields
pub doc_id: ::std::string::String,
pub data: ::std::vec::Vec<u8>,
pub rev_id: i64,
// special fields
pub unknown_fields: ::protobuf::UnknownFields,
pub cached_size: ::protobuf::CachedSize,
@ -532,6 +533,21 @@ impl UpdateDocParams {
pub fn take_data(&mut self) -> ::std::vec::Vec<u8> {
::std::mem::replace(&mut self.data, ::std::vec::Vec::new())
}
// int64 rev_id = 3;
pub fn get_rev_id(&self) -> i64 {
self.rev_id
}
pub fn clear_rev_id(&mut self) {
self.rev_id = 0;
}
// Param is passed by value, moved
pub fn set_rev_id(&mut self, v: i64) {
self.rev_id = v;
}
}
impl ::protobuf::Message for UpdateDocParams {
@ -549,6 +565,13 @@ impl ::protobuf::Message for UpdateDocParams {
2 => {
::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.data)?;
},
3 => {
if wire_type != ::protobuf::wire_format::WireTypeVarint {
return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
}
let tmp = is.read_int64()?;
self.rev_id = tmp;
},
_ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
},
@ -567,6 +590,9 @@ impl ::protobuf::Message for UpdateDocParams {
if !self.data.is_empty() {
my_size += ::protobuf::rt::bytes_size(2, &self.data);
}
if self.rev_id != 0 {
my_size += ::protobuf::rt::value_size(3, self.rev_id, ::protobuf::wire_format::WireTypeVarint);
}
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size);
my_size
@ -579,6 +605,9 @@ impl ::protobuf::Message for UpdateDocParams {
if !self.data.is_empty() {
os.write_bytes(2, &self.data)?;
}
if self.rev_id != 0 {
os.write_int64(3, self.rev_id)?;
}
os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(())
}
@ -627,6 +656,11 @@ impl ::protobuf::Message for UpdateDocParams {
|m: &UpdateDocParams| { &m.data },
|m: &mut UpdateDocParams| { &mut m.data },
));
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>(
"rev_id",
|m: &UpdateDocParams| { &m.rev_id },
|m: &mut UpdateDocParams| { &mut m.rev_id },
));
::protobuf::reflect::MessageDescriptor::new_pb_name::<UpdateDocParams>(
"UpdateDocParams",
fields,
@ -645,6 +679,7 @@ impl ::protobuf::Clear for UpdateDocParams {
fn clear(&mut self) {
self.doc_id.clear();
self.data.clear();
self.rev_id = 0;
self.unknown_fields.clear();
}
}
@ -1023,48 +1058,51 @@ impl ::protobuf::reflect::ProtobufValue for QueryDocParams {
static file_descriptor_proto_data: &'static [u8] = b"\
\n\tdoc.proto\"5\n\x0fCreateDocParams\x12\x0e\n\x02id\x18\x01\x20\x01(\t\
R\x02id\x12\x12\n\x04data\x18\x02\x20\x01(\x0cR\x04data\"E\n\x03Doc\x12\
R\x02id\x12\x12\n\x04data\x18\x02\x20\x01(\x0cR\x04data\"@\n\x03Doc\x12\
\x0e\n\x02id\x18\x01\x20\x01(\tR\x02id\x12\x12\n\x04data\x18\x02\x20\x01\
(\x0cR\x04data\x12\x1a\n\x08revision\x18\x03\x20\x01(\x03R\x08revision\"\
<\n\x0fUpdateDocParams\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\
\x12\x12\n\x04data\x18\x02\x20\x01(\x0cR\x04data\"5\n\x08DocDelta\x12\
\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\x12\n\x04data\x18\x02\
\x20\x01(\x0cR\x04data\"'\n\x0eQueryDocParams\x12\x15\n\x06doc_id\x18\
\x01\x20\x01(\tR\x05docIdJ\xb0\x05\n\x06\x12\x04\0\0\x15\x01\n\x08\n\x01\
\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x05\x01\n\n\n\x03\x04\
\0\x01\x12\x03\x02\x08\x17\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x12\n\
\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\
\x12\x03\x03\x0b\r\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x10\x11\n\x0b\
\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x13\n\x0c\n\x05\x04\0\x02\x01\x05\
\x12\x03\x04\x04\t\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\n\x0e\n\x0c\
\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x11\x12\n\n\n\x02\x04\x01\x12\x04\
\x06\0\n\x01\n\n\n\x03\x04\x01\x01\x12\x03\x06\x08\x0b\n\x0b\n\x04\x04\
\x01\x02\0\x12\x03\x07\x04\x12\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\x07\
\x04\n\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x07\x0b\r\n\x0c\n\x05\x04\
\x01\x02\0\x03\x12\x03\x07\x10\x11\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\
\x08\x04\x13\n\x0c\n\x05\x04\x01\x02\x01\x05\x12\x03\x08\x04\t\n\x0c\n\
\x05\x04\x01\x02\x01\x01\x12\x03\x08\n\x0e\n\x0c\n\x05\x04\x01\x02\x01\
\x03\x12\x03\x08\x11\x12\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\t\x04\x17\n\
\x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\t\x04\t\n\x0c\n\x05\x04\x01\x02\
\x02\x01\x12\x03\t\n\x12\n\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\t\x15\
\x16\n\n\n\x02\x04\x02\x12\x04\x0b\0\x0e\x01\n\n\n\x03\x04\x02\x01\x12\
\x03\x0b\x08\x17\n\x0b\n\x04\x04\x02\x02\0\x12\x03\x0c\x04\x16\n\x0c\n\
\x05\x04\x02\x02\0\x05\x12\x03\x0c\x04\n\n\x0c\n\x05\x04\x02\x02\0\x01\
\x12\x03\x0c\x0b\x11\n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03\x0c\x14\x15\n\
\x0b\n\x04\x04\x02\x02\x01\x12\x03\r\x04\x13\n\x0c\n\x05\x04\x02\x02\x01\
\x05\x12\x03\r\x04\t\n\x0c\n\x05\x04\x02\x02\x01\x01\x12\x03\r\n\x0e\n\
\x0c\n\x05\x04\x02\x02\x01\x03\x12\x03\r\x11\x12\n\n\n\x02\x04\x03\x12\
\x04\x0f\0\x12\x01\n\n\n\x03\x04\x03\x01\x12\x03\x0f\x08\x10\n\x0b\n\x04\
\x04\x03\x02\0\x12\x03\x10\x04\x16\n\x0c\n\x05\x04\x03\x02\0\x05\x12\x03\
\x10\x04\n\n\x0c\n\x05\x04\x03\x02\0\x01\x12\x03\x10\x0b\x11\n\x0c\n\x05\
\x04\x03\x02\0\x03\x12\x03\x10\x14\x15\n\x0b\n\x04\x04\x03\x02\x01\x12\
\x03\x11\x04\x13\n\x0c\n\x05\x04\x03\x02\x01\x05\x12\x03\x11\x04\t\n\x0c\
\n\x05\x04\x03\x02\x01\x01\x12\x03\x11\n\x0e\n\x0c\n\x05\x04\x03\x02\x01\
\x03\x12\x03\x11\x11\x12\n\n\n\x02\x04\x04\x12\x04\x13\0\x15\x01\n\n\n\
\x03\x04\x04\x01\x12\x03\x13\x08\x16\n\x0b\n\x04\x04\x04\x02\0\x12\x03\
\x14\x04\x16\n\x0c\n\x05\x04\x04\x02\0\x05\x12\x03\x14\x04\n\n\x0c\n\x05\
\x04\x04\x02\0\x01\x12\x03\x14\x0b\x11\n\x0c\n\x05\x04\x04\x02\0\x03\x12\
\x03\x14\x14\x15b\x06proto3\
(\x0cR\x04data\x12\x15\n\x06rev_id\x18\x03\x20\x01(\x03R\x05revId\"S\n\
\x0fUpdateDocParams\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\
\x12\n\x04data\x18\x02\x20\x01(\x0cR\x04data\x12\x15\n\x06rev_id\x18\x03\
\x20\x01(\x03R\x05revId\"5\n\x08DocDelta\x12\x15\n\x06doc_id\x18\x01\x20\
\x01(\tR\x05docId\x12\x12\n\x04data\x18\x02\x20\x01(\x0cR\x04data\"'\n\
\x0eQueryDocParams\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docIdJ\xe7\
\x05\n\x06\x12\x04\0\0\x16\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\
\x04\0\x12\x04\x02\0\x05\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x17\n\
\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x12\n\x0c\n\x05\x04\0\x02\0\x05\
\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\x0b\r\n\x0c\n\
\x05\x04\0\x02\0\x03\x12\x03\x03\x10\x11\n\x0b\n\x04\x04\0\x02\x01\x12\
\x03\x04\x04\x13\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\x04\t\n\x0c\n\
\x05\x04\0\x02\x01\x01\x12\x03\x04\n\x0e\n\x0c\n\x05\x04\0\x02\x01\x03\
\x12\x03\x04\x11\x12\n\n\n\x02\x04\x01\x12\x04\x06\0\n\x01\n\n\n\x03\x04\
\x01\x01\x12\x03\x06\x08\x0b\n\x0b\n\x04\x04\x01\x02\0\x12\x03\x07\x04\
\x12\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\x07\x04\n\n\x0c\n\x05\x04\x01\
\x02\0\x01\x12\x03\x07\x0b\r\n\x0c\n\x05\x04\x01\x02\0\x03\x12\x03\x07\
\x10\x11\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\x08\x04\x13\n\x0c\n\x05\x04\
\x01\x02\x01\x05\x12\x03\x08\x04\t\n\x0c\n\x05\x04\x01\x02\x01\x01\x12\
\x03\x08\n\x0e\n\x0c\n\x05\x04\x01\x02\x01\x03\x12\x03\x08\x11\x12\n\x0b\
\n\x04\x04\x01\x02\x02\x12\x03\t\x04\x15\n\x0c\n\x05\x04\x01\x02\x02\x05\
\x12\x03\t\x04\t\n\x0c\n\x05\x04\x01\x02\x02\x01\x12\x03\t\n\x10\n\x0c\n\
\x05\x04\x01\x02\x02\x03\x12\x03\t\x13\x14\n\n\n\x02\x04\x02\x12\x04\x0b\
\0\x0f\x01\n\n\n\x03\x04\x02\x01\x12\x03\x0b\x08\x17\n\x0b\n\x04\x04\x02\
\x02\0\x12\x03\x0c\x04\x16\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0c\x04\
\n\n\x0c\n\x05\x04\x02\x02\0\x01\x12\x03\x0c\x0b\x11\n\x0c\n\x05\x04\x02\
\x02\0\x03\x12\x03\x0c\x14\x15\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\r\x04\
\x13\n\x0c\n\x05\x04\x02\x02\x01\x05\x12\x03\r\x04\t\n\x0c\n\x05\x04\x02\
\x02\x01\x01\x12\x03\r\n\x0e\n\x0c\n\x05\x04\x02\x02\x01\x03\x12\x03\r\
\x11\x12\n\x0b\n\x04\x04\x02\x02\x02\x12\x03\x0e\x04\x15\n\x0c\n\x05\x04\
\x02\x02\x02\x05\x12\x03\x0e\x04\t\n\x0c\n\x05\x04\x02\x02\x02\x01\x12\
\x03\x0e\n\x10\n\x0c\n\x05\x04\x02\x02\x02\x03\x12\x03\x0e\x13\x14\n\n\n\
\x02\x04\x03\x12\x04\x10\0\x13\x01\n\n\n\x03\x04\x03\x01\x12\x03\x10\x08\
\x10\n\x0b\n\x04\x04\x03\x02\0\x12\x03\x11\x04\x16\n\x0c\n\x05\x04\x03\
\x02\0\x05\x12\x03\x11\x04\n\n\x0c\n\x05\x04\x03\x02\0\x01\x12\x03\x11\
\x0b\x11\n\x0c\n\x05\x04\x03\x02\0\x03\x12\x03\x11\x14\x15\n\x0b\n\x04\
\x04\x03\x02\x01\x12\x03\x12\x04\x13\n\x0c\n\x05\x04\x03\x02\x01\x05\x12\
\x03\x12\x04\t\n\x0c\n\x05\x04\x03\x02\x01\x01\x12\x03\x12\n\x0e\n\x0c\n\
\x05\x04\x03\x02\x01\x03\x12\x03\x12\x11\x12\n\n\n\x02\x04\x04\x12\x04\
\x14\0\x16\x01\n\n\n\x03\x04\x04\x01\x12\x03\x14\x08\x16\n\x0b\n\x04\x04\
\x04\x02\0\x12\x03\x15\x04\x16\n\x0c\n\x05\x04\x04\x02\0\x05\x12\x03\x15\
\x04\n\n\x0c\n\x05\x04\x04\x02\0\x01\x12\x03\x15\x0b\x11\n\x0c\n\x05\x04\
\x04\x02\0\x03\x12\x03\x15\x14\x15b\x06proto3\
";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

View File

@ -258,7 +258,7 @@ impl ::protobuf::reflect::ProtobufValue for WsDocumentData {
#[derive(Clone,PartialEq,Eq,Debug,Hash)]
pub enum WsDataType {
Acked = 0,
Delta = 1,
Rev = 1,
}
impl ::protobuf::ProtobufEnum for WsDataType {
@ -269,7 +269,7 @@ impl ::protobuf::ProtobufEnum for WsDataType {
fn from_i32(value: i32) -> ::std::option::Option<WsDataType> {
match value {
0 => ::std::option::Option::Some(WsDataType::Acked),
1 => ::std::option::Option::Some(WsDataType::Delta),
1 => ::std::option::Option::Some(WsDataType::Rev),
_ => ::std::option::Option::None
}
}
@ -277,7 +277,7 @@ impl ::protobuf::ProtobufEnum for WsDataType {
fn values() -> &'static [Self] {
static values: &'static [WsDataType] = &[
WsDataType::Acked,
WsDataType::Delta,
WsDataType::Rev,
];
values
}
@ -308,8 +308,8 @@ impl ::protobuf::reflect::ProtobufValue for WsDataType {
static file_descriptor_proto_data: &'static [u8] = b"\
\n\x08ws.proto\"Q\n\x0eWsDocumentData\x12\x0e\n\x02id\x18\x01\x20\x01(\t\
R\x02id\x12\x1b\n\x02ty\x18\x02\x20\x01(\x0e2\x0b.WsDataTypeR\x02ty\x12\
\x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data*\"\n\nWsDataType\x12\t\n\
\x05Acked\x10\0\x12\t\n\x05Delta\x10\x01J\xb9\x02\n\x06\x12\x04\0\0\n\
\x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data*\x20\n\nWsDataType\x12\t\n\
\x05Acked\x10\0\x12\x07\n\x03Rev\x10\x01J\xb9\x02\n\x06\x12\x04\0\0\n\
\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x06\
\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x16\n\x0b\n\x04\x04\0\x02\0\x12\
\x03\x03\x04\x12\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\
@ -323,8 +323,8 @@ static file_descriptor_proto_data: &'static [u8] = b"\
\x07\0\n\x01\n\n\n\x03\x05\0\x01\x12\x03\x07\x05\x0f\n\x0b\n\x04\x05\0\
\x02\0\x12\x03\x08\x04\x0e\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x08\x04\t\
\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x08\x0c\r\n\x0b\n\x04\x05\0\x02\x01\
\x12\x03\t\x04\x0e\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\t\x04\t\n\x0c\n\
\x05\x05\0\x02\x01\x02\x12\x03\t\x0c\rb\x06proto3\
\x12\x03\t\x04\x0c\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\t\x04\x07\n\x0c\
\n\x05\x05\0\x02\x01\x02\x12\x03\t\n\x0bb\x06proto3\
";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

View File

@ -7,11 +7,12 @@ message CreateDocParams {
message Doc {
string id = 1;
bytes data = 2;
int64 revision = 3;
int64 rev_id = 3;
}
message UpdateDocParams {
string doc_id = 1;
bytes data = 2;
int64 rev_id = 3;
}
message DocDelta {
string doc_id = 1;

View File

@ -7,5 +7,5 @@ message WsDocumentData {
}
enum WsDataType {
Acked = 0;
Delta = 1;
Rev = 1;
}

View File

@ -4,7 +4,7 @@ use dashmap::DashMap;
use crate::{
errors::DocError,
services::doc::edit_context::{DocId, EditDocContext},
services::doc::edit_doc_context::{DocId, EditDocContext},
};
pub(crate) struct DocCache {
@ -17,22 +17,18 @@ impl DocCache {
pub(crate) fn set(&self, doc: Arc<EditDocContext>) {
let doc_id = doc.id.clone();
if self.inner.contains_key(&doc_id) {
log::warn!("Doc:{} already exists in cache", doc_id.as_ref());
log::warn!("Doc:{} already exists in cache", &doc_id);
}
self.inner.insert(doc.id.clone(), doc);
self.inner.insert(doc_id, doc);
}
pub(crate) fn is_opened(&self, doc_id: &str) -> bool {
let doc_id: DocId = doc_id.into();
self.inner.get(&doc_id).is_some()
}
pub(crate) fn is_opened(&self, doc_id: &str) -> bool { self.inner.get(doc_id).is_some() }
pub(crate) fn get(&self, doc_id: &str) -> Result<Arc<EditDocContext>, DocError> {
if !self.is_opened(&doc_id) {
return Err(doc_not_found());
}
let doc_id: DocId = doc_id.into();
let opened_doc = self.inner.get(&doc_id).unwrap();
let opened_doc = self.inner.get(doc_id).unwrap();
Ok(opened_doc.clone())
}

View File

@ -2,12 +2,14 @@ use crate::{
entities::doc::{CreateDocParams, Doc, DocDelta, QueryDocParams, UpdateDocParams},
errors::{internal_error, DocError},
module::DocumentUser,
services::{cache::DocCache, doc::edit_context::EditDocContext, server::Server, ws::WsDocumentManager},
services::{cache::DocCache, doc::edit_doc_context::EditDocContext, server::Server, ws::WsDocumentManager},
sql_tables::doc::{DocTable, DocTableSql, OpTableSql},
};
use bytes::Bytes;
use flowy_database::{ConnectionPool, SqliteConnection};
use crate::services::doc::rev_manager::RevisionManager;
use flowy_ot::core::Delta;
use parking_lot::RwLock;
use std::sync::Arc;
@ -40,7 +42,7 @@ impl DocController {
let doc = Doc {
id: params.id,
data: params.data,
revision: 0,
rev_id: 0,
};
let _ = self.doc_sql.create_doc_table(DocTable::new(doc), conn)?;
Ok(())
@ -76,10 +78,10 @@ impl DocController {
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, delta, pool), err)]
pub(crate) fn edit_doc(&self, delta: DocDelta, pool: Arc<ConnectionPool>) -> Result<Doc, DocError> {
#[tracing::instrument(level = "debug", skip(self, delta), err)]
pub(crate) fn edit_doc(&self, delta: DocDelta) -> Result<Doc, DocError> {
let edit_doc_ctx = self.cache.get(&delta.doc_id)?;
let _ = edit_doc_ctx.apply_delta(Bytes::from(delta.data), pool)?;
let _ = edit_doc_ctx.apply_local_delta(Bytes::from(delta.data))?;
Ok(edit_doc_ctx.doc())
}
}
@ -107,7 +109,7 @@ impl DocController {
match self.server.read_doc(&token, params).await? {
None => Err(DocError::not_found()),
Some(doc) => {
let edit = self.make_edit_context(doc.clone())?;
let edit = self.make_edit_context(doc.clone(), pool.clone())?;
let conn = &*(pool.get().map_err(internal_error)?);
let _ = self.doc_sql.create_doc_table(doc.into(), conn)?;
Ok(edit)
@ -133,7 +135,7 @@ impl DocController {
async fn _open(&self, params: QueryDocParams, pool: Arc<ConnectionPool>) -> Result<Arc<EditDocContext>, DocError> {
match self.doc_sql.read_doc_table(&params.doc_id, pool.clone()) {
Ok(doc_table) => Ok(self.make_edit_context(doc_table.into())?),
Ok(doc_table) => Ok(self.make_edit_context(doc_table.into(), pool.clone())?),
Err(error) => {
if error.is_record_not_found() {
log::debug!("Doc:{} don't exist, reading from server", params.doc_id);
@ -145,12 +147,15 @@ impl DocController {
}
}
fn make_edit_context(&self, doc: Doc) -> Result<Arc<EditDocContext>, DocError> {
fn make_edit_context(&self, doc: Doc, pool: Arc<ConnectionPool>) -> Result<Arc<EditDocContext>, DocError> {
// Opti: require upgradable_read lock and then upgrade to write lock using
// RwLockUpgradableReadGuard::upgrade(xx) of ws
let ws = self.ws.read().sender();
let edit_ctx = Arc::new(EditDocContext::new(doc, ws, self.op_sql.clone())?);
self.ws.write().register_handler(edit_ctx.id.as_ref(), edit_ctx.clone());
let doc_id = doc.id.clone();
let delta = Delta::from_bytes(doc.data)?;
let ws_sender = self.ws.read().sender();
let rev_manager = RevisionManager::new(&doc_id, doc.rev_id, self.op_sql.clone(), pool, ws_sender);
let edit_ctx = Arc::new(EditDocContext::new(&doc_id, delta, rev_manager)?);
self.ws.write().register_handler(&doc_id, edit_ctx.clone());
self.cache.set(edit_ctx.clone());
Ok(edit_ctx)
}

View File

@ -53,15 +53,14 @@ impl Document {
pub fn to_plain_string(&self) -> String { self.delta.apply("").unwrap() }
pub fn apply_delta_data(&mut self, data: Bytes) -> Result<(), DocError> {
let new_delta = Delta::from_bytes(data.to_vec())?;
self.apply_delta(new_delta)
}
pub fn delta(&self) -> &Delta { &self.delta }
pub fn set_delta(&mut self, data: Delta) { self.delta = data; }
pub fn apply_delta(&mut self, delta: Delta) -> Result<(), DocError> {
log::trace!("Apply delta: {}", delta);
let _ = self.add_delta(&delta)?;
log::trace!("Document: {}", self.to_json());
log::debug!("Document: {}", self.to_json());
Ok(())
}
@ -146,10 +145,6 @@ impl Document {
},
}
}
pub fn data(&self) -> &Delta { &self.delta }
pub fn set_data(&mut self, data: Delta) { self.delta = data; }
}
impl Document {

View File

@ -1,148 +0,0 @@
use crate::{
entities::{
doc::{Doc, Revision},
ws::{WsDataType, WsDocumentData},
},
errors::{internal_error, DocError},
services::{
doc::Document,
ws::{WsDocumentHandler, WsDocumentSender},
},
sql_tables::doc::{OpTable, OpTableSql},
};
use bytes::Bytes;
use flowy_database::ConnectionPool;
use flowy_ot::core::Delta;
use parking_lot::RwLock;
use std::{
convert::TryInto,
sync::{
atomic::{AtomicI64, Ordering::SeqCst},
Arc,
},
};
pub(crate) struct EditDocContext {
pub(crate) id: DocId,
pub(crate) rev_counter: RevCounter,
document: RwLock<Document>,
ws: Arc<dyn WsDocumentSender>,
op_sql: Arc<OpTableSql>,
}
impl EditDocContext {
pub(crate) fn new(doc: Doc, ws: Arc<dyn WsDocumentSender>, op_sql: Arc<OpTableSql>) -> Result<Self, DocError> {
let id: DocId = doc.id.into();
let rev_counter = RevCounter::new(doc.revision);
let delta: Delta = doc.data.try_into()?;
let document = RwLock::new(Document::from_delta(delta));
Ok(Self {
id,
rev_counter,
document,
ws,
op_sql,
})
}
pub(crate) fn doc(&self) -> Doc {
Doc {
id: self.id.clone().into(),
data: self.document.read().to_bytes(),
revision: self.rev_counter.value(),
}
}
pub(crate) fn apply_delta(&self, data: Bytes, pool: Arc<ConnectionPool>) -> Result<(), DocError> {
let mut guard = self.document.write();
let base_rev_id = self.rev_counter.value();
let rev_id = self.rev_counter.next();
let delta = Delta::from_bytes(data.to_vec())?;
let _ = guard.apply_delta(delta)?;
let json = guard.to_json();
drop(guard);
// Opti: it is necessary to save the rev if send success?
let md5 = format!("{:x}", md5::compute(json));
let revision = Revision::new(base_rev_id, rev_id, data.to_vec(), md5, self.id.clone().into());
let _ = self.save_revision(revision.clone(), pool.clone())?;
match self.ws.send(revision.into()) {
Ok(_) => {
// TODO: remove the rev if send success
// let _ = self.delete_revision(rev_id, pool)?;
},
Err(e) => {
log::error!("Send delta failed: {:?}", e);
},
}
Ok(())
}
}
impl EditDocContext {
fn save_revision(&self, revision: Revision, pool: Arc<ConnectionPool>) -> Result<(), DocError> {
let conn = &*pool.get().map_err(internal_error)?;
// conn.immediate_transaction::<_, DocError, _>(|| {
// let op_table: OpTable = revision.into();
// let _ = self.op_sql.create_op_table(op_table, conn)?;
// Ok(())
// })?;
Ok(())
}
fn delete_revision(&self, rev_id: i64, pool: Arc<ConnectionPool>) -> Result<(), DocError> {
let conn = &*pool.get().map_err(internal_error)?;
conn.immediate_transaction::<_, DocError, _>(|| {
let _ = self.op_sql.delete_op_table(rev_id, conn)?;
Ok(())
})?;
Ok(())
}
}
use byteorder::{BigEndian, ReadBytesExt};
use std::io::Cursor;
impl WsDocumentHandler for EditDocContext {
fn receive(&self, doc_data: WsDocumentData) {
match doc_data.ty {
WsDataType::Delta => {},
WsDataType::Acked => {
let mut rdr = Cursor::new(doc_data.data);
let rev = rdr.read_i64::<BigEndian>().unwrap();
},
}
}
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct DocId(pub(crate) String);
impl AsRef<str> for DocId {
fn as_ref(&self) -> &str { &self.0 }
}
impl<T> std::convert::From<T> for DocId
where
T: ToString,
{
fn from(s: T) -> Self { DocId(s.to_string()) }
}
impl std::convert::Into<String> for DocId {
fn into(self) -> String { self.0.clone() }
}
#[derive(Debug)]
pub struct RevCounter(pub AtomicI64);
impl RevCounter {
pub fn new(n: i64) -> Self { Self(AtomicI64::new(n)) }
pub fn next(&self) -> i64 {
let _ = self.0.fetch_add(1, SeqCst);
self.value()
}
pub fn value(&self) -> i64 { self.0.load(SeqCst) }
}

View File

@ -0,0 +1,100 @@
use crate::{
entities::{
doc::{Doc, Revision},
ws::{WsDataType, WsDocumentData},
},
errors::*,
services::{
doc::{rev_manager::RevisionManager, Document},
util::{bytes_to_rev_id, md5},
ws::{WsDocumentHandler, WsDocumentSender},
},
sql_tables::{OpTable, OpTableSql},
};
use bytes::Bytes;
use flowy_ot::core::Delta;
use parking_lot::RwLock;
use std::{convert::TryFrom, sync::Arc};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
pub type DocId = String;
pub(crate) struct EditDocContext {
pub(crate) id: DocId,
document: Arc<RwLock<Document>>,
rev_manager: Arc<RevisionManager>,
}
impl EditDocContext {
pub(crate) fn new(doc_id: &str, delta: Delta, rev_manager: RevisionManager) -> Result<Self, DocError> {
let id = doc_id.to_owned();
let rev_manager = Arc::new(rev_manager);
let document = Arc::new(RwLock::new(Document::from_delta(delta)));
let edit_context = Self { id, document, rev_manager };
edit_context.composing_delta();
Ok(edit_context)
}
pub(crate) fn doc(&self) -> Doc {
Doc {
id: self.id.clone(),
data: self.document.read().to_bytes(),
rev_id: self.rev_manager.rev(),
}
}
#[tracing::instrument(level = "debug", skip(self, data), err)]
pub(crate) fn apply_local_delta(&self, data: Bytes) -> Result<(), DocError> {
let doc_id = self.id.clone();
let (base_rev_id, rev_id) = self.rev_manager.next_rev();
let revision = Revision::new(base_rev_id, rev_id, data.to_vec(), md5(&data), doc_id);
let delta = Delta::from_bytes(data.to_vec())?;
self.document.write().apply_delta(delta)?;
self.rev_manager.add_local(revision);
Ok(())
}
fn composing_delta(&self) {
let rev_manager = self.rev_manager.clone();
let document = self.document.clone();
tokio::spawn(async move {
let notified = rev_manager.notified();
tokio::select! {
_ = notified => {
if let Some(delta) = rev_manager.next_compose_delta() {
log::info!("😁receive delta: {:?}", delta);
document.write().apply_delta(delta).unwrap();
log::info!("😁Document: {:?}", document.read().to_plain_string());
}
}
}
});
}
}
impl WsDocumentHandler for EditDocContext {
fn receive(&self, doc_data: WsDocumentData) {
let f = |doc_data: WsDocumentData| {
match doc_data.ty {
WsDataType::Rev => {
let bytes = Bytes::from(doc_data.data);
let revision = Revision::try_from(bytes)?;
self.rev_manager.add_remote(revision);
},
WsDataType::Acked => {
let rev_id = bytes_to_rev_id(doc_data.data)?;
self.rev_manager.remove(rev_id);
},
}
Result::<(), DocError>::Ok(())
};
if let Err(e) = f(doc_data) {
log::error!("{:?}", e);
}
}
}

View File

@ -1,6 +1,7 @@
use crate::services::doc::{extensions::DeleteExt, util::is_newline};
use flowy_ot::core::{plain_attributes, CharMetric, Delta, DeltaBuilder, DeltaIter, Interval, NEW_LINE};
use crate::services::{doc::extensions::DeleteExt, util::is_newline};
pub struct PreserveLineFormatOnMerge {}
impl DeleteExt for PreserveLineFormatOnMerge {
fn ext_name(&self) -> &str { "PreserveLineFormatOnMerge" }

View File

@ -1,4 +1,4 @@
use crate::services::doc::util::find_newline;
use crate::services::util::find_newline;
use flowy_ot::core::{plain_attributes, Attribute, AttributeScope, Delta, Operation};
pub(crate) fn line_break(op: &Operation, attribute: &Attribute, scope: AttributeScope) -> Delta {

View File

@ -1,8 +1,9 @@
use crate::services::doc::{
extensions::{format::helper::line_break, FormatExt},
use flowy_ot::core::{plain_attributes, Attribute, AttributeScope, Delta, DeltaBuilder, DeltaIter, Interval};
use crate::services::{
doc::extensions::{format::helper::line_break, FormatExt},
util::find_newline,
};
use flowy_ot::core::{plain_attributes, Attribute, AttributeScope, Delta, DeltaBuilder, DeltaIter, Interval};
pub struct ResolveBlockFormat {}
impl FormatExt for ResolveBlockFormat {

View File

@ -1,8 +1,9 @@
use crate::services::doc::{
extensions::{format::helper::line_break, FormatExt},
use flowy_ot::core::{Attribute, AttributeScope, Delta, DeltaBuilder, DeltaIter, Interval};
use crate::services::{
doc::extensions::{format::helper::line_break, FormatExt},
util::find_newline,
};
use flowy_ot::core::{Attribute, AttributeScope, Delta, DeltaBuilder, DeltaIter, Interval};
pub struct ResolveInlineFormat {}
impl FormatExt for ResolveInlineFormat {

View File

@ -1,6 +1,7 @@
use crate::services::doc::{extensions::InsertExt, util::is_newline};
use flowy_ot::core::{attributes_except_header, is_empty_line_at_index, AttributeKey, Delta, DeltaBuilder, DeltaIter};
use crate::services::{doc::extensions::InsertExt, util::is_newline};
pub struct AutoExitBlock {}
impl InsertExt for AutoExitBlock {

View File

@ -1,9 +1,12 @@
use crate::services::doc::{extensions::InsertExt, util::is_whitespace};
use bytecount::num_chars;
use flowy_ot::core::{plain_attributes, Attribute, Attributes, Delta, DeltaBuilder, DeltaIter};
use std::cmp::min;
use bytecount::num_chars;
use url::Url;
use flowy_ot::core::{plain_attributes, Attribute, Attributes, Delta, DeltaBuilder, DeltaIter};
use crate::services::{doc::extensions::InsertExt, util::is_whitespace};
pub struct AutoFormatExt {}
impl InsertExt for AutoFormatExt {
fn ext_name(&self) -> &str { std::any::type_name::<AutoFormatExt>() }

View File

@ -1,4 +1,3 @@
use crate::services::doc::{extensions::InsertExt, util::is_newline};
use flowy_ot::core::{
attributes_except_header,
plain_attributes,
@ -11,6 +10,8 @@ use flowy_ot::core::{
NEW_LINE,
};
use crate::services::{doc::extensions::InsertExt, util::is_newline};
pub struct PreserveBlockFormatOnInsert {}
impl InsertExt for PreserveBlockFormatOnInsert {
fn ext_name(&self) -> &str { std::any::type_name::<PreserveBlockFormatOnInsert>() }

View File

@ -1,8 +1,9 @@
use crate::services::doc::{
extensions::InsertExt,
use flowy_ot::core::{plain_attributes, AttributeKey, Delta, DeltaBuilder, DeltaIter, OpNewline, NEW_LINE};
use crate::services::{
doc::extensions::InsertExt,
util::{contain_newline, is_newline},
};
use flowy_ot::core::{plain_attributes, AttributeKey, Delta, DeltaBuilder, DeltaIter, OpNewline, NEW_LINE};
pub struct PreserveInlineFormat {}
impl InsertExt for PreserveInlineFormat {

View File

@ -1,6 +1,7 @@
use crate::services::doc::{extensions::InsertExt, util::is_newline};
use flowy_ot::core::{AttributeKey, Attributes, CharMetric, Delta, DeltaBuilder, DeltaIter, NEW_LINE};
use crate::services::{doc::extensions::InsertExt, util::is_newline};
pub struct ResetLineFormatOnNewLine {}
impl InsertExt for ResetLineFormatOnNewLine {
fn ext_name(&self) -> &str { std::any::type_name::<ResetLineFormatOnNewLine>() }

View File

@ -7,6 +7,6 @@ mod history;
mod view;
pub(crate) mod doc_controller;
pub mod edit_context;
pub mod edit_doc_context;
pub mod extensions;
mod util;
mod rev_manager;

View File

@ -0,0 +1,128 @@
use crate::{
entities::{
doc::Revision,
ws::{WsDataType, WsDocumentData},
},
errors::{internal_error, DocError},
services::{
util::{bytes_to_rev_id, RevIdCounter},
ws::{WsDocumentHandler, WsDocumentSender},
},
sql_tables::{OpTable, OpTableSql},
};
use bytes::Bytes;
use flowy_database::ConnectionPool;
use flowy_infra::future::wrap_future;
use flowy_ot::core::Delta;
use parking_lot::RwLock;
use std::{collections::BTreeMap, sync::Arc};
use tokio::sync::{futures::Notified, Notify};
pub struct RevisionManager {
doc_id: String,
op_sql: Arc<OpTableSql>,
pool: Arc<ConnectionPool>,
rev_id_counter: RevIdCounter,
ws_sender: Arc<dyn WsDocumentSender>,
rev_cache: RwLock<BTreeMap<i64, Revision>>,
notify: Notify,
}
impl RevisionManager {
pub fn new(
doc_id: &str,
rev_id: i64,
op_sql: Arc<OpTableSql>,
pool: Arc<ConnectionPool>,
ws_sender: Arc<dyn WsDocumentSender>,
) -> Self {
let rev_id_counter = RevIdCounter::new(rev_id);
let rev_cache = RwLock::new(BTreeMap::new());
Self {
doc_id: doc_id.to_owned(),
op_sql,
pool,
rev_id_counter,
ws_sender,
rev_cache,
notify: Notify::new(),
}
}
pub fn next_compose_delta(&self) -> Option<Delta> {
// let delta = Delta::from_bytes(revision.delta)?;
//
// log::debug!("Remote delta: {:?}", delta);
}
pub fn notified(&self) -> Notified { self.notify.notified() }
pub fn next_rev(&self) -> (i64, i64) {
let cur = self.rev_id_counter.value();
let next = self.rev_id_counter.next();
(cur, next)
}
pub fn rev(&self) -> i64 { self.rev_id_counter.value() }
pub fn add_local(&self, revision: Revision) -> Result<(), DocError> {
self.rev_cache.write().insert(revision.rev_id, revision.clone());
match self.ws_sender.send(revision.into()) {
Ok(_) => {},
Err(e) => {
log::error!("Send delta failed: {:?}", e);
},
}
// self.save_revision(revision.clone());
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, revision))]
pub fn add_remote(&self, revision: Revision) -> Result<(), DocError> {
self.rev_cache.write().insert(revision.rev_id, revision);
// self.save_revision(revision.clone());
self.notify.notify_waiters();
Ok(())
}
pub fn remove(&self, rev_id: i64) -> Result<(), DocError> {
self.rev_cache.write().remove(&rev_id);
// self.delete_revision(rev_id);
Ok(())
}
fn save_revision(&self, revision: Revision) {
let op_sql = self.op_sql.clone();
let pool = self.pool.clone();
tokio::spawn(async move {
let conn = &*pool.get().map_err(internal_error).unwrap();
let result = conn.immediate_transaction::<_, DocError, _>(|| {
let op_table: OpTable = revision.into();
let _ = op_sql.create_op_table(op_table, conn).unwrap();
Ok(())
});
match result {
Ok(_) => {},
Err(e) => log::error!("Save revision failed: {:?}", e),
}
});
}
fn delete_revision(&self, rev_id: i64) {
let op_sql = self.op_sql.clone();
let pool = self.pool.clone();
tokio::spawn(async move {
let conn = &*pool.get().map_err(internal_error).unwrap();
let result = conn.immediate_transaction::<_, DocError, _>(|| {
let _ = op_sql.delete_op_table(rev_id, conn)?;
Ok(())
});
match result {
Ok(_) => {},
Err(e) => log::error!("Delete revision failed: {:?}", e),
}
});
}
}

View File

@ -1,18 +0,0 @@
use flowy_ot::core::{NEW_LINE, WHITESPACE};
#[inline]
pub fn find_newline(s: &str) -> Option<usize> {
match s.find(NEW_LINE) {
None => None,
Some(line_break) => Some(line_break),
}
}
#[inline]
pub fn is_newline(s: &str) -> bool { s == NEW_LINE }
#[inline]
pub fn is_whitespace(s: &str) -> bool { s == WHITESPACE }
#[inline]
pub fn contain_newline(s: &str) -> bool { s.contains(NEW_LINE) }

View File

@ -1,4 +1,5 @@
mod cache;
pub mod doc;
pub mod server;
mod util;
pub mod ws;

View File

@ -0,0 +1,51 @@
use crate::errors::DocError;
use byteorder::{BigEndian, ReadBytesExt};
use flowy_ot::core::{NEW_LINE, WHITESPACE};
use std::{
io::Cursor,
sync::atomic::{AtomicI64, Ordering::SeqCst},
};
#[inline]
pub fn find_newline(s: &str) -> Option<usize> {
match s.find(NEW_LINE) {
None => None,
Some(line_break) => Some(line_break),
}
}
#[inline]
pub fn is_newline(s: &str) -> bool { s == NEW_LINE }
#[inline]
pub fn is_whitespace(s: &str) -> bool { s == WHITESPACE }
#[inline]
pub fn contain_newline(s: &str) -> bool { s.contains(NEW_LINE) }
#[inline]
pub fn md5<T: AsRef<[u8]>>(data: T) -> String {
let md5 = format!("{:x}", md5::compute(data));
md5
}
#[inline]
pub fn bytes_to_rev_id(bytes: Vec<u8>) -> Result<i64, DocError> {
let mut rdr = Cursor::new(bytes);
match rdr.read_i64::<BigEndian>() {
Ok(rev_id) => Ok(rev_id),
Err(e) => Err(DocError::internal().context(e)),
}
}
#[derive(Debug)]
pub(crate) struct RevIdCounter(pub AtomicI64);
impl RevIdCounter {
pub fn new(n: i64) -> Self { Self(AtomicI64::new(n)) }
pub fn next(&self) -> i64 {
let _ = self.0.fetch_add(1, SeqCst);
self.value()
}
pub fn value(&self) -> i64 { self.0.load(SeqCst) }
}

View File

@ -4,8 +4,9 @@ use flowy_database::schema::op_table;
#[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)]
#[table_name = "op_table"]
#[primary_key(rev_id)]
#[primary_key(doc_id)]
pub(crate) struct OpTable {
pub(crate) doc_id: String,
pub(crate) base_rev_id: i64,
pub(crate) rev_id: i64,
pub(crate) data: Vec<u8>,
@ -18,8 +19,7 @@ pub(crate) struct OpTable {
#[sql_type = "Integer"]
pub enum OpState {
Local = 0,
Sending = 1,
Acked = 2,
Acked = 1,
}
impl std::default::Default for OpState {
@ -30,8 +30,7 @@ impl std::convert::From<i32> for OpState {
fn from(value: i32) -> Self {
match value {
0 => OpState::Local,
1 => OpState::Sending,
2 => OpState::Acked,
1 => OpState::Acked,
o => {
log::error!("Unsupported view type {}, fallback to ViewType::Docs", o);
OpState::Local
@ -48,8 +47,9 @@ impl_sql_integer_expression!(OpState);
#[derive(AsChangeset, Identifiable, Default, Debug)]
#[table_name = "op_table"]
#[primary_key(rev_id)]
#[primary_key(doc_id)]
pub(crate) struct OpChangeset {
pub(crate) doc_id: String,
pub(crate) rev_id: i64,
pub(crate) state: Option<OpState>,
}
@ -57,6 +57,7 @@ pub(crate) struct OpChangeset {
impl std::convert::Into<OpTable> for Revision {
fn into(self) -> OpTable {
OpTable {
doc_id: self.doc_id,
base_rev_id: self.base_rev_id,
rev_id: self.rev_id,
data: self.delta,

View File

@ -40,7 +40,7 @@ impl std::convert::Into<Doc> for DocTable {
Doc {
id: self.id,
data: self.data,
revision: self.revision,
rev_id: self.revision,
}
}
}
@ -50,7 +50,7 @@ impl std::convert::From<Doc> for DocTable {
Self {
id: doc.id,
data: doc.data,
revision: doc.revision,
revision: doc.rev_id,
}
}
}

View File

@ -1 +1,3 @@
pub mod doc;
pub(crate) mod doc;
pub use doc::*;

View File

@ -131,20 +131,20 @@ impl TestBuilder {
},
TestOp::Transform(delta_a_i, delta_b_i) => {
let (a_prime, b_prime) = self.documents[*delta_a_i]
.data()
.transform(&self.documents[*delta_b_i].data())
.delta()
.transform(&self.documents[*delta_b_i].delta())
.unwrap();
log::trace!("a:{:?},b:{:?}", a_prime, b_prime);
let data_left = self.documents[*delta_a_i].data().compose(&b_prime).unwrap();
let data_right = self.documents[*delta_b_i].data().compose(&a_prime).unwrap();
let data_left = self.documents[*delta_a_i].delta().compose(&b_prime).unwrap();
let data_right = self.documents[*delta_b_i].delta().compose(&a_prime).unwrap();
self.documents[*delta_a_i].set_data(data_left);
self.documents[*delta_b_i].set_data(data_right);
self.documents[*delta_a_i].set_delta(data_left);
self.documents[*delta_b_i].set_delta(data_right);
},
TestOp::Invert(delta_a_i, delta_b_i) => {
let delta_a = &self.documents[*delta_a_i].data();
let delta_b = &self.documents[*delta_b_i].data();
let delta_a = &self.documents[*delta_a_i].delta();
let delta_b = &self.documents[*delta_b_i].delta();
log::debug!("Invert: ");
log::debug!("a: {}", delta_a.to_json());
log::debug!("b: {}", delta_b.to_json());
@ -162,7 +162,7 @@ impl TestBuilder {
assert_eq!(delta_a, &&new_delta_after_undo);
self.documents[*delta_a_i].set_data(new_delta_after_undo);
self.documents[*delta_a_i].set_delta(new_delta_after_undo);
},
TestOp::Undo(delta_i) => {
self.documents[*delta_i].undo().unwrap();

View File

@ -495,6 +495,18 @@ fn delta_transform_test() {
r#"[{"retain":3,"attributes":{"bold":true}},{"insert":"456"}]"#,
serde_json::to_string(&b_prime).unwrap()
);
let new_a = a.compose(&b_prime).unwrap();
let new_b = b.compose(&a_prime).unwrap();
assert_eq!(
r#"[{"insert":"123","attributes":{"bold":true}},{"insert":"456"}]"#,
serde_json::to_string(&new_a).unwrap()
);
assert_eq!(
r#"[{"insert":"123","attributes":{"bold":true}},{"insert":"456"}]"#,
serde_json::to_string(&new_b).unwrap()
);
}
#[test]

View File

@ -7,13 +7,20 @@ use std::{
task::{Context, Poll},
};
pub fn wrap_future<T, O>(f: T) -> FnFuture<O>
where
T: Future<Output = O> + Send + Sync + 'static,
{
FnFuture { fut: Box::pin(f) }
}
#[pin_project]
pub struct ClosureFuture<T> {
pub struct FnFuture<T> {
#[pin]
pub fut: Pin<Box<dyn Future<Output = T> + Sync + Send>>,
}
impl<T> Future for ClosureFuture<T>
impl<T> Future for FnFuture<T>
where
T: Send + Sync,
{

View File

@ -9,9 +9,6 @@ pub mod future;
pub mod kv;
mod protobuf;
#[macro_use]
pub mod macros;
#[allow(dead_code)]
pub fn uuid() -> String { uuid::Uuid::new_v4().to_string() }

View File

@ -1,8 +0,0 @@
#[macro_export]
macro_rules! wrap_future_fn {
($fut:expr) => {
ClosureFuture {
fut: Box::pin(async move { $fut.await }),
}
};
}

View File

@ -9,7 +9,7 @@ use flowy_document::entities::ws::WsDocumentData;
use flowy_user::{errors::ErrorCode, services::user::UserSession};
use flowy_ws::{WsMessage, WsMessageHandler, WsModule};
use parking_lot::RwLock;
use std::{convert::TryInto, path::Path, sync::Arc};
use std::{path::Path, sync::Arc};
pub struct DocumentDepsResolver {
user_session: Arc<UserSession>,

View File

@ -4,7 +4,7 @@ use strum_macros::Display;
#[derive(Clone, Copy, PartialEq, Eq, Debug, Display, Hash, ProtoBuf_Enum, Flowy_Event)]
#[event_err = "UserError"]
pub enum UserEvent {
#[event(output = "UserProfile")]
#[event()]
InitUser = 0,
#[event(input = "SignInRequest", output = "UserProfile")]
@ -21,4 +21,7 @@ pub enum UserEvent {
#[event(output = "UserProfile")]
GetUserProfile = 5,
#[event(output = "UserProfile")]
CheckUser = 6,
}

View File

@ -4,8 +4,14 @@ use flowy_dispatch::prelude::*;
use std::{convert::TryInto, sync::Arc};
#[tracing::instrument(skip(session))]
pub async fn init_user_handler(session: Unit<Arc<UserSession>>) -> DataResult<UserProfile, UserError> {
let user_profile = session.init_user().await?;
pub async fn init_user_handler(session: Unit<Arc<UserSession>>) -> Result<(), UserError> {
let _ = session.init_user().await?;
Ok(())
}
#[tracing::instrument(skip(session))]
pub async fn check_user_handler(session: Unit<Arc<UserSession>>) -> DataResult<UserProfile, UserError> {
let user_profile = session.check_user().await?;
data_result(user_profile)
}

View File

@ -13,4 +13,5 @@ pub fn create(user_session: Arc<UserSession>) -> Module {
.event(UserEvent::GetUserProfile, get_user_profile_handler)
.event(UserEvent::SignOut, sign_out)
.event(UserEvent::UpdateUser, update_user_handler)
.event(UserEvent::CheckUser, check_user_handler)
}

View File

@ -31,6 +31,7 @@ pub enum UserEvent {
SignOut = 3,
UpdateUser = 4,
GetUserProfile = 5,
CheckUser = 6,
}
impl ::protobuf::ProtobufEnum for UserEvent {
@ -46,6 +47,7 @@ impl ::protobuf::ProtobufEnum for UserEvent {
3 => ::std::option::Option::Some(UserEvent::SignOut),
4 => ::std::option::Option::Some(UserEvent::UpdateUser),
5 => ::std::option::Option::Some(UserEvent::GetUserProfile),
6 => ::std::option::Option::Some(UserEvent::CheckUser),
_ => ::std::option::Option::None
}
}
@ -58,6 +60,7 @@ impl ::protobuf::ProtobufEnum for UserEvent {
UserEvent::SignOut,
UserEvent::UpdateUser,
UserEvent::GetUserProfile,
UserEvent::CheckUser,
];
values
}
@ -86,24 +89,26 @@ impl ::protobuf::reflect::ProtobufValue for UserEvent {
}
static file_descriptor_proto_data: &'static [u8] = b"\
\n\x0bevent.proto*b\n\tUserEvent\x12\x0c\n\x08InitUser\x10\0\x12\n\n\x06\
\n\x0bevent.proto*q\n\tUserEvent\x12\x0c\n\x08InitUser\x10\0\x12\n\n\x06\
SignIn\x10\x01\x12\n\n\x06SignUp\x10\x02\x12\x0b\n\x07SignOut\x10\x03\
\x12\x0e\n\nUpdateUser\x10\x04\x12\x12\n\x0eGetUserProfile\x10\x05J\xa0\
\x02\n\x06\x12\x04\0\0\t\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\
\x05\0\x12\x04\x02\0\t\x01\n\n\n\x03\x05\0\x01\x12\x03\x02\x05\x0e\n\x0b\
\n\x04\x05\0\x02\0\x12\x03\x03\x04\x11\n\x0c\n\x05\x05\0\x02\0\x01\x12\
\x03\x03\x04\x0c\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x03\x0f\x10\n\x0b\n\
\x04\x05\0\x02\x01\x12\x03\x04\x04\x0f\n\x0c\n\x05\x05\0\x02\x01\x01\x12\
\x03\x04\x04\n\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x04\r\x0e\n\x0b\n\
\x04\x05\0\x02\x02\x12\x03\x05\x04\x0f\n\x0c\n\x05\x05\0\x02\x02\x01\x12\
\x03\x05\x04\n\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\x05\r\x0e\n\x0b\n\
\x04\x05\0\x02\x03\x12\x03\x06\x04\x10\n\x0c\n\x05\x05\0\x02\x03\x01\x12\
\x03\x06\x04\x0b\n\x0c\n\x05\x05\0\x02\x03\x02\x12\x03\x06\x0e\x0f\n\x0b\
\n\x04\x05\0\x02\x04\x12\x03\x07\x04\x13\n\x0c\n\x05\x05\0\x02\x04\x01\
\x12\x03\x07\x04\x0e\n\x0c\n\x05\x05\0\x02\x04\x02\x12\x03\x07\x11\x12\n\
\x0b\n\x04\x05\0\x02\x05\x12\x03\x08\x04\x17\n\x0c\n\x05\x05\0\x02\x05\
\x01\x12\x03\x08\x04\x12\n\x0c\n\x05\x05\0\x02\x05\x02\x12\x03\x08\x15\
\x16b\x06proto3\
\x12\x0e\n\nUpdateUser\x10\x04\x12\x12\n\x0eGetUserProfile\x10\x05\x12\r\
\n\tCheckUser\x10\x06J\xc9\x02\n\x06\x12\x04\0\0\n\x01\n\x08\n\x01\x0c\
\x12\x03\0\0\x12\n\n\n\x02\x05\0\x12\x04\x02\0\n\x01\n\n\n\x03\x05\0\x01\
\x12\x03\x02\x05\x0e\n\x0b\n\x04\x05\0\x02\0\x12\x03\x03\x04\x11\n\x0c\n\
\x05\x05\0\x02\0\x01\x12\x03\x03\x04\x0c\n\x0c\n\x05\x05\0\x02\0\x02\x12\
\x03\x03\x0f\x10\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x04\x04\x0f\n\x0c\n\
\x05\x05\0\x02\x01\x01\x12\x03\x04\x04\n\n\x0c\n\x05\x05\0\x02\x01\x02\
\x12\x03\x04\r\x0e\n\x0b\n\x04\x05\0\x02\x02\x12\x03\x05\x04\x0f\n\x0c\n\
\x05\x05\0\x02\x02\x01\x12\x03\x05\x04\n\n\x0c\n\x05\x05\0\x02\x02\x02\
\x12\x03\x05\r\x0e\n\x0b\n\x04\x05\0\x02\x03\x12\x03\x06\x04\x10\n\x0c\n\
\x05\x05\0\x02\x03\x01\x12\x03\x06\x04\x0b\n\x0c\n\x05\x05\0\x02\x03\x02\
\x12\x03\x06\x0e\x0f\n\x0b\n\x04\x05\0\x02\x04\x12\x03\x07\x04\x13\n\x0c\
\n\x05\x05\0\x02\x04\x01\x12\x03\x07\x04\x0e\n\x0c\n\x05\x05\0\x02\x04\
\x02\x12\x03\x07\x11\x12\n\x0b\n\x04\x05\0\x02\x05\x12\x03\x08\x04\x17\n\
\x0c\n\x05\x05\0\x02\x05\x01\x12\x03\x08\x04\x12\n\x0c\n\x05\x05\0\x02\
\x05\x02\x12\x03\x08\x15\x16\n\x0b\n\x04\x05\0\x02\x06\x12\x03\t\x04\x12\
\n\x0c\n\x05\x05\0\x02\x06\x01\x12\x03\t\x04\r\n\x0c\n\x05\x05\0\x02\x06\
\x02\x12\x03\t\x10\x11b\x06proto3\
";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

View File

@ -7,4 +7,5 @@ enum UserEvent {
SignOut = 3;
UpdateUser = 4;
GetUserProfile = 5;
CheckUser = 6;
}

View File

@ -141,7 +141,14 @@ impl UserSession {
Ok(())
}
pub async fn init_user(&self) -> Result<UserProfile, UserError> {
pub async fn init_user(&self) -> Result<(), UserError> {
let (_, token) = self.get_session()?.into_part();
let _ = self.start_ws_connection(&token)?;
Ok(())
}
pub async fn check_user(&self) -> Result<UserProfile, UserError> {
let (user_id, token) = self.get_session()?.into_part();
let user = dsl::user_table
@ -149,8 +156,6 @@ impl UserSession {
.first::<UserTable>(&*(self.db_connection()?))?;
let _ = self.read_user_profile_on_server(&token)?;
let _ = self.start_ws_connection(&token)?;
Ok(UserProfile::from(user))
}

View File

@ -3,7 +3,7 @@ use crate::{
errors::WorkspaceError,
};
use flowy_derive::ProtoBuf;
use flowy_document::entities::doc::{DocDelta, UpdateDocParams};
use flowy_document::entities::doc::DocDelta;
use std::convert::TryInto;
#[derive(Default, ProtoBuf)]
@ -100,28 +100,6 @@ impl TryInto<UpdateViewParams> for UpdateViewRequest {
}
}
#[derive(Default, ProtoBuf)]
pub struct SaveViewDataRequest {
#[pb(index = 1)]
pub view_id: String,
#[pb(index = 2)]
pub data: Vec<u8>,
}
impl TryInto<UpdateDocParams> for SaveViewDataRequest {
type Error = WorkspaceError;
fn try_into(self) -> Result<UpdateDocParams, Self::Error> {
let view_id = ViewId::parse(self.view_id).map_err(|e| WorkspaceError::view_id().context(e))?.0;
// Opti: Vec<u8> -> Delta -> Vec<u8>
let data = DeltaData::parse(self.data).map_err(|e| WorkspaceError::view_data().context(e))?.0;
Ok(UpdateDocParams { doc_id: view_id, data })
}
}
#[derive(Default, ProtoBuf)]
pub struct ApplyChangesetRequest {
#[pb(index = 1)]

View File

@ -943,207 +943,6 @@ impl ::protobuf::reflect::ProtobufValue for UpdateViewParams {
}
}
#[derive(PartialEq,Clone,Default)]
pub struct SaveViewDataRequest {
// message fields
pub view_id: ::std::string::String,
pub data: ::std::vec::Vec<u8>,
// special fields
pub unknown_fields: ::protobuf::UnknownFields,
pub cached_size: ::protobuf::CachedSize,
}
impl<'a> ::std::default::Default for &'a SaveViewDataRequest {
fn default() -> &'a SaveViewDataRequest {
<SaveViewDataRequest as ::protobuf::Message>::default_instance()
}
}
impl SaveViewDataRequest {
pub fn new() -> SaveViewDataRequest {
::std::default::Default::default()
}
// string view_id = 1;
pub fn get_view_id(&self) -> &str {
&self.view_id
}
pub fn clear_view_id(&mut self) {
self.view_id.clear();
}
// Param is passed by value, moved
pub fn set_view_id(&mut self, v: ::std::string::String) {
self.view_id = v;
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_view_id(&mut self) -> &mut ::std::string::String {
&mut self.view_id
}
// Take field
pub fn take_view_id(&mut self) -> ::std::string::String {
::std::mem::replace(&mut self.view_id, ::std::string::String::new())
}
// bytes data = 2;
pub fn get_data(&self) -> &[u8] {
&self.data
}
pub fn clear_data(&mut self) {
self.data.clear();
}
// Param is passed by value, moved
pub fn set_data(&mut self, v: ::std::vec::Vec<u8>) {
self.data = v;
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_data(&mut self) -> &mut ::std::vec::Vec<u8> {
&mut self.data
}
// Take field
pub fn take_data(&mut self) -> ::std::vec::Vec<u8> {
::std::mem::replace(&mut self.data, ::std::vec::Vec::new())
}
}
impl ::protobuf::Message for SaveViewDataRequest {
fn is_initialized(&self) -> bool {
true
}
fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
while !is.eof()? {
let (field_number, wire_type) = is.read_tag_unpack()?;
match field_number {
1 => {
::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.view_id)?;
},
2 => {
::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.data)?;
},
_ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
},
};
}
::std::result::Result::Ok(())
}
// Compute sizes of nested messages
#[allow(unused_variables)]
fn compute_size(&self) -> u32 {
let mut my_size = 0;
if !self.view_id.is_empty() {
my_size += ::protobuf::rt::string_size(1, &self.view_id);
}
if !self.data.is_empty() {
my_size += ::protobuf::rt::bytes_size(2, &self.data);
}
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size);
my_size
}
fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
if !self.view_id.is_empty() {
os.write_string(1, &self.view_id)?;
}
if !self.data.is_empty() {
os.write_bytes(2, &self.data)?;
}
os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(())
}
fn get_cached_size(&self) -> u32 {
self.cached_size.get()
}
fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
&self.unknown_fields
}
fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
&mut self.unknown_fields
}
fn as_any(&self) -> &dyn (::std::any::Any) {
self as &dyn (::std::any::Any)
}
fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
self as &mut dyn (::std::any::Any)
}
fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
self
}
fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
Self::descriptor_static()
}
fn new() -> SaveViewDataRequest {
SaveViewDataRequest::new()
}
fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
descriptor.get(|| {
let mut fields = ::std::vec::Vec::new();
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
"view_id",
|m: &SaveViewDataRequest| { &m.view_id },
|m: &mut SaveViewDataRequest| { &mut m.view_id },
));
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>(
"data",
|m: &SaveViewDataRequest| { &m.data },
|m: &mut SaveViewDataRequest| { &mut m.data },
));
::protobuf::reflect::MessageDescriptor::new_pb_name::<SaveViewDataRequest>(
"SaveViewDataRequest",
fields,
file_descriptor_proto()
)
})
}
fn default_instance() -> &'static SaveViewDataRequest {
static instance: ::protobuf::rt::LazyV2<SaveViewDataRequest> = ::protobuf::rt::LazyV2::INIT;
instance.get(SaveViewDataRequest::new)
}
}
impl ::protobuf::Clear for SaveViewDataRequest {
fn clear(&mut self) {
self.view_id.clear();
self.data.clear();
self.unknown_fields.clear();
}
}
impl ::std::fmt::Debug for SaveViewDataRequest {
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
::protobuf::text_format::fmt(self, f)
}
}
impl ::protobuf::reflect::ProtobufValue for SaveViewDataRequest {
fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
::protobuf::reflect::ReflectValueRef::Message(self)
}
}
#[derive(PartialEq,Clone,Default)]
pub struct ApplyChangesetRequest {
// message fields
@ -1357,64 +1156,55 @@ static file_descriptor_proto_data: &'static [u8] = b"\
\x03\x20\x01(\tH\x01R\x04desc\x12\x1e\n\tthumbnail\x18\x04\x20\x01(\tH\
\x02R\tthumbnail\x12\x1b\n\x08is_trash\x18\x05\x20\x01(\x08H\x03R\x07isT\
rashB\r\n\x0bone_of_nameB\r\n\x0bone_of_descB\x12\n\x10one_of_thumbnailB\
\x11\n\x0fone_of_is_trash\"B\n\x13SaveViewDataRequest\x12\x17\n\x07view_\
id\x18\x01\x20\x01(\tR\x06viewId\x12\x12\n\x04data\x18\x02\x20\x01(\x0cR\
\x04data\"D\n\x15ApplyChangesetRequest\x12\x17\n\x07view_id\x18\x01\x20\
\x01(\tR\x06viewId\x12\x12\n\x04data\x18\x02\x20\x01(\x0cR\x04dataJ\xcc\
\x08\n\x06\x12\x04\0\0\x17\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\
\x04\0\x12\x04\x02\0\x08\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x19\n\
\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x17\n\x0c\n\x05\x04\0\x02\0\x05\
\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\x0b\x12\n\x0c\
\n\x05\x04\0\x02\0\x03\x12\x03\x03\x15\x16\n\x0b\n\x04\x04\0\x08\0\x12\
\x03\x04\x04*\n\x0c\n\x05\x04\0\x08\0\x01\x12\x03\x04\n\x15\n\x0b\n\x04\
\x04\0\x02\x01\x12\x03\x04\x18(\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\
\x04\x18\x1e\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\x1f#\n\x0c\n\x05\
\x04\0\x02\x01\x03\x12\x03\x04&'\n\x0b\n\x04\x04\0\x08\x01\x12\x03\x05\
\x04*\n\x0c\n\x05\x04\0\x08\x01\x01\x12\x03\x05\n\x15\n\x0b\n\x04\x04\0\
\x02\x02\x12\x03\x05\x18(\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\x18\
\x1e\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\x1f#\n\x0c\n\x05\x04\0\
\x02\x02\x03\x12\x03\x05&'\n\x0b\n\x04\x04\0\x08\x02\x12\x03\x06\x044\n\
\x0c\n\x05\x04\0\x08\x02\x01\x12\x03\x06\n\x1a\n\x0b\n\x04\x04\0\x02\x03\
\x12\x03\x06\x1d2\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x06\x1d#\n\x0c\n\
\x05\x04\0\x02\x03\x01\x12\x03\x06$-\n\x0c\n\x05\x04\0\x02\x03\x03\x12\
\x03\x0601\n\x0b\n\x04\x04\0\x08\x03\x12\x03\x07\x040\n\x0c\n\x05\x04\0\
\x08\x03\x01\x12\x03\x07\n\x19\n\x0b\n\x04\x04\0\x02\x04\x12\x03\x07\x1c\
.\n\x0c\n\x05\x04\0\x02\x04\x05\x12\x03\x07\x1c\x20\n\x0c\n\x05\x04\0\
\x02\x04\x01\x12\x03\x07!)\n\x0c\n\x05\x04\0\x02\x04\x03\x12\x03\x07,-\n\
\n\n\x02\x04\x01\x12\x04\t\0\x0f\x01\n\n\n\x03\x04\x01\x01\x12\x03\t\x08\
\x18\n\x0b\n\x04\x04\x01\x02\0\x12\x03\n\x04\x17\n\x0c\n\x05\x04\x01\x02\
\0\x05\x12\x03\n\x04\n\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\n\x0b\x12\n\
\x0c\n\x05\x04\x01\x02\0\x03\x12\x03\n\x15\x16\n\x0b\n\x04\x04\x01\x08\0\
\x12\x03\x0b\x04*\n\x0c\n\x05\x04\x01\x08\0\x01\x12\x03\x0b\n\x15\n\x0b\
\n\x04\x04\x01\x02\x01\x12\x03\x0b\x18(\n\x0c\n\x05\x04\x01\x02\x01\x05\
\x12\x03\x0b\x18\x1e\n\x0c\n\x05\x04\x01\x02\x01\x01\x12\x03\x0b\x1f#\n\
\x0c\n\x05\x04\x01\x02\x01\x03\x12\x03\x0b&'\n\x0b\n\x04\x04\x01\x08\x01\
\x12\x03\x0c\x04*\n\x0c\n\x05\x04\x01\x08\x01\x01\x12\x03\x0c\n\x15\n\
\x0b\n\x04\x04\x01\x02\x02\x12\x03\x0c\x18(\n\x0c\n\x05\x04\x01\x02\x02\
\x05\x12\x03\x0c\x18\x1e\n\x0c\n\x05\x04\x01\x02\x02\x01\x12\x03\x0c\x1f\
#\n\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\x0c&'\n\x0b\n\x04\x04\x01\x08\
\x02\x12\x03\r\x044\n\x0c\n\x05\x04\x01\x08\x02\x01\x12\x03\r\n\x1a\n\
\x0b\n\x04\x04\x01\x02\x03\x12\x03\r\x1d2\n\x0c\n\x05\x04\x01\x02\x03\
\x05\x12\x03\r\x1d#\n\x0c\n\x05\x04\x01\x02\x03\x01\x12\x03\r$-\n\x0c\n\
\x05\x04\x01\x02\x03\x03\x12\x03\r01\n\x0b\n\x04\x04\x01\x08\x03\x12\x03\
\x0e\x040\n\x0c\n\x05\x04\x01\x08\x03\x01\x12\x03\x0e\n\x19\n\x0b\n\x04\
\x04\x01\x02\x04\x12\x03\x0e\x1c.\n\x0c\n\x05\x04\x01\x02\x04\x05\x12\
\x03\x0e\x1c\x20\n\x0c\n\x05\x04\x01\x02\x04\x01\x12\x03\x0e!)\n\x0c\n\
\x05\x04\x01\x02\x04\x03\x12\x03\x0e,-\n\n\n\x02\x04\x02\x12\x04\x10\0\
\x13\x01\n\n\n\x03\x04\x02\x01\x12\x03\x10\x08\x1b\n\x0b\n\x04\x04\x02\
\x02\0\x12\x03\x11\x04\x17\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x11\x04\
\n\n\x0c\n\x05\x04\x02\x02\0\x01\x12\x03\x11\x0b\x12\n\x0c\n\x05\x04\x02\
\x02\0\x03\x12\x03\x11\x15\x16\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\x12\
\x04\x13\n\x0c\n\x05\x04\x02\x02\x01\x05\x12\x03\x12\x04\t\n\x0c\n\x05\
\x04\x02\x02\x01\x01\x12\x03\x12\n\x0e\n\x0c\n\x05\x04\x02\x02\x01\x03\
\x12\x03\x12\x11\x12\n\n\n\x02\x04\x03\x12\x04\x14\0\x17\x01\n\n\n\x03\
\x04\x03\x01\x12\x03\x14\x08\x1d\n\x0b\n\x04\x04\x03\x02\0\x12\x03\x15\
\x04\x17\n\x0c\n\x05\x04\x03\x02\0\x05\x12\x03\x15\x04\n\n\x0c\n\x05\x04\
\x03\x02\0\x01\x12\x03\x15\x0b\x12\n\x0c\n\x05\x04\x03\x02\0\x03\x12\x03\
\x15\x15\x16\n\x0b\n\x04\x04\x03\x02\x01\x12\x03\x16\x04\x13\n\x0c\n\x05\
\x04\x03\x02\x01\x05\x12\x03\x16\x04\t\n\x0c\n\x05\x04\x03\x02\x01\x01\
\x12\x03\x16\n\x0e\n\x0c\n\x05\x04\x03\x02\x01\x03\x12\x03\x16\x11\x12b\
\x06proto3\
\x11\n\x0fone_of_is_trash\"D\n\x15ApplyChangesetRequest\x12\x17\n\x07vie\
w_id\x18\x01\x20\x01(\tR\x06viewId\x12\x12\n\x04data\x18\x02\x20\x01(\
\x0cR\x04dataJ\xc6\x07\n\x06\x12\x04\0\0\x13\x01\n\x08\n\x01\x0c\x12\x03\
\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x08\x01\n\n\n\x03\x04\0\x01\x12\
\x03\x02\x08\x19\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x17\n\x0c\n\x05\
\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\
\x03\x0b\x12\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x15\x16\n\x0b\n\x04\
\x04\0\x08\0\x12\x03\x04\x04*\n\x0c\n\x05\x04\0\x08\0\x01\x12\x03\x04\n\
\x15\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x18(\n\x0c\n\x05\x04\0\x02\
\x01\x05\x12\x03\x04\x18\x1e\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\
\x1f#\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04&'\n\x0b\n\x04\x04\0\x08\
\x01\x12\x03\x05\x04*\n\x0c\n\x05\x04\0\x08\x01\x01\x12\x03\x05\n\x15\n\
\x0b\n\x04\x04\0\x02\x02\x12\x03\x05\x18(\n\x0c\n\x05\x04\0\x02\x02\x05\
\x12\x03\x05\x18\x1e\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\x1f#\n\
\x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05&'\n\x0b\n\x04\x04\0\x08\x02\x12\
\x03\x06\x044\n\x0c\n\x05\x04\0\x08\x02\x01\x12\x03\x06\n\x1a\n\x0b\n\
\x04\x04\0\x02\x03\x12\x03\x06\x1d2\n\x0c\n\x05\x04\0\x02\x03\x05\x12\
\x03\x06\x1d#\n\x0c\n\x05\x04\0\x02\x03\x01\x12\x03\x06$-\n\x0c\n\x05\
\x04\0\x02\x03\x03\x12\x03\x0601\n\x0b\n\x04\x04\0\x08\x03\x12\x03\x07\
\x040\n\x0c\n\x05\x04\0\x08\x03\x01\x12\x03\x07\n\x19\n\x0b\n\x04\x04\0\
\x02\x04\x12\x03\x07\x1c.\n\x0c\n\x05\x04\0\x02\x04\x05\x12\x03\x07\x1c\
\x20\n\x0c\n\x05\x04\0\x02\x04\x01\x12\x03\x07!)\n\x0c\n\x05\x04\0\x02\
\x04\x03\x12\x03\x07,-\n\n\n\x02\x04\x01\x12\x04\t\0\x0f\x01\n\n\n\x03\
\x04\x01\x01\x12\x03\t\x08\x18\n\x0b\n\x04\x04\x01\x02\0\x12\x03\n\x04\
\x17\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\n\x04\n\n\x0c\n\x05\x04\x01\
\x02\0\x01\x12\x03\n\x0b\x12\n\x0c\n\x05\x04\x01\x02\0\x03\x12\x03\n\x15\
\x16\n\x0b\n\x04\x04\x01\x08\0\x12\x03\x0b\x04*\n\x0c\n\x05\x04\x01\x08\
\0\x01\x12\x03\x0b\n\x15\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\x0b\x18(\n\
\x0c\n\x05\x04\x01\x02\x01\x05\x12\x03\x0b\x18\x1e\n\x0c\n\x05\x04\x01\
\x02\x01\x01\x12\x03\x0b\x1f#\n\x0c\n\x05\x04\x01\x02\x01\x03\x12\x03\
\x0b&'\n\x0b\n\x04\x04\x01\x08\x01\x12\x03\x0c\x04*\n\x0c\n\x05\x04\x01\
\x08\x01\x01\x12\x03\x0c\n\x15\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\x0c\
\x18(\n\x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\x0c\x18\x1e\n\x0c\n\x05\
\x04\x01\x02\x02\x01\x12\x03\x0c\x1f#\n\x0c\n\x05\x04\x01\x02\x02\x03\
\x12\x03\x0c&'\n\x0b\n\x04\x04\x01\x08\x02\x12\x03\r\x044\n\x0c\n\x05\
\x04\x01\x08\x02\x01\x12\x03\r\n\x1a\n\x0b\n\x04\x04\x01\x02\x03\x12\x03\
\r\x1d2\n\x0c\n\x05\x04\x01\x02\x03\x05\x12\x03\r\x1d#\n\x0c\n\x05\x04\
\x01\x02\x03\x01\x12\x03\r$-\n\x0c\n\x05\x04\x01\x02\x03\x03\x12\x03\r01\
\n\x0b\n\x04\x04\x01\x08\x03\x12\x03\x0e\x040\n\x0c\n\x05\x04\x01\x08\
\x03\x01\x12\x03\x0e\n\x19\n\x0b\n\x04\x04\x01\x02\x04\x12\x03\x0e\x1c.\
\n\x0c\n\x05\x04\x01\x02\x04\x05\x12\x03\x0e\x1c\x20\n\x0c\n\x05\x04\x01\
\x02\x04\x01\x12\x03\x0e!)\n\x0c\n\x05\x04\x01\x02\x04\x03\x12\x03\x0e,-\
\n\n\n\x02\x04\x02\x12\x04\x10\0\x13\x01\n\n\n\x03\x04\x02\x01\x12\x03\
\x10\x08\x1d\n\x0b\n\x04\x04\x02\x02\0\x12\x03\x11\x04\x17\n\x0c\n\x05\
\x04\x02\x02\0\x05\x12\x03\x11\x04\n\n\x0c\n\x05\x04\x02\x02\0\x01\x12\
\x03\x11\x0b\x12\n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03\x11\x15\x16\n\x0b\
\n\x04\x04\x02\x02\x01\x12\x03\x12\x04\x13\n\x0c\n\x05\x04\x02\x02\x01\
\x05\x12\x03\x12\x04\t\n\x0c\n\x05\x04\x02\x02\x01\x01\x12\x03\x12\n\x0e\
\n\x0c\n\x05\x04\x02\x02\x01\x03\x12\x03\x12\x11\x12b\x06proto3\
";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

View File

@ -14,10 +14,6 @@ message UpdateViewParams {
oneof one_of_thumbnail { string thumbnail = 4; };
oneof one_of_is_trash { bool is_trash = 5; };
}
message SaveViewDataRequest {
string view_id = 1;
bytes data = 2;
}
message ApplyChangesetRequest {
string view_id = 1;
bytes data = 2;

View File

@ -126,8 +126,7 @@ impl ViewController {
}
pub(crate) async fn apply_doc_delta(&self, params: DocDelta) -> Result<Doc, WorkspaceError> {
let pool = self.database.db_pool()?;
let doc = self.document.apply_doc_delta(params, pool).await?;
let doc = self.document.apply_doc_delta(params).await?;
Ok(doc)
}
}

View File

@ -1,8 +1,7 @@
use crate::errors::WsError;
use bytes::Bytes;
use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
use std::convert::{TryFrom, TryInto};
use tokio_tungstenite::tungstenite::{Message as TokioMessage, Message};
use tokio_tungstenite::tungstenite::Message as TokioMessage;
// Opti: using four bytes of the data to represent the source
#[derive(ProtoBuf, Debug, Clone, Default)]

View File

@ -11,7 +11,7 @@ use futures_core::{future::BoxFuture, ready, Stream};
use pin_project::pin_project;
use std::{
collections::HashMap,
convert::{Infallible, TryFrom},
convert::TryFrom,
future::Future,
pin::Pin,
sync::Arc,