From 4a46bf3fa30fa4caed54b2d748ea7c15d1d797e7 Mon Sep 17 00:00:00 2001 From: appflowy Date: Thu, 16 Dec 2021 21:31:36 +0800 Subject: [PATCH] refcator web socket message sent logic --- backend/src/services/doc/ws_actor.rs | 10 +- .../protobuf/flowy-collaboration/ws.pb.dart | 84 ++++- .../flowy-collaboration/ws.pbenum.dart | 20 +- .../flowy-collaboration/ws.pbjson.dart | 28 +- frontend/rust-lib/flowy-document/Cargo.toml | 2 +- .../rust-lib/flowy-document/src/module.rs | 4 +- .../src/services/doc/controller.rs | 43 +-- .../src/services/doc/edit/editor.rs | 289 +++++++-------- .../src/services/doc/edit/mod.rs | 7 +- .../src/services/doc/edit/model.rs | 1 + .../src/services/doc/edit/queue.rs | 207 ----------- .../flowy-document/src/services/doc/mod.rs | 6 +- .../src/services/doc/revision/cache/cache.rs | 12 +- .../src/services/doc/revision/manager.rs | 22 +- .../src/services/doc/revision/mod.rs | 2 - .../src/services/doc/revision/sync.rs | 200 ---------- .../src/services/doc/ws_manager.rs | 67 ---- frontend/rust-lib/flowy-net/Cargo.toml | 2 +- .../flowy-net/src/services/mock/ws_mock.rs | 16 +- .../rust-lib/flowy-net/src/services/mod.rs | 2 +- .../flowy-net/src/services/ws/conn.rs | 4 +- .../flowy-net/src/services/ws/manager.rs | 10 +- .../rust-lib/flowy-net/src/services/ws/mod.rs | 4 +- .../flowy-net/src/services/ws/ws_local.rs | 4 +- .../src/deps_resolve/document_deps.rs | 26 +- frontend/rust-lib/flowy-test/Cargo.toml | 2 +- .../rust-lib/flowy-test/src/doc_script.rs | 9 +- .../src/core/sync/synchronizer.rs | 8 +- .../flowy-collaboration/src/entities/ws/ws.rs | 44 ++- .../src/protobuf/model/ws.rs | 350 +++++++++++++++--- .../src/protobuf/proto/ws.proto | 11 +- .../src/derive_cache/derive_cache.rs | 3 +- shared-lib/lib-ot/src/revision/model.rs | 9 +- shared-lib/lib-ws/src/ws.rs | 6 +- 34 files changed, 676 insertions(+), 838 deletions(-) delete mode 100644 frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs delete mode 100644 frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs delete mode 100644 frontend/rust-lib/flowy-document/src/services/doc/ws_manager.rs diff --git a/backend/src/services/doc/ws_actor.rs b/backend/src/services/doc/ws_actor.rs index df93edaba5..10d5c3984f 100644 --- a/backend/src/services/doc/ws_actor.rs +++ b/backend/src/services/doc/ws_actor.rs @@ -11,7 +11,7 @@ use async_stream::stream; use backend_service::errors::{internal_error, Result, ServerError}; use flowy_collaboration::{ core::sync::ServerDocManager, - protobuf::{WsDataType, WsDocumentData}, + protobuf::{WsDocumentData, WsDocumentDataType}, }; use futures::stream::StreamExt; use lib_ot::protobuf::Revision; @@ -78,10 +78,10 @@ impl DocWsActor { let data = document_data.data; match document_data.ty { - WsDataType::Acked => Ok(()), - WsDataType::PushRev => self.apply_pushed_rev(user, socket, data, pool).await, - WsDataType::PullRev => Ok(()), - WsDataType::Conflict => Ok(()), + WsDocumentDataType::Acked => Ok(()), + WsDocumentDataType::PushRev => self.apply_pushed_rev(user, socket, data, pool).await, + WsDocumentDataType::PullRev => Ok(()), + WsDocumentDataType::UserConnect => Ok(()), } } diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pb.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pb.dart index e3192816ff..df1ef65a13 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pb.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pb.dart @@ -7,6 +7,7 @@ import 'dart:core' as $core; +import 'package:fixnum/fixnum.dart' as $fixnum; import 'package:protobuf/protobuf.dart' as $pb; import 'ws.pbenum.dart'; @@ -16,7 +17,7 @@ export 'ws.pbenum.dart'; class WsDocumentData extends $pb.GeneratedMessage { static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WsDocumentData', createEmptyInstance: create) ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId') - ..e(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: WsDataType.Acked, valueOf: WsDataType.valueOf, enumValues: WsDataType.values) + ..e(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: WsDocumentDataType.Acked, valueOf: WsDocumentDataType.valueOf, enumValues: WsDocumentDataType.values) ..a<$core.List<$core.int>>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY) ..hasRequiredFields = false ; @@ -24,7 +25,7 @@ class WsDocumentData extends $pb.GeneratedMessage { WsDocumentData._() : super(); factory WsDocumentData({ $core.String? docId, - WsDataType? ty, + WsDocumentDataType? ty, $core.List<$core.int>? data, }) { final _result = create(); @@ -70,9 +71,9 @@ class WsDocumentData extends $pb.GeneratedMessage { void clearDocId() => clearField(1); @$pb.TagNumber(2) - WsDataType get ty => $_getN(1); + WsDocumentDataType get ty => $_getN(1); @$pb.TagNumber(2) - set ty(WsDataType v) { setField(2, v); } + set ty(WsDocumentDataType v) { setField(2, v); } @$pb.TagNumber(2) $core.bool hasTy() => $_has(1); @$pb.TagNumber(2) @@ -88,3 +89,78 @@ class WsDocumentData extends $pb.GeneratedMessage { void clearData() => clearField(3); } +class DocumentConnected extends $pb.GeneratedMessage { + static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'DocumentConnected', createEmptyInstance: create) + ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'userId') + ..aOS(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId') + ..aInt64(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'revId') + ..hasRequiredFields = false + ; + + DocumentConnected._() : super(); + factory DocumentConnected({ + $core.String? userId, + $core.String? docId, + $fixnum.Int64? revId, + }) { + final _result = create(); + if (userId != null) { + _result.userId = userId; + } + if (docId != null) { + _result.docId = docId; + } + if (revId != null) { + _result.revId = revId; + } + return _result; + } + factory DocumentConnected.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); + factory DocumentConnected.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); + @$core.Deprecated( + 'Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' + 'Will be removed in next major version') + DocumentConnected clone() => DocumentConnected()..mergeFromMessage(this); + @$core.Deprecated( + 'Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' + 'Will be removed in next major version') + DocumentConnected copyWith(void Function(DocumentConnected) updates) => super.copyWith((message) => updates(message as DocumentConnected)) as DocumentConnected; // ignore: deprecated_member_use + $pb.BuilderInfo get info_ => _i; + @$core.pragma('dart2js:noInline') + static DocumentConnected create() => DocumentConnected._(); + DocumentConnected createEmptyInstance() => create(); + static $pb.PbList createRepeated() => $pb.PbList(); + @$core.pragma('dart2js:noInline') + static DocumentConnected getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); + static DocumentConnected? _defaultInstance; + + @$pb.TagNumber(1) + $core.String get userId => $_getSZ(0); + @$pb.TagNumber(1) + set userId($core.String v) { $_setString(0, v); } + @$pb.TagNumber(1) + $core.bool hasUserId() => $_has(0); + @$pb.TagNumber(1) + void clearUserId() => clearField(1); + + @$pb.TagNumber(2) + $core.String get docId => $_getSZ(1); + @$pb.TagNumber(2) + set docId($core.String v) { $_setString(1, v); } + @$pb.TagNumber(2) + $core.bool hasDocId() => $_has(1); + @$pb.TagNumber(2) + void clearDocId() => clearField(2); + + @$pb.TagNumber(3) + $fixnum.Int64 get revId => $_getI64(2); + @$pb.TagNumber(3) + set revId($fixnum.Int64 v) { $_setInt64(2, v); } + @$pb.TagNumber(3) + $core.bool hasRevId() => $_has(2); + @$pb.TagNumber(3) + void clearRevId() => clearField(3); +} + diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbenum.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbenum.dart index 31f54d22ea..b11c8b7fa1 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbenum.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbenum.dart @@ -9,22 +9,22 @@ import 'dart:core' as $core; import 'package:protobuf/protobuf.dart' as $pb; -class WsDataType extends $pb.ProtobufEnum { - static const WsDataType Acked = WsDataType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Acked'); - static const WsDataType PushRev = WsDataType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PushRev'); - static const WsDataType PullRev = WsDataType._(2, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PullRev'); - static const WsDataType Conflict = WsDataType._(3, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Conflict'); +class WsDocumentDataType extends $pb.ProtobufEnum { + static const WsDocumentDataType Acked = WsDocumentDataType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Acked'); + static const WsDocumentDataType PushRev = WsDocumentDataType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PushRev'); + static const WsDocumentDataType PullRev = WsDocumentDataType._(2, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PullRev'); + static const WsDocumentDataType UserConnect = WsDocumentDataType._(3, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UserConnect'); - static const $core.List values = [ + static const $core.List values = [ Acked, PushRev, PullRev, - Conflict, + UserConnect, ]; - static final $core.Map<$core.int, WsDataType> _byValue = $pb.ProtobufEnum.initByValue(values); - static WsDataType? valueOf($core.int value) => _byValue[value]; + static final $core.Map<$core.int, WsDocumentDataType> _byValue = $pb.ProtobufEnum.initByValue(values); + static WsDocumentDataType? valueOf($core.int value) => _byValue[value]; - const WsDataType._($core.int v, $core.String n) : super(v, n); + const WsDocumentDataType._($core.int v, $core.String n) : super(v, n); } diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbjson.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbjson.dart index 50bc4f4d5b..9917157067 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbjson.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbjson.dart @@ -8,28 +8,40 @@ import 'dart:core' as $core; import 'dart:convert' as $convert; import 'dart:typed_data' as $typed_data; -@$core.Deprecated('Use wsDataTypeDescriptor instead') -const WsDataType$json = const { - '1': 'WsDataType', +@$core.Deprecated('Use wsDocumentDataTypeDescriptor instead') +const WsDocumentDataType$json = const { + '1': 'WsDocumentDataType', '2': const [ const {'1': 'Acked', '2': 0}, const {'1': 'PushRev', '2': 1}, const {'1': 'PullRev', '2': 2}, - const {'1': 'Conflict', '2': 3}, + const {'1': 'UserConnect', '2': 3}, ], }; -/// Descriptor for `WsDataType`. Decode as a `google.protobuf.EnumDescriptorProto`. -final $typed_data.Uint8List wsDataTypeDescriptor = $convert.base64Decode('CgpXc0RhdGFUeXBlEgkKBUFja2VkEAASCwoHUHVzaFJldhABEgsKB1B1bGxSZXYQAhIMCghDb25mbGljdBAD'); +/// Descriptor for `WsDocumentDataType`. Decode as a `google.protobuf.EnumDescriptorProto`. +final $typed_data.Uint8List wsDocumentDataTypeDescriptor = $convert.base64Decode('ChJXc0RvY3VtZW50RGF0YVR5cGUSCQoFQWNrZWQQABILCgdQdXNoUmV2EAESCwoHUHVsbFJldhACEg8KC1VzZXJDb25uZWN0EAM='); @$core.Deprecated('Use wsDocumentDataDescriptor instead') const WsDocumentData$json = const { '1': 'WsDocumentData', '2': const [ const {'1': 'doc_id', '3': 1, '4': 1, '5': 9, '10': 'docId'}, - const {'1': 'ty', '3': 2, '4': 1, '5': 14, '6': '.WsDataType', '10': 'ty'}, + const {'1': 'ty', '3': 2, '4': 1, '5': 14, '6': '.WsDocumentDataType', '10': 'ty'}, const {'1': 'data', '3': 3, '4': 1, '5': 12, '10': 'data'}, ], }; /// Descriptor for `WsDocumentData`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List wsDocumentDataDescriptor = $convert.base64Decode('Cg5Xc0RvY3VtZW50RGF0YRIVCgZkb2NfaWQYASABKAlSBWRvY0lkEhsKAnR5GAIgASgOMgsuV3NEYXRhVHlwZVICdHkSEgoEZGF0YRgDIAEoDFIEZGF0YQ=='); +final $typed_data.Uint8List wsDocumentDataDescriptor = $convert.base64Decode('Cg5Xc0RvY3VtZW50RGF0YRIVCgZkb2NfaWQYASABKAlSBWRvY0lkEiMKAnR5GAIgASgOMhMuV3NEb2N1bWVudERhdGFUeXBlUgJ0eRISCgRkYXRhGAMgASgMUgRkYXRh'); +@$core.Deprecated('Use documentConnectedDescriptor instead') +const DocumentConnected$json = const { + '1': 'DocumentConnected', + '2': const [ + const {'1': 'user_id', '3': 1, '4': 1, '5': 9, '10': 'userId'}, + const {'1': 'doc_id', '3': 2, '4': 1, '5': 9, '10': 'docId'}, + const {'1': 'rev_id', '3': 3, '4': 1, '5': 3, '10': 'revId'}, + ], +}; + +/// Descriptor for `DocumentConnected`. Decode as a `google.protobuf.DescriptorProto`. +final $typed_data.Uint8List documentConnectedDescriptor = $convert.base64Decode('ChFEb2N1bWVudENvbm5lY3RlZBIXCgd1c2VyX2lkGAEgASgJUgZ1c2VySWQSFQoGZG9jX2lkGAIgASgJUgVkb2NJZBIVCgZyZXZfaWQYAyABKANSBXJldklk'); diff --git a/frontend/rust-lib/flowy-document/Cargo.toml b/frontend/rust-lib/flowy-document/Cargo.toml index bd5c8763d4..4760e93343 100644 --- a/frontend/rust-lib/flowy-document/Cargo.toml +++ b/frontend/rust-lib/flowy-document/Cargo.toml @@ -48,7 +48,7 @@ pin-project = "1.0.0" [dev-dependencies] flowy-test = { path = "../flowy-test" } flowy-document = { path = "../flowy-document", features = ["flowy_unit_test"]} -flowy-net = { path = "../flowy-net", features = ["ws_mock"] } +flowy-net = { path = "../flowy-net", features = ["flowy_unit_test"] } color-eyre = { version = "0.5", default-features = false } criterion = "0.3" rand = "0.7.3" diff --git a/frontend/rust-lib/flowy-document/src/module.rs b/frontend/rust-lib/flowy-document/src/module.rs index 4144f3dcb0..865f09111a 100644 --- a/frontend/rust-lib/flowy-document/src/module.rs +++ b/frontend/rust-lib/flowy-document/src/module.rs @@ -1,7 +1,7 @@ use crate::{ errors::FlowyError, services::{ - doc::{controller::DocController, edit::ClientDocEditor, WsDocumentManager}, + doc::{controller::DocController, edit::ClientDocEditor, DocumentWsHandlers}, server::construct_doc_server, }, }; @@ -25,7 +25,7 @@ pub struct FlowyDocument { impl FlowyDocument { pub fn new( user: Arc, - ws_manager: Arc, + ws_manager: Arc, server_config: &ClientServerConfiguration, ) -> FlowyDocument { let server = construct_doc_server(server_config); diff --git a/frontend/rust-lib/flowy-document/src/services/doc/controller.rs b/frontend/rust-lib/flowy-document/src/services/doc/controller.rs index 5f2886778c..12813a1daf 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/controller.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/controller.rs @@ -3,9 +3,9 @@ use crate::{ module::DocumentUser, services::{ doc::{ - edit::{ClientDocEditor, EditDocWsHandler}, + edit::ClientDocEditor, revision::{RevisionCache, RevisionManager, RevisionServer}, - WsDocumentManager, + DocumentWsHandlers, }, server::Server, }, @@ -20,24 +20,19 @@ use std::sync::Arc; pub(crate) struct DocController { server: Server, - ws_manager: Arc, + ws_handlers: Arc, open_cache: Arc, user: Arc, } impl DocController { - pub(crate) fn new(server: Server, user: Arc, ws: Arc) -> Self { + pub(crate) fn new(server: Server, user: Arc, ws_handlers: Arc) -> Self { let open_cache = Arc::new(OpenDocCache::new()); - Self { - server, - user, - ws_manager: ws, - open_cache, - } + Self { server, ws_handlers, open_cache, user } } pub(crate) fn init(&self) -> FlowyResult<()> { - self.ws_manager.init(); + self.ws_handlers.init(); Ok(()) } @@ -58,7 +53,7 @@ impl DocController { pub(crate) fn close(&self, doc_id: &str) -> Result<(), FlowyError> { tracing::debug!("Close doc {}", doc_id); self.open_cache.remove(doc_id); - self.ws_manager.remove_handler(doc_id); + self.ws_handlers.remove_handler(doc_id); Ok(()) } @@ -66,7 +61,7 @@ impl DocController { pub(crate) fn delete(&self, params: DocIdentifier) -> Result<(), FlowyError> { let doc_id = ¶ms.doc_id; self.open_cache.remove(doc_id); - self.ws_manager.remove_handler(doc_id); + self.ws_handlers.remove_handler(doc_id); Ok(()) } @@ -99,18 +94,17 @@ impl DocController { ) -> Result, FlowyError> { let user = self.user.clone(); let rev_manager = self.make_rev_manager(doc_id, pool.clone())?; - let edit_ctx = ClientDocEditor::new(doc_id, user, pool, rev_manager, self.ws_manager.ws()).await?; - let ws_handler = Arc::new(EditDocWsHandler(edit_ctx.clone())); - self.ws_manager.register_handler(doc_id, ws_handler); - self.open_cache.set(edit_ctx.clone()); - Ok(edit_ctx) + let doc_editor = ClientDocEditor::new(doc_id, user, pool, rev_manager, self.ws_handlers.ws()).await?; + let ws_handler = doc_editor.ws_handler(); + self.ws_handlers.register_handler(doc_id, ws_handler); + self.open_cache.insert(&doc_id, &doc_editor); + Ok(doc_editor) } fn make_rev_manager(&self, doc_id: &str, pool: Arc) -> Result { // Opti: require upgradable_read lock and then upgrade to write lock using // RwLockUpgradableReadGuard::upgrade(xx) of ws // let doc = self.read_doc(doc_id, pool.clone()).await?; - let ws_sender = self.ws_manager.ws(); let token = self.user.token()?; let user_id = self.user.user_id()?; let server = Arc::new(RevisionServerImpl { @@ -118,7 +112,7 @@ impl DocController { server: self.server.clone(), }); let cache = Arc::new(RevisionCache::new(&user_id, doc_id, pool, server)); - Ok(RevisionManager::new(&user_id, doc_id, cache, ws_sender)) + Ok(RevisionManager::new(&user_id, doc_id, cache)) } } @@ -152,12 +146,11 @@ pub struct OpenDocCache { impl OpenDocCache { fn new() -> Self { Self { inner: DashMap::new() } } - pub(crate) fn set(&self, doc: Arc) { - let doc_id = doc.doc_id.clone(); - if self.inner.contains_key(&doc_id) { - log::warn!("Doc:{} already exists in cache", &doc_id); + pub(crate) fn insert(&self, doc_id: &str, doc: &Arc) { + if self.inner.contains_key(doc_id) { + log::warn!("Doc:{} already exists in cache", doc_id); } - self.inner.insert(doc_id, doc); + self.inner.insert(doc_id.to_string(), doc.clone()); } pub(crate) fn contains(&self, doc_id: &str) -> bool { self.inner.get(doc_id).is_some() } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs index 00162f1d0a..7b23582b99 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs @@ -1,13 +1,4 @@ -use crate::{ - errors::FlowyError, - module::DocumentUser, - services::doc::{ - edit::{DocumentMD5, EditCommand, EditCommandQueue, NewDelta, OpenDocAction, TransformDeltas}, - revision::{RevisionDownStream, RevisionManager, SteamStopTx}, - DocumentWebSocket, - WsDocumentHandler, - }, -}; +use crate::{errors::FlowyError, module::DocumentUser, services::doc::*}; use bytes::Bytes; use flowy_collaboration::{ core::document::history::UndoResult, @@ -16,13 +7,12 @@ use flowy_collaboration::{ }; use flowy_database::ConnectionPool; use flowy_error::{internal_error, FlowyResult}; -use lib_infra::retry::{ExponentialBackoff, Retry}; +use lib_infra::future::FutureResult; use lib_ot::{ core::Interval, - revision::{RevId, RevType, Revision}, + revision::{RevId, RevType, Revision, RevisionRange}, rich_text::{RichTextAttribute, RichTextDelta}, }; -use lib_ws::WsConnectState; use std::sync::Arc; use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot}; @@ -31,11 +21,9 @@ pub type DocId = String; pub struct ClientDocEditor { pub doc_id: DocId, rev_manager: Arc, + ws_manager: Arc, edit_cmd_tx: UnboundedSender, - ws_sender: Arc, user: Arc, - ws_msg_tx: UnboundedSender, - stop_sync_tx: tokio::sync::broadcast::Sender<()>, } impl ClientDocEditor { @@ -44,29 +32,31 @@ impl ClientDocEditor { user: Arc, pool: Arc, mut rev_manager: RevisionManager, - ws_sender: Arc, + ws: Arc, ) -> FlowyResult> { let delta = rev_manager.load_document().await?; let edit_cmd_tx = spawn_edit_queue(doc_id, delta, pool.clone()); let doc_id = doc_id.to_string(); let rev_manager = Arc::new(rev_manager); - let (ws_msg_tx, ws_msg_rx) = mpsc::unbounded_channel(); - let (stop_sync_tx, _) = tokio::sync::broadcast::channel(2); - let cloned_stop_sync_tx = stop_sync_tx.clone(); - let edit_doc = Arc::new(Self { + + let data_provider = Arc::new(DocumentSinkDataProviderAdapter { + rev_manager: rev_manager.clone(), + }); + let stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { + doc_id: doc_id.clone(), + edit_cmd_tx: edit_cmd_tx.clone(), + rev_manager: rev_manager.clone(), + user: user.clone(), + }); + let ws_manager = Arc::new(WebSocketManager::new(&doc_id, ws, data_provider, stream_consumer)); + let editor = Arc::new(Self { doc_id, rev_manager, + ws_manager, edit_cmd_tx, - ws_sender, user, - ws_msg_tx, - stop_sync_tx, }); - - edit_doc.connect_to_doc(); - - start_sync(edit_doc.clone(), ws_msg_rx, cloned_stop_sync_tx); - Ok(edit_doc) + Ok(editor) } pub async fn insert(&self, index: usize, data: T) -> Result<(), FlowyError> { @@ -192,122 +182,9 @@ impl ClientDocEditor { } #[tracing::instrument(level = "debug", skip(self))] - pub fn stop_sync(&self) { - tracing::debug!("{} stop sync", self.doc_id); - let _ = self.stop_sync_tx.send(()); - } + pub fn stop_sync(&self) { self.ws_manager.stop(); } - #[tracing::instrument(level = "debug", skip(self))] - fn connect_to_doc(&self) { - let rev_id: RevId = self.rev_manager.rev_id().into(); - if let Ok(user_id) = self.user.user_id() { - let action = OpenDocAction::new(&user_id, &self.doc_id, &rev_id, &self.ws_sender); - let strategy = ExponentialBackoff::from_millis(50).take(3); - let retry = Retry::spawn(strategy, action); - tokio::spawn(async move { - match retry.await { - Ok(_) => log::debug!("Notify open doc success"), - Err(e) => log::error!("Notify open doc failed: {}", e), - } - }); - } - } - - #[tracing::instrument(level = "debug", skip(self))] - pub(crate) async fn handle_push_rev(&self, bytes: Bytes) -> FlowyResult<()> { - // Transform the revision - let (ret, rx) = oneshot::channel::>(); - let _ = self.edit_cmd_tx.send(EditCommand::ProcessRemoteRevision { bytes, ret }); - let TransformDeltas { - client_prime, - server_prime, - server_rev_id, - } = rx.await.map_err(internal_error)??; - - if self.rev_manager.rev_id() >= server_rev_id.value { - // Ignore this push revision if local_rev_id >= server_rev_id - return Ok(()); - } - - // compose delta - let (ret, rx) = oneshot::channel::>(); - let msg = EditCommand::ComposeDelta { - delta: client_prime.clone(), - ret, - }; - let _ = self.edit_cmd_tx.send(msg); - let md5 = rx.await.map_err(internal_error)??; - - // update rev id - self.rev_manager - .update_rev_id_counter_value(server_rev_id.clone().into()); - let (local_base_rev_id, local_rev_id) = self.rev_manager.next_rev_id(); - let delta_data = client_prime.to_bytes(); - // save the revision - let user_id = self.user.user_id()?; - let revision = Revision::new( - &self.doc_id, - local_base_rev_id, - local_rev_id, - delta_data, - RevType::Remote, - &user_id, - md5.clone(), - ); - - let _ = self.rev_manager.add_remote_revision(&revision).await?; - - // send the server_prime delta - let user_id = self.user.user_id()?; - let delta_data = server_prime.to_bytes(); - let revision = Revision::new( - &self.doc_id, - local_base_rev_id, - local_rev_id, - delta_data, - RevType::Remote, - &user_id, - md5, - ); - let _ = self.ws_sender.send(revision.into()); - Ok(()) - } - - pub async fn handle_ws_message(&self, doc_data: WsDocumentData) -> FlowyResult<()> { - match self.ws_msg_tx.send(doc_data) { - Ok(_) => {}, - Err(e) => tracing::error!("❌Propagate ws message failed. {}", e), - } - Ok(()) - } -} - -pub struct EditDocWsHandler(pub Arc); - -impl std::ops::Deref for EditDocWsHandler { - type Target = Arc; - - fn deref(&self) -> &Self::Target { &self.0 } -} - -impl WsDocumentHandler for EditDocWsHandler { - fn receive(&self, doc_data: WsDocumentData) { - let edit_doc = self.0.clone(); - tokio::spawn(async move { - if let Err(e) = edit_doc.handle_ws_message(doc_data).await { - tracing::error!("❌{:?}", e); - } - }); - } - - fn state_changed(&self, state: &WsConnectState) { - match state { - WsConnectState::Init => {}, - WsConnectState::Connecting => {}, - WsConnectState::Connected => self.connect_to_doc(), - WsConnectState::Disconnected => {}, - } - } + pub(crate) fn ws_handler(&self) -> Arc { self.ws_manager.clone() } } fn spawn_edit_queue(doc_id: &str, delta: RichTextDelta, _pool: Arc) -> UnboundedSender { @@ -317,19 +194,121 @@ fn spawn_edit_queue(doc_id: &str, delta: RichTextDelta, _pool: Arc, - ws_msg_rx: mpsc::UnboundedReceiver, - stop_sync_tx: SteamStopTx, -) { - let rev_manager = editor.rev_manager.clone(); - let ws_sender = editor.ws_sender.clone(); +struct DocumentWebSocketSteamConsumerAdapter { + doc_id: String, + edit_cmd_tx: UnboundedSender, + rev_manager: Arc, + user: Arc, +} - let up_stream = editor.rev_manager.make_up_stream(stop_sync_tx.subscribe()); - let down_stream = RevisionDownStream::new(editor, rev_manager, ws_msg_rx, ws_sender, stop_sync_tx.subscribe()); +impl DocumentWebSocketSteamConsumer for DocumentWebSocketSteamConsumerAdapter { + fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError> { + let user = self.user.clone(); + let rev_manager = self.rev_manager.clone(); + let edit_cmd_tx = self.edit_cmd_tx.clone(); + let doc_id = self.doc_id.clone(); + FutureResult::new(async move { + let user_id = user.user_id()?; + let _revision = handle_push_rev(&doc_id, &user_id, edit_cmd_tx, rev_manager, bytes).await?; + Ok(()) + }) + } - tokio::spawn(up_stream.run()); - tokio::spawn(down_stream.run()); + fn make_revision_from_range(&self, range: RevisionRange) -> FutureResult { + let rev_manager = self.rev_manager.clone(); + FutureResult::new(async move { + let revision = rev_manager.mk_revisions(range).await?; + Ok(revision) + }) + } + + fn ack_revision(&self, rev_id: i64) -> FutureResult<(), FlowyError> { + let rev_manager = self.rev_manager.clone(); + FutureResult::new(async move { + let _ = rev_manager.ack_revision(rev_id).await?; + Ok(()) + }) + } +} + +struct DocumentSinkDataProviderAdapter { + rev_manager: Arc, +} + +impl DocumentSinkDataProvider for DocumentSinkDataProviderAdapter { + fn next(&self) -> FutureResult, FlowyError> { + let rev_manager = self.rev_manager.clone(); + FutureResult::new(async move { + match rev_manager.next_sync_revision().await? { + Some(rev) => { + tracing::debug!("[DocumentSinkDataProvider]: revision: {}:{:?}", rev.doc_id, rev.rev_id); + Ok(Some(rev.into())) + }, + None => Ok(None), + } + }) + } +} + +#[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))] +pub(crate) async fn handle_push_rev( + doc_id: &str, + user_id: &str, + edit_cmd_tx: UnboundedSender, + rev_manager: Arc, + bytes: Bytes, +) -> FlowyResult> { + // Transform the revision + let (ret, rx) = oneshot::channel::>(); + let _ = edit_cmd_tx.send(EditCommand::ProcessRemoteRevision { bytes, ret }); + let TransformDeltas { + client_prime, + server_prime, + server_rev_id, + } = rx.await.map_err(internal_error)??; + + if rev_manager.rev_id() >= server_rev_id.value { + // Ignore this push revision if local_rev_id >= server_rev_id + return Ok(None); + } + + // compose delta + let (ret, rx) = oneshot::channel::>(); + let msg = EditCommand::ComposeDelta { + delta: client_prime.clone(), + ret, + }; + let _ = edit_cmd_tx.send(msg); + let md5 = rx.await.map_err(internal_error)??; + + // update rev id + rev_manager.update_rev_id_counter_value(server_rev_id.clone().into()); + let (local_base_rev_id, local_rev_id) = rev_manager.next_rev_id(); + let delta_data = client_prime.to_bytes(); + // save the revision + let revision = Revision::new( + &doc_id, + local_base_rev_id, + local_rev_id, + delta_data, + RevType::Remote, + &user_id, + md5.clone(), + ); + + let _ = rev_manager.add_remote_revision(&revision).await?; + + // send the server_prime delta + let delta_data = server_prime.to_bytes(); + Ok(Some(Revision::new( + &doc_id, + local_base_rev_id, + local_rev_id, + delta_data, + RevType::Remote, + &user_id, + md5, + ))) } #[cfg(feature = "flowy_unit_test")] diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs index 6d4fb5fa62..794da84e62 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs @@ -1,7 +1,8 @@ +mod edit_queue; +mod edit_ws; mod editor; mod model; -mod queue; +pub(crate) use edit_queue::*; +pub use edit_ws::*; pub use editor::*; -pub(crate) use model::*; -pub(crate) use queue::*; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/model.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/model.rs index 12e1d2fc28..bd78e32c15 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/model.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/model.rs @@ -15,6 +15,7 @@ pub(crate) struct OpenDocAction { ws: Arc, } +#[allow(dead_code)] impl OpenDocAction { pub(crate) fn new(user_id: &str, doc_id: &str, rev_id: &RevId, ws: &Arc) -> Self { Self { diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs deleted file mode 100644 index faa239dd3f..0000000000 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs +++ /dev/null @@ -1,207 +0,0 @@ -use async_stream::stream; -use bytes::Bytes; -use flowy_collaboration::{ - core::document::{history::UndoResult, Document}, - errors::CollaborateError, -}; -use flowy_error::FlowyError; -use futures::stream::StreamExt; -use lib_ot::{ - core::{Interval, OperationTransformable}, - revision::{RevId, Revision}, - rich_text::{RichTextAttribute, RichTextDelta}, -}; -use std::{convert::TryFrom, sync::Arc}; -use tokio::sync::{mpsc, oneshot, RwLock}; - -pub(crate) struct EditCommandQueue { - doc_id: String, - document: Arc>, - receiver: Option>, -} - -impl EditCommandQueue { - pub(crate) fn new(doc_id: &str, delta: RichTextDelta, receiver: mpsc::UnboundedReceiver) -> Self { - let document = Arc::new(RwLock::new(Document::from_delta(delta))); - Self { - doc_id: doc_id.to_owned(), - document, - receiver: Some(receiver), - } - } - - pub(crate) async fn run(mut self) { - let mut receiver = self.receiver.take().expect("Should only call once"); - let stream = stream! { - loop { - match receiver.recv().await { - Some(msg) => yield msg, - None => break, - } - } - }; - stream - .for_each(|msg| async { - match self.handle_message(msg).await { - Ok(_) => {}, - Err(e) => tracing::debug!("[EditCommandQueue]: {}", e), - } - }) - .await; - } - - async fn handle_message(&self, msg: EditCommand) -> Result<(), FlowyError> { - match msg { - EditCommand::ComposeDelta { delta, ret } => { - let result = self.composed_delta(delta).await; - let _ = ret.send(result); - }, - EditCommand::ProcessRemoteRevision { bytes, ret } => { - let f = || async { - let revision = Revision::try_from(bytes)?; - let delta = RichTextDelta::from_bytes(&revision.delta_data)?; - let server_rev_id: RevId = revision.rev_id.into(); - let read_guard = self.document.read().await; - let (server_prime, client_prime) = read_guard.delta().transform(&delta)?; - drop(read_guard); - - let transform_delta = TransformDeltas { - client_prime, - server_prime, - server_rev_id, - }; - - Ok::(transform_delta) - }; - let _ = ret.send(f().await); - }, - EditCommand::Insert { index, data, ret } => { - let mut write_guard = self.document.write().await; - let delta = write_guard.insert(index, data)?; - let md5 = write_guard.md5(); - let _ = ret.send(Ok((delta, md5))); - }, - EditCommand::Delete { interval, ret } => { - let mut write_guard = self.document.write().await; - let delta = write_guard.delete(interval)?; - let md5 = write_guard.md5(); - let _ = ret.send(Ok((delta, md5))); - }, - EditCommand::Format { - interval, - attribute, - ret, - } => { - let mut write_guard = self.document.write().await; - let delta = write_guard.format(interval, attribute)?; - let md5 = write_guard.md5(); - let _ = ret.send(Ok((delta, md5))); - }, - EditCommand::Replace { interval, data, ret } => { - let mut write_guard = self.document.write().await; - let delta = write_guard.replace(interval, data)?; - let md5 = write_guard.md5(); - let _ = ret.send(Ok((delta, md5))); - }, - EditCommand::CanUndo { ret } => { - let _ = ret.send(self.document.read().await.can_undo()); - }, - EditCommand::CanRedo { ret } => { - let _ = ret.send(self.document.read().await.can_redo()); - }, - EditCommand::Undo { ret } => { - let result = self.document.write().await.undo(); - let _ = ret.send(result); - }, - EditCommand::Redo { ret } => { - let result = self.document.write().await.redo(); - let _ = ret.send(result); - }, - EditCommand::ReadDoc { ret } => { - let data = self.document.read().await.to_json(); - let _ = ret.send(Ok(data)); - }, - EditCommand::ReadDocDelta { ret } => { - let delta = self.document.read().await.delta().clone(); - let _ = ret.send(Ok(delta)); - }, - } - Ok(()) - } - - #[tracing::instrument(level = "debug", skip(self, delta), fields(compose_result), err)] - async fn composed_delta(&self, delta: RichTextDelta) -> Result { - // tracing::debug!("{:?} thread handle_message", thread::current(),); - let mut document = self.document.write().await; - tracing::Span::current().record( - "composed_delta", - &format!("doc_id:{} - {}", &self.doc_id, delta.to_json()).as_str(), - ); - - let _ = document.compose_delta(delta)?; - let md5 = document.md5(); - drop(document); - - Ok(md5) - } -} - -pub(crate) type Ret = oneshot::Sender>; -pub(crate) type NewDelta = (RichTextDelta, String); -pub(crate) type DocumentMD5 = String; - -#[allow(dead_code)] -pub(crate) enum EditCommand { - ComposeDelta { - delta: RichTextDelta, - ret: Ret, - }, - ProcessRemoteRevision { - bytes: Bytes, - ret: Ret, - }, - Insert { - index: usize, - data: String, - ret: Ret, - }, - Delete { - interval: Interval, - ret: Ret, - }, - Format { - interval: Interval, - attribute: RichTextAttribute, - ret: Ret, - }, - - Replace { - interval: Interval, - data: String, - ret: Ret, - }, - CanUndo { - ret: oneshot::Sender, - }, - CanRedo { - ret: oneshot::Sender, - }, - Undo { - ret: Ret, - }, - Redo { - ret: Ret, - }, - ReadDoc { - ret: Ret, - }, - ReadDocDelta { - ret: Ret, - }, -} - -pub(crate) struct TransformDeltas { - pub client_prime: RichTextDelta, - pub server_prime: RichTextDelta, - pub server_rev_id: RevId, -} diff --git a/frontend/rust-lib/flowy-document/src/services/doc/mod.rs b/frontend/rust-lib/flowy-document/src/services/doc/mod.rs index 9188a3f339..c276f674d3 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/mod.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/mod.rs @@ -3,7 +3,9 @@ pub mod revision; pub(crate) mod controller; -mod ws_manager; -pub use ws_manager::*; +mod ws_handlers; +pub use edit::*; +pub use revision::*; +pub use ws_handlers::*; pub const SYNC_INTERVAL_IN_MILLIS: u64 = 500; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs index 7e1f787577..6858108afa 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs @@ -23,10 +23,6 @@ use tokio::{ task::{spawn_blocking, JoinHandle}, }; -pub trait RevisionIterator: Send + Sync { - fn next(&self) -> FutureResult, FlowyError>; -} - type DocRevisionDeskCache = dyn RevisionDiskCache; pub struct RevisionCache { @@ -171,10 +167,8 @@ impl RevisionCache { self.add_remote_revision(revision).await?; Ok(doc) } -} -impl RevisionIterator for RevisionCache { - fn next(&self) -> FutureResult, FlowyError> { + pub(crate) fn next_revision(&self) -> FutureResult, FlowyError> { let memory_cache = self.memory_cache.clone(); let disk_cache = self.dish_cache.clone(); let doc_id = self.doc_id.clone(); @@ -184,10 +178,10 @@ impl RevisionIterator for RevisionCache { None => Ok(None), Some(rev_id) => match disk_cache.read_revision(&doc_id, rev_id)? { None => Ok(None), - Some(record) => Ok(Some(record)), + Some(record) => Ok(Some(record.revision)), }, }, - Some((_, record)) => Ok(Some(record)), + Some((_, record)) => Ok(Some(record.revision)), } }) } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs index ebb61687d9..9cc4c6d4ed 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -1,10 +1,4 @@ -use crate::{ - errors::FlowyError, - services::doc::{ - revision::{RevisionCache, RevisionUpStream, SteamStopRx}, - DocumentWebSocket, - }, -}; +use crate::{errors::FlowyError, services::doc::revision::RevisionCache}; use flowy_collaboration::{ entities::doc::Doc, util::{md5, RevIdCounter}, @@ -13,7 +7,7 @@ use flowy_error::FlowyResult; use lib_infra::future::FutureResult; use lib_ot::{ core::OperationTransformable, - revision::{RevId, RevType, Revision, RevisionRange}, + revision::{RevType, Revision, RevisionRange}, rich_text::RichTextDelta, }; use std::sync::Arc; @@ -27,18 +21,16 @@ pub struct RevisionManager { user_id: String, rev_id_counter: RevIdCounter, cache: Arc, - ws_sender: Arc, } impl RevisionManager { - pub fn new(user_id: &str, doc_id: &str, cache: Arc, ws_sender: Arc) -> Self { + pub fn new(user_id: &str, doc_id: &str, cache: Arc) -> Self { let rev_id_counter = RevIdCounter::new(0); Self { doc_id: doc_id.to_string(), user_id: user_id.to_owned(), rev_id_counter, cache, - ws_sender, } } @@ -58,8 +50,8 @@ impl RevisionManager { Ok(()) } - pub async fn ack_revision(&self, rev_id: RevId) -> Result<(), FlowyError> { - self.cache.ack_revision(rev_id.into()).await; + pub async fn ack_revision(&self, rev_id: i64) -> Result<(), FlowyError> { + self.cache.ack_revision(rev_id).await; Ok(()) } @@ -101,9 +93,7 @@ impl RevisionManager { Ok(revision) } - pub(crate) fn make_up_stream(&self, stop_rx: SteamStopRx) -> RevisionUpStream { - RevisionUpStream::new(&self.doc_id, self.cache.clone(), self.ws_sender.clone(), stop_rx) - } + pub fn next_sync_revision(&self) -> FutureResult, FlowyError> { self.cache.next_revision() } } #[cfg(feature = "flowy_unit_test")] diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/mod.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/mod.rs index 867c30173c..e101635339 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/mod.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/mod.rs @@ -1,7 +1,5 @@ mod cache; mod manager; -mod sync; pub use cache::*; pub use manager::*; -pub(crate) use sync::*; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs deleted file mode 100644 index fd839ff98a..0000000000 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs +++ /dev/null @@ -1,200 +0,0 @@ -use crate::services::doc::{ - edit::ClientDocEditor, - revision::{RevisionIterator, RevisionManager}, - DocumentWebSocket, - SYNC_INTERVAL_IN_MILLIS, -}; -use async_stream::stream; -use bytes::Bytes; -use flowy_collaboration::entities::ws::{WsDataType, WsDocumentData}; -use flowy_error::{internal_error, FlowyResult}; -use futures::stream::StreamExt; -use lib_ot::revision::{RevId, RevisionRange}; -use std::{convert::TryFrom, sync::Arc}; -use tokio::{ - sync::{broadcast, mpsc}, - task::spawn_blocking, - time::{interval, Duration}, -}; - -pub(crate) struct RevisionDownStream { - editor: Arc, - rev_manager: Arc, - ws_msg_rx: Option>, - ws_sender: Arc, - stop_rx: Option, -} - -impl RevisionDownStream { - pub(crate) fn new( - editor: Arc, - rev_manager: Arc, - ws_msg_rx: mpsc::UnboundedReceiver, - ws_sender: Arc, - stop_rx: SteamStopRx, - ) -> Self { - RevisionDownStream { - editor, - rev_manager, - ws_msg_rx: Some(ws_msg_rx), - ws_sender, - stop_rx: Some(stop_rx), - } - } - - pub async fn run(mut self) { - let mut receiver = self.ws_msg_rx.take().expect("Only take once"); - let mut stop_rx = self.stop_rx.take().expect("Only take once"); - let doc_id = self.editor.doc_id.clone(); - let stream = stream! { - loop { - tokio::select! { - result = receiver.recv() => { - match result { - Some(msg) => { - yield msg - }, - None => { - tracing::debug!("[RevisionDownStream:{}] loop exit", doc_id); - break; - }, - } - }, - _ = stop_rx.recv() => { - tracing::debug!("[RevisionDownStream:{}] loop exit", doc_id); - break - }, - }; - } - }; - - stream - .for_each(|msg| async { - match self.handle_message(msg).await { - Ok(_) => {}, - Err(e) => log::error!("[RevisionDownStream:{}] error: {}", self.editor.doc_id, e), - } - }) - .await; - } - - async fn handle_message(&self, msg: WsDocumentData) -> FlowyResult<()> { - let WsDocumentData { doc_id: _, ty, data } = msg; - let bytes = spawn_blocking(move || Bytes::from(data)) - .await - .map_err(internal_error)?; - - tracing::debug!("[RevisionDownStream]: receives new message: {:?}", ty); - match ty { - WsDataType::PushRev => { - let _ = self.editor.handle_push_rev(bytes).await?; - }, - WsDataType::PullRev => { - let range = RevisionRange::try_from(bytes)?; - let revision = self.rev_manager.mk_revisions(range).await?; - let _ = self.ws_sender.send(revision.into()); - }, - WsDataType::Acked => { - let rev_id = RevId::try_from(bytes)?; - let _ = self.rev_manager.ack_revision(rev_id).await?; - }, - WsDataType::Conflict => {}, - } - - Ok(()) - } -} - -// RevisionUpStream -pub(crate) enum UpStreamMsg { - Tick, -} - -pub type SteamStopRx = broadcast::Receiver<()>; -pub type SteamStopTx = broadcast::Sender<()>; - -pub(crate) struct RevisionUpStream { - revisions: Arc, - ws_sender: Arc, - stop_rx: Option, - doc_id: String, -} - -impl RevisionUpStream { - pub(crate) fn new( - doc_id: &str, - revisions: Arc, - ws_sender: Arc, - stop_rx: SteamStopRx, - ) -> Self { - Self { - revisions, - ws_sender, - stop_rx: Some(stop_rx), - doc_id: doc_id.to_owned(), - } - } - - pub async fn run(mut self) { - let (tx, mut rx) = mpsc::unbounded_channel(); - let mut stop_rx = self.stop_rx.take().expect("Only take once"); - let doc_id = self.doc_id.clone(); - tokio::spawn(tick(tx)); - let stream = stream! { - loop { - tokio::select! { - result = rx.recv() => { - match result { - Some(msg) => yield msg, - None => break, - } - }, - _ = stop_rx.recv() => { - tracing::debug!("[RevisionUpStream:{}] loop exit", doc_id); - break - }, - }; - } - }; - stream - .for_each(|msg| async { - match self.handle_msg(msg).await { - Ok(_) => {}, - Err(e) => log::error!("[RevisionUpStream]: send msg failed, {:?}", e), - } - }) - .await; - } - - async fn handle_msg(&self, msg: UpStreamMsg) -> FlowyResult<()> { - match msg { - UpStreamMsg::Tick => self.send_next_revision().await, - } - } - - async fn send_next_revision(&self) -> FlowyResult<()> { - match self.revisions.next().await? { - None => { - tracing::debug!("Finish synchronizing revisions"); - Ok(()) - }, - Some(record) => { - tracing::debug!( - "[RevisionUpStream]: processes revision: {}:{:?}", - record.revision.doc_id, - record.revision.rev_id - ); - self.ws_sender.send(record.revision.into()).map_err(internal_error) - // let _ = tokio::time::timeout(Duration::from_millis(2000), - // ret.recv()).await; - }, - } - } -} - -async fn tick(sender: mpsc::UnboundedSender) { - let mut i = interval(Duration::from_millis(SYNC_INTERVAL_IN_MILLIS)); - while sender.send(UpStreamMsg::Tick).is_ok() { - i.tick().await; - } -} diff --git a/frontend/rust-lib/flowy-document/src/services/doc/ws_manager.rs b/frontend/rust-lib/flowy-document/src/services/doc/ws_manager.rs deleted file mode 100644 index efccc18ece..0000000000 --- a/frontend/rust-lib/flowy-document/src/services/doc/ws_manager.rs +++ /dev/null @@ -1,67 +0,0 @@ -use crate::errors::FlowyError; -use bytes::Bytes; -use dashmap::DashMap; -use flowy_collaboration::entities::ws::WsDocumentData; -use lib_ws::WsConnectState; -use std::{convert::TryInto, sync::Arc}; - -pub(crate) trait WsDocumentHandler: Send + Sync { - fn receive(&self, data: WsDocumentData); - fn state_changed(&self, state: &WsConnectState); -} - -pub type WsStateReceiver = tokio::sync::broadcast::Receiver; -pub trait DocumentWebSocket: Send + Sync { - fn send(&self, data: WsDocumentData) -> Result<(), FlowyError>; - fn subscribe_state_changed(&self) -> WsStateReceiver; -} - -pub struct WsDocumentManager { - ws: Arc, - // key: the document id - handlers: Arc>>, -} - -impl WsDocumentManager { - pub fn new(ws: Arc) -> Self { - let handlers: Arc>> = Arc::new(DashMap::new()); - Self { ws, handlers } - } - - pub(crate) fn init(&self) { listen_ws_state_changed(self.ws.clone(), self.handlers.clone()); } - - pub(crate) fn register_handler(&self, id: &str, handler: Arc) { - if self.handlers.contains_key(id) { - log::error!("Duplicate handler registered for {:?}", id); - } - self.handlers.insert(id.to_string(), handler); - } - - pub(crate) fn remove_handler(&self, id: &str) { self.handlers.remove(id); } - - pub fn did_receive_ws_data(&self, data: Bytes) { - let data: WsDocumentData = data.try_into().unwrap(); - match self.handlers.get(&data.doc_id) { - None => { - log::error!("Can't find any source handler for {:?}", data.doc_id); - }, - Some(handler) => { - handler.receive(data); - }, - } - } - - pub fn ws(&self) -> Arc { self.ws.clone() } -} - -#[tracing::instrument(level = "debug", skip(ws, handlers))] -fn listen_ws_state_changed(ws: Arc, handlers: Arc>>) { - let mut notify = ws.subscribe_state_changed(); - tokio::spawn(async move { - while let Ok(state) = notify.recv().await { - handlers.iter().for_each(|handle| { - handle.value().state_changed(&state); - }); - } - }); -} diff --git a/frontend/rust-lib/flowy-net/Cargo.toml b/frontend/rust-lib/flowy-net/Cargo.toml index 24268edc08..500fd74d90 100644 --- a/frontend/rust-lib/flowy-net/Cargo.toml +++ b/frontend/rust-lib/flowy-net/Cargo.toml @@ -25,5 +25,5 @@ lazy_static = {version = "1.4.0", optional = true} dashmap = {version = "4.0", optional = true} [features] -ws_mock = ["flowy-collaboration", "lazy_static", "dashmap"] +flowy_unit_test = ["flowy-collaboration", "lazy_static", "dashmap"] http_server = [] \ No newline at end of file diff --git a/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs b/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs index 419aaf3454..b22e4fac48 100644 --- a/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs +++ b/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs @@ -1,11 +1,11 @@ -use crate::services::ws::{FlowyError, FlowyWebSocket, FlowyWsSender, WsConnectState, WsMessage, WsMessageHandler}; +use crate::services::ws::{FlowyError, FlowyWebSocket, FlowyWsSender, WsConnectState, WsMessage, WsMessageReceiver}; use bytes::Bytes; use dashmap::DashMap; use flowy_collaboration::{ core::sync::{RevisionUser, ServerDocManager, ServerDocPersistence, SyncResponse}, entities::{ doc::Doc, - ws::{WsDataType, WsDocumentData}, + ws::{WsDocumentData, WsDocumentDataType}, }, errors::CollaborateError, Revision, @@ -22,7 +22,7 @@ use std::{ use tokio::sync::{broadcast, broadcast::Receiver, mpsc}; pub struct MockWebSocket { - handlers: DashMap>, + handlers: DashMap>, state_sender: broadcast::Sender, ws_sender: broadcast::Sender, is_stop: RwLock, @@ -79,7 +79,7 @@ impl FlowyWebSocket for Arc { fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } - fn add_ws_message_handler(&self, handler: Arc) -> Result<(), FlowyError> { + fn add_message_receiver(&self, handler: Arc) -> Result<(), FlowyError> { let source = handler.source(); if self.handlers.contains_key(&source) { tracing::error!("WsSource's {:?} is already registered", source); @@ -111,10 +111,10 @@ impl MockDocServer { async fn handle_ws_data(&self, ws_data: WsDocumentData) -> mpsc::Receiver { let bytes = Bytes::from(ws_data.data); match ws_data.ty { - WsDataType::Acked => { + WsDocumentDataType::Acked => { unimplemented!() }, - WsDataType::PushRev => { + WsDocumentDataType::PushRev => { let revision = Revision::try_from(bytes).unwrap(); let handler = match self.manager.get(&revision.doc_id).await { None => self.manager.create_doc(revision.clone()).await.unwrap(), @@ -129,10 +129,10 @@ impl MockDocServer { handler.apply_revision(Arc::new(user), revision).await.unwrap(); rx }, - WsDataType::PullRev => { + WsDocumentDataType::PullRev => { unimplemented!() }, - WsDataType::Conflict => { + WsDocumentDataType::UserConnect => { unimplemented!() }, } diff --git a/frontend/rust-lib/flowy-net/src/services/mod.rs b/frontend/rust-lib/flowy-net/src/services/mod.rs index bd8f542936..1d5e1d784f 100644 --- a/frontend/rust-lib/flowy-net/src/services/mod.rs +++ b/frontend/rust-lib/flowy-net/src/services/mod.rs @@ -1,4 +1,4 @@ pub mod ws; -#[cfg(feature = "ws_mock")] +#[cfg(feature = "flowy_unit_test")] mod mock; diff --git a/frontend/rust-lib/flowy-net/src/services/ws/conn.rs b/frontend/rust-lib/flowy-net/src/services/ws/conn.rs index 0013ea8cbd..fa3d5b9185 100644 --- a/frontend/rust-lib/flowy-net/src/services/ws/conn.rs +++ b/frontend/rust-lib/flowy-net/src/services/ws/conn.rs @@ -3,14 +3,14 @@ use std::sync::Arc; use tokio::sync::broadcast; pub use flowy_error::FlowyError; -pub use lib_ws::{WsConnectState, WsMessage, WsMessageHandler}; +pub use lib_ws::{WsConnectState, WsMessage, WsMessageReceiver}; pub trait FlowyWebSocket: Send + Sync { fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError>; fn stop_connect(&self) -> FutureResult<(), FlowyError>; fn subscribe_connect_state(&self) -> broadcast::Receiver; fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError>; - fn add_ws_message_handler(&self, handler: Arc) -> Result<(), FlowyError>; + fn add_message_receiver(&self, handler: Arc) -> Result<(), FlowyError>; fn ws_sender(&self) -> Result, FlowyError>; } diff --git a/frontend/rust-lib/flowy-net/src/services/ws/manager.rs b/frontend/rust-lib/flowy-net/src/services/ws/manager.rs index 138e281a95..fac7835b6f 100644 --- a/frontend/rust-lib/flowy-net/src/services/ws/manager.rs +++ b/frontend/rust-lib/flowy-net/src/services/ws/manager.rs @@ -4,7 +4,7 @@ use crate::{ }; use flowy_error::{internal_error, FlowyError}; use lib_infra::future::FutureResult; -use lib_ws::{WsConnectState, WsController, WsMessage, WsMessageHandler, WsSender}; +use lib_ws::{WsConnectState, WsController, WsMessage, WsMessageReceiver, WsSender}; use parking_lot::RwLock; use std::sync::Arc; use tokio::sync::{broadcast, broadcast::Receiver}; @@ -71,8 +71,8 @@ impl WsManager { pub fn subscribe_network_ty(&self) -> broadcast::Receiver { self.status_notifier.subscribe() } - pub fn add_handler(&self, handler: Arc) -> Result<(), FlowyError> { - let _ = self.inner.add_ws_message_handler(handler)?; + pub fn add_receiver(&self, handler: Arc) -> Result<(), FlowyError> { + let _ = self.inner.add_message_receiver(handler)?; Ok(()) } @@ -139,8 +139,8 @@ impl FlowyWebSocket for Arc { }) } - fn add_ws_message_handler(&self, handler: Arc) -> Result<(), FlowyError> { - let _ = self.add_handler(handler).map_err(internal_error)?; + fn add_message_receiver(&self, handler: Arc) -> Result<(), FlowyError> { + let _ = self.add_receiver(handler).map_err(internal_error)?; Ok(()) } diff --git a/frontend/rust-lib/flowy-net/src/services/ws/mod.rs b/frontend/rust-lib/flowy-net/src/services/ws/mod.rs index aae5ecffdc..56738af71b 100644 --- a/frontend/rust-lib/flowy-net/src/services/ws/mod.rs +++ b/frontend/rust-lib/flowy-net/src/services/ws/mod.rs @@ -6,10 +6,10 @@ mod conn; mod manager; mod ws_local; -#[cfg(not(feature = "ws_mock"))] +#[cfg(not(feature = "flowy_unit_test"))] pub(crate) fn local_web_socket() -> Arc { Arc::new(Arc::new(ws_local::LocalWebSocket::default())) } -#[cfg(feature = "ws_mock")] +#[cfg(feature = "flowy_unit_test")] pub(crate) fn local_web_socket() -> Arc { Arc::new(Arc::new(crate::services::mock::MockWebSocket::default())) } diff --git a/frontend/rust-lib/flowy-net/src/services/ws/ws_local.rs b/frontend/rust-lib/flowy-net/src/services/ws/ws_local.rs index 20d73eb0ef..692b9244fe 100644 --- a/frontend/rust-lib/flowy-net/src/services/ws/ws_local.rs +++ b/frontend/rust-lib/flowy-net/src/services/ws/ws_local.rs @@ -1,4 +1,4 @@ -use crate::services::ws::{FlowyError, FlowyWebSocket, FlowyWsSender, WsConnectState, WsMessage, WsMessageHandler}; +use crate::services::ws::{FlowyError, FlowyWebSocket, FlowyWsSender, WsConnectState, WsMessage, WsMessageReceiver}; use lib_infra::future::FutureResult; use std::sync::Arc; use tokio::sync::{broadcast, broadcast::Receiver}; @@ -28,7 +28,7 @@ impl FlowyWebSocket for Arc { fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } - fn add_ws_message_handler(&self, _handler: Arc) -> Result<(), FlowyError> { Ok(()) } + fn add_message_receiver(&self, _handler: Arc) -> Result<(), FlowyError> { Ok(()) } fn ws_sender(&self) -> Result, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) } } diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs index 4cf1606b40..6ad1d7c0f4 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs @@ -4,11 +4,11 @@ use flowy_database::ConnectionPool; use flowy_document::{ errors::{internal_error, FlowyError}, module::DocumentUser, - services::doc::{DocumentWebSocket, WsDocumentManager, WsStateReceiver}, + services::doc::{DocumentWebSocket, DocumentWsHandlers, WsStateReceiver}, }; use flowy_net::services::ws::WsManager; use flowy_user::services::user::UserSession; -use lib_ws::{WsMessage, WsMessageHandler, WsModule}; +use lib_ws::{WsMessage, WsMessageReceiver, WsModule}; use std::{convert::TryInto, path::Path, sync::Arc}; pub struct DocumentDepsResolver(); @@ -16,16 +16,16 @@ impl DocumentDepsResolver { pub fn resolve( ws_manager: Arc, user_session: Arc, - ) -> (Arc, Arc) { + ) -> (Arc, Arc) { let user = Arc::new(DocumentUserImpl { user: user_session }); let sender = Arc::new(WsSenderImpl { ws_manager: ws_manager.clone(), }); - let ws_doc = Arc::new(WsDocumentManager::new(sender)); - let ws_handler = Arc::new(DocumentWsMessageReceiver { inner: ws_doc.clone() }); - ws_manager.add_handler(ws_handler).unwrap(); - (user, ws_doc) + let document_ws_handlers = Arc::new(DocumentWsHandlers::new(sender)); + let receiver = Arc::new(WsMessageReceiverAdaptor(document_ws_handlers.clone())); + ws_manager.add_receiver(receiver).unwrap(); + (user, document_ws_handlers) } } @@ -76,15 +76,9 @@ impl DocumentWebSocket for WsSenderImpl { fn subscribe_state_changed(&self) -> WsStateReceiver { self.ws_manager.subscribe_websocket_state() } } -struct DocumentWsMessageReceiver { - inner: Arc, -} +struct WsMessageReceiverAdaptor(Arc); -impl WsMessageHandler for DocumentWsMessageReceiver { +impl WsMessageReceiver for WsMessageReceiverAdaptor { fn source(&self) -> WsModule { WsModule::Doc } - - fn receive_message(&self, msg: WsMessage) { - let data = Bytes::from(msg.data); - self.inner.did_receive_ws_data(data); - } + fn receive_message(&self, msg: WsMessage) { self.0.did_receive_data(Bytes::from(msg.data)); } } diff --git a/frontend/rust-lib/flowy-test/Cargo.toml b/frontend/rust-lib/flowy-test/Cargo.toml index de27fc3f66..56fd34ef46 100644 --- a/frontend/rust-lib/flowy-test/Cargo.toml +++ b/frontend/rust-lib/flowy-test/Cargo.toml @@ -36,4 +36,4 @@ fake = "~2.3.0" claim = "0.4.0" futures = "0.3.15" serial_test = "0.5.1" -flowy-net = { path = "../flowy-net", features = ["ws_mock"] } \ No newline at end of file +flowy-net = { path = "../flowy-net", features = ["flowy_unit_test"] } \ No newline at end of file diff --git a/frontend/rust-lib/flowy-test/src/doc_script.rs b/frontend/rust-lib/flowy-test/src/doc_script.rs index f7828f9836..6c699033e2 100644 --- a/frontend/rust-lib/flowy-test/src/doc_script.rs +++ b/frontend/rust-lib/flowy-test/src/doc_script.rs @@ -1,6 +1,6 @@ use crate::{helper::ViewTest, FlowySDKTest}; -use flowy_collaboration::entities::{doc::DocIdentifier, ws::WsDocumentData}; -use flowy_document::services::doc::{edit::ClientDocEditor, revision::RevisionIterator, SYNC_INTERVAL_IN_MILLIS}; +use flowy_collaboration::entities::doc::DocIdentifier; +use flowy_document::services::doc::{edit::ClientDocEditor, SYNC_INTERVAL_IN_MILLIS}; use lib_ot::{core::Interval, revision::RevState, rich_text::RichTextDelta}; use std::sync::Arc; use tokio::time::{sleep, Duration}; @@ -78,14 +78,13 @@ impl EditorTest { assert_eq!(self.editor.rev_manager().rev_id(), rev_id); }, EditorScript::AssertNextRevId(rev_id) => { - let next_revision = cache.next().await.unwrap(); + let next_revision = rev_manager.next_sync_revision().await.unwrap(); if rev_id.is_none() { assert_eq!(next_revision.is_none(), true); return; } - let next_revision = next_revision.unwrap(); - assert_eq!(next_revision.revision.rev_id, rev_id.unwrap()); + assert_eq!(next_revision.rev_id, rev_id.unwrap()); }, EditorScript::AssertJson(expected) => { let expected_delta: RichTextDelta = serde_json::from_str(expected).unwrap(); diff --git a/shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs b/shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs index 2d62071407..6d799c1dde 100644 --- a/shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs +++ b/shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs @@ -1,6 +1,6 @@ use crate::{ core::document::Document, - entities::ws::{WsDataType, WsDocumentData}, + entities::ws::{WsDocumentData, WsDocumentDataType}, }; use bytes::Bytes; use lib_ot::{ @@ -147,7 +147,7 @@ fn mk_push_message(doc_id: &str, revision: Revision) -> WsDocumentData { let bytes: Bytes = revision.try_into().unwrap(); WsDocumentData { doc_id: doc_id.to_string(), - ty: WsDataType::PushRev, + ty: WsDocumentDataType::PushRev, data: bytes.to_vec(), } } @@ -162,7 +162,7 @@ fn mk_pull_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsDocument let bytes: Bytes = range.try_into().unwrap(); WsDocumentData { doc_id: doc_id.to_string(), - ty: WsDataType::PullRev, + ty: WsDocumentDataType::PullRev, data: bytes.to_vec(), } } @@ -176,7 +176,7 @@ fn mk_acked_message(revision: &Revision) -> WsDocumentData { WsDocumentData { doc_id: revision.doc_id.clone(), - ty: WsDataType::Acked, + ty: WsDocumentDataType::Acked, data, } } diff --git a/shared-lib/flowy-collaboration/src/entities/ws/ws.rs b/shared-lib/flowy-collaboration/src/entities/ws/ws.rs index 3e41268aef..a2e1e31d2f 100644 --- a/shared-lib/flowy-collaboration/src/entities/ws/ws.rs +++ b/shared-lib/flowy-collaboration/src/entities/ws/ws.rs @@ -5,17 +5,17 @@ use lib_ot::revision::{RevId, Revision, RevisionRange}; use std::convert::{TryFrom, TryInto}; #[derive(Debug, Clone, ProtoBuf_Enum, Eq, PartialEq, Hash)] -pub enum WsDataType { +pub enum WsDocumentDataType { // The frontend receives the Acked means the backend has accepted the revision - Acked = 0, + Acked = 0, // The frontend receives the PushRev event means the backend is pushing the new revision to frontend - PushRev = 1, + PushRev = 1, // The fronted receives the PullRev event means the backend try to pull the revision from frontend - PullRev = 2, - Conflict = 3, + PullRev = 2, + UserConnect = 3, } -impl WsDataType { +impl WsDocumentDataType { pub fn data(&self, bytes: Bytes) -> Result where T: TryFrom, @@ -24,8 +24,8 @@ impl WsDataType { } } -impl std::default::Default for WsDataType { - fn default() -> Self { WsDataType::Acked } +impl std::default::Default for WsDocumentDataType { + fn default() -> Self { WsDocumentDataType::Acked } } #[derive(ProtoBuf, Default, Debug, Clone)] @@ -34,7 +34,7 @@ pub struct WsDocumentData { pub doc_id: String, #[pb(index = 2)] - pub ty: WsDataType, + pub ty: WsDocumentDataType, #[pb(index = 3)] pub data: Vec, @@ -46,7 +46,7 @@ impl std::convert::From for WsDocumentData { let bytes: Bytes = revision.try_into().unwrap(); Self { doc_id, - ty: WsDataType::PushRev, + ty: WsDocumentDataType::PushRev, data: bytes.to_vec(), } } @@ -54,34 +54,46 @@ impl std::convert::From for WsDocumentData { pub struct WsDocumentDataBuilder(); impl WsDocumentDataBuilder { - // WsDataType::PushRev -> Revision + // WsDocumentDataType::PushRev -> Revision pub fn build_push_rev_message(doc_id: &str, revision: Revision) -> WsDocumentData { let bytes: Bytes = revision.try_into().unwrap(); WsDocumentData { doc_id: doc_id.to_string(), - ty: WsDataType::PushRev, + ty: WsDocumentDataType::PushRev, data: bytes.to_vec(), } } - // WsDataType::PullRev -> RevisionRange + // WsDocumentDataType::PullRev -> RevisionRange pub fn build_push_pull_message(doc_id: &str, range: RevisionRange) -> WsDocumentData { let bytes: Bytes = range.try_into().unwrap(); WsDocumentData { doc_id: doc_id.to_string(), - ty: WsDataType::PullRev, + ty: WsDocumentDataType::PullRev, data: bytes.to_vec(), } } - // WsDataType::Acked -> RevId + // WsDocumentDataType::Acked -> RevId pub fn build_acked_message(doc_id: &str, rev_id: i64) -> WsDocumentData { let rev_id: RevId = rev_id.into(); let bytes: Bytes = rev_id.try_into().unwrap(); WsDocumentData { doc_id: doc_id.to_string(), - ty: WsDataType::Acked, + ty: WsDocumentDataType::Acked, data: bytes.to_vec(), } } } + +#[derive(ProtoBuf, Default, Debug, Clone)] +pub struct DocumentConnected { + #[pb(index = 1)] + pub user_id: String, + + #[pb(index = 2)] + pub doc_id: String, + + #[pb(index = 3)] + pub rev_id: i64, +} diff --git a/shared-lib/flowy-collaboration/src/protobuf/model/ws.rs b/shared-lib/flowy-collaboration/src/protobuf/model/ws.rs index dbdcbe6569..394459aaff 100644 --- a/shared-lib/flowy-collaboration/src/protobuf/model/ws.rs +++ b/shared-lib/flowy-collaboration/src/protobuf/model/ws.rs @@ -27,7 +27,7 @@ pub struct WsDocumentData { // message fields pub doc_id: ::std::string::String, - pub ty: WsDataType, + pub ty: WsDocumentDataType, pub data: ::std::vec::Vec, // special fields pub unknown_fields: ::protobuf::UnknownFields, @@ -71,18 +71,18 @@ impl WsDocumentData { ::std::mem::replace(&mut self.doc_id, ::std::string::String::new()) } - // .WsDataType ty = 2; + // .WsDocumentDataType ty = 2; - pub fn get_ty(&self) -> WsDataType { + pub fn get_ty(&self) -> WsDocumentDataType { self.ty } pub fn clear_ty(&mut self) { - self.ty = WsDataType::Acked; + self.ty = WsDocumentDataType::Acked; } // Param is passed by value, moved - pub fn set_ty(&mut self, v: WsDataType) { + pub fn set_ty(&mut self, v: WsDocumentDataType) { self.ty = v; } @@ -146,7 +146,7 @@ impl ::protobuf::Message for WsDocumentData { if !self.doc_id.is_empty() { my_size += ::protobuf::rt::string_size(1, &self.doc_id); } - if self.ty != WsDataType::Acked { + if self.ty != WsDocumentDataType::Acked { my_size += ::protobuf::rt::enum_size(2, self.ty); } if !self.data.is_empty() { @@ -161,7 +161,7 @@ impl ::protobuf::Message for WsDocumentData { if !self.doc_id.is_empty() { os.write_string(1, &self.doc_id)?; } - if self.ty != WsDataType::Acked { + if self.ty != WsDocumentDataType::Acked { os.write_enum(2, ::protobuf::ProtobufEnum::value(&self.ty))?; } if !self.data.is_empty() { @@ -210,7 +210,7 @@ impl ::protobuf::Message for WsDocumentData { |m: &WsDocumentData| { &m.doc_id }, |m: &mut WsDocumentData| { &mut m.doc_id }, )); - fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum>( + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum>( "ty", |m: &WsDocumentData| { &m.ty }, |m: &mut WsDocumentData| { &mut m.ty }, @@ -237,7 +237,7 @@ impl ::protobuf::Message for WsDocumentData { impl ::protobuf::Clear for WsDocumentData { fn clear(&mut self) { self.doc_id.clear(); - self.ty = WsDataType::Acked; + self.ty = WsDocumentDataType::Acked; self.data.clear(); self.unknown_fields.clear(); } @@ -255,35 +255,271 @@ impl ::protobuf::reflect::ProtobufValue for WsDocumentData { } } +#[derive(PartialEq,Clone,Default)] +pub struct DocumentConnected { + // message fields + pub user_id: ::std::string::String, + pub doc_id: ::std::string::String, + pub rev_id: i64, + // special fields + pub unknown_fields: ::protobuf::UnknownFields, + pub cached_size: ::protobuf::CachedSize, +} + +impl<'a> ::std::default::Default for &'a DocumentConnected { + fn default() -> &'a DocumentConnected { + ::default_instance() + } +} + +impl DocumentConnected { + pub fn new() -> DocumentConnected { + ::std::default::Default::default() + } + + // string user_id = 1; + + + pub fn get_user_id(&self) -> &str { + &self.user_id + } + pub fn clear_user_id(&mut self) { + self.user_id.clear(); + } + + // Param is passed by value, moved + pub fn set_user_id(&mut self, v: ::std::string::String) { + self.user_id = v; + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_user_id(&mut self) -> &mut ::std::string::String { + &mut self.user_id + } + + // Take field + pub fn take_user_id(&mut self) -> ::std::string::String { + ::std::mem::replace(&mut self.user_id, ::std::string::String::new()) + } + + // string doc_id = 2; + + + pub fn get_doc_id(&self) -> &str { + &self.doc_id + } + pub fn clear_doc_id(&mut self) { + self.doc_id.clear(); + } + + // Param is passed by value, moved + pub fn set_doc_id(&mut self, v: ::std::string::String) { + self.doc_id = v; + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_doc_id(&mut self) -> &mut ::std::string::String { + &mut self.doc_id + } + + // Take field + pub fn take_doc_id(&mut self) -> ::std::string::String { + ::std::mem::replace(&mut self.doc_id, ::std::string::String::new()) + } + + // int64 rev_id = 3; + + + pub fn get_rev_id(&self) -> i64 { + self.rev_id + } + pub fn clear_rev_id(&mut self) { + self.rev_id = 0; + } + + // Param is passed by value, moved + pub fn set_rev_id(&mut self, v: i64) { + self.rev_id = v; + } +} + +impl ::protobuf::Message for DocumentConnected { + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.user_id)?; + }, + 2 => { + ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.doc_id)?; + }, + 3 => { + if wire_type != ::protobuf::wire_format::WireTypeVarint { + return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); + } + let tmp = is.read_int64()?; + self.rev_id = tmp; + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + if !self.user_id.is_empty() { + my_size += ::protobuf::rt::string_size(1, &self.user_id); + } + if !self.doc_id.is_empty() { + my_size += ::protobuf::rt::string_size(2, &self.doc_id); + } + if self.rev_id != 0 { + my_size += ::protobuf::rt::value_size(3, self.rev_id, ::protobuf::wire_format::WireTypeVarint); + } + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { + if !self.user_id.is_empty() { + os.write_string(1, &self.user_id)?; + } + if !self.doc_id.is_empty() { + os.write_string(2, &self.doc_id)?; + } + if self.rev_id != 0 { + os.write_int64(3, self.rev_id)?; + } + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &dyn (::std::any::Any) { + self as &dyn (::std::any::Any) + } + fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { + self as &mut dyn (::std::any::Any) + } + fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + Self::descriptor_static() + } + + fn new() -> DocumentConnected { + DocumentConnected::new() + } + + fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { + static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( + "user_id", + |m: &DocumentConnected| { &m.user_id }, + |m: &mut DocumentConnected| { &mut m.user_id }, + )); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( + "doc_id", + |m: &DocumentConnected| { &m.doc_id }, + |m: &mut DocumentConnected| { &mut m.doc_id }, + )); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>( + "rev_id", + |m: &DocumentConnected| { &m.rev_id }, + |m: &mut DocumentConnected| { &mut m.rev_id }, + )); + ::protobuf::reflect::MessageDescriptor::new_pb_name::( + "DocumentConnected", + fields, + file_descriptor_proto() + ) + }) + } + + fn default_instance() -> &'static DocumentConnected { + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(DocumentConnected::new) + } +} + +impl ::protobuf::Clear for DocumentConnected { + fn clear(&mut self) { + self.user_id.clear(); + self.doc_id.clear(); + self.rev_id = 0; + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for DocumentConnected { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for DocumentConnected { + fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { + ::protobuf::reflect::ReflectValueRef::Message(self) + } +} + #[derive(Clone,PartialEq,Eq,Debug,Hash)] -pub enum WsDataType { +pub enum WsDocumentDataType { Acked = 0, PushRev = 1, PullRev = 2, - Conflict = 3, + UserConnect = 3, } -impl ::protobuf::ProtobufEnum for WsDataType { +impl ::protobuf::ProtobufEnum for WsDocumentDataType { fn value(&self) -> i32 { *self as i32 } - fn from_i32(value: i32) -> ::std::option::Option { + fn from_i32(value: i32) -> ::std::option::Option { match value { - 0 => ::std::option::Option::Some(WsDataType::Acked), - 1 => ::std::option::Option::Some(WsDataType::PushRev), - 2 => ::std::option::Option::Some(WsDataType::PullRev), - 3 => ::std::option::Option::Some(WsDataType::Conflict), + 0 => ::std::option::Option::Some(WsDocumentDataType::Acked), + 1 => ::std::option::Option::Some(WsDocumentDataType::PushRev), + 2 => ::std::option::Option::Some(WsDocumentDataType::PullRev), + 3 => ::std::option::Option::Some(WsDocumentDataType::UserConnect), _ => ::std::option::Option::None } } fn values() -> &'static [Self] { - static values: &'static [WsDataType] = &[ - WsDataType::Acked, - WsDataType::PushRev, - WsDataType::PullRev, - WsDataType::Conflict, + static values: &'static [WsDocumentDataType] = &[ + WsDocumentDataType::Acked, + WsDocumentDataType::PushRev, + WsDocumentDataType::PullRev, + WsDocumentDataType::UserConnect, ]; values } @@ -291,51 +527,63 @@ impl ::protobuf::ProtobufEnum for WsDataType { fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor { static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::EnumDescriptor> = ::protobuf::rt::LazyV2::INIT; descriptor.get(|| { - ::protobuf::reflect::EnumDescriptor::new_pb_name::("WsDataType", file_descriptor_proto()) + ::protobuf::reflect::EnumDescriptor::new_pb_name::("WsDocumentDataType", file_descriptor_proto()) }) } } -impl ::std::marker::Copy for WsDataType { +impl ::std::marker::Copy for WsDocumentDataType { } -impl ::std::default::Default for WsDataType { +impl ::std::default::Default for WsDocumentDataType { fn default() -> Self { - WsDataType::Acked + WsDocumentDataType::Acked } } -impl ::protobuf::reflect::ProtobufValue for WsDataType { +impl ::protobuf::reflect::ProtobufValue for WsDocumentDataType { fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { ::protobuf::reflect::ReflectValueRef::Enum(::protobuf::ProtobufEnum::descriptor(self)) } } static file_descriptor_proto_data: &'static [u8] = b"\ - \n\x08ws.proto\"X\n\x0eWsDocumentData\x12\x15\n\x06doc_id\x18\x01\x20\ - \x01(\tR\x05docId\x12\x1b\n\x02ty\x18\x02\x20\x01(\x0e2\x0b.WsDataTypeR\ - \x02ty\x12\x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data*?\n\nWsDataType\ - \x12\t\n\x05Acked\x10\0\x12\x0b\n\x07PushRev\x10\x01\x12\x0b\n\x07PullRe\ - v\x10\x02\x12\x0c\n\x08Conflict\x10\x03J\x8b\x03\n\x06\x12\x04\0\0\x0c\ - \x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x06\ - \x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x16\n\x0b\n\x04\x04\0\x02\0\x12\ - \x03\x03\x04\x16\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\ - \x05\x04\0\x02\0\x01\x12\x03\x03\x0b\x11\n\x0c\n\x05\x04\0\x02\0\x03\x12\ - \x03\x03\x14\x15\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x16\n\x0c\n\ - \x05\x04\0\x02\x01\x06\x12\x03\x04\x04\x0e\n\x0c\n\x05\x04\0\x02\x01\x01\ - \x12\x03\x04\x0f\x11\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x14\x15\n\ - \x0b\n\x04\x04\0\x02\x02\x12\x03\x05\x04\x13\n\x0c\n\x05\x04\0\x02\x02\ - \x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x0e\n\ - \x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05\x11\x12\n\n\n\x02\x05\0\x12\x04\ - \x07\0\x0c\x01\n\n\n\x03\x05\0\x01\x12\x03\x07\x05\x0f\n\x0b\n\x04\x05\0\ - \x02\0\x12\x03\x08\x04\x0e\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x08\x04\t\ - \n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x08\x0c\r\n\x0b\n\x04\x05\0\x02\x01\ - \x12\x03\t\x04\x10\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\t\x04\x0b\n\x0c\ - \n\x05\x05\0\x02\x01\x02\x12\x03\t\x0e\x0f\n\x0b\n\x04\x05\0\x02\x02\x12\ - \x03\n\x04\x10\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\n\x04\x0b\n\x0c\n\ - \x05\x05\0\x02\x02\x02\x12\x03\n\x0e\x0f\n\x0b\n\x04\x05\0\x02\x03\x12\ - \x03\x0b\x04\x11\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\x0b\x04\x0c\n\x0c\ - \n\x05\x05\0\x02\x03\x02\x12\x03\x0b\x0f\x10b\x06proto3\ + \n\x08ws.proto\"`\n\x0eWsDocumentData\x12\x15\n\x06doc_id\x18\x01\x20\ + \x01(\tR\x05docId\x12#\n\x02ty\x18\x02\x20\x01(\x0e2\x13.WsDocumentDataT\ + ypeR\x02ty\x12\x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data\"Z\n\x11Docum\ + entConnected\x12\x17\n\x07user_id\x18\x01\x20\x01(\tR\x06userId\x12\x15\ + \n\x06doc_id\x18\x02\x20\x01(\tR\x05docId\x12\x15\n\x06rev_id\x18\x03\ + \x20\x01(\x03R\x05revId*J\n\x12WsDocumentDataType\x12\t\n\x05Acked\x10\0\ + \x12\x0b\n\x07PushRev\x10\x01\x12\x0b\n\x07PullRev\x10\x02\x12\x0f\n\x0b\ + UserConnect\x10\x03J\xc8\x04\n\x06\x12\x04\0\0\x11\x01\n\x08\n\x01\x0c\ + \x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x06\x01\n\n\n\x03\x04\0\ + \x01\x12\x03\x02\x08\x16\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x16\n\ + \x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\ + \x12\x03\x03\x0b\x11\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x14\x15\n\ + \x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x1e\n\x0c\n\x05\x04\0\x02\x01\ + \x06\x12\x03\x04\x04\x16\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\x17\ + \x19\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x1c\x1d\n\x0b\n\x04\x04\0\ + \x02\x02\x12\x03\x05\x04\x13\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\ + \x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x0e\n\x0c\n\x05\x04\0\ + \x02\x02\x03\x12\x03\x05\x11\x12\n\n\n\x02\x04\x01\x12\x04\x07\0\x0b\x01\ + \n\n\n\x03\x04\x01\x01\x12\x03\x07\x08\x19\n\x0b\n\x04\x04\x01\x02\0\x12\ + \x03\x08\x04\x17\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\x08\x04\n\n\x0c\n\ + \x05\x04\x01\x02\0\x01\x12\x03\x08\x0b\x12\n\x0c\n\x05\x04\x01\x02\0\x03\ + \x12\x03\x08\x15\x16\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\t\x04\x16\n\x0c\ + \n\x05\x04\x01\x02\x01\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\x01\x02\x01\ + \x01\x12\x03\t\x0b\x11\n\x0c\n\x05\x04\x01\x02\x01\x03\x12\x03\t\x14\x15\ + \n\x0b\n\x04\x04\x01\x02\x02\x12\x03\n\x04\x15\n\x0c\n\x05\x04\x01\x02\ + \x02\x05\x12\x03\n\x04\t\n\x0c\n\x05\x04\x01\x02\x02\x01\x12\x03\n\n\x10\ + \n\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\n\x13\x14\n\n\n\x02\x05\0\x12\ + \x04\x0c\0\x11\x01\n\n\n\x03\x05\0\x01\x12\x03\x0c\x05\x17\n\x0b\n\x04\ + \x05\0\x02\0\x12\x03\r\x04\x0e\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\r\x04\ + \t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\r\x0c\r\n\x0b\n\x04\x05\0\x02\x01\ + \x12\x03\x0e\x04\x10\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x0e\x04\x0b\n\ + \x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x0e\x0e\x0f\n\x0b\n\x04\x05\0\x02\ + \x02\x12\x03\x0f\x04\x10\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\x0f\x04\ + \x0b\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\x0f\x0e\x0f\n\x0b\n\x04\x05\0\ + \x02\x03\x12\x03\x10\x04\x14\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\x10\ + \x04\x0f\n\x0c\n\x05\x05\0\x02\x03\x02\x12\x03\x10\x12\x13b\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/shared-lib/flowy-collaboration/src/protobuf/proto/ws.proto b/shared-lib/flowy-collaboration/src/protobuf/proto/ws.proto index 423a3e3869..35a10bbbcf 100644 --- a/shared-lib/flowy-collaboration/src/protobuf/proto/ws.proto +++ b/shared-lib/flowy-collaboration/src/protobuf/proto/ws.proto @@ -2,12 +2,17 @@ syntax = "proto3"; message WsDocumentData { string doc_id = 1; - WsDataType ty = 2; + WsDocumentDataType ty = 2; bytes data = 3; } -enum WsDataType { +message DocumentConnected { + string user_id = 1; + string doc_id = 2; + int64 rev_id = 3; +} +enum WsDocumentDataType { Acked = 0; PushRev = 1; PullRev = 2; - Conflict = 3; + UserConnect = 3; } diff --git a/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs b/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs index 3e314e6a7f..4c178596a8 100644 --- a/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs +++ b/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs @@ -61,6 +61,7 @@ pub fn category_from_str(type_str: &str) -> TypeCategory { | "NewDocUser" | "DocIdentifier" | "WsDocumentData" + | "DocumentConnected" | "WsError" | "WsMessage" | "Revision" @@ -88,7 +89,7 @@ pub fn category_from_str(type_str: &str) -> TypeCategory { | "TrashType" | "ViewType" | "ExportType" - | "WsDataType" + | "WsDocumentDataType" | "ErrorCode" | "WsModule" | "RevType" diff --git a/shared-lib/lib-ot/src/revision/model.rs b/shared-lib/lib-ot/src/revision/model.rs index 2b7b55cc58..f9ba9aa837 100644 --- a/shared-lib/lib-ot/src/revision/model.rs +++ b/shared-lib/lib-ot/src/revision/model.rs @@ -1,7 +1,7 @@ use crate::rich_text::RichTextDelta; use bytes::Bytes; use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; -use std::{fmt::Formatter, ops::RangeInclusive}; +use std::{convert::TryFrom, fmt::Formatter, ops::RangeInclusive}; #[derive(PartialEq, Eq, Clone, Default, ProtoBuf)] pub struct Revision { @@ -27,6 +27,13 @@ pub struct Revision { pub user_id: String, } +impl std::convert::From> for Revision { + fn from(data: Vec) -> Self { + let bytes = Bytes::from(data); + Revision::try_from(bytes).unwrap() + } +} + impl Revision { pub fn is_empty(&self) -> bool { self.base_rev_id == self.rev_id } diff --git a/shared-lib/lib-ws/src/ws.rs b/shared-lib/lib-ws/src/ws.rs index b33870dca9..ba62f4a454 100644 --- a/shared-lib/lib-ws/src/ws.rs +++ b/shared-lib/lib-ws/src/ws.rs @@ -30,9 +30,9 @@ use tokio_tungstenite::tungstenite::{ pub type MsgReceiver = UnboundedReceiver; pub type MsgSender = UnboundedSender; -type Handlers = DashMap>; +type Handlers = DashMap>; -pub trait WsMessageHandler: Sync + Send + 'static { +pub trait WsMessageReceiver: Sync + Send + 'static { fn source(&self) -> WsModule; fn receive_message(&self, msg: WsMessage); } @@ -59,7 +59,7 @@ impl std::default::Default for WsController { impl WsController { pub fn new() -> Self { WsController::default() } - pub fn add_handler(&self, handler: Arc) -> Result<(), WsError> { + pub fn add_receiver(&self, handler: Arc) -> Result<(), WsError> { let source = handler.source(); if self.handlers.contains_key(&source) { log::error!("WsSource's {:?} is already registered", source);