diff --git a/backend/src/services/document/controller.rs b/backend/src/services/document/controller.rs index 78527c8016..4098b37f8e 100644 --- a/backend/src/services/document/controller.rs +++ b/backend/src/services/document/controller.rs @@ -3,7 +3,7 @@ use crate::services::{ persistence::{create_doc, read_doc}, ws_actor::{DocumentWebSocketActor, WSActorMessage}, }, - web_socket::{WebSocketReceiver, WSClientData}, + web_socket::{WSClientData, WebSocketReceiver}, }; use crate::context::FlowyPersistence; @@ -18,13 +18,13 @@ use flowy_collaboration::{ }; use lib_infra::future::FutureResultSend; +use flowy_collaboration::sync::{DocumentPersistence, ServerDocumentManager}; use std::{ convert::TryInto, fmt::{Debug, Formatter}, sync::Arc, }; use tokio::sync::{mpsc, oneshot}; -use flowy_collaboration::sync::{DocumentPersistence, ServerDocumentManager}; pub fn make_document_ws_receiver(persistence: Arc) -> Arc { let document_persistence = Arc::new(DocumentPersistenceImpl(persistence.clone())); @@ -95,13 +95,13 @@ impl DocumentPersistence for DocumentPersistenceImpl { }) } - fn create_doc(&self, revision: Revision) -> FutureResultSend { + fn create_doc(&self, doc_id: &str, revisions: Vec) -> FutureResultSend { let kv_store = self.0.kv_store(); + let doc_id = doc_id.to_owned(); FutureResultSend::new(async move { - let doc: DocumentInfo = revision.clone().try_into()?; - let doc_id = revision.doc_id.clone(); - let revisions = RepeatedRevision { items: vec![revision] }; - + let doc = DocumentInfo::from_revisions(&doc_id, revisions.clone())?; + let doc_id = doc_id.to_owned(); + let revisions = RepeatedRevision::new(revisions); let params = CreateDocParams { id: doc_id, revisions }; let pb_params: flowy_collaboration::protobuf::CreateDocParams = params.try_into().unwrap(); let _ = create_doc(&kv_store, pb_params) @@ -125,6 +125,19 @@ impl DocumentPersistence for DocumentPersistenceImpl { FutureResultSend::new(async move { f().await.map_err(server_error_to_collaborate_error) }) } + + fn get_doc_revisions(&self, doc_id: &str) -> FutureResultSend, CollaborateError> { + let kv_store = self.0.kv_store(); + let doc_id = doc_id.to_owned(); + let f = || async move { + let mut pb = kv_store.get_doc_revisions(&doc_id).await?; + let repeated_revision: RepeatedRevision = (&mut pb).try_into()?; + let revisions = repeated_revision.into_inner(); + Ok(revisions) + }; + + FutureResultSend::new(async move { f().await.map_err(server_error_to_collaborate_error) }) + } } fn server_error_to_collaborate_error(error: ServerError) -> CollaborateError { diff --git a/backend/src/services/document/persistence/kv_store.rs b/backend/src/services/document/persistence/kv_store.rs index 24b6722f4f..6ed5a29d0a 100644 --- a/backend/src/services/document/persistence/kv_store.rs +++ b/backend/src/services/document/persistence/kv_store.rs @@ -49,6 +49,11 @@ impl DocumentKVPersistence { Ok(()) } + pub(crate) async fn get_doc_revisions(&self, doc_id: &str) -> Result { + let items = self.inner.batch_get_start_with(doc_id).await?; + Ok(key_value_items_to_revisions(items)) + } + pub(crate) async fn batch_get_revisions>>>( &self, doc_id: &str, @@ -56,7 +61,7 @@ impl DocumentKVPersistence { ) -> Result { let rev_ids = rev_ids.into(); let items = match rev_ids { - None => self.inner.batch_get_key_start_with(doc_id).await?, + None => self.inner.batch_get_start_with(doc_id).await?, Some(rev_ids) => { let keys = rev_ids .into_iter() @@ -66,17 +71,7 @@ impl DocumentKVPersistence { }, }; - let mut revisions = items - .into_iter() - .filter_map(|kv| parse_from_bytes::(&kv.value).ok()) - .collect::>(); - - // TODO: optimize sort - revisions.sort_by(|a, b| a.rev_id.cmp(&b.rev_id)); - - let mut repeated_revision = RepeatedRevision::new(); - repeated_revision.set_items(revisions.into()); - Ok(repeated_revision) + Ok(key_value_items_to_revisions(items)) } pub(crate) async fn batch_delete_revisions>>>( @@ -101,5 +96,18 @@ impl DocumentKVPersistence { } } +#[inline] +fn key_value_items_to_revisions(items: Vec) -> RepeatedRevision { + let mut revisions = items + .into_iter() + .filter_map(|kv| parse_from_bytes::(&kv.value).ok()) + .collect::>(); + + revisions.sort_by(|a, b| a.rev_id.cmp(&b.rev_id)); + let mut repeated_revision = RepeatedRevision::new(); + repeated_revision.set_items(revisions.into()); + repeated_revision +} + #[inline] fn make_revision_key(doc_id: &str, rev_id: i64) -> String { format!("{}:{}", doc_id, rev_id) } diff --git a/backend/src/services/document/persistence/postgres.rs b/backend/src/services/document/persistence/postgres.rs index 57f941fb00..283b01587c 100644 --- a/backend/src/services/document/persistence/postgres.rs +++ b/backend/src/services/document/persistence/postgres.rs @@ -57,7 +57,7 @@ fn make_doc_from_revisions(doc_id: &str, mut revisions: RepeatedRevision) -> Res if revisions.is_empty() { return Err(ServerError::record_not_found().context(format!("{} not exist", doc_id))); } - + let mut document_delta = RichTextDelta::new(); let mut base_rev_id = 0; let mut rev_id = 0; @@ -70,7 +70,7 @@ fn make_doc_from_revisions(doc_id: &str, mut revisions: RepeatedRevision) -> Res } let text = document_delta.to_json(); let mut document_info = DocumentInfo::new(); - document_info.set_id(doc_id.to_owned()); + document_info.set_doc_id(doc_id.to_owned()); document_info.set_text(text); document_info.set_base_rev_id(base_rev_id); document_info.set_rev_id(rev_id); diff --git a/backend/src/services/document/ws_actor.rs b/backend/src/services/document/ws_actor.rs index 91425eb0c8..9eb42e6617 100644 --- a/backend/src/services/document/ws_actor.rs +++ b/backend/src/services/document/ws_actor.rs @@ -6,8 +6,9 @@ use crate::{ use actix_rt::task::spawn_blocking; use async_stream::stream; use backend_service::errors::{internal_error, Result, ServerError}; + use flowy_collaboration::{ - protobuf::{DocumentClientWSData, DocumentClientWSDataType, RepeatedRevision, Revision}, + protobuf::{DocumentClientWSData, DocumentClientWSDataType, Revision}, sync::{RevisionUser, ServerDocumentManager, SyncResponse}, }; use futures::stream::StreamExt; @@ -67,18 +68,15 @@ impl DocumentWebSocketActor { async fn handle_client_data(&self, client_data: WSClientData, persistence: Arc) -> Result<()> { let WSClientData { user, socket, data } = client_data; - let document_data = spawn_blocking(move || { - let document_data: DocumentClientWSData = parse_from_bytes(&data)?; - Result::Ok(document_data) - }) - .await - .map_err(internal_error)??; + let document_client_data = spawn_blocking(move || parse_from_bytes::(&data)) + .await + .map_err(internal_error)??; tracing::debug!( - "[HTTP_SERVER_WS]: receive client data: {}:{}, {:?}", - document_data.doc_id, - document_data.id, - document_data.ty + "[DocumentWebSocketActor]: receive client data: {}:{}, {:?}", + document_client_data.doc_id, + document_client_data.id, + document_client_data.ty ); let user = Arc::new(ServerDocUser { @@ -87,33 +85,26 @@ impl DocumentWebSocketActor { persistence, }); - match match &document_data.ty { - DocumentClientWSDataType::ClientPushRev => self.handle_pushed_rev(user, document_data.data).await, - } { + match self.handle_revision(user, document_client_data).await { Ok(_) => {}, Err(e) => { - tracing::error!("[HTTP_SERVER_WS]: process client data error {:?}", e); + tracing::error!("[DocumentWebSocketActor]: process client data error {:?}", e); }, } Ok(()) } - async fn handle_pushed_rev(&self, user: Arc, data: Vec) -> Result<()> { - let repeated_revision = spawn_blocking(move || parse_from_bytes::(&data)) - .await - .map_err(internal_error)??; - self.handle_revision(user, repeated_revision).await - } + async fn handle_revision(&self, user: Arc, client_data: DocumentClientWSData) -> Result<()> { + match &client_data.ty { + DocumentClientWSDataType::ClientPushRev => { + let _ = self + .doc_manager + .apply_revisions(user, client_data) + .await + .map_err(internal_error)?; + }, + } - async fn handle_revision(&self, user: Arc, mut revisions: RepeatedRevision) -> Result<()> { - let repeated_revision: flowy_collaboration::entities::revision::RepeatedRevision = - (&mut revisions).try_into().map_err(internal_error)?; - let revisions = repeated_revision.into_inner(); - let _ = self - .doc_manager - .apply_revisions(user, revisions) - .await - .map_err(internal_error)?; Ok(()) } } diff --git a/backend/src/services/kv/kv.rs b/backend/src/services/kv/kv.rs index 7fca6d9fbd..5220ad42d9 100644 --- a/backend/src/services/kv/kv.rs +++ b/backend/src/services/kv/kv.rs @@ -158,9 +158,9 @@ impl KVStore for PostgresKV { }) } - fn batch_get_key_start_with(&self, prefix: &str) -> FutureResultSend, ServerError> { + fn batch_get_start_with(&self, key: &str) -> FutureResultSend, ServerError> { let pg_pool = self.pg_pool.clone(); - let prefix = prefix.to_owned(); + let prefix = key.to_owned(); FutureResultSend::new(async move { let mut transaction = pg_pool .begin() diff --git a/backend/src/services/kv/mod.rs b/backend/src/services/kv/mod.rs index ee2220a080..82b816d692 100644 --- a/backend/src/services/kv/mod.rs +++ b/backend/src/services/kv/mod.rs @@ -19,7 +19,7 @@ pub trait KVStore: Send + Sync { fn delete(&self, key: &str) -> FutureResultSend<(), ServerError>; fn batch_set(&self, kvs: Vec) -> FutureResultSend<(), ServerError>; fn batch_get(&self, keys: Vec) -> FutureResultSend, ServerError>; - fn batch_get_key_start_with(&self, keyword: &str) -> FutureResultSend, ServerError>; + fn batch_get_start_with(&self, key: &str) -> FutureResultSend, ServerError>; fn batch_delete(&self, keys: Vec) -> FutureResultSend<(), ServerError>; fn batch_delete_key_start_with(&self, keyword: &str) -> FutureResultSend<(), ServerError>; diff --git a/backend/tests/api_test/workspace_test.rs b/backend/tests/api_test/workspace_test.rs index 4a6f621c66..69acda6fa9 100644 --- a/backend/tests/api_test/workspace_test.rs +++ b/backend/tests/api_test/workspace_test.rs @@ -258,7 +258,7 @@ async fn doc_create() { let params = CreateDocParams { id: doc_id.clone(), - revisions: RepeatedRevision { items: revisions }, + revisions: RepeatedRevision::new(revisions), }; server.create_doc(params).await; diff --git a/frontend/app_flowy/packages/flowy_infra/pubspec.lock b/frontend/app_flowy/packages/flowy_infra/pubspec.lock index 0b9eaad115..4cfc4b3c40 100644 --- a/frontend/app_flowy/packages/flowy_infra/pubspec.lock +++ b/frontend/app_flowy/packages/flowy_infra/pubspec.lock @@ -211,7 +211,7 @@ packages: name: vector_math url: "https://pub.dartlang.org" source: hosted - version: "2.1.0" + version: "2.1.1" xml: dependency: transitive description: @@ -220,5 +220,5 @@ packages: source: hosted version: "5.2.0" sdks: - dart: ">=2.13.0 <3.0.0" + dart: ">=2.14.0 <3.0.0" flutter: ">=1.24.0-7.0" diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/doc.pb.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/doc.pb.dart index 9c55344cff..f85bce986d 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/doc.pb.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/doc.pb.dart @@ -77,7 +77,7 @@ class CreateDocParams extends $pb.GeneratedMessage { class DocumentInfo extends $pb.GeneratedMessage { static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'DocumentInfo', createEmptyInstance: create) - ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id') + ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId') ..aOS(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'text') ..aInt64(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'revId') ..aInt64(4, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'baseRevId') @@ -86,14 +86,14 @@ class DocumentInfo extends $pb.GeneratedMessage { DocumentInfo._() : super(); factory DocumentInfo({ - $core.String? id, + $core.String? docId, $core.String? text, $fixnum.Int64? revId, $fixnum.Int64? baseRevId, }) { final _result = create(); - if (id != null) { - _result.id = id; + if (docId != null) { + _result.docId = docId; } if (text != null) { _result.text = text; @@ -128,13 +128,13 @@ class DocumentInfo extends $pb.GeneratedMessage { static DocumentInfo? _defaultInstance; @$pb.TagNumber(1) - $core.String get id => $_getSZ(0); + $core.String get docId => $_getSZ(0); @$pb.TagNumber(1) - set id($core.String v) { $_setString(0, v); } + set docId($core.String v) { $_setString(0, v); } @$pb.TagNumber(1) - $core.bool hasId() => $_has(0); + $core.bool hasDocId() => $_has(0); @$pb.TagNumber(1) - void clearId() => clearField(1); + void clearDocId() => clearField(1); @$pb.TagNumber(2) $core.String get text => $_getSZ(1); diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/doc.pbjson.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/doc.pbjson.dart index 5afa8cbcb7..061b00bb55 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/doc.pbjson.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/doc.pbjson.dart @@ -23,7 +23,7 @@ final $typed_data.Uint8List createDocParamsDescriptor = $convert.base64Decode('C const DocumentInfo$json = const { '1': 'DocumentInfo', '2': const [ - const {'1': 'id', '3': 1, '4': 1, '5': 9, '10': 'id'}, + const {'1': 'doc_id', '3': 1, '4': 1, '5': 9, '10': 'docId'}, const {'1': 'text', '3': 2, '4': 1, '5': 9, '10': 'text'}, const {'1': 'rev_id', '3': 3, '4': 1, '5': 3, '10': 'revId'}, const {'1': 'base_rev_id', '3': 4, '4': 1, '5': 3, '10': 'baseRevId'}, @@ -31,7 +31,7 @@ const DocumentInfo$json = const { }; /// Descriptor for `DocumentInfo`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List documentInfoDescriptor = $convert.base64Decode('CgxEb2N1bWVudEluZm8SDgoCaWQYASABKAlSAmlkEhIKBHRleHQYAiABKAlSBHRleHQSFQoGcmV2X2lkGAMgASgDUgVyZXZJZBIeCgtiYXNlX3Jldl9pZBgEIAEoA1IJYmFzZVJldklk'); +final $typed_data.Uint8List documentInfoDescriptor = $convert.base64Decode('CgxEb2N1bWVudEluZm8SFQoGZG9jX2lkGAEgASgJUgVkb2NJZBISCgR0ZXh0GAIgASgJUgR0ZXh0EhUKBnJldl9pZBgDIAEoA1IFcmV2SWQSHgoLYmFzZV9yZXZfaWQYBCABKANSCWJhc2VSZXZJZA=='); @$core.Deprecated('Use resetDocumentParamsDescriptor instead') const ResetDocumentParams$json = const { '1': 'ResetDocumentParams', diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pb.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pb.dart index aa1c64afac..e4492d6d29 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pb.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pb.dart @@ -9,6 +9,8 @@ import 'dart:core' as $core; import 'package:protobuf/protobuf.dart' as $pb; +import 'revision.pb.dart' as $0; + import 'ws.pbenum.dart'; export 'ws.pbenum.dart'; @@ -17,7 +19,7 @@ class DocumentClientWSData extends $pb.GeneratedMessage { static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'DocumentClientWSData', createEmptyInstance: create) ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId') ..e(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: DocumentClientWSDataType.ClientPushRev, valueOf: DocumentClientWSDataType.valueOf, enumValues: DocumentClientWSDataType.values) - ..a<$core.List<$core.int>>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY) + ..aOM<$0.RepeatedRevision>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'revisions', subBuilder: $0.RepeatedRevision.create) ..aOS(4, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id') ..hasRequiredFields = false ; @@ -26,7 +28,7 @@ class DocumentClientWSData extends $pb.GeneratedMessage { factory DocumentClientWSData({ $core.String? docId, DocumentClientWSDataType? ty, - $core.List<$core.int>? data, + $0.RepeatedRevision? revisions, $core.String? id, }) { final _result = create(); @@ -36,8 +38,8 @@ class DocumentClientWSData extends $pb.GeneratedMessage { if (ty != null) { _result.ty = ty; } - if (data != null) { - _result.data = data; + if (revisions != null) { + _result.revisions = revisions; } if (id != null) { _result.id = id; @@ -84,13 +86,15 @@ class DocumentClientWSData extends $pb.GeneratedMessage { void clearTy() => clearField(2); @$pb.TagNumber(3) - $core.List<$core.int> get data => $_getN(2); + $0.RepeatedRevision get revisions => $_getN(2); @$pb.TagNumber(3) - set data($core.List<$core.int> v) { $_setBytes(2, v); } + set revisions($0.RepeatedRevision v) { setField(3, v); } @$pb.TagNumber(3) - $core.bool hasData() => $_has(2); + $core.bool hasRevisions() => $_has(2); @$pb.TagNumber(3) - void clearData() => clearField(3); + void clearRevisions() => clearField(3); + @$pb.TagNumber(3) + $0.RepeatedRevision ensureRevisions() => $_ensure(2); @$pb.TagNumber(4) $core.String get id => $_getSZ(3); @@ -107,7 +111,6 @@ class DocumentServerWSData extends $pb.GeneratedMessage { ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId') ..e(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: DocumentServerWSDataType.ServerAck, valueOf: DocumentServerWSDataType.valueOf, enumValues: DocumentServerWSDataType.values) ..a<$core.List<$core.int>>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY) - ..aOS(4, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id') ..hasRequiredFields = false ; @@ -116,7 +119,6 @@ class DocumentServerWSData extends $pb.GeneratedMessage { $core.String? docId, DocumentServerWSDataType? ty, $core.List<$core.int>? data, - $core.String? id, }) { final _result = create(); if (docId != null) { @@ -128,9 +130,6 @@ class DocumentServerWSData extends $pb.GeneratedMessage { if (data != null) { _result.data = data; } - if (id != null) { - _result.id = id; - } return _result; } factory DocumentServerWSData.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); @@ -180,15 +179,6 @@ class DocumentServerWSData extends $pb.GeneratedMessage { $core.bool hasData() => $_has(2); @$pb.TagNumber(3) void clearData() => clearField(3); - - @$pb.TagNumber(4) - $core.String get id => $_getSZ(3); - @$pb.TagNumber(4) - set id($core.String v) { $_setString(3, v); } - @$pb.TagNumber(4) - $core.bool hasId() => $_has(3); - @$pb.TagNumber(4) - void clearId() => clearField(4); } class NewDocumentUser extends $pb.GeneratedMessage { diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbjson.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbjson.dart index 78d6000e1b..129523e47e 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbjson.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbjson.dart @@ -37,13 +37,13 @@ const DocumentClientWSData$json = const { '2': const [ const {'1': 'doc_id', '3': 1, '4': 1, '5': 9, '10': 'docId'}, const {'1': 'ty', '3': 2, '4': 1, '5': 14, '6': '.DocumentClientWSDataType', '10': 'ty'}, - const {'1': 'data', '3': 3, '4': 1, '5': 12, '10': 'data'}, + const {'1': 'revisions', '3': 3, '4': 1, '5': 11, '6': '.RepeatedRevision', '10': 'revisions'}, const {'1': 'id', '3': 4, '4': 1, '5': 9, '10': 'id'}, ], }; /// Descriptor for `DocumentClientWSData`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List documentClientWSDataDescriptor = $convert.base64Decode('ChREb2N1bWVudENsaWVudFdTRGF0YRIVCgZkb2NfaWQYASABKAlSBWRvY0lkEikKAnR5GAIgASgOMhkuRG9jdW1lbnRDbGllbnRXU0RhdGFUeXBlUgJ0eRISCgRkYXRhGAMgASgMUgRkYXRhEg4KAmlkGAQgASgJUgJpZA=='); +final $typed_data.Uint8List documentClientWSDataDescriptor = $convert.base64Decode('ChREb2N1bWVudENsaWVudFdTRGF0YRIVCgZkb2NfaWQYASABKAlSBWRvY0lkEikKAnR5GAIgASgOMhkuRG9jdW1lbnRDbGllbnRXU0RhdGFUeXBlUgJ0eRIvCglyZXZpc2lvbnMYAyABKAsyES5SZXBlYXRlZFJldmlzaW9uUglyZXZpc2lvbnMSDgoCaWQYBCABKAlSAmlk'); @$core.Deprecated('Use documentServerWSDataDescriptor instead') const DocumentServerWSData$json = const { '1': 'DocumentServerWSData', @@ -51,12 +51,11 @@ const DocumentServerWSData$json = const { const {'1': 'doc_id', '3': 1, '4': 1, '5': 9, '10': 'docId'}, const {'1': 'ty', '3': 2, '4': 1, '5': 14, '6': '.DocumentServerWSDataType', '10': 'ty'}, const {'1': 'data', '3': 3, '4': 1, '5': 12, '10': 'data'}, - const {'1': 'id', '3': 4, '4': 1, '5': 9, '10': 'id'}, ], }; /// Descriptor for `DocumentServerWSData`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List documentServerWSDataDescriptor = $convert.base64Decode('ChREb2N1bWVudFNlcnZlcldTRGF0YRIVCgZkb2NfaWQYASABKAlSBWRvY0lkEikKAnR5GAIgASgOMhkuRG9jdW1lbnRTZXJ2ZXJXU0RhdGFUeXBlUgJ0eRISCgRkYXRhGAMgASgMUgRkYXRhEg4KAmlkGAQgASgJUgJpZA=='); +final $typed_data.Uint8List documentServerWSDataDescriptor = $convert.base64Decode('ChREb2N1bWVudFNlcnZlcldTRGF0YRIVCgZkb2NfaWQYASABKAlSBWRvY0lkEikKAnR5GAIgASgOMhkuRG9jdW1lbnRTZXJ2ZXJXU0RhdGFUeXBlUgJ0eRISCgRkYXRhGAMgASgMUgRkYXRh'); @$core.Deprecated('Use newDocumentUserDescriptor instead') const NewDocumentUser$json = const { '1': 'NewDocumentUser', diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs index 0d24cc4c2f..15152c8f09 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -115,7 +115,7 @@ impl RevisionLoader { let delta_data = Bytes::from(doc.text.clone()); let doc_md5 = md5(&delta_data); let revision = Revision::new( - &doc.id, + &doc.doc_id, doc.base_rev_id, doc.rev_id, delta_data, @@ -158,7 +158,7 @@ fn mk_doc_from_revisions(doc_id: &str, revisions: Vec) -> FlowyResult< correct_delta_if_need(&mut delta); Result::::Ok(DocumentInfo { - id: doc_id.to_owned(), + doc_id: doc_id.to_owned(), text: delta.to_json(), rev_id, base_rev_id, diff --git a/frontend/rust-lib/flowy-document/src/services/doc/web_socket/http_ws_impl.rs b/frontend/rust-lib/flowy-document/src/services/doc/web_socket/http_ws_impl.rs index a9903b0859..7d14872823 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/web_socket/http_ws_impl.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/web_socket/http_ws_impl.rs @@ -5,7 +5,7 @@ use crate::services::{ use async_stream::stream; use bytes::Bytes; use flowy_collaboration::entities::{ - revision::RevisionRange, + revision::{RevId, RevisionRange}, ws::{DocumentClientWSData, DocumentServerWSData, DocumentServerWSDataType, NewDocumentUser}, }; use flowy_error::{internal_error, FlowyError, FlowyResult}; @@ -172,12 +172,7 @@ impl DocumentWSStream { } async fn handle_message(&self, msg: DocumentServerWSData) -> FlowyResult<()> { - let DocumentServerWSData { - doc_id: _, - ty, - data, - id, - } = msg; + let DocumentServerWSData { doc_id: _, ty, data } = msg; let bytes = spawn_blocking(move || Bytes::from(data)) .await .map_err(internal_error)?; @@ -186,14 +181,14 @@ impl DocumentWSStream { match ty { DocumentServerWSDataType::ServerPushRev => { let _ = self.consumer.receive_push_revision(bytes).await?; - let _ = self.consumer.receive_ack(id, ty).await; }, DocumentServerWSDataType::ServerPullRev => { let range = RevisionRange::try_from(bytes)?; let _ = self.consumer.pull_revisions_in_range(range).await?; }, DocumentServerWSDataType::ServerAck => { - let _ = self.consumer.receive_ack(id, ty).await; + let rev_id = RevId::try_from(bytes).unwrap().value; + let _ = self.consumer.receive_ack(rev_id.to_string(), ty).await; }, DocumentServerWSDataType::UserConnect => { let new_user = NewDocumentUser::try_from(bytes)?; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/web_socket/web_socket.rs b/frontend/rust-lib/flowy-document/src/services/doc/web_socket/web_socket.rs index 099e4bba69..167dc5a75a 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/web_socket/web_socket.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/web_socket/web_socket.rs @@ -1,10 +1,5 @@ use crate::services::doc::{ - web_socket::{ - local_ws_impl::LocalWebSocketManager, - DocumentWSSinkDataProvider, - DocumentWSSteamConsumer, - HttpWebSocketManager, - }, + web_socket::{DocumentWSSinkDataProvider, DocumentWSSteamConsumer, HttpWebSocketManager}, DocumentMD5, DocumentWSReceiver, DocumentWebSocket, @@ -40,6 +35,33 @@ pub(crate) async fn make_document_ws_manager( rev_manager: Arc, ws: Arc, ) -> Arc { + // if cfg!(feature = "http_server") { + // let shared_sink = + // Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone())); + // let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { + // doc_id: doc_id.clone(), + // user_id: user_id.clone(), + // editor_edit_queue: editor_edit_queue.clone(), + // rev_manager: rev_manager.clone(), + // shared_sink: shared_sink.clone(), + // }); + // let ws_stream_provider = + // DocumentWSSinkDataProviderAdapter(shared_sink.clone()); + // let ws_manager = Arc::new(HttpWebSocketManager::new( + // &doc_id, + // ws.clone(), + // Arc::new(ws_stream_provider), + // ws_stream_consumer, + // )); + // notify_user_has_connected(&user_id, &doc_id, rev_manager.clone(), + // shared_sink).await; listen_document_ws_state(&user_id, &doc_id, + // ws_manager.scribe_state(), rev_manager.clone()); + // + // Arc::new(ws_manager) + // } else { + // Arc::new(Arc::new(LocalWebSocketManager {})) + // } + let shared_sink = Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone())); let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { doc_id: doc_id.clone(), @@ -124,7 +146,8 @@ impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter { if let Some(server_composed_revision) = handle_push_rev(&doc_id, &user_id, edit_cmd_tx, rev_manager, bytes).await? { - shared_sink.push_back(server_composed_revision.into()).await; + let data = DocumentClientWSData::from_revisions(&doc_id, vec![server_composed_revision]); + shared_sink.push_back(data).await; } Ok(()) }) @@ -143,15 +166,11 @@ impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter { fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> { let rev_manager = self.rev_manager.clone(); let shared_sink = self.shared_sink.clone(); + let doc_id = self.doc_id.clone(); FutureResult::new(async move { - let data = rev_manager - .get_revisions_in_range(range) - .await? - .into_iter() - .map(|revision| revision.into()) - .collect::>(); - - shared_sink.append(data).await; + let revisions = rev_manager.get_revisions_in_range(range).await?; + let data = DocumentClientWSData::from_revisions(&doc_id, revisions); + shared_sink.push_back(data).await; Ok(()) }) } @@ -260,18 +279,11 @@ impl SharedWSSinkDataProvider { } } - // TODO: return Option<&DocumentWSData> would be better - pub(crate) async fn front(&self) -> Option { self.shared.read().await.front().cloned() } - + #[allow(dead_code)] pub(crate) async fn push_front(&self, data: DocumentClientWSData) { self.shared.write().await.push_front(data); } async fn push_back(&self, data: DocumentClientWSData) { self.shared.write().await.push_back(data); } - async fn append(&self, data: Vec) { - let mut buf: VecDeque<_> = data.into_iter().collect(); - self.shared.write().await.append(&mut buf); - } - async fn next(&self) -> FlowyResult> { let source_ty = self.source_ty.read().await.clone(); match source_ty { @@ -281,7 +293,7 @@ impl SharedWSSinkDataProvider { Ok(None) }, Some(data) => { - tracing::debug!("[DocumentSinkDataProvider]: {}:{:?}", data.doc_id, data.ty); + tracing::debug!("[SharedWSSinkDataProvider]: {}:{:?}", data.doc_id, data.ty); Ok(Some(data.clone())) }, }, @@ -293,8 +305,9 @@ impl SharedWSSinkDataProvider { match self.rev_manager.next_sync_revision().await? { Some(rev) => { - tracing::debug!("[DocumentSinkDataProvider]: {}:{:?}", rev.doc_id, rev.rev_id); - Ok(Some(rev.into())) + tracing::debug!("[SharedWSSinkDataProvider]: {}:{:?}", rev.doc_id, rev.rev_id); + let doc_id = rev.doc_id.clone(); + Ok(Some(DocumentClientWSData::from_revisions(&doc_id, vec![rev]))) }, None => Ok(None), } @@ -310,10 +323,11 @@ impl SharedWSSinkDataProvider { let should_pop = match self.shared.read().await.front() { None => false, Some(val) => { - if val.id == id { + let expected_id = val.id(); + if expected_id == id { true } else { - tracing::error!("The front element's {} is not equal to the {}", val.id, id); + tracing::error!("The front element's {} is not equal to the {}", expected_id, id); false } }, diff --git a/frontend/rust-lib/flowy-document/src/services/server/server_api_mock.rs b/frontend/rust-lib/flowy-document/src/services/server/server_api_mock.rs index 6a95a9adab..4bfa2896b3 100644 --- a/frontend/rust-lib/flowy-document/src/services/server/server_api_mock.rs +++ b/frontend/rust-lib/flowy-document/src/services/server/server_api_mock.rs @@ -15,7 +15,7 @@ impl DocumentServerAPI for DocServerMock { fn read_doc(&self, _token: &str, params: DocIdentifier) -> FutureResult, FlowyError> { let doc = DocumentInfo { - id: params.doc_id, + doc_id: params.doc_id, text: initial_delta_string(), rev_id: 0, base_rev_id: 0, diff --git a/frontend/rust-lib/flowy-virtual-net/src/mock/server.rs b/frontend/rust-lib/flowy-virtual-net/src/mock/server.rs index 25afd7f792..ee1be4cdb2 100644 --- a/frontend/rust-lib/flowy-virtual-net/src/mock/server.rs +++ b/frontend/rust-lib/flowy-virtual-net/src/mock/server.rs @@ -1,11 +1,11 @@ use bytes::Bytes; use dashmap::DashMap; use flowy_collaboration::{entities::prelude::*, errors::CollaborateError, sync::*}; -use flowy_net::services::ws::*; +// use flowy_net::services::ws::*; use lib_infra::future::FutureResultSend; use lib_ws::{WSModule, WebSocketRawMessage}; use std::{ - convert::{TryFrom, TryInto}, + convert::TryInto, fmt::{Debug, Formatter}, sync::Arc, }; @@ -24,21 +24,20 @@ impl std::default::Default for MockDocServer { } impl MockDocServer { - pub async fn handle_ws_data(&self, ws_data: DocumentClientWSData) -> Option> { - let bytes = Bytes::from(ws_data.data); - match ws_data.ty { + pub async fn handle_client_data( + &self, + client_data: DocumentClientWSData, + ) -> Option> { + match client_data.ty { DocumentClientWSDataType::ClientPushRev => { - let revisions = RepeatedRevision::try_from(bytes).unwrap().into_inner(); - if revisions.is_empty() { - return None; - } - let first_revision = revisions.first().unwrap(); let (tx, rx) = mpsc::channel(1); let user = Arc::new(MockDocUser { - user_id: first_revision.user_id.clone(), + user_id: "fake_user_id".to_owned(), tx, }); - self.manager.apply_revisions(user, revisions).await.unwrap(); + let pb_client_data: flowy_collaboration::protobuf::DocumentClientWSData = + client_data.try_into().unwrap(); + self.manager.apply_revisions(user, pb_client_data).await.unwrap(); Some(rx) }, } @@ -79,16 +78,16 @@ impl DocumentPersistence for MockDocServerPersistence { }) } - fn create_doc(&self, revision: Revision) -> FutureResultSend { - FutureResultSend::new(async move { - let document_info: DocumentInfo = revision.try_into().unwrap(); - Ok(document_info) - }) + fn create_doc(&self, doc_id: &str, revisions: Vec) -> FutureResultSend { + let doc_id = doc_id.to_owned(); + FutureResultSend::new(async move { DocumentInfo::from_revisions(&doc_id, revisions) }) } fn get_revisions(&self, _doc_id: &str, _rev_ids: Vec) -> FutureResultSend, CollaborateError> { - unimplemented!() + FutureResultSend::new(async move { Ok(vec![]) }) } + + fn get_doc_revisions(&self, _doc_id: &str) -> FutureResultSend, CollaborateError> { unimplemented!() } } #[derive(Debug)] diff --git a/frontend/rust-lib/flowy-virtual-net/src/mock/ws_local.rs b/frontend/rust-lib/flowy-virtual-net/src/mock/ws_local.rs index 9c042c1bdb..2af11e6241 100644 --- a/frontend/rust-lib/flowy-virtual-net/src/mock/ws_local.rs +++ b/frontend/rust-lib/flowy-virtual-net/src/mock/ws_local.rs @@ -47,7 +47,7 @@ impl FlowyWebSocket for MockWebSocket { } else { let ws_data = DocumentClientWSData::try_from(Bytes::from(message.data.clone())).unwrap(); - if let Some(mut rx) = server.handle_ws_data(ws_data).await { + if let Some(mut rx) = server.handle_client_data(ws_data).await { let new_ws_message = rx.recv().await.unwrap(); match receivers.get(&new_ws_message.module) { None => tracing::error!("Can't find any handler for message: {:?}", new_ws_message), diff --git a/shared-lib/flowy-collaboration/src/entities/doc.rs b/shared-lib/flowy-collaboration/src/entities/doc.rs index d34905fc6c..1665d04e1d 100644 --- a/shared-lib/flowy-collaboration/src/entities/doc.rs +++ b/shared-lib/flowy-collaboration/src/entities/doc.rs @@ -1,9 +1,9 @@ use crate::{ entities::revision::{RepeatedRevision, Revision}, - errors::CollaborateError, + errors::{internal_error, CollaborateError}, }; use flowy_derive::ProtoBuf; -use lib_ot::{errors::OTError, rich_text::RichTextDelta}; +use lib_ot::{core::OperationTransformable, errors::OTError, rich_text::RichTextDelta}; #[derive(ProtoBuf, Default, Debug, Clone)] pub struct CreateDocParams { @@ -17,7 +17,7 @@ pub struct CreateDocParams { #[derive(ProtoBuf, Default, Debug, Clone, Eq, PartialEq)] pub struct DocumentInfo { #[pb(index = 1)] - pub id: String, + pub doc_id: String, #[pb(index = 2)] pub text: String, @@ -34,6 +34,27 @@ impl DocumentInfo { let delta = RichTextDelta::from_bytes(&self.text)?; Ok(delta) } + + pub fn from_revisions(doc_id: &str, revisions: Vec) -> Result { + let mut document_delta = RichTextDelta::new(); + let mut base_rev_id = 0; + let mut rev_id = 0; + + for revision in revisions { + base_rev_id = revision.base_rev_id; + rev_id = revision.rev_id; + let delta = RichTextDelta::from_bytes(revision.delta_data).map_err(internal_error)?; + document_delta = document_delta.compose(&delta).map_err(internal_error)?; + } + let text = document_delta.to_json(); + + Ok(DocumentInfo { + doc_id: doc_id.to_string(), + text, + rev_id, + base_rev_id, + }) + } } impl std::convert::TryFrom for DocumentInfo { @@ -49,7 +70,7 @@ impl std::convert::TryFrom for DocumentInfo { let doc_json = delta.to_json(); Ok(DocumentInfo { - id: revision.doc_id, + doc_id: revision.doc_id, text: doc_json, rev_id: revision.rev_id, base_rev_id: revision.base_rev_id, diff --git a/shared-lib/flowy-collaboration/src/entities/revision.rs b/shared-lib/flowy-collaboration/src/entities/revision.rs index 942b3a26e9..74ba24345d 100644 --- a/shared-lib/flowy-collaboration/src/entities/revision.rs +++ b/shared-lib/flowy-collaboration/src/entities/revision.rs @@ -100,7 +100,7 @@ impl std::fmt::Debug for Revision { #[derive(PartialEq, Debug, Default, ProtoBuf, Clone)] pub struct RepeatedRevision { #[pb(index = 1)] - pub items: Vec, + items: Vec, } impl std::ops::Deref for RepeatedRevision { @@ -114,6 +114,16 @@ impl std::ops::DerefMut for RepeatedRevision { } impl RepeatedRevision { + pub fn new(items: Vec) -> Self { + if cfg!(debug_assertions) { + let mut sorted_items = items.clone(); + sorted_items.sort_by(|a, b| a.rev_id.cmp(&b.rev_id)); + assert_eq!(sorted_items, items, "The items passed in should be sorted") + } + + Self { items } + } + pub fn into_inner(self) -> Vec { self.items } } diff --git a/shared-lib/flowy-collaboration/src/entities/ws.rs b/shared-lib/flowy-collaboration/src/entities/ws.rs index 9fbb762f44..dd51b800c5 100644 --- a/shared-lib/flowy-collaboration/src/entities/ws.rs +++ b/shared-lib/flowy-collaboration/src/entities/ws.rs @@ -1,5 +1,5 @@ use crate::{ - entities::revision::{RepeatedRevision, Revision, RevisionRange}, + entities::revision::{RepeatedRevision, RevId, Revision, RevisionRange}, errors::CollaborateError, }; use bytes::Bytes; @@ -33,24 +33,28 @@ pub struct DocumentClientWSData { pub ty: DocumentClientWSDataType, #[pb(index = 3)] - pub data: Vec, + pub revisions: RepeatedRevision, #[pb(index = 4)] - pub id: String, + id: String, } -impl std::convert::From for DocumentClientWSData { - fn from(revision: Revision) -> Self { - let doc_id = revision.doc_id.clone(); - let rev_id = revision.rev_id; - let bytes: Bytes = revision.try_into().unwrap(); +impl DocumentClientWSData { + pub fn from_revisions(doc_id: &str, revisions: Vec) -> Self { + let rev_id = match revisions.first() { + None => 0, + Some(revision) => revision.rev_id, + }; + Self { - doc_id, + doc_id: doc_id.to_owned(), ty: DocumentClientWSDataType::ClientPushRev, - data: bytes.to_vec(), + revisions: RepeatedRevision::new(revisions), id: rev_id.to_string(), } } + + pub fn id(&self) -> String { self.id.clone() } } #[derive(Debug, Clone, ProtoBuf_Enum, Eq, PartialEq, Hash)] @@ -75,57 +79,38 @@ pub struct DocumentServerWSData { #[pb(index = 3)] pub data: Vec, - - #[pb(index = 4)] - pub id: String, } pub struct DocumentServerWSDataBuilder(); impl DocumentServerWSDataBuilder { - // DocumentWSDataType::PushRev -> Revision - pub fn build_push_message(doc_id: &str, revisions: Vec, id: &str) -> DocumentServerWSData { - let repeated_revision = RepeatedRevision { items: revisions }; + pub fn build_push_message(doc_id: &str, revisions: Vec) -> DocumentServerWSData { + let repeated_revision = RepeatedRevision::new(revisions); let bytes: Bytes = repeated_revision.try_into().unwrap(); DocumentServerWSData { doc_id: doc_id.to_string(), ty: DocumentServerWSDataType::ServerPushRev, data: bytes.to_vec(), - id: id.to_string(), } } - // DocumentWSDataType::PullRev -> RevisionRange - pub fn build_pull_message(doc_id: &str, range: RevisionRange, rev_id: i64) -> DocumentServerWSData { + pub fn build_pull_message(doc_id: &str, range: RevisionRange) -> DocumentServerWSData { let bytes: Bytes = range.try_into().unwrap(); DocumentServerWSData { doc_id: doc_id.to_string(), ty: DocumentServerWSDataType::ServerPullRev, data: bytes.to_vec(), - id: rev_id.to_string(), } } - // DocumentWSDataType::Ack -> RevId - pub fn build_ack_message(doc_id: &str, id: &str) -> DocumentServerWSData { + pub fn build_ack_message(doc_id: &str, rev_id: i64) -> DocumentServerWSData { + let rev_id: RevId = rev_id.into(); + let bytes: Bytes = rev_id.try_into().unwrap(); DocumentServerWSData { doc_id: doc_id.to_string(), ty: DocumentServerWSDataType::ServerAck, - data: vec![], - id: id.to_string(), + data: bytes.to_vec(), } } - - // DocumentWSDataType::UserConnect -> DocumentConnected - // pub fn build_new_document_user_message(doc_id: &str, new_document_user: - // NewDocumentUser) -> DocumentServerWSData { let id = - // new_document_user.user_id.clone(); let bytes: Bytes = - // new_document_user.try_into().unwrap(); DocumentServerWSData { - // doc_id: doc_id.to_string(), - // ty: DocumentServerWSDataType::UserConnect, - // data: bytes.to_vec(), - // id, - // } - // } } #[derive(ProtoBuf, Default, Debug, Clone)] diff --git a/shared-lib/flowy-collaboration/src/protobuf/model/doc.rs b/shared-lib/flowy-collaboration/src/protobuf/model/doc.rs index 9655a108fb..7a6349bf90 100644 --- a/shared-lib/flowy-collaboration/src/protobuf/model/doc.rs +++ b/shared-lib/flowy-collaboration/src/protobuf/model/doc.rs @@ -242,7 +242,7 @@ impl ::protobuf::reflect::ProtobufValue for CreateDocParams { #[derive(PartialEq,Clone,Default)] pub struct DocumentInfo { // message fields - pub id: ::std::string::String, + pub doc_id: ::std::string::String, pub text: ::std::string::String, pub rev_id: i64, pub base_rev_id: i64, @@ -262,30 +262,30 @@ impl DocumentInfo { ::std::default::Default::default() } - // string id = 1; + // string doc_id = 1; - pub fn get_id(&self) -> &str { - &self.id + pub fn get_doc_id(&self) -> &str { + &self.doc_id } - pub fn clear_id(&mut self) { - self.id.clear(); + pub fn clear_doc_id(&mut self) { + self.doc_id.clear(); } // Param is passed by value, moved - pub fn set_id(&mut self, v: ::std::string::String) { - self.id = v; + pub fn set_doc_id(&mut self, v: ::std::string::String) { + self.doc_id = v; } // Mutable pointer to the field. // If field is not initialized, it is initialized with default value first. - pub fn mut_id(&mut self) -> &mut ::std::string::String { - &mut self.id + pub fn mut_doc_id(&mut self) -> &mut ::std::string::String { + &mut self.doc_id } // Take field - pub fn take_id(&mut self) -> ::std::string::String { - ::std::mem::replace(&mut self.id, ::std::string::String::new()) + pub fn take_doc_id(&mut self) -> ::std::string::String { + ::std::mem::replace(&mut self.doc_id, ::std::string::String::new()) } // string text = 2; @@ -355,7 +355,7 @@ impl ::protobuf::Message for DocumentInfo { let (field_number, wire_type) = is.read_tag_unpack()?; match field_number { 1 => { - ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.id)?; + ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.doc_id)?; }, 2 => { ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.text)?; @@ -386,8 +386,8 @@ impl ::protobuf::Message for DocumentInfo { #[allow(unused_variables)] fn compute_size(&self) -> u32 { let mut my_size = 0; - if !self.id.is_empty() { - my_size += ::protobuf::rt::string_size(1, &self.id); + if !self.doc_id.is_empty() { + my_size += ::protobuf::rt::string_size(1, &self.doc_id); } if !self.text.is_empty() { my_size += ::protobuf::rt::string_size(2, &self.text); @@ -404,8 +404,8 @@ impl ::protobuf::Message for DocumentInfo { } fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { - if !self.id.is_empty() { - os.write_string(1, &self.id)?; + if !self.doc_id.is_empty() { + os.write_string(1, &self.doc_id)?; } if !self.text.is_empty() { os.write_string(2, &self.text)?; @@ -455,9 +455,9 @@ impl ::protobuf::Message for DocumentInfo { descriptor.get(|| { let mut fields = ::std::vec::Vec::new(); fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( - "id", - |m: &DocumentInfo| { &m.id }, - |m: &mut DocumentInfo| { &mut m.id }, + "doc_id", + |m: &DocumentInfo| { &m.doc_id }, + |m: &mut DocumentInfo| { &mut m.doc_id }, )); fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( "text", @@ -490,7 +490,7 @@ impl ::protobuf::Message for DocumentInfo { impl ::protobuf::Clear for DocumentInfo { fn clear(&mut self) { - self.id.clear(); + self.doc_id.clear(); self.text.clear(); self.rev_id = 0; self.base_rev_id = 0; @@ -1325,64 +1325,64 @@ impl ::protobuf::reflect::ProtobufValue for DocIdentifier { static file_descriptor_proto_data: &'static [u8] = b"\ \n\tdoc.proto\x1a\x0erevision.proto\"R\n\x0fCreateDocParams\x12\x0e\n\ \x02id\x18\x01\x20\x01(\tR\x02id\x12/\n\trevisions\x18\x02\x20\x01(\x0b2\ - \x11.RepeatedRevisionR\trevisions\"i\n\x0cDocumentInfo\x12\x0e\n\x02id\ - \x18\x01\x20\x01(\tR\x02id\x12\x12\n\x04text\x18\x02\x20\x01(\tR\x04text\ - \x12\x15\n\x06rev_id\x18\x03\x20\x01(\x03R\x05revId\x12\x1e\n\x0bbase_re\ - v_id\x18\x04\x20\x01(\x03R\tbaseRevId\"]\n\x13ResetDocumentParams\x12\ - \x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12/\n\trevisions\x18\x02\ - \x20\x01(\x0b2\x11.RepeatedRevisionR\trevisions\":\n\rDocumentDelta\x12\ - \x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\x12\n\x04text\x18\x02\ - \x20\x01(\tR\x04text\"S\n\nNewDocUser\x12\x17\n\x07user_id\x18\x01\x20\ - \x01(\tR\x06userId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\x03R\x05revId\ - \x12\x15\n\x06doc_id\x18\x03\x20\x01(\tR\x05docId\"&\n\rDocIdentifier\ - \x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docIdJ\xaf\x07\n\x06\x12\x04\ - \0\0\x1c\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\t\n\x02\x03\0\x12\x03\x01\ - \0\x18\n\n\n\x02\x04\0\x12\x04\x03\0\x06\x01\n\n\n\x03\x04\0\x01\x12\x03\ - \x03\x08\x17\n\x0b\n\x04\x04\0\x02\0\x12\x03\x04\x04\x12\n\x0c\n\x05\x04\ - \0\x02\0\x05\x12\x03\x04\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x04\ - \x0b\r\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x04\x10\x11\n\x0b\n\x04\x04\0\ - \x02\x01\x12\x03\x05\x04#\n\x0c\n\x05\x04\0\x02\x01\x06\x12\x03\x05\x04\ - \x14\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x05\x15\x1e\n\x0c\n\x05\x04\0\ - \x02\x01\x03\x12\x03\x05!\"\n\n\n\x02\x04\x01\x12\x04\x07\0\x0c\x01\n\n\ - \n\x03\x04\x01\x01\x12\x03\x07\x08\x14\n\x0b\n\x04\x04\x01\x02\0\x12\x03\ - \x08\x04\x12\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\x08\x04\n\n\x0c\n\x05\ - \x04\x01\x02\0\x01\x12\x03\x08\x0b\r\n\x0c\n\x05\x04\x01\x02\0\x03\x12\ - \x03\x08\x10\x11\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\t\x04\x14\n\x0c\n\ - \x05\x04\x01\x02\x01\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\x01\x02\x01\x01\ - \x12\x03\t\x0b\x0f\n\x0c\n\x05\x04\x01\x02\x01\x03\x12\x03\t\x12\x13\n\ - \x0b\n\x04\x04\x01\x02\x02\x12\x03\n\x04\x15\n\x0c\n\x05\x04\x01\x02\x02\ - \x05\x12\x03\n\x04\t\n\x0c\n\x05\x04\x01\x02\x02\x01\x12\x03\n\n\x10\n\ - \x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\n\x13\x14\n\x0b\n\x04\x04\x01\x02\ - \x03\x12\x03\x0b\x04\x1a\n\x0c\n\x05\x04\x01\x02\x03\x05\x12\x03\x0b\x04\ - \t\n\x0c\n\x05\x04\x01\x02\x03\x01\x12\x03\x0b\n\x15\n\x0c\n\x05\x04\x01\ - \x02\x03\x03\x12\x03\x0b\x18\x19\n\n\n\x02\x04\x02\x12\x04\r\0\x10\x01\n\ - \n\n\x03\x04\x02\x01\x12\x03\r\x08\x1b\n\x0b\n\x04\x04\x02\x02\0\x12\x03\ - \x0e\x04\x16\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0e\x04\n\n\x0c\n\x05\ - \x04\x02\x02\0\x01\x12\x03\x0e\x0b\x11\n\x0c\n\x05\x04\x02\x02\0\x03\x12\ - \x03\x0e\x14\x15\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\x0f\x04#\n\x0c\n\ - \x05\x04\x02\x02\x01\x06\x12\x03\x0f\x04\x14\n\x0c\n\x05\x04\x02\x02\x01\ - \x01\x12\x03\x0f\x15\x1e\n\x0c\n\x05\x04\x02\x02\x01\x03\x12\x03\x0f!\"\ - \n\n\n\x02\x04\x03\x12\x04\x11\0\x14\x01\n\n\n\x03\x04\x03\x01\x12\x03\ - \x11\x08\x15\n\x0b\n\x04\x04\x03\x02\0\x12\x03\x12\x04\x16\n\x0c\n\x05\ - \x04\x03\x02\0\x05\x12\x03\x12\x04\n\n\x0c\n\x05\x04\x03\x02\0\x01\x12\ - \x03\x12\x0b\x11\n\x0c\n\x05\x04\x03\x02\0\x03\x12\x03\x12\x14\x15\n\x0b\ - \n\x04\x04\x03\x02\x01\x12\x03\x13\x04\x14\n\x0c\n\x05\x04\x03\x02\x01\ - \x05\x12\x03\x13\x04\n\n\x0c\n\x05\x04\x03\x02\x01\x01\x12\x03\x13\x0b\ - \x0f\n\x0c\n\x05\x04\x03\x02\x01\x03\x12\x03\x13\x12\x13\n\n\n\x02\x04\ - \x04\x12\x04\x15\0\x19\x01\n\n\n\x03\x04\x04\x01\x12\x03\x15\x08\x12\n\ - \x0b\n\x04\x04\x04\x02\0\x12\x03\x16\x04\x17\n\x0c\n\x05\x04\x04\x02\0\ - \x05\x12\x03\x16\x04\n\n\x0c\n\x05\x04\x04\x02\0\x01\x12\x03\x16\x0b\x12\ - \n\x0c\n\x05\x04\x04\x02\0\x03\x12\x03\x16\x15\x16\n\x0b\n\x04\x04\x04\ - \x02\x01\x12\x03\x17\x04\x15\n\x0c\n\x05\x04\x04\x02\x01\x05\x12\x03\x17\ - \x04\t\n\x0c\n\x05\x04\x04\x02\x01\x01\x12\x03\x17\n\x10\n\x0c\n\x05\x04\ - \x04\x02\x01\x03\x12\x03\x17\x13\x14\n\x0b\n\x04\x04\x04\x02\x02\x12\x03\ - \x18\x04\x16\n\x0c\n\x05\x04\x04\x02\x02\x05\x12\x03\x18\x04\n\n\x0c\n\ - \x05\x04\x04\x02\x02\x01\x12\x03\x18\x0b\x11\n\x0c\n\x05\x04\x04\x02\x02\ - \x03\x12\x03\x18\x14\x15\n\n\n\x02\x04\x05\x12\x04\x1a\0\x1c\x01\n\n\n\ - \x03\x04\x05\x01\x12\x03\x1a\x08\x15\n\x0b\n\x04\x04\x05\x02\0\x12\x03\ - \x1b\x04\x16\n\x0c\n\x05\x04\x05\x02\0\x05\x12\x03\x1b\x04\n\n\x0c\n\x05\ - \x04\x05\x02\0\x01\x12\x03\x1b\x0b\x11\n\x0c\n\x05\x04\x05\x02\0\x03\x12\ - \x03\x1b\x14\x15b\x06proto3\ + \x11.RepeatedRevisionR\trevisions\"p\n\x0cDocumentInfo\x12\x15\n\x06doc_\ + id\x18\x01\x20\x01(\tR\x05docId\x12\x12\n\x04text\x18\x02\x20\x01(\tR\ + \x04text\x12\x15\n\x06rev_id\x18\x03\x20\x01(\x03R\x05revId\x12\x1e\n\ + \x0bbase_rev_id\x18\x04\x20\x01(\x03R\tbaseRevId\"]\n\x13ResetDocumentPa\ + rams\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12/\n\trevisions\ + \x18\x02\x20\x01(\x0b2\x11.RepeatedRevisionR\trevisions\":\n\rDocumentDe\ + lta\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\x12\n\x04text\ + \x18\x02\x20\x01(\tR\x04text\"S\n\nNewDocUser\x12\x17\n\x07user_id\x18\ + \x01\x20\x01(\tR\x06userId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\x03R\x05\ + revId\x12\x15\n\x06doc_id\x18\x03\x20\x01(\tR\x05docId\"&\n\rDocIdentifi\ + er\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docIdJ\xaf\x07\n\x06\x12\ + \x04\0\0\x1c\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\t\n\x02\x03\0\x12\x03\ + \x01\0\x18\n\n\n\x02\x04\0\x12\x04\x03\0\x06\x01\n\n\n\x03\x04\0\x01\x12\ + \x03\x03\x08\x17\n\x0b\n\x04\x04\0\x02\0\x12\x03\x04\x04\x12\n\x0c\n\x05\ + \x04\0\x02\0\x05\x12\x03\x04\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\ + \x04\x0b\r\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x04\x10\x11\n\x0b\n\x04\ + \x04\0\x02\x01\x12\x03\x05\x04#\n\x0c\n\x05\x04\0\x02\x01\x06\x12\x03\ + \x05\x04\x14\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x05\x15\x1e\n\x0c\n\ + \x05\x04\0\x02\x01\x03\x12\x03\x05!\"\n\n\n\x02\x04\x01\x12\x04\x07\0\ + \x0c\x01\n\n\n\x03\x04\x01\x01\x12\x03\x07\x08\x14\n\x0b\n\x04\x04\x01\ + \x02\0\x12\x03\x08\x04\x16\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\x08\x04\ + \n\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x08\x0b\x11\n\x0c\n\x05\x04\x01\ + \x02\0\x03\x12\x03\x08\x14\x15\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\t\x04\ + \x14\n\x0c\n\x05\x04\x01\x02\x01\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\x01\ + \x02\x01\x01\x12\x03\t\x0b\x0f\n\x0c\n\x05\x04\x01\x02\x01\x03\x12\x03\t\ + \x12\x13\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\n\x04\x15\n\x0c\n\x05\x04\ + \x01\x02\x02\x05\x12\x03\n\x04\t\n\x0c\n\x05\x04\x01\x02\x02\x01\x12\x03\ + \n\n\x10\n\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\n\x13\x14\n\x0b\n\x04\ + \x04\x01\x02\x03\x12\x03\x0b\x04\x1a\n\x0c\n\x05\x04\x01\x02\x03\x05\x12\ + \x03\x0b\x04\t\n\x0c\n\x05\x04\x01\x02\x03\x01\x12\x03\x0b\n\x15\n\x0c\n\ + \x05\x04\x01\x02\x03\x03\x12\x03\x0b\x18\x19\n\n\n\x02\x04\x02\x12\x04\r\ + \0\x10\x01\n\n\n\x03\x04\x02\x01\x12\x03\r\x08\x1b\n\x0b\n\x04\x04\x02\ + \x02\0\x12\x03\x0e\x04\x16\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0e\x04\ + \n\n\x0c\n\x05\x04\x02\x02\0\x01\x12\x03\x0e\x0b\x11\n\x0c\n\x05\x04\x02\ + \x02\0\x03\x12\x03\x0e\x14\x15\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\x0f\ + \x04#\n\x0c\n\x05\x04\x02\x02\x01\x06\x12\x03\x0f\x04\x14\n\x0c\n\x05\ + \x04\x02\x02\x01\x01\x12\x03\x0f\x15\x1e\n\x0c\n\x05\x04\x02\x02\x01\x03\ + \x12\x03\x0f!\"\n\n\n\x02\x04\x03\x12\x04\x11\0\x14\x01\n\n\n\x03\x04\ + \x03\x01\x12\x03\x11\x08\x15\n\x0b\n\x04\x04\x03\x02\0\x12\x03\x12\x04\ + \x16\n\x0c\n\x05\x04\x03\x02\0\x05\x12\x03\x12\x04\n\n\x0c\n\x05\x04\x03\ + \x02\0\x01\x12\x03\x12\x0b\x11\n\x0c\n\x05\x04\x03\x02\0\x03\x12\x03\x12\ + \x14\x15\n\x0b\n\x04\x04\x03\x02\x01\x12\x03\x13\x04\x14\n\x0c\n\x05\x04\ + \x03\x02\x01\x05\x12\x03\x13\x04\n\n\x0c\n\x05\x04\x03\x02\x01\x01\x12\ + \x03\x13\x0b\x0f\n\x0c\n\x05\x04\x03\x02\x01\x03\x12\x03\x13\x12\x13\n\n\ + \n\x02\x04\x04\x12\x04\x15\0\x19\x01\n\n\n\x03\x04\x04\x01\x12\x03\x15\ + \x08\x12\n\x0b\n\x04\x04\x04\x02\0\x12\x03\x16\x04\x17\n\x0c\n\x05\x04\ + \x04\x02\0\x05\x12\x03\x16\x04\n\n\x0c\n\x05\x04\x04\x02\0\x01\x12\x03\ + \x16\x0b\x12\n\x0c\n\x05\x04\x04\x02\0\x03\x12\x03\x16\x15\x16\n\x0b\n\ + \x04\x04\x04\x02\x01\x12\x03\x17\x04\x15\n\x0c\n\x05\x04\x04\x02\x01\x05\ + \x12\x03\x17\x04\t\n\x0c\n\x05\x04\x04\x02\x01\x01\x12\x03\x17\n\x10\n\ + \x0c\n\x05\x04\x04\x02\x01\x03\x12\x03\x17\x13\x14\n\x0b\n\x04\x04\x04\ + \x02\x02\x12\x03\x18\x04\x16\n\x0c\n\x05\x04\x04\x02\x02\x05\x12\x03\x18\ + \x04\n\n\x0c\n\x05\x04\x04\x02\x02\x01\x12\x03\x18\x0b\x11\n\x0c\n\x05\ + \x04\x04\x02\x02\x03\x12\x03\x18\x14\x15\n\n\n\x02\x04\x05\x12\x04\x1a\0\ + \x1c\x01\n\n\n\x03\x04\x05\x01\x12\x03\x1a\x08\x15\n\x0b\n\x04\x04\x05\ + \x02\0\x12\x03\x1b\x04\x16\n\x0c\n\x05\x04\x05\x02\0\x05\x12\x03\x1b\x04\ + \n\n\x0c\n\x05\x04\x05\x02\0\x01\x12\x03\x1b\x0b\x11\n\x0c\n\x05\x04\x05\ + \x02\0\x03\x12\x03\x1b\x14\x15b\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/shared-lib/flowy-collaboration/src/protobuf/model/ws.rs b/shared-lib/flowy-collaboration/src/protobuf/model/ws.rs index 0d24c128d4..b54df062bb 100644 --- a/shared-lib/flowy-collaboration/src/protobuf/model/ws.rs +++ b/shared-lib/flowy-collaboration/src/protobuf/model/ws.rs @@ -28,7 +28,7 @@ pub struct DocumentClientWSData { // message fields pub doc_id: ::std::string::String, pub ty: DocumentClientWSDataType, - pub data: ::std::vec::Vec, + pub revisions: ::protobuf::SingularPtrField, pub id: ::std::string::String, // special fields pub unknown_fields: ::protobuf::UnknownFields, @@ -87,30 +87,37 @@ impl DocumentClientWSData { self.ty = v; } - // bytes data = 3; + // .RepeatedRevision revisions = 3; - pub fn get_data(&self) -> &[u8] { - &self.data + pub fn get_revisions(&self) -> &super::revision::RepeatedRevision { + self.revisions.as_ref().unwrap_or_else(|| ::default_instance()) } - pub fn clear_data(&mut self) { - self.data.clear(); + pub fn clear_revisions(&mut self) { + self.revisions.clear(); + } + + pub fn has_revisions(&self) -> bool { + self.revisions.is_some() } // Param is passed by value, moved - pub fn set_data(&mut self, v: ::std::vec::Vec) { - self.data = v; + pub fn set_revisions(&mut self, v: super::revision::RepeatedRevision) { + self.revisions = ::protobuf::SingularPtrField::some(v); } // Mutable pointer to the field. // If field is not initialized, it is initialized with default value first. - pub fn mut_data(&mut self) -> &mut ::std::vec::Vec { - &mut self.data + pub fn mut_revisions(&mut self) -> &mut super::revision::RepeatedRevision { + if self.revisions.is_none() { + self.revisions.set_default(); + } + self.revisions.as_mut().unwrap() } // Take field - pub fn take_data(&mut self) -> ::std::vec::Vec { - ::std::mem::replace(&mut self.data, ::std::vec::Vec::new()) + pub fn take_revisions(&mut self) -> super::revision::RepeatedRevision { + self.revisions.take().unwrap_or_else(|| super::revision::RepeatedRevision::new()) } // string id = 4; @@ -142,6 +149,11 @@ impl DocumentClientWSData { impl ::protobuf::Message for DocumentClientWSData { fn is_initialized(&self) -> bool { + for v in &self.revisions { + if !v.is_initialized() { + return false; + } + }; true } @@ -156,7 +168,7 @@ impl ::protobuf::Message for DocumentClientWSData { ::protobuf::rt::read_proto3_enum_with_unknown_fields_into(wire_type, is, &mut self.ty, 2, &mut self.unknown_fields)? }, 3 => { - ::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.data)?; + ::protobuf::rt::read_singular_message_into(wire_type, is, &mut self.revisions)?; }, 4 => { ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.id)?; @@ -179,8 +191,9 @@ impl ::protobuf::Message for DocumentClientWSData { if self.ty != DocumentClientWSDataType::ClientPushRev { my_size += ::protobuf::rt::enum_size(2, self.ty); } - if !self.data.is_empty() { - my_size += ::protobuf::rt::bytes_size(3, &self.data); + if let Some(ref v) = self.revisions.as_ref() { + let len = v.compute_size(); + my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len; } if !self.id.is_empty() { my_size += ::protobuf::rt::string_size(4, &self.id); @@ -197,8 +210,10 @@ impl ::protobuf::Message for DocumentClientWSData { if self.ty != DocumentClientWSDataType::ClientPushRev { os.write_enum(2, ::protobuf::ProtobufEnum::value(&self.ty))?; } - if !self.data.is_empty() { - os.write_bytes(3, &self.data)?; + if let Some(ref v) = self.revisions.as_ref() { + os.write_tag(3, ::protobuf::wire_format::WireTypeLengthDelimited)?; + os.write_raw_varint32(v.get_cached_size())?; + v.write_to_with_cached_sizes(os)?; } if !self.id.is_empty() { os.write_string(4, &self.id)?; @@ -251,10 +266,10 @@ impl ::protobuf::Message for DocumentClientWSData { |m: &DocumentClientWSData| { &m.ty }, |m: &mut DocumentClientWSData| { &mut m.ty }, )); - fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( - "data", - |m: &DocumentClientWSData| { &m.data }, - |m: &mut DocumentClientWSData| { &mut m.data }, + fields.push(::protobuf::reflect::accessor::make_singular_ptr_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage>( + "revisions", + |m: &DocumentClientWSData| { &m.revisions }, + |m: &mut DocumentClientWSData| { &mut m.revisions }, )); fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( "id", @@ -279,7 +294,7 @@ impl ::protobuf::Clear for DocumentClientWSData { fn clear(&mut self) { self.doc_id.clear(); self.ty = DocumentClientWSDataType::ClientPushRev; - self.data.clear(); + self.revisions.clear(); self.id.clear(); self.unknown_fields.clear(); } @@ -303,7 +318,6 @@ pub struct DocumentServerWSData { pub doc_id: ::std::string::String, pub ty: DocumentServerWSDataType, pub data: ::std::vec::Vec, - pub id: ::std::string::String, // special fields pub unknown_fields: ::protobuf::UnknownFields, pub cached_size: ::protobuf::CachedSize, @@ -386,32 +400,6 @@ impl DocumentServerWSData { pub fn take_data(&mut self) -> ::std::vec::Vec { ::std::mem::replace(&mut self.data, ::std::vec::Vec::new()) } - - // string id = 4; - - - pub fn get_id(&self) -> &str { - &self.id - } - pub fn clear_id(&mut self) { - self.id.clear(); - } - - // Param is passed by value, moved - pub fn set_id(&mut self, v: ::std::string::String) { - self.id = v; - } - - // Mutable pointer to the field. - // If field is not initialized, it is initialized with default value first. - pub fn mut_id(&mut self) -> &mut ::std::string::String { - &mut self.id - } - - // Take field - pub fn take_id(&mut self) -> ::std::string::String { - ::std::mem::replace(&mut self.id, ::std::string::String::new()) - } } impl ::protobuf::Message for DocumentServerWSData { @@ -432,9 +420,6 @@ impl ::protobuf::Message for DocumentServerWSData { 3 => { ::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.data)?; }, - 4 => { - ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.id)?; - }, _ => { ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; }, @@ -456,9 +441,6 @@ impl ::protobuf::Message for DocumentServerWSData { if !self.data.is_empty() { my_size += ::protobuf::rt::bytes_size(3, &self.data); } - if !self.id.is_empty() { - my_size += ::protobuf::rt::string_size(4, &self.id); - } my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); self.cached_size.set(my_size); my_size @@ -474,9 +456,6 @@ impl ::protobuf::Message for DocumentServerWSData { if !self.data.is_empty() { os.write_bytes(3, &self.data)?; } - if !self.id.is_empty() { - os.write_string(4, &self.id)?; - } os.write_unknown_fields(self.get_unknown_fields())?; ::std::result::Result::Ok(()) } @@ -530,11 +509,6 @@ impl ::protobuf::Message for DocumentServerWSData { |m: &DocumentServerWSData| { &m.data }, |m: &mut DocumentServerWSData| { &mut m.data }, )); - fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( - "id", - |m: &DocumentServerWSData| { &m.id }, - |m: &mut DocumentServerWSData| { &mut m.id }, - )); ::protobuf::reflect::MessageDescriptor::new_pb_name::( "DocumentServerWSData", fields, @@ -554,7 +528,6 @@ impl ::protobuf::Clear for DocumentServerWSData { self.doc_id.clear(); self.ty = DocumentServerWSDataType::ServerAck; self.data.clear(); - self.id.clear(); self.unknown_fields.clear(); } } @@ -918,66 +891,64 @@ impl ::protobuf::reflect::ProtobufValue for DocumentServerWSDataType { } static file_descriptor_proto_data: &'static [u8] = b"\ - \n\x08ws.proto\"|\n\x14DocumentClientWSData\x12\x15\n\x06doc_id\x18\x01\ - \x20\x01(\tR\x05docId\x12)\n\x02ty\x18\x02\x20\x01(\x0e2\x19.DocumentCli\ - entWSDataTypeR\x02ty\x12\x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data\x12\ - \x0e\n\x02id\x18\x04\x20\x01(\tR\x02id\"|\n\x14DocumentServerWSData\x12\ - \x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12)\n\x02ty\x18\x02\x20\ - \x01(\x0e2\x19.DocumentServerWSDataTypeR\x02ty\x12\x12\n\x04data\x18\x03\ - \x20\x01(\x0cR\x04data\x12\x0e\n\x02id\x18\x04\x20\x01(\tR\x02id\"f\n\ - \x0fNewDocumentUser\x12\x17\n\x07user_id\x18\x01\x20\x01(\tR\x06userId\ - \x12\x15\n\x06doc_id\x18\x02\x20\x01(\tR\x05docId\x12#\n\rrevision_data\ - \x18\x03\x20\x01(\x0cR\x0crevisionData*-\n\x18DocumentClientWSDataType\ - \x12\x11\n\rClientPushRev\x10\0*`\n\x18DocumentServerWSDataType\x12\r\n\ - \tServerAck\x10\0\x12\x11\n\rServerPushRev\x10\x01\x12\x11\n\rServerPull\ - Rev\x10\x02\x12\x0f\n\x0bUserConnect\x10\x03J\xb4\x07\n\x06\x12\x04\0\0\ - \x1b\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\ - \x07\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x1c\n\x0b\n\x04\x04\0\x02\0\ - \x12\x03\x03\x04\x16\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\ - \n\x05\x04\0\x02\0\x01\x12\x03\x03\x0b\x11\n\x0c\n\x05\x04\0\x02\0\x03\ - \x12\x03\x03\x14\x15\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04$\n\x0c\n\ - \x05\x04\0\x02\x01\x06\x12\x03\x04\x04\x1c\n\x0c\n\x05\x04\0\x02\x01\x01\ - \x12\x03\x04\x1d\x1f\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\"#\n\x0b\ - \n\x04\x04\0\x02\x02\x12\x03\x05\x04\x13\n\x0c\n\x05\x04\0\x02\x02\x05\ - \x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x0e\n\x0c\ - \n\x05\x04\0\x02\x02\x03\x12\x03\x05\x11\x12\n\x0b\n\x04\x04\0\x02\x03\ - \x12\x03\x06\x04\x12\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x06\x04\n\n\ - \x0c\n\x05\x04\0\x02\x03\x01\x12\x03\x06\x0b\r\n\x0c\n\x05\x04\0\x02\x03\ - \x03\x12\x03\x06\x10\x11\n\n\n\x02\x04\x01\x12\x04\x08\0\r\x01\n\n\n\x03\ - \x04\x01\x01\x12\x03\x08\x08\x1c\n\x0b\n\x04\x04\x01\x02\0\x12\x03\t\x04\ - \x16\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\x01\ - \x02\0\x01\x12\x03\t\x0b\x11\n\x0c\n\x05\x04\x01\x02\0\x03\x12\x03\t\x14\ - \x15\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\n\x04$\n\x0c\n\x05\x04\x01\x02\ - \x01\x06\x12\x03\n\x04\x1c\n\x0c\n\x05\x04\x01\x02\x01\x01\x12\x03\n\x1d\ - \x1f\n\x0c\n\x05\x04\x01\x02\x01\x03\x12\x03\n\"#\n\x0b\n\x04\x04\x01\ - \x02\x02\x12\x03\x0b\x04\x13\n\x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\x0b\ - \x04\t\n\x0c\n\x05\x04\x01\x02\x02\x01\x12\x03\x0b\n\x0e\n\x0c\n\x05\x04\ - \x01\x02\x02\x03\x12\x03\x0b\x11\x12\n\x0b\n\x04\x04\x01\x02\x03\x12\x03\ - \x0c\x04\x12\n\x0c\n\x05\x04\x01\x02\x03\x05\x12\x03\x0c\x04\n\n\x0c\n\ - \x05\x04\x01\x02\x03\x01\x12\x03\x0c\x0b\r\n\x0c\n\x05\x04\x01\x02\x03\ - \x03\x12\x03\x0c\x10\x11\n\n\n\x02\x04\x02\x12\x04\x0e\0\x12\x01\n\n\n\ - \x03\x04\x02\x01\x12\x03\x0e\x08\x17\n\x0b\n\x04\x04\x02\x02\0\x12\x03\ - \x0f\x04\x17\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0f\x04\n\n\x0c\n\x05\ - \x04\x02\x02\0\x01\x12\x03\x0f\x0b\x12\n\x0c\n\x05\x04\x02\x02\0\x03\x12\ - \x03\x0f\x15\x16\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\x10\x04\x16\n\x0c\n\ - \x05\x04\x02\x02\x01\x05\x12\x03\x10\x04\n\n\x0c\n\x05\x04\x02\x02\x01\ - \x01\x12\x03\x10\x0b\x11\n\x0c\n\x05\x04\x02\x02\x01\x03\x12\x03\x10\x14\ - \x15\n\x0b\n\x04\x04\x02\x02\x02\x12\x03\x11\x04\x1c\n\x0c\n\x05\x04\x02\ - \x02\x02\x05\x12\x03\x11\x04\t\n\x0c\n\x05\x04\x02\x02\x02\x01\x12\x03\ - \x11\n\x17\n\x0c\n\x05\x04\x02\x02\x02\x03\x12\x03\x11\x1a\x1b\n\n\n\x02\ - \x05\0\x12\x04\x13\0\x15\x01\n\n\n\x03\x05\0\x01\x12\x03\x13\x05\x1d\n\ - \x0b\n\x04\x05\0\x02\0\x12\x03\x14\x04\x16\n\x0c\n\x05\x05\0\x02\0\x01\ - \x12\x03\x14\x04\x11\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x14\x14\x15\n\n\ - \n\x02\x05\x01\x12\x04\x16\0\x1b\x01\n\n\n\x03\x05\x01\x01\x12\x03\x16\ - \x05\x1d\n\x0b\n\x04\x05\x01\x02\0\x12\x03\x17\x04\x12\n\x0c\n\x05\x05\ - \x01\x02\0\x01\x12\x03\x17\x04\r\n\x0c\n\x05\x05\x01\x02\0\x02\x12\x03\ - \x17\x10\x11\n\x0b\n\x04\x05\x01\x02\x01\x12\x03\x18\x04\x16\n\x0c\n\x05\ - \x05\x01\x02\x01\x01\x12\x03\x18\x04\x11\n\x0c\n\x05\x05\x01\x02\x01\x02\ - \x12\x03\x18\x14\x15\n\x0b\n\x04\x05\x01\x02\x02\x12\x03\x19\x04\x16\n\ - \x0c\n\x05\x05\x01\x02\x02\x01\x12\x03\x19\x04\x11\n\x0c\n\x05\x05\x01\ - \x02\x02\x02\x12\x03\x19\x14\x15\n\x0b\n\x04\x05\x01\x02\x03\x12\x03\x1a\ - \x04\x14\n\x0c\n\x05\x05\x01\x02\x03\x01\x12\x03\x1a\x04\x0f\n\x0c\n\x05\ - \x05\x01\x02\x03\x02\x12\x03\x1a\x12\x13b\x06proto3\ + \n\x08ws.proto\x1a\x0erevision.proto\"\x99\x01\n\x14DocumentClientWSData\ + \x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12)\n\x02ty\x18\x02\ + \x20\x01(\x0e2\x19.DocumentClientWSDataTypeR\x02ty\x12/\n\trevisions\x18\ + \x03\x20\x01(\x0b2\x11.RepeatedRevisionR\trevisions\x12\x0e\n\x02id\x18\ + \x04\x20\x01(\tR\x02id\"l\n\x14DocumentServerWSData\x12\x15\n\x06doc_id\ + \x18\x01\x20\x01(\tR\x05docId\x12)\n\x02ty\x18\x02\x20\x01(\x0e2\x19.Doc\ + umentServerWSDataTypeR\x02ty\x12\x12\n\x04data\x18\x03\x20\x01(\x0cR\x04\ + data\"f\n\x0fNewDocumentUser\x12\x17\n\x07user_id\x18\x01\x20\x01(\tR\ + \x06userId\x12\x15\n\x06doc_id\x18\x02\x20\x01(\tR\x05docId\x12#\n\rrevi\ + sion_data\x18\x03\x20\x01(\x0cR\x0crevisionData*-\n\x18DocumentClientWSD\ + ataType\x12\x11\n\rClientPushRev\x10\0*`\n\x18DocumentServerWSDataType\ + \x12\r\n\tServerAck\x10\0\x12\x11\n\rServerPushRev\x10\x01\x12\x11\n\rSe\ + rverPullRev\x10\x02\x12\x0f\n\x0bUserConnect\x10\x03J\x88\x07\n\x06\x12\ + \x04\0\0\x1b\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\t\n\x02\x03\0\x12\x03\ + \x01\0\x18\n\n\n\x02\x04\0\x12\x04\x03\0\x08\x01\n\n\n\x03\x04\0\x01\x12\ + \x03\x03\x08\x1c\n\x0b\n\x04\x04\0\x02\0\x12\x03\x04\x04\x16\n\x0c\n\x05\ + \x04\0\x02\0\x05\x12\x03\x04\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\ + \x04\x0b\x11\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x04\x14\x15\n\x0b\n\x04\ + \x04\0\x02\x01\x12\x03\x05\x04$\n\x0c\n\x05\x04\0\x02\x01\x06\x12\x03\ + \x05\x04\x1c\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x05\x1d\x1f\n\x0c\n\ + \x05\x04\0\x02\x01\x03\x12\x03\x05\"#\n\x0b\n\x04\x04\0\x02\x02\x12\x03\ + \x06\x04#\n\x0c\n\x05\x04\0\x02\x02\x06\x12\x03\x06\x04\x14\n\x0c\n\x05\ + \x04\0\x02\x02\x01\x12\x03\x06\x15\x1e\n\x0c\n\x05\x04\0\x02\x02\x03\x12\ + \x03\x06!\"\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x07\x04\x12\n\x0c\n\x05\ + \x04\0\x02\x03\x05\x12\x03\x07\x04\n\n\x0c\n\x05\x04\0\x02\x03\x01\x12\ + \x03\x07\x0b\r\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\x07\x10\x11\n\n\n\ + \x02\x04\x01\x12\x04\t\0\r\x01\n\n\n\x03\x04\x01\x01\x12\x03\t\x08\x1c\n\ + \x0b\n\x04\x04\x01\x02\0\x12\x03\n\x04\x16\n\x0c\n\x05\x04\x01\x02\0\x05\ + \x12\x03\n\x04\n\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\n\x0b\x11\n\x0c\n\ + \x05\x04\x01\x02\0\x03\x12\x03\n\x14\x15\n\x0b\n\x04\x04\x01\x02\x01\x12\ + \x03\x0b\x04$\n\x0c\n\x05\x04\x01\x02\x01\x06\x12\x03\x0b\x04\x1c\n\x0c\ + \n\x05\x04\x01\x02\x01\x01\x12\x03\x0b\x1d\x1f\n\x0c\n\x05\x04\x01\x02\ + \x01\x03\x12\x03\x0b\"#\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\x0c\x04\x13\ + \n\x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\x0c\x04\t\n\x0c\n\x05\x04\x01\ + \x02\x02\x01\x12\x03\x0c\n\x0e\n\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\ + \x0c\x11\x12\n\n\n\x02\x04\x02\x12\x04\x0e\0\x12\x01\n\n\n\x03\x04\x02\ + \x01\x12\x03\x0e\x08\x17\n\x0b\n\x04\x04\x02\x02\0\x12\x03\x0f\x04\x17\n\ + \x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0f\x04\n\n\x0c\n\x05\x04\x02\x02\0\ + \x01\x12\x03\x0f\x0b\x12\n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03\x0f\x15\ + \x16\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\x10\x04\x16\n\x0c\n\x05\x04\x02\ + \x02\x01\x05\x12\x03\x10\x04\n\n\x0c\n\x05\x04\x02\x02\x01\x01\x12\x03\ + \x10\x0b\x11\n\x0c\n\x05\x04\x02\x02\x01\x03\x12\x03\x10\x14\x15\n\x0b\n\ + \x04\x04\x02\x02\x02\x12\x03\x11\x04\x1c\n\x0c\n\x05\x04\x02\x02\x02\x05\ + \x12\x03\x11\x04\t\n\x0c\n\x05\x04\x02\x02\x02\x01\x12\x03\x11\n\x17\n\ + \x0c\n\x05\x04\x02\x02\x02\x03\x12\x03\x11\x1a\x1b\n\n\n\x02\x05\0\x12\ + \x04\x13\0\x15\x01\n\n\n\x03\x05\0\x01\x12\x03\x13\x05\x1d\n\x0b\n\x04\ + \x05\0\x02\0\x12\x03\x14\x04\x16\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x14\ + \x04\x11\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x14\x14\x15\n\n\n\x02\x05\ + \x01\x12\x04\x16\0\x1b\x01\n\n\n\x03\x05\x01\x01\x12\x03\x16\x05\x1d\n\ + \x0b\n\x04\x05\x01\x02\0\x12\x03\x17\x04\x12\n\x0c\n\x05\x05\x01\x02\0\ + \x01\x12\x03\x17\x04\r\n\x0c\n\x05\x05\x01\x02\0\x02\x12\x03\x17\x10\x11\ + \n\x0b\n\x04\x05\x01\x02\x01\x12\x03\x18\x04\x16\n\x0c\n\x05\x05\x01\x02\ + \x01\x01\x12\x03\x18\x04\x11\n\x0c\n\x05\x05\x01\x02\x01\x02\x12\x03\x18\ + \x14\x15\n\x0b\n\x04\x05\x01\x02\x02\x12\x03\x19\x04\x16\n\x0c\n\x05\x05\ + \x01\x02\x02\x01\x12\x03\x19\x04\x11\n\x0c\n\x05\x05\x01\x02\x02\x02\x12\ + \x03\x19\x14\x15\n\x0b\n\x04\x05\x01\x02\x03\x12\x03\x1a\x04\x14\n\x0c\n\ + \x05\x05\x01\x02\x03\x01\x12\x03\x1a\x04\x0f\n\x0c\n\x05\x05\x01\x02\x03\ + \x02\x12\x03\x1a\x12\x13b\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/shared-lib/flowy-collaboration/src/protobuf/proto/doc.proto b/shared-lib/flowy-collaboration/src/protobuf/proto/doc.proto index d20903a5f9..ae6be50ee8 100644 --- a/shared-lib/flowy-collaboration/src/protobuf/proto/doc.proto +++ b/shared-lib/flowy-collaboration/src/protobuf/proto/doc.proto @@ -6,7 +6,7 @@ message CreateDocParams { RepeatedRevision revisions = 2; } message DocumentInfo { - string id = 1; + string doc_id = 1; string text = 2; int64 rev_id = 3; int64 base_rev_id = 4; diff --git a/shared-lib/flowy-collaboration/src/protobuf/proto/ws.proto b/shared-lib/flowy-collaboration/src/protobuf/proto/ws.proto index 5919fe2868..e638ecd504 100644 --- a/shared-lib/flowy-collaboration/src/protobuf/proto/ws.proto +++ b/shared-lib/flowy-collaboration/src/protobuf/proto/ws.proto @@ -1,16 +1,16 @@ syntax = "proto3"; +import "revision.proto"; message DocumentClientWSData { string doc_id = 1; DocumentClientWSDataType ty = 2; - bytes data = 3; + RepeatedRevision revisions = 3; string id = 4; } message DocumentServerWSData { string doc_id = 1; DocumentServerWSDataType ty = 2; bytes data = 3; - string id = 4; } message NewDocumentUser { string user_id = 1; diff --git a/shared-lib/flowy-collaboration/src/sync/server.rs b/shared-lib/flowy-collaboration/src/sync/server.rs index 3f2415a27d..98cd8cf26e 100644 --- a/shared-lib/flowy-collaboration/src/sync/server.rs +++ b/shared-lib/flowy-collaboration/src/sync/server.rs @@ -1,15 +1,20 @@ use crate::{ document::Document, - entities::{doc::DocumentInfo, revision::Revision}, + entities::{ + doc::DocumentInfo, + revision::{RepeatedRevision, Revision}, + ws::DocumentServerWSDataBuilder, + }, errors::{internal_error, CollaborateError, CollaborateResult}, - sync::{RevisionSynchronizer, RevisionUser}, + protobuf::DocumentClientWSData, + sync::{RevisionSynchronizer, RevisionUser, SyncResponse}, }; use async_stream::stream; use dashmap::DashMap; use futures::stream::StreamExt; use lib_infra::future::FutureResultSend; use lib_ot::rich_text::RichTextDelta; -use std::{fmt::Debug, sync::Arc}; +use std::{convert::TryFrom, fmt::Debug, sync::Arc}; use tokio::{ sync::{mpsc, oneshot}, task::spawn_blocking, @@ -17,8 +22,9 @@ use tokio::{ pub trait DocumentPersistence: Send + Sync + Debug { fn read_doc(&self, doc_id: &str) -> FutureResultSend; - fn create_doc(&self, revision: Revision) -> FutureResultSend; + fn create_doc(&self, doc_id: &str, revisions: Vec) -> FutureResultSend; fn get_revisions(&self, doc_id: &str, rev_ids: Vec) -> FutureResultSend, CollaborateError>; + fn get_doc_revisions(&self, doc_id: &str) -> FutureResultSend, CollaborateError>; } pub struct ServerDocumentManager { @@ -37,22 +43,43 @@ impl ServerDocumentManager { pub async fn apply_revisions( &self, user: Arc, - revisions: Vec, + mut client_data: DocumentClientWSData, ) -> Result<(), CollaborateError> { - if revisions.is_empty() { - return Ok(()); - } - let revision = revisions.first().unwrap(); - let handler = match self.get_document_handler(&revision.doc_id).await { + let mut pb = client_data.take_revisions(); + let cloned_user = user.clone(); + let ack_id = client_data.id.clone().parse::().map_err(|e| { + CollaborateError::internal().context(format!("Parse rev_id from {} failed. {}", &client_data.id, e)) + })?; + let doc_id = client_data.doc_id; + + let revisions = spawn_blocking(move || { + let repeated_revision = RepeatedRevision::try_from(&mut pb)?; + let revisions = repeated_revision.into_inner(); + Ok::, CollaborateError>(revisions) + }) + .await + .map_err(internal_error)??; + + let result = match self.get_document_handler(&doc_id).await { None => { - // Create the document if it doesn't exist - self.create_document(revision.clone()).await.map_err(internal_error)? + let _ = self.create_document(&doc_id, revisions).await.map_err(internal_error)?; + Ok(()) + }, + Some(handler) => { + let _ = handler + .apply_revisions(doc_id.clone(), user, revisions) + .await + .map_err(internal_error)?; + Ok(()) }, - Some(handler) => handler, }; - handler.apply_revisions(user, revisions).await.map_err(internal_error)?; - Ok(()) + if result.is_ok() { + cloned_user.receive(SyncResponse::Ack(DocumentServerWSDataBuilder::build_ack_message( + &doc_id, ack_id, + ))); + } + result } async fn get_document_handler(&self, doc_id: &str) -> Option> { @@ -75,14 +102,18 @@ impl ServerDocumentManager { } } - async fn create_document(&self, revision: Revision) -> Result, CollaborateError> { - let doc = self.persistence.create_doc(revision).await?; + async fn create_document( + &self, + doc_id: &str, + revisions: Vec, + ) -> Result, CollaborateError> { + let doc = self.persistence.create_doc(doc_id, revisions).await?; let handler = self.cache_document(doc).await?; Ok(handler) } async fn cache_document(&self, doc: DocumentInfo) -> Result, CollaborateError> { - let doc_id = doc.id.clone(); + let doc_id = doc.doc_id.clone(); let persistence = self.persistence.clone(); let handle = spawn_blocking(|| OpenDocHandle::new(doc, persistence)) .await @@ -114,6 +145,7 @@ impl OpenDocHandle { async fn apply_revisions( &self, + doc_id: String, user: Arc, revisions: Vec, ) -> Result<(), CollaborateError> { @@ -121,6 +153,7 @@ impl OpenDocHandle { let persistence = self.persistence.clone(); self.users.insert(user.user_id(), user.clone()); let msg = DocumentCommand::ApplyRevisions { + doc_id, user, revisions, persistence, @@ -130,12 +163,6 @@ impl OpenDocHandle { Ok(()) } - pub async fn document_json(&self) -> CollaborateResult { - let (ret, rx) = oneshot::channel(); - let msg = DocumentCommand::GetDocumentJson { ret }; - self.send(msg, rx).await? - } - async fn send(&self, msg: DocumentCommand, rx: oneshot::Receiver) -> CollaborateResult { let _ = self.sender.send(msg).await.map_err(internal_error)?; let result = rx.await.map_err(internal_error)?; @@ -146,14 +173,12 @@ impl OpenDocHandle { #[derive(Debug)] enum DocumentCommand { ApplyRevisions { + doc_id: String, user: Arc, revisions: Vec, persistence: Arc, ret: oneshot::Sender>, }, - GetDocumentJson { - ret: oneshot::Sender>, - }, } struct DocumentCommandQueue { @@ -166,13 +191,13 @@ impl DocumentCommandQueue { fn new(receiver: mpsc::Receiver, doc: DocumentInfo) -> Result { let delta = RichTextDelta::from_bytes(&doc.text)?; let synchronizer = Arc::new(RevisionSynchronizer::new( - &doc.id, + &doc.doc_id, doc.rev_id, Document::from_delta(delta), )); Ok(Self { - doc_id: doc.id, + doc_id: doc.doc_id, receiver: Some(receiver), synchronizer, }) @@ -198,24 +223,18 @@ impl DocumentCommandQueue { async fn handle_message(&self, msg: DocumentCommand) { match msg { DocumentCommand::ApplyRevisions { + doc_id, user, revisions, persistence, ret, } => { self.synchronizer - .apply_revisions(user, revisions, persistence) + .apply_revisions(doc_id, user, revisions, persistence) .await .unwrap(); let _ = ret.send(Ok(())); }, - DocumentCommand::GetDocumentJson { ret } => { - let synchronizer = self.synchronizer.clone(); - let json = spawn_blocking(move || synchronizer.doc_json()) - .await - .map_err(internal_error); - let _ = ret.send(json); - }, } } } diff --git a/shared-lib/flowy-collaboration/src/sync/synchronizer.rs b/shared-lib/flowy-collaboration/src/sync/synchronizer.rs index fdba534d04..8199badae9 100644 --- a/shared-lib/flowy-collaboration/src/sync/synchronizer.rs +++ b/shared-lib/flowy-collaboration/src/sync/synchronizer.rs @@ -6,6 +6,7 @@ use crate::{ }, sync::DocumentPersistence, }; +use futures::TryFutureExt; use lib_ot::{core::OperationTransformable, errors::OTError, rich_text::RichTextDelta}; use parking_lot::RwLock; use std::{ @@ -49,12 +50,19 @@ impl RevisionSynchronizer { #[tracing::instrument(level = "debug", skip(self, user, revisions, persistence), err)] pub async fn apply_revisions( &self, + doc_id: String, user: Arc, revisions: Vec, persistence: Arc, ) -> Result<(), OTError> { if revisions.is_empty() { - tracing::warn!("Receive empty revisions"); + // Return all the revisions to client + let revisions = persistence + .get_doc_revisions(&doc_id) + .map_err(|e| OTError::internal().context(e)) + .await?; + let data = DocumentServerWSDataBuilder::build_push_message(&doc_id, revisions); + user.receive(SyncResponse::Push(data)); return Ok(()); } @@ -83,8 +91,7 @@ impl RevisionSynchronizer { start: server_rev_id, end: first_revision.rev_id, }; - let msg = - DocumentServerWSDataBuilder::build_pull_message(&self.doc_id, range, first_revision.rev_id); + let msg = DocumentServerWSDataBuilder::build_pull_message(&self.doc_id, range); user.receive(SyncResponse::Pull(msg)); } }, @@ -96,7 +103,6 @@ impl RevisionSynchronizer { // The client document is outdated. Transform the client revision delta and then // send the prime delta to the client. Client should compose the this prime // delta. - let id = first_revision.rev_id.to_string(); let from_rev_id = first_revision.rev_id; let to_rev_id = server_base_rev_id; let rev_ids: Vec = (from_rev_id..=to_rev_id).collect(); @@ -111,15 +117,10 @@ impl RevisionSynchronizer { }, }; - let data = DocumentServerWSDataBuilder::build_push_message(&self.doc_id, revisions, &id); + let data = DocumentServerWSDataBuilder::build_push_message(&self.doc_id, revisions); user.receive(SyncResponse::Push(data)); }, } - - user.receive(SyncResponse::Ack(DocumentServerWSDataBuilder::build_ack_message( - &first_revision.doc_id, - &first_revision.rev_id.to_string(), - ))); Ok(()) } diff --git a/shared-lib/lib-ws/src/connect.rs b/shared-lib/lib-ws/src/connect.rs index b3fb181d37..5623d0d3f0 100644 --- a/shared-lib/lib-ws/src/connect.rs +++ b/shared-lib/lib-ws/src/connect.rs @@ -137,7 +137,7 @@ impl WSStream { } impl fmt::Debug for WSStream { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WsStream").finish() } + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WSStream").finish() } } impl Future for WSStream {