diff --git a/backend/src/context.rs b/backend/src/context.rs index cfe56c68dd..082e2e5240 100644 --- a/backend/src/context.rs +++ b/backend/src/context.rs @@ -4,7 +4,7 @@ use crate::{ }; use actix::Addr; use actix_web::web::Data; -use lib_ws::WsModule; +use lib_ws::WSModule; use sqlx::PgPool; use std::sync::Arc; @@ -23,7 +23,7 @@ impl AppContext { let mut ws_bizs = WsBizHandlers::new(); let document_core = Arc::new(DocumentCore::new(pg_pool.clone())); - ws_bizs.register(WsModule::Doc, document_core.clone()); + ws_bizs.register(WSModule::Doc, document_core.clone()); AppContext { ws_server, diff --git a/backend/src/services/doc/ws_actor.rs b/backend/src/services/doc/ws_actor.rs index 10d5c3984f..edf24ac920 100644 --- a/backend/src/services/doc/ws_actor.rs +++ b/backend/src/services/doc/ws_actor.rs @@ -11,7 +11,7 @@ use async_stream::stream; use backend_service::errors::{internal_error, Result, ServerError}; use flowy_collaboration::{ core::sync::ServerDocManager, - protobuf::{WsDocumentData, WsDocumentDataType}, + protobuf::{DocumentWSData, DocumentWSDataType}, }; use futures::stream::StreamExt; use lib_ot::protobuf::Revision; @@ -69,7 +69,7 @@ impl DocWsActor { async fn handle_client_data(&self, client_data: WsClientData, pool: Data) -> Result<()> { let WsClientData { user, socket, data } = client_data; let document_data = spawn_blocking(move || { - let document_data: WsDocumentData = parse_from_bytes(&data)?; + let document_data: DocumentWSData = parse_from_bytes(&data)?; Result::Ok(document_data) }) .await @@ -78,10 +78,10 @@ impl DocWsActor { let data = document_data.data; match document_data.ty { - WsDocumentDataType::Acked => Ok(()), - WsDocumentDataType::PushRev => self.apply_pushed_rev(user, socket, data, pool).await, - WsDocumentDataType::PullRev => Ok(()), - WsDocumentDataType::UserConnect => Ok(()), + DocumentWSDataType::Acked => Ok(()), + DocumentWSDataType::PushRev => self.apply_pushed_rev(user, socket, data, pool).await, + DocumentWSDataType::PullRev => Ok(()), + DocumentWSDataType::UserConnect => Ok(()), } } diff --git a/backend/src/web_socket/biz_handler.rs b/backend/src/web_socket/biz_handler.rs index 144ee0ac79..e4c62b8981 100644 --- a/backend/src/web_socket/biz_handler.rs +++ b/backend/src/web_socket/biz_handler.rs @@ -1,5 +1,5 @@ use crate::web_socket::WsClientData; -use lib_ws::WsModule; +use lib_ws::WSModule; use std::{collections::HashMap, sync::Arc}; pub trait WsBizHandler: Send + Sync { @@ -8,7 +8,7 @@ pub trait WsBizHandler: Send + Sync { pub type BizHandler = Arc; pub struct WsBizHandlers { - inner: HashMap, + inner: HashMap, } impl std::default::Default for WsBizHandlers { @@ -18,7 +18,7 @@ impl std::default::Default for WsBizHandlers { impl WsBizHandlers { pub fn new() -> Self { WsBizHandlers::default() } - pub fn register(&mut self, source: WsModule, handler: BizHandler) { self.inner.insert(source, handler); } + pub fn register(&mut self, source: WSModule, handler: BizHandler) { self.inner.insert(source, handler); } - pub fn get(&self, source: &WsModule) -> Option { self.inner.get(source).cloned() } + pub fn get(&self, source: &WSModule) -> Option { self.inner.get(source).cloned() } } diff --git a/backend/src/web_socket/entities/message.rs b/backend/src/web_socket/entities/message.rs index 9438d9587e..c65ca69ea4 100644 --- a/backend/src/web_socket/entities/message.rs +++ b/backend/src/web_socket/entities/message.rs @@ -1,7 +1,7 @@ use actix::Message; use bytes::Bytes; -use flowy_collaboration::entities::ws::WsDocumentData; -use lib_ws::{WsMessage, WsModule}; +use flowy_collaboration::entities::ws::DocumentWSData; +use lib_ws::{WSMessage, WSModule}; use std::convert::TryInto; #[derive(Debug, Message, Clone)] @@ -14,11 +14,11 @@ impl std::ops::Deref for WsMessageAdaptor { fn deref(&self) -> &Self::Target { &self.0 } } -impl std::convert::From for WsMessageAdaptor { - fn from(data: WsDocumentData) -> Self { +impl std::convert::From for WsMessageAdaptor { + fn from(data: DocumentWSData) -> Self { let bytes: Bytes = data.try_into().unwrap(); - let msg = WsMessage { - module: WsModule::Doc, + let msg = WSMessage { + module: WSModule::Doc, data: bytes.to_vec(), }; diff --git a/backend/src/web_socket/ws_client.rs b/backend/src/web_socket/ws_client.rs index f2365958fd..c048f7e0b9 100644 --- a/backend/src/web_socket/ws_client.rs +++ b/backend/src/web_socket/ws_client.rs @@ -12,7 +12,7 @@ use actix::*; use actix_web::web::Data; use actix_web_actors::{ws, ws::Message::Text}; use bytes::Bytes; -use lib_ws::WsMessage; +use lib_ws::WSMessage; use std::{convert::TryFrom, sync::Arc, time::Instant}; #[derive(Debug)] @@ -64,7 +64,7 @@ impl WsClient { fn handle_binary_message(&self, bytes: Bytes, socket: Socket) { // TODO: ok to unwrap? - let message: WsMessage = WsMessage::try_from(bytes).unwrap(); + let message: WSMessage = WSMessage::try_from(bytes).unwrap(); match self.biz_handlers.get(&message.module) { None => { log::error!("Can't find the handler for {:?}", message.module); diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pb.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pb.dart index df1ef65a13..53eff22b3b 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pb.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pb.dart @@ -14,19 +14,31 @@ import 'ws.pbenum.dart'; export 'ws.pbenum.dart'; -class WsDocumentData extends $pb.GeneratedMessage { - static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WsDocumentData', createEmptyInstance: create) +enum DocumentWSData_OneOfId { + id, + notSet +} + +class DocumentWSData extends $pb.GeneratedMessage { + static const $core.Map<$core.int, DocumentWSData_OneOfId> _DocumentWSData_OneOfIdByTag = { + 4 : DocumentWSData_OneOfId.id, + 0 : DocumentWSData_OneOfId.notSet + }; + static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'DocumentWSData', createEmptyInstance: create) + ..oo(0, [4]) ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId') - ..e(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: WsDocumentDataType.Acked, valueOf: WsDocumentDataType.valueOf, enumValues: WsDocumentDataType.values) + ..e(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: DocumentWSDataType.Acked, valueOf: DocumentWSDataType.valueOf, enumValues: DocumentWSDataType.values) ..a<$core.List<$core.int>>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY) + ..aInt64(4, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id') ..hasRequiredFields = false ; - WsDocumentData._() : super(); - factory WsDocumentData({ + DocumentWSData._() : super(); + factory DocumentWSData({ $core.String? docId, - WsDocumentDataType? ty, + DocumentWSDataType? ty, $core.List<$core.int>? data, + $fixnum.Int64? id, }) { final _result = create(); if (docId != null) { @@ -38,28 +50,34 @@ class WsDocumentData extends $pb.GeneratedMessage { if (data != null) { _result.data = data; } + if (id != null) { + _result.id = id; + } return _result; } - factory WsDocumentData.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); - factory WsDocumentData.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); + factory DocumentWSData.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); + factory DocumentWSData.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); @$core.Deprecated( 'Using this can add significant overhead to your binary. ' 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' 'Will be removed in next major version') - WsDocumentData clone() => WsDocumentData()..mergeFromMessage(this); + DocumentWSData clone() => DocumentWSData()..mergeFromMessage(this); @$core.Deprecated( 'Using this can add significant overhead to your binary. ' 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' 'Will be removed in next major version') - WsDocumentData copyWith(void Function(WsDocumentData) updates) => super.copyWith((message) => updates(message as WsDocumentData)) as WsDocumentData; // ignore: deprecated_member_use + DocumentWSData copyWith(void Function(DocumentWSData) updates) => super.copyWith((message) => updates(message as DocumentWSData)) as DocumentWSData; // ignore: deprecated_member_use $pb.BuilderInfo get info_ => _i; @$core.pragma('dart2js:noInline') - static WsDocumentData create() => WsDocumentData._(); - WsDocumentData createEmptyInstance() => create(); - static $pb.PbList createRepeated() => $pb.PbList(); + static DocumentWSData create() => DocumentWSData._(); + DocumentWSData createEmptyInstance() => create(); + static $pb.PbList createRepeated() => $pb.PbList(); @$core.pragma('dart2js:noInline') - static WsDocumentData getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); - static WsDocumentData? _defaultInstance; + static DocumentWSData getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); + static DocumentWSData? _defaultInstance; + + DocumentWSData_OneOfId whichOneOfId() => _DocumentWSData_OneOfIdByTag[$_whichOneof(0)]!; + void clearOneOfId() => clearField($_whichOneof(0)); @$pb.TagNumber(1) $core.String get docId => $_getSZ(0); @@ -71,9 +89,9 @@ class WsDocumentData extends $pb.GeneratedMessage { void clearDocId() => clearField(1); @$pb.TagNumber(2) - WsDocumentDataType get ty => $_getN(1); + DocumentWSDataType get ty => $_getN(1); @$pb.TagNumber(2) - set ty(WsDocumentDataType v) { setField(2, v); } + set ty(DocumentWSDataType v) { setField(2, v); } @$pb.TagNumber(2) $core.bool hasTy() => $_has(1); @$pb.TagNumber(2) @@ -87,6 +105,15 @@ class WsDocumentData extends $pb.GeneratedMessage { $core.bool hasData() => $_has(2); @$pb.TagNumber(3) void clearData() => clearField(3); + + @$pb.TagNumber(4) + $fixnum.Int64 get id => $_getI64(3); + @$pb.TagNumber(4) + set id($fixnum.Int64 v) { $_setInt64(3, v); } + @$pb.TagNumber(4) + $core.bool hasId() => $_has(3); + @$pb.TagNumber(4) + void clearId() => clearField(4); } class DocumentConnected extends $pb.GeneratedMessage { diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbenum.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbenum.dart index b11c8b7fa1..83be5c1784 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbenum.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbenum.dart @@ -9,22 +9,22 @@ import 'dart:core' as $core; import 'package:protobuf/protobuf.dart' as $pb; -class WsDocumentDataType extends $pb.ProtobufEnum { - static const WsDocumentDataType Acked = WsDocumentDataType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Acked'); - static const WsDocumentDataType PushRev = WsDocumentDataType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PushRev'); - static const WsDocumentDataType PullRev = WsDocumentDataType._(2, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PullRev'); - static const WsDocumentDataType UserConnect = WsDocumentDataType._(3, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UserConnect'); +class DocumentWSDataType extends $pb.ProtobufEnum { + static const DocumentWSDataType Acked = DocumentWSDataType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Acked'); + static const DocumentWSDataType PushRev = DocumentWSDataType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PushRev'); + static const DocumentWSDataType PullRev = DocumentWSDataType._(2, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PullRev'); + static const DocumentWSDataType UserConnect = DocumentWSDataType._(3, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UserConnect'); - static const $core.List values = [ + static const $core.List values = [ Acked, PushRev, PullRev, UserConnect, ]; - static final $core.Map<$core.int, WsDocumentDataType> _byValue = $pb.ProtobufEnum.initByValue(values); - static WsDocumentDataType? valueOf($core.int value) => _byValue[value]; + static final $core.Map<$core.int, DocumentWSDataType> _byValue = $pb.ProtobufEnum.initByValue(values); + static DocumentWSDataType? valueOf($core.int value) => _byValue[value]; - const WsDocumentDataType._($core.int v, $core.String n) : super(v, n); + const DocumentWSDataType._($core.int v, $core.String n) : super(v, n); } diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbjson.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbjson.dart index 9917157067..2bcc352adf 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbjson.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/ws.pbjson.dart @@ -8,9 +8,9 @@ import 'dart:core' as $core; import 'dart:convert' as $convert; import 'dart:typed_data' as $typed_data; -@$core.Deprecated('Use wsDocumentDataTypeDescriptor instead') -const WsDocumentDataType$json = const { - '1': 'WsDocumentDataType', +@$core.Deprecated('Use documentWSDataTypeDescriptor instead') +const DocumentWSDataType$json = const { + '1': 'DocumentWSDataType', '2': const [ const {'1': 'Acked', '2': 0}, const {'1': 'PushRev', '2': 1}, @@ -19,20 +19,24 @@ const WsDocumentDataType$json = const { ], }; -/// Descriptor for `WsDocumentDataType`. Decode as a `google.protobuf.EnumDescriptorProto`. -final $typed_data.Uint8List wsDocumentDataTypeDescriptor = $convert.base64Decode('ChJXc0RvY3VtZW50RGF0YVR5cGUSCQoFQWNrZWQQABILCgdQdXNoUmV2EAESCwoHUHVsbFJldhACEg8KC1VzZXJDb25uZWN0EAM='); -@$core.Deprecated('Use wsDocumentDataDescriptor instead') -const WsDocumentData$json = const { - '1': 'WsDocumentData', +/// Descriptor for `DocumentWSDataType`. Decode as a `google.protobuf.EnumDescriptorProto`. +final $typed_data.Uint8List documentWSDataTypeDescriptor = $convert.base64Decode('ChJEb2N1bWVudFdTRGF0YVR5cGUSCQoFQWNrZWQQABILCgdQdXNoUmV2EAESCwoHUHVsbFJldhACEg8KC1VzZXJDb25uZWN0EAM='); +@$core.Deprecated('Use documentWSDataDescriptor instead') +const DocumentWSData$json = const { + '1': 'DocumentWSData', '2': const [ const {'1': 'doc_id', '3': 1, '4': 1, '5': 9, '10': 'docId'}, - const {'1': 'ty', '3': 2, '4': 1, '5': 14, '6': '.WsDocumentDataType', '10': 'ty'}, + const {'1': 'ty', '3': 2, '4': 1, '5': 14, '6': '.DocumentWSDataType', '10': 'ty'}, const {'1': 'data', '3': 3, '4': 1, '5': 12, '10': 'data'}, + const {'1': 'id', '3': 4, '4': 1, '5': 3, '9': 0, '10': 'id'}, + ], + '8': const [ + const {'1': 'one_of_id'}, ], }; -/// Descriptor for `WsDocumentData`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List wsDocumentDataDescriptor = $convert.base64Decode('Cg5Xc0RvY3VtZW50RGF0YRIVCgZkb2NfaWQYASABKAlSBWRvY0lkEiMKAnR5GAIgASgOMhMuV3NEb2N1bWVudERhdGFUeXBlUgJ0eRISCgRkYXRhGAMgASgMUgRkYXRh'); +/// Descriptor for `DocumentWSData`. Decode as a `google.protobuf.DescriptorProto`. +final $typed_data.Uint8List documentWSDataDescriptor = $convert.base64Decode('Cg5Eb2N1bWVudFdTRGF0YRIVCgZkb2NfaWQYASABKAlSBWRvY0lkEiMKAnR5GAIgASgOMhMuRG9jdW1lbnRXU0RhdGFUeXBlUgJ0eRISCgRkYXRhGAMgASgMUgRkYXRhEhAKAmlkGAQgASgDSABSAmlkQgsKCW9uZV9vZl9pZA=='); @$core.Deprecated('Use documentConnectedDescriptor instead') const DocumentConnected$json = const { '1': 'DocumentConnected', diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/errors.pb.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/errors.pb.dart index 63c73c28f3..f85e4e8280 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/errors.pb.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/errors.pb.dart @@ -13,15 +13,15 @@ import 'errors.pbenum.dart'; export 'errors.pbenum.dart'; -class WsError extends $pb.GeneratedMessage { - static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WsError', createEmptyInstance: create) +class WSError extends $pb.GeneratedMessage { + static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WSError', createEmptyInstance: create) ..e(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'code', $pb.PbFieldType.OE, defaultOrMaker: ErrorCode.InternalError, valueOf: ErrorCode.valueOf, enumValues: ErrorCode.values) ..aOS(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'msg') ..hasRequiredFields = false ; - WsError._() : super(); - factory WsError({ + WSError._() : super(); + factory WSError({ ErrorCode? code, $core.String? msg, }) { @@ -34,26 +34,26 @@ class WsError extends $pb.GeneratedMessage { } return _result; } - factory WsError.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); - factory WsError.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); + factory WSError.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); + factory WSError.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); @$core.Deprecated( 'Using this can add significant overhead to your binary. ' 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' 'Will be removed in next major version') - WsError clone() => WsError()..mergeFromMessage(this); + WSError clone() => WSError()..mergeFromMessage(this); @$core.Deprecated( 'Using this can add significant overhead to your binary. ' 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' 'Will be removed in next major version') - WsError copyWith(void Function(WsError) updates) => super.copyWith((message) => updates(message as WsError)) as WsError; // ignore: deprecated_member_use + WSError copyWith(void Function(WSError) updates) => super.copyWith((message) => updates(message as WSError)) as WSError; // ignore: deprecated_member_use $pb.BuilderInfo get info_ => _i; @$core.pragma('dart2js:noInline') - static WsError create() => WsError._(); - WsError createEmptyInstance() => create(); - static $pb.PbList createRepeated() => $pb.PbList(); + static WSError create() => WSError._(); + WSError createEmptyInstance() => create(); + static $pb.PbList createRepeated() => $pb.PbList(); @$core.pragma('dart2js:noInline') - static WsError getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); - static WsError? _defaultInstance; + static WSError getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); + static WSError? _defaultInstance; @$pb.TagNumber(1) ErrorCode get code => $_getN(0); diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/errors.pbjson.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/errors.pbjson.dart index c1039debcf..69eff54ee8 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/errors.pbjson.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/errors.pbjson.dart @@ -20,14 +20,14 @@ const ErrorCode$json = const { /// Descriptor for `ErrorCode`. Decode as a `google.protobuf.EnumDescriptorProto`. final $typed_data.Uint8List errorCodeDescriptor = $convert.base64Decode('CglFcnJvckNvZGUSEQoNSW50ZXJuYWxFcnJvchAAEhYKElVuc3VwcG9ydGVkTWVzc2FnZRABEhAKDFVuYXV0aG9yaXplZBAC'); -@$core.Deprecated('Use wsErrorDescriptor instead') -const WsError$json = const { - '1': 'WsError', +@$core.Deprecated('Use wSErrorDescriptor instead') +const WSError$json = const { + '1': 'WSError', '2': const [ const {'1': 'code', '3': 1, '4': 1, '5': 14, '6': '.ErrorCode', '10': 'code'}, const {'1': 'msg', '3': 2, '4': 1, '5': 9, '10': 'msg'}, ], }; -/// Descriptor for `WsError`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List wsErrorDescriptor = $convert.base64Decode('CgdXc0Vycm9yEh4KBGNvZGUYASABKA4yCi5FcnJvckNvZGVSBGNvZGUSEAoDbXNnGAIgASgJUgNtc2c='); +/// Descriptor for `WSError`. Decode as a `google.protobuf.DescriptorProto`. +final $typed_data.Uint8List wSErrorDescriptor = $convert.base64Decode('CgdXU0Vycm9yEh4KBGNvZGUYASABKA4yCi5FcnJvckNvZGVSBGNvZGUSEAoDbXNnGAIgASgJUgNtc2c='); diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pb.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pb.dart index 0c07e01026..d71cbad363 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pb.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pb.dart @@ -13,16 +13,16 @@ import 'msg.pbenum.dart'; export 'msg.pbenum.dart'; -class WsMessage extends $pb.GeneratedMessage { - static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WsMessage', createEmptyInstance: create) - ..e(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'module', $pb.PbFieldType.OE, defaultOrMaker: WsModule.Doc, valueOf: WsModule.valueOf, enumValues: WsModule.values) +class WSMessage extends $pb.GeneratedMessage { + static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WSMessage', createEmptyInstance: create) + ..e(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'module', $pb.PbFieldType.OE, defaultOrMaker: WSModule.Doc, valueOf: WSModule.valueOf, enumValues: WSModule.values) ..a<$core.List<$core.int>>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY) ..hasRequiredFields = false ; - WsMessage._() : super(); - factory WsMessage({ - WsModule? module, + WSMessage._() : super(); + factory WSMessage({ + WSModule? module, $core.List<$core.int>? data, }) { final _result = create(); @@ -34,31 +34,31 @@ class WsMessage extends $pb.GeneratedMessage { } return _result; } - factory WsMessage.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); - factory WsMessage.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); + factory WSMessage.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); + factory WSMessage.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); @$core.Deprecated( 'Using this can add significant overhead to your binary. ' 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' 'Will be removed in next major version') - WsMessage clone() => WsMessage()..mergeFromMessage(this); + WSMessage clone() => WSMessage()..mergeFromMessage(this); @$core.Deprecated( 'Using this can add significant overhead to your binary. ' 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' 'Will be removed in next major version') - WsMessage copyWith(void Function(WsMessage) updates) => super.copyWith((message) => updates(message as WsMessage)) as WsMessage; // ignore: deprecated_member_use + WSMessage copyWith(void Function(WSMessage) updates) => super.copyWith((message) => updates(message as WSMessage)) as WSMessage; // ignore: deprecated_member_use $pb.BuilderInfo get info_ => _i; @$core.pragma('dart2js:noInline') - static WsMessage create() => WsMessage._(); - WsMessage createEmptyInstance() => create(); - static $pb.PbList createRepeated() => $pb.PbList(); + static WSMessage create() => WSMessage._(); + WSMessage createEmptyInstance() => create(); + static $pb.PbList createRepeated() => $pb.PbList(); @$core.pragma('dart2js:noInline') - static WsMessage getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); - static WsMessage? _defaultInstance; + static WSMessage getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); + static WSMessage? _defaultInstance; @$pb.TagNumber(1) - WsModule get module => $_getN(0); + WSModule get module => $_getN(0); @$pb.TagNumber(1) - set module(WsModule v) { setField(1, v); } + set module(WSModule v) { setField(1, v); } @$pb.TagNumber(1) $core.bool hasModule() => $_has(0); @$pb.TagNumber(1) diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbenum.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbenum.dart index dddbfe6eaa..9b4fbd51b0 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbenum.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbenum.dart @@ -9,16 +9,16 @@ import 'dart:core' as $core; import 'package:protobuf/protobuf.dart' as $pb; -class WsModule extends $pb.ProtobufEnum { - static const WsModule Doc = WsModule._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Doc'); +class WSModule extends $pb.ProtobufEnum { + static const WSModule Doc = WSModule._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Doc'); - static const $core.List values = [ + static const $core.List values = [ Doc, ]; - static final $core.Map<$core.int, WsModule> _byValue = $pb.ProtobufEnum.initByValue(values); - static WsModule? valueOf($core.int value) => _byValue[value]; + static final $core.Map<$core.int, WSModule> _byValue = $pb.ProtobufEnum.initByValue(values); + static WSModule? valueOf($core.int value) => _byValue[value]; - const WsModule._($core.int v, $core.String n) : super(v, n); + const WSModule._($core.int v, $core.String n) : super(v, n); } diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbjson.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbjson.dart index 13899cc203..ca42294573 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbjson.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbjson.dart @@ -8,24 +8,24 @@ import 'dart:core' as $core; import 'dart:convert' as $convert; import 'dart:typed_data' as $typed_data; -@$core.Deprecated('Use wsModuleDescriptor instead') -const WsModule$json = const { - '1': 'WsModule', +@$core.Deprecated('Use wSModuleDescriptor instead') +const WSModule$json = const { + '1': 'WSModule', '2': const [ const {'1': 'Doc', '2': 0}, ], }; -/// Descriptor for `WsModule`. Decode as a `google.protobuf.EnumDescriptorProto`. -final $typed_data.Uint8List wsModuleDescriptor = $convert.base64Decode('CghXc01vZHVsZRIHCgNEb2MQAA=='); -@$core.Deprecated('Use wsMessageDescriptor instead') -const WsMessage$json = const { - '1': 'WsMessage', +/// Descriptor for `WSModule`. Decode as a `google.protobuf.EnumDescriptorProto`. +final $typed_data.Uint8List wSModuleDescriptor = $convert.base64Decode('CghXU01vZHVsZRIHCgNEb2MQAA=='); +@$core.Deprecated('Use wSMessageDescriptor instead') +const WSMessage$json = const { + '1': 'WSMessage', '2': const [ - const {'1': 'module', '3': 1, '4': 1, '5': 14, '6': '.WsModule', '10': 'module'}, + const {'1': 'module', '3': 1, '4': 1, '5': 14, '6': '.WSModule', '10': 'module'}, const {'1': 'data', '3': 2, '4': 1, '5': 12, '10': 'data'}, ], }; -/// Descriptor for `WsMessage`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List wsMessageDescriptor = $convert.base64Decode('CglXc01lc3NhZ2USIQoGbW9kdWxlGAEgASgOMgkuV3NNb2R1bGVSBm1vZHVsZRISCgRkYXRhGAIgASgMUgRkYXRh'); +/// Descriptor for `WSMessage`. Decode as a `google.protobuf.DescriptorProto`. +final $typed_data.Uint8List wSMessageDescriptor = $convert.base64Decode('CglXU01lc3NhZ2USIQoGbW9kdWxlGAEgASgOMgkuV1NNb2R1bGVSBm1vZHVsZRISCgRkYXRhGAIgASgMUgRkYXRh'); diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/edit_queue.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/edit_queue.rs new file mode 100644 index 0000000000..faa239dd3f --- /dev/null +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/edit_queue.rs @@ -0,0 +1,207 @@ +use async_stream::stream; +use bytes::Bytes; +use flowy_collaboration::{ + core::document::{history::UndoResult, Document}, + errors::CollaborateError, +}; +use flowy_error::FlowyError; +use futures::stream::StreamExt; +use lib_ot::{ + core::{Interval, OperationTransformable}, + revision::{RevId, Revision}, + rich_text::{RichTextAttribute, RichTextDelta}, +}; +use std::{convert::TryFrom, sync::Arc}; +use tokio::sync::{mpsc, oneshot, RwLock}; + +pub(crate) struct EditCommandQueue { + doc_id: String, + document: Arc>, + receiver: Option>, +} + +impl EditCommandQueue { + pub(crate) fn new(doc_id: &str, delta: RichTextDelta, receiver: mpsc::UnboundedReceiver) -> Self { + let document = Arc::new(RwLock::new(Document::from_delta(delta))); + Self { + doc_id: doc_id.to_owned(), + document, + receiver: Some(receiver), + } + } + + pub(crate) async fn run(mut self) { + let mut receiver = self.receiver.take().expect("Should only call once"); + let stream = stream! { + loop { + match receiver.recv().await { + Some(msg) => yield msg, + None => break, + } + } + }; + stream + .for_each(|msg| async { + match self.handle_message(msg).await { + Ok(_) => {}, + Err(e) => tracing::debug!("[EditCommandQueue]: {}", e), + } + }) + .await; + } + + async fn handle_message(&self, msg: EditCommand) -> Result<(), FlowyError> { + match msg { + EditCommand::ComposeDelta { delta, ret } => { + let result = self.composed_delta(delta).await; + let _ = ret.send(result); + }, + EditCommand::ProcessRemoteRevision { bytes, ret } => { + let f = || async { + let revision = Revision::try_from(bytes)?; + let delta = RichTextDelta::from_bytes(&revision.delta_data)?; + let server_rev_id: RevId = revision.rev_id.into(); + let read_guard = self.document.read().await; + let (server_prime, client_prime) = read_guard.delta().transform(&delta)?; + drop(read_guard); + + let transform_delta = TransformDeltas { + client_prime, + server_prime, + server_rev_id, + }; + + Ok::(transform_delta) + }; + let _ = ret.send(f().await); + }, + EditCommand::Insert { index, data, ret } => { + let mut write_guard = self.document.write().await; + let delta = write_guard.insert(index, data)?; + let md5 = write_guard.md5(); + let _ = ret.send(Ok((delta, md5))); + }, + EditCommand::Delete { interval, ret } => { + let mut write_guard = self.document.write().await; + let delta = write_guard.delete(interval)?; + let md5 = write_guard.md5(); + let _ = ret.send(Ok((delta, md5))); + }, + EditCommand::Format { + interval, + attribute, + ret, + } => { + let mut write_guard = self.document.write().await; + let delta = write_guard.format(interval, attribute)?; + let md5 = write_guard.md5(); + let _ = ret.send(Ok((delta, md5))); + }, + EditCommand::Replace { interval, data, ret } => { + let mut write_guard = self.document.write().await; + let delta = write_guard.replace(interval, data)?; + let md5 = write_guard.md5(); + let _ = ret.send(Ok((delta, md5))); + }, + EditCommand::CanUndo { ret } => { + let _ = ret.send(self.document.read().await.can_undo()); + }, + EditCommand::CanRedo { ret } => { + let _ = ret.send(self.document.read().await.can_redo()); + }, + EditCommand::Undo { ret } => { + let result = self.document.write().await.undo(); + let _ = ret.send(result); + }, + EditCommand::Redo { ret } => { + let result = self.document.write().await.redo(); + let _ = ret.send(result); + }, + EditCommand::ReadDoc { ret } => { + let data = self.document.read().await.to_json(); + let _ = ret.send(Ok(data)); + }, + EditCommand::ReadDocDelta { ret } => { + let delta = self.document.read().await.delta().clone(); + let _ = ret.send(Ok(delta)); + }, + } + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self, delta), fields(compose_result), err)] + async fn composed_delta(&self, delta: RichTextDelta) -> Result { + // tracing::debug!("{:?} thread handle_message", thread::current(),); + let mut document = self.document.write().await; + tracing::Span::current().record( + "composed_delta", + &format!("doc_id:{} - {}", &self.doc_id, delta.to_json()).as_str(), + ); + + let _ = document.compose_delta(delta)?; + let md5 = document.md5(); + drop(document); + + Ok(md5) + } +} + +pub(crate) type Ret = oneshot::Sender>; +pub(crate) type NewDelta = (RichTextDelta, String); +pub(crate) type DocumentMD5 = String; + +#[allow(dead_code)] +pub(crate) enum EditCommand { + ComposeDelta { + delta: RichTextDelta, + ret: Ret, + }, + ProcessRemoteRevision { + bytes: Bytes, + ret: Ret, + }, + Insert { + index: usize, + data: String, + ret: Ret, + }, + Delete { + interval: Interval, + ret: Ret, + }, + Format { + interval: Interval, + attribute: RichTextAttribute, + ret: Ret, + }, + + Replace { + interval: Interval, + data: String, + ret: Ret, + }, + CanUndo { + ret: oneshot::Sender, + }, + CanRedo { + ret: oneshot::Sender, + }, + Undo { + ret: Ret, + }, + Redo { + ret: Ret, + }, + ReadDoc { + ret: Ret, + }, + ReadDocDelta { + ret: Ret, + }, +} + +pub(crate) struct TransformDeltas { + pub client_prime: RichTextDelta, + pub server_prime: RichTextDelta, + pub server_rev_id: RevId, +} diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/edit_ws.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/edit_ws.rs new file mode 100644 index 0000000000..59d548eaa8 --- /dev/null +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/edit_ws.rs @@ -0,0 +1,297 @@ +use crate::services::doc::{DocumentWebSocket, DocumentWsHandler, SYNC_INTERVAL_IN_MILLIS}; +use async_stream::stream; +use bytes::Bytes; +use flowy_collaboration::{ + entities::ws::{DocumentWSData, DocumentWSDataType}, + Revision, +}; +use flowy_error::{internal_error, FlowyError, FlowyResult}; +use futures::stream::StreamExt; +use lib_infra::future::FutureResult; +use lib_ot::revision::{RevId, RevisionRange}; +use lib_ws::WSConnectState; +use std::{convert::TryFrom, sync::Arc}; +use tokio::{ + sync::{ + broadcast, + mpsc, + mpsc::{UnboundedReceiver, UnboundedSender}, + }, + task::spawn_blocking, + time::{interval, Duration}, +}; + +pub(crate) struct WebSocketManager { + doc_id: String, + data_provider: Arc, + stream_consumer: Arc, + ws: Arc, + ws_msg_tx: UnboundedSender, + ws_msg_rx: Option>, + stop_sync_tx: SinkStopTx, +} + +impl WebSocketManager { + pub(crate) fn new( + doc_id: &str, + ws: Arc, + data_provider: Arc, + stream_consumer: Arc, + ) -> Self { + let (ws_msg_tx, ws_msg_rx) = mpsc::unbounded_channel(); + let (stop_sync_tx, _) = tokio::sync::broadcast::channel(2); + let doc_id = doc_id.to_string(); + let mut manager = WebSocketManager { + doc_id, + data_provider, + stream_consumer, + ws, + ws_msg_tx, + ws_msg_rx: Some(ws_msg_rx), + stop_sync_tx, + }; + manager.start_sync(); + manager + } + + fn start_sync(&mut self) { + let ws_msg_rx = self.ws_msg_rx.take().expect("Only take once"); + let sink = DocumentWebSocketSink::new( + &self.doc_id, + self.data_provider.clone(), + self.ws.clone(), + self.stop_sync_tx.subscribe(), + ); + let stream = DocumentWebSocketStream::new( + &self.doc_id, + self.stream_consumer.clone(), + ws_msg_rx, + self.ws.clone(), + self.stop_sync_tx.subscribe(), + ); + tokio::spawn(sink.run()); + tokio::spawn(stream.run()); + self.notify_user_conn(); + } + + pub(crate) fn stop(&self) { + if self.stop_sync_tx.send(()).is_ok() { + tracing::debug!("{} stop sync", self.doc_id) + } + } + + #[tracing::instrument(level = "debug", skip(self))] + fn notify_user_conn(&self) { + // let rev_id: RevId = self.rev_manager.rev_id().into(); + // if let Ok(user_id) = self.user.user_id() { + // let action = OpenDocAction::new(&user_id, &self.doc_id, &rev_id, + // &self.ws_sender); let strategy = + // ExponentialBackoff::from_millis(50).take(3); let retry = + // Retry::spawn(strategy, action); tokio::spawn(async move { + // match retry.await { + // Ok(_) => log::debug!("Notify open doc success"), + // Err(e) => log::error!("Notify open doc failed: {}", e), + // } + // }); + // } + } +} + +impl DocumentWsHandler for WebSocketManager { + fn receive(&self, doc_data: DocumentWSData) { + match self.ws_msg_tx.send(doc_data) { + Ok(_) => {}, + Err(e) => tracing::error!("❌Propagate ws message failed. {}", e), + } + } + + fn connect_state_changed(&self, state: &WSConnectState) { + match state { + WSConnectState::Init => {}, + WSConnectState::Connecting => {}, + WSConnectState::Connected => self.notify_user_conn(), + WSConnectState::Disconnected => {}, + } + } +} + +pub trait DocumentWebSocketSteamConsumer: Send + Sync { + fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError>; + fn make_revision_from_range(&self, range: RevisionRange) -> FutureResult; + fn ack_revision(&self, rev_id: i64) -> FutureResult<(), FlowyError>; +} + +pub(crate) struct DocumentWebSocketStream { + doc_id: String, + consumer: Arc, + ws_msg_rx: Option>, + ws_sender: Arc, + stop_rx: Option, +} + +impl DocumentWebSocketStream { + pub(crate) fn new( + doc_id: &str, + consumer: Arc, + ws_msg_rx: mpsc::UnboundedReceiver, + ws_sender: Arc, + stop_rx: SinkStopRx, + ) -> Self { + DocumentWebSocketStream { + doc_id: doc_id.to_owned(), + consumer, + ws_msg_rx: Some(ws_msg_rx), + ws_sender, + stop_rx: Some(stop_rx), + } + } + + pub async fn run(mut self) { + let mut receiver = self.ws_msg_rx.take().expect("Only take once"); + let mut stop_rx = self.stop_rx.take().expect("Only take once"); + let doc_id = self.doc_id.clone(); + let stream = stream! { + loop { + tokio::select! { + result = receiver.recv() => { + match result { + Some(msg) => { + yield msg + }, + None => { + tracing::debug!("[DocumentStream:{}] loop exit", doc_id); + break; + }, + } + }, + _ = stop_rx.recv() => { + tracing::debug!("[DocumentStream:{}] loop exit", doc_id); + break + }, + }; + } + }; + + stream + .for_each(|msg| async { + match self.handle_message(msg).await { + Ok(_) => {}, + Err(e) => log::error!("[DocumentStream:{}] error: {}", self.doc_id, e), + } + }) + .await; + } + + async fn handle_message(&self, msg: DocumentWSData) -> FlowyResult<()> { + let DocumentWSData { + doc_id: _, + ty, + data, + id: _, + } = msg; + let bytes = spawn_blocking(move || Bytes::from(data)) + .await + .map_err(internal_error)?; + + tracing::debug!("[DocumentStream]: receives new message: {:?}", ty); + match ty { + DocumentWSDataType::PushRev => { + let _ = self.consumer.receive_push_revision(bytes).await?; + }, + DocumentWSDataType::PullRev => { + let range = RevisionRange::try_from(bytes)?; + let revision = self.consumer.make_revision_from_range(range).await?; + let _ = self.ws_sender.send(revision.into()); + }, + DocumentWSDataType::Acked => { + let rev_id = RevId::try_from(bytes)?; + let _ = self.consumer.ack_revision(rev_id.into()).await; + }, + DocumentWSDataType::UserConnect => {}, + } + + Ok(()) + } +} + +pub(crate) type Tick = (); +pub(crate) type SinkStopRx = broadcast::Receiver<()>; +pub(crate) type SinkStopTx = broadcast::Sender<()>; + +pub trait DocumentSinkDataProvider: Send + Sync { + fn next(&self) -> FutureResult, FlowyError>; +} + +pub(crate) struct DocumentWebSocketSink { + provider: Arc, + ws_sender: Arc, + stop_rx: Option, + doc_id: String, +} + +impl DocumentWebSocketSink { + pub(crate) fn new( + doc_id: &str, + provider: Arc, + ws_sender: Arc, + stop_rx: SinkStopRx, + ) -> Self { + Self { + provider, + ws_sender, + stop_rx: Some(stop_rx), + doc_id: doc_id.to_owned(), + } + } + + pub async fn run(mut self) { + let (tx, mut rx) = mpsc::unbounded_channel(); + let mut stop_rx = self.stop_rx.take().expect("Only take once"); + let doc_id = self.doc_id.clone(); + tokio::spawn(tick(tx)); + let stream = stream! { + loop { + tokio::select! { + result = rx.recv() => { + match result { + Some(msg) => yield msg, + None => break, + } + }, + _ = stop_rx.recv() => { + tracing::debug!("[DocumentSink:{}] loop exit", doc_id); + break + }, + }; + } + }; + stream + .for_each(|_| async { + match self.send_next_revision().await { + Ok(_) => {}, + Err(e) => log::error!("[DocumentSink]: send msg failed, {:?}", e), + } + }) + .await; + } + + async fn send_next_revision(&self) -> FlowyResult<()> { + match self.provider.next().await? { + None => { + tracing::debug!("Finish synchronizing revisions"); + Ok(()) + }, + Some(data) => { + self.ws_sender.send(data).map_err(internal_error) + // let _ = tokio::time::timeout(Duration::from_millis(2000), + }, + } + } +} + +async fn tick(sender: mpsc::UnboundedSender) { + let mut interval = interval(Duration::from_millis(SYNC_INTERVAL_IN_MILLIS)); + while sender.send(()).is_ok() { + interval.tick().await; + } +} diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs index 7b23582b99..c5fc1ae140 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs @@ -2,7 +2,7 @@ use crate::{errors::FlowyError, module::DocumentUser, services::doc::*}; use bytes::Bytes; use flowy_collaboration::{ core::document::history::UndoResult, - entities::{doc::DocDelta, ws::WsDocumentData}, + entities::{doc::DocDelta, ws::DocumentWSData}, errors::CollaborateResult, }; use flowy_database::ConnectionPool; @@ -13,16 +13,16 @@ use lib_ot::{ revision::{RevId, RevType, Revision, RevisionRange}, rich_text::{RichTextAttribute, RichTextDelta}, }; -use std::sync::Arc; +use parking_lot::RwLock; +use std::{collections::VecDeque, sync::Arc}; use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot}; - -pub type DocId = String; - +type SinkVec = Arc>>; pub struct ClientDocEditor { - pub doc_id: DocId, + pub doc_id: String, rev_manager: Arc, ws_manager: Arc, edit_cmd_tx: UnboundedSender, + sink_vec: SinkVec, user: Arc, } @@ -38,15 +38,17 @@ impl ClientDocEditor { let edit_cmd_tx = spawn_edit_queue(doc_id, delta, pool.clone()); let doc_id = doc_id.to_string(); let rev_manager = Arc::new(rev_manager); - + let sink_vec = Arc::new(RwLock::new(VecDeque::new())); let data_provider = Arc::new(DocumentSinkDataProviderAdapter { rev_manager: rev_manager.clone(), + sink_vec: sink_vec.clone(), }); let stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { doc_id: doc_id.clone(), edit_cmd_tx: edit_cmd_tx.clone(), rev_manager: rev_manager.clone(), user: user.clone(), + sink_vec: sink_vec.clone(), }); let ws_manager = Arc::new(WebSocketManager::new(&doc_id, ws, data_provider, stream_consumer)); let editor = Arc::new(Self { @@ -54,6 +56,7 @@ impl ClientDocEditor { rev_manager, ws_manager, edit_cmd_tx, + sink_vec, user, }); Ok(editor) @@ -199,6 +202,7 @@ struct DocumentWebSocketSteamConsumerAdapter { edit_cmd_tx: UnboundedSender, rev_manager: Arc, user: Arc, + sink_vec: SinkVec, } impl DocumentWebSocketSteamConsumer for DocumentWebSocketSteamConsumerAdapter { @@ -233,18 +237,28 @@ impl DocumentWebSocketSteamConsumer for DocumentWebSocketSteamConsumerAdapter { struct DocumentSinkDataProviderAdapter { rev_manager: Arc, + sink_vec: SinkVec, } impl DocumentSinkDataProvider for DocumentSinkDataProviderAdapter { - fn next(&self) -> FutureResult, FlowyError> { + fn next(&self) -> FutureResult, FlowyError> { let rev_manager = self.rev_manager.clone(); + let sink_vec = self.sink_vec.clone(); + FutureResult::new(async move { - match rev_manager.next_sync_revision().await? { - Some(rev) => { - tracing::debug!("[DocumentSinkDataProvider]: revision: {}:{:?}", rev.doc_id, rev.rev_id); - Ok(Some(rev.into())) - }, - None => Ok(None), + if sink_vec.read().is_empty() { + match rev_manager.next_sync_revision().await? { + Some(rev) => { + tracing::debug!("[DocumentSinkDataProvider]: revision: {}:{:?}", rev.doc_id, rev.rev_id); + Ok(Some(rev.into())) + }, + None => Ok(None), + } + } else { + match sink_vec.read().front() { + None => Ok(None), + Some(data) => Ok(Some(data.clone())), + } } }) } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/ws/mod.rs b/frontend/rust-lib/flowy-document/src/services/doc/ws/mod.rs new file mode 100644 index 0000000000..e69de29bb2 diff --git a/frontend/rust-lib/flowy-document/src/services/doc/ws_handlers.rs b/frontend/rust-lib/flowy-document/src/services/doc/ws_handlers.rs new file mode 100644 index 0000000000..1c36ffdd51 --- /dev/null +++ b/frontend/rust-lib/flowy-document/src/services/doc/ws_handlers.rs @@ -0,0 +1,67 @@ +use crate::errors::FlowyError; +use bytes::Bytes; +use dashmap::DashMap; +use flowy_collaboration::entities::ws::DocumentWSData; +use lib_ws::WSConnectState; +use std::{convert::TryInto, sync::Arc}; + +pub(crate) trait DocumentWsHandler: Send + Sync { + fn receive(&self, data: DocumentWSData); + fn connect_state_changed(&self, state: &WSConnectState); +} + +pub type WsStateReceiver = tokio::sync::broadcast::Receiver; +pub trait DocumentWebSocket: Send + Sync { + fn send(&self, data: DocumentWSData) -> Result<(), FlowyError>; + fn subscribe_state_changed(&self) -> WsStateReceiver; +} + +pub struct DocumentWsHandlers { + ws: Arc, + // key: the document id + handlers: Arc>>, +} + +impl DocumentWsHandlers { + pub fn new(ws: Arc) -> Self { + let handlers: Arc>> = Arc::new(DashMap::new()); + Self { ws, handlers } + } + + pub(crate) fn init(&self) { listen_ws_state_changed(self.ws.clone(), self.handlers.clone()); } + + pub(crate) fn register_handler(&self, id: &str, handler: Arc) { + if self.handlers.contains_key(id) { + log::error!("Duplicate handler registered for {:?}", id); + } + self.handlers.insert(id.to_string(), handler); + } + + pub(crate) fn remove_handler(&self, id: &str) { self.handlers.remove(id); } + + pub fn did_receive_data(&self, data: Bytes) { + let data: DocumentWSData = data.try_into().unwrap(); + match self.handlers.get(&data.doc_id) { + None => { + log::error!("Can't find any source handler for {:?}", data.doc_id); + }, + Some(handler) => { + handler.receive(data); + }, + } + } + + pub fn ws(&self) -> Arc { self.ws.clone() } +} + +#[tracing::instrument(level = "debug", skip(ws, handlers))] +fn listen_ws_state_changed(ws: Arc, handlers: Arc>>) { + let mut notify = ws.subscribe_state_changed(); + tokio::spawn(async move { + while let Ok(state) = notify.recv().await { + handlers.iter().for_each(|handle| { + handle.value().connect_state_changed(&state); + }); + } + }); +} diff --git a/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs b/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs index b22e4fac48..ea06008889 100644 --- a/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs +++ b/frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs @@ -1,11 +1,11 @@ -use crate::services::ws::{FlowyError, FlowyWebSocket, FlowyWsSender, WsConnectState, WsMessage, WsMessageReceiver}; +use crate::services::ws::{FlowyError, FlowyWebSocket, FlowyWsSender, WSConnectState, WSMessage, WSMessageReceiver}; use bytes::Bytes; use dashmap::DashMap; use flowy_collaboration::{ core::sync::{RevisionUser, ServerDocManager, ServerDocPersistence, SyncResponse}, entities::{ doc::Doc, - ws::{WsDocumentData, WsDocumentDataType}, + ws::{DocumentWSData, DocumentWSDataType}, }, errors::CollaborateError, Revision, @@ -13,7 +13,7 @@ use flowy_collaboration::{ }; use lazy_static::lazy_static; use lib_infra::future::{FutureResult, FutureResultSend}; -use lib_ws::WsModule; +use lib_ws::WSModule; use parking_lot::RwLock; use std::{ convert::{TryFrom, TryInto}, @@ -22,9 +22,9 @@ use std::{ use tokio::sync::{broadcast, broadcast::Receiver, mpsc}; pub struct MockWebSocket { - handlers: DashMap>, - state_sender: broadcast::Sender, - ws_sender: broadcast::Sender, + handlers: DashMap>, + state_sender: broadcast::Sender, + ws_sender: broadcast::Sender, is_stop: RwLock, } @@ -56,7 +56,7 @@ impl FlowyWebSocket for Arc { if *cloned_ws.is_stop.read() { // do nothing } else { - let ws_data = WsDocumentData::try_from(Bytes::from(message.data.clone())).unwrap(); + let ws_data = DocumentWSData::try_from(Bytes::from(message.data.clone())).unwrap(); let mut rx = DOC_SERVER.handle_ws_data(ws_data).await; let new_ws_message = rx.recv().await.unwrap(); match cloned_ws.handlers.get(&new_ws_message.module) { @@ -75,11 +75,11 @@ impl FlowyWebSocket for Arc { FutureResult::new(async { Ok(()) }) } - fn subscribe_connect_state(&self) -> Receiver { self.state_sender.subscribe() } + fn subscribe_connect_state(&self) -> Receiver { self.state_sender.subscribe() } fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } - fn add_message_receiver(&self, handler: Arc) -> Result<(), FlowyError> { + fn add_message_receiver(&self, handler: Arc) -> Result<(), FlowyError> { let source = handler.source(); if self.handlers.contains_key(&source) { tracing::error!("WsSource's {:?} is already registered", source); @@ -108,13 +108,13 @@ impl std::default::Default for MockDocServer { } impl MockDocServer { - async fn handle_ws_data(&self, ws_data: WsDocumentData) -> mpsc::Receiver { + async fn handle_ws_data(&self, ws_data: DocumentWSData) -> mpsc::Receiver { let bytes = Bytes::from(ws_data.data); match ws_data.ty { - WsDocumentDataType::Acked => { + DocumentWSDataType::Acked => { unimplemented!() }, - WsDocumentDataType::PushRev => { + DocumentWSDataType::PushRev => { let revision = Revision::try_from(bytes).unwrap(); let handler = match self.manager.get(&revision.doc_id).await { None => self.manager.create_doc(revision.clone()).await.unwrap(), @@ -129,10 +129,10 @@ impl MockDocServer { handler.apply_revision(Arc::new(user), revision).await.unwrap(); rx }, - WsDocumentDataType::PullRev => { + DocumentWSDataType::PullRev => { unimplemented!() }, - WsDocumentDataType::UserConnect => { + DocumentWSDataType::UserConnect => { unimplemented!() }, } @@ -184,7 +184,7 @@ impl ServerDocPersistence for MockDocServerPersistence { #[derive(Debug)] struct MockDocUser { user_id: String, - tx: mpsc::Sender, + tx: mpsc::Sender, } impl RevisionUser for MockDocUser { @@ -196,24 +196,24 @@ impl RevisionUser for MockDocUser { match resp { SyncResponse::Pull(data) => { let bytes: Bytes = data.try_into().unwrap(); - let msg = WsMessage { - module: WsModule::Doc, + let msg = WSMessage { + module: WSModule::Doc, data: bytes.to_vec(), }; sender.send(msg).await.unwrap(); }, SyncResponse::Push(data) => { let bytes: Bytes = data.try_into().unwrap(); - let msg = WsMessage { - module: WsModule::Doc, + let msg = WSMessage { + module: WSModule::Doc, data: bytes.to_vec(), }; sender.send(msg).await.unwrap(); }, SyncResponse::Ack(data) => { let bytes: Bytes = data.try_into().unwrap(); - let msg = WsMessage { - module: WsModule::Doc, + let msg = WSMessage { + module: WSModule::Doc, data: bytes.to_vec(), }; sender.send(msg).await.unwrap(); diff --git a/frontend/rust-lib/flowy-net/src/services/ws/conn.rs b/frontend/rust-lib/flowy-net/src/services/ws/conn.rs index fa3d5b9185..4f2f6e2f4c 100644 --- a/frontend/rust-lib/flowy-net/src/services/ws/conn.rs +++ b/frontend/rust-lib/flowy-net/src/services/ws/conn.rs @@ -3,17 +3,17 @@ use std::sync::Arc; use tokio::sync::broadcast; pub use flowy_error::FlowyError; -pub use lib_ws::{WsConnectState, WsMessage, WsMessageReceiver}; +pub use lib_ws::{WSConnectState, WSMessage, WSMessageReceiver}; pub trait FlowyWebSocket: Send + Sync { fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError>; fn stop_connect(&self) -> FutureResult<(), FlowyError>; - fn subscribe_connect_state(&self) -> broadcast::Receiver; + fn subscribe_connect_state(&self) -> broadcast::Receiver; fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError>; - fn add_message_receiver(&self, handler: Arc) -> Result<(), FlowyError>; + fn add_message_receiver(&self, handler: Arc) -> Result<(), FlowyError>; fn ws_sender(&self) -> Result, FlowyError>; } pub trait FlowyWsSender: Send + Sync { - fn send(&self, msg: WsMessage) -> Result<(), FlowyError>; + fn send(&self, msg: WSMessage) -> Result<(), FlowyError>; } diff --git a/frontend/rust-lib/flowy-net/src/services/ws/manager.rs b/frontend/rust-lib/flowy-net/src/services/ws/manager.rs index fac7835b6f..53a94f6b1c 100644 --- a/frontend/rust-lib/flowy-net/src/services/ws/manager.rs +++ b/frontend/rust-lib/flowy-net/src/services/ws/manager.rs @@ -4,7 +4,7 @@ use crate::{ }; use flowy_error::{internal_error, FlowyError}; use lib_infra::future::FutureResult; -use lib_ws::{WsConnectState, WsController, WsMessage, WsMessageReceiver, WsSender}; +use lib_ws::{WSConnectState, WSController, WSMessage, WSMessageReceiver, WSSender}; use parking_lot::RwLock; use std::sync::Arc; use tokio::sync::{broadcast, broadcast::Receiver}; @@ -19,7 +19,7 @@ pub struct WsManager { impl WsManager { pub fn new(addr: String) -> Self { let ws: Arc = if cfg!(feature = "http_server") { - Arc::new(Arc::new(WsController::new())) + Arc::new(Arc::new(WSController::new())) } else { local_web_socket() }; @@ -65,13 +65,13 @@ impl WsManager { } } - pub fn subscribe_websocket_state(&self) -> broadcast::Receiver { + pub fn subscribe_websocket_state(&self) -> broadcast::Receiver { self.inner.subscribe_connect_state() } pub fn subscribe_network_ty(&self) -> broadcast::Receiver { self.status_notifier.subscribe() } - pub fn add_receiver(&self, handler: Arc) -> Result<(), FlowyError> { + pub fn add_receiver(&self, handler: Arc) -> Result<(), FlowyError> { let _ = self.inner.add_message_receiver(handler)?; Ok(()) } @@ -88,10 +88,10 @@ fn listen_on_websocket(ws: Arc) { Ok(state) => { tracing::info!("Websocket state changed: {}", state); match state { - WsConnectState::Init => {}, - WsConnectState::Connected => {}, - WsConnectState::Connecting => {}, - WsConnectState::Disconnected => retry_connect(ws.clone(), 100).await, + WSConnectState::Init => {}, + WSConnectState::Connected => {}, + WSConnectState::Connecting => {}, + WSConnectState::Disconnected => retry_connect(ws.clone(), 100).await, } }, Err(e) => { @@ -112,7 +112,7 @@ async fn retry_connect(ws: Arc, count: usize) { } } -impl FlowyWebSocket for Arc { +impl FlowyWebSocket for Arc { fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError> { let cloned_ws = self.clone(); FutureResult::new(async move { @@ -129,7 +129,7 @@ impl FlowyWebSocket for Arc { }) } - fn subscribe_connect_state(&self) -> Receiver { self.subscribe_state() } + fn subscribe_connect_state(&self) -> Receiver { self.subscribe_state() } fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError> { let cloned_ws = self.clone(); @@ -139,7 +139,7 @@ impl FlowyWebSocket for Arc { }) } - fn add_message_receiver(&self, handler: Arc) -> Result<(), FlowyError> { + fn add_message_receiver(&self, handler: Arc) -> Result<(), FlowyError> { let _ = self.add_receiver(handler).map_err(internal_error)?; Ok(()) } @@ -150,8 +150,8 @@ impl FlowyWebSocket for Arc { } } -impl FlowyWsSender for WsSender { - fn send(&self, msg: WsMessage) -> Result<(), FlowyError> { +impl FlowyWsSender for WSSender { + fn send(&self, msg: WSMessage) -> Result<(), FlowyError> { let _ = self.send_msg(msg).map_err(internal_error)?; Ok(()) } diff --git a/frontend/rust-lib/flowy-net/src/services/ws/ws_local.rs b/frontend/rust-lib/flowy-net/src/services/ws/ws_local.rs index 692b9244fe..2450505d9d 100644 --- a/frontend/rust-lib/flowy-net/src/services/ws/ws_local.rs +++ b/frontend/rust-lib/flowy-net/src/services/ws/ws_local.rs @@ -1,11 +1,11 @@ -use crate::services::ws::{FlowyError, FlowyWebSocket, FlowyWsSender, WsConnectState, WsMessage, WsMessageReceiver}; +use crate::services::ws::{FlowyError, FlowyWebSocket, FlowyWsSender, WSConnectState, WSMessage, WSMessageReceiver}; use lib_infra::future::FutureResult; use std::sync::Arc; use tokio::sync::{broadcast, broadcast::Receiver}; pub(crate) struct LocalWebSocket { - state_sender: broadcast::Sender, - ws_sender: broadcast::Sender, + state_sender: broadcast::Sender, + ws_sender: broadcast::Sender, } impl std::default::Default for LocalWebSocket { @@ -24,17 +24,17 @@ impl FlowyWebSocket for Arc { fn stop_connect(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } - fn subscribe_connect_state(&self) -> Receiver { self.state_sender.subscribe() } + fn subscribe_connect_state(&self) -> Receiver { self.state_sender.subscribe() } fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } - fn add_message_receiver(&self, _handler: Arc) -> Result<(), FlowyError> { Ok(()) } + fn add_message_receiver(&self, _handler: Arc) -> Result<(), FlowyError> { Ok(()) } fn ws_sender(&self) -> Result, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) } } -impl FlowyWsSender for broadcast::Sender { - fn send(&self, msg: WsMessage) -> Result<(), FlowyError> { +impl FlowyWsSender for broadcast::Sender { + fn send(&self, msg: WSMessage) -> Result<(), FlowyError> { let _ = self.send(msg); Ok(()) } diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs index 6ad1d7c0f4..036b082f7d 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs @@ -1,5 +1,5 @@ use bytes::Bytes; -use flowy_collaboration::entities::ws::WsDocumentData; +use flowy_collaboration::entities::ws::DocumentWSData; use flowy_database::ConnectionPool; use flowy_document::{ errors::{internal_error, FlowyError}, @@ -8,7 +8,7 @@ use flowy_document::{ }; use flowy_net::services::ws::WsManager; use flowy_user::services::user::UserSession; -use lib_ws::{WsMessage, WsMessageReceiver, WsModule}; +use lib_ws::{WSMessage, WSMessageReceiver, WSModule}; use std::{convert::TryInto, path::Path, sync::Arc}; pub struct DocumentDepsResolver(); @@ -61,10 +61,10 @@ struct WsSenderImpl { } impl DocumentWebSocket for WsSenderImpl { - fn send(&self, data: WsDocumentData) -> Result<(), FlowyError> { + fn send(&self, data: DocumentWSData) -> Result<(), FlowyError> { let bytes: Bytes = data.try_into().unwrap(); - let msg = WsMessage { - module: WsModule::Doc, + let msg = WSMessage { + module: WSModule::Doc, data: bytes.to_vec(), }; let sender = self.ws_manager.ws_sender().map_err(internal_error)?; @@ -78,7 +78,7 @@ impl DocumentWebSocket for WsSenderImpl { struct WsMessageReceiverAdaptor(Arc); -impl WsMessageReceiver for WsMessageReceiverAdaptor { - fn source(&self) -> WsModule { WsModule::Doc } - fn receive_message(&self, msg: WsMessage) { self.0.did_receive_data(Bytes::from(msg.data)); } +impl WSMessageReceiver for WsMessageReceiverAdaptor { + fn source(&self) -> WSModule { WSModule::Doc } + fn receive_message(&self, msg: WSMessage) { self.0.did_receive_data(Bytes::from(msg.data)); } } diff --git a/frontend/scripts/flowy-tool/src/proto/ast.rs b/frontend/scripts/flowy-tool/src/proto/ast.rs index 59e7619312..92d94d6f43 100644 --- a/frontend/scripts/flowy-tool/src/proto/ast.rs +++ b/frontend/scripts/flowy-tool/src/proto/ast.rs @@ -46,7 +46,7 @@ fn parse_files_protobuf(proto_crate_path: &str, proto_output_dir: &str) -> Vec

{ // Do nothing log::warn!("Applied revision rev_id is the same as cur_rev_id"); - user.recv(SyncResponse::Ack(mk_acked_message(&revision))); + let data = WsDocumentDataBuilder::build_acked_message(&revision.doc_id, revision.rev_id); + user.recv(SyncResponse::Ack(data)); }, Ordering::Greater => { // The client document is outdated. Transform the client revision delta and then // send the prime delta to the client. Client should compose the this prime // delta. let cli_revision = self.transform_revision(&revision)?; - let data = mk_push_message(&self.doc_id, cli_revision); + let data = WsDocumentDataBuilder::build_push_rev_message(&self.doc_id, cli_revision); user.recv(SyncResponse::Push(data)); }, } @@ -143,44 +152,6 @@ impl RevisionSynchronizer { } } -fn mk_push_message(doc_id: &str, revision: Revision) -> WsDocumentData { - let bytes: Bytes = revision.try_into().unwrap(); - WsDocumentData { - doc_id: doc_id.to_string(), - ty: WsDocumentDataType::PushRev, - data: bytes.to_vec(), - } -} - -fn mk_pull_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsDocumentData { - let range = RevisionRange { - doc_id: doc_id.to_string(), - start: from_rev_id, - end: to_rev_id, - }; - - let bytes: Bytes = range.try_into().unwrap(); - WsDocumentData { - doc_id: doc_id.to_string(), - ty: WsDocumentDataType::PullRev, - data: bytes.to_vec(), - } -} - -fn mk_acked_message(revision: &Revision) -> WsDocumentData { - // let mut wtr = vec![]; - // let _ = wtr.write_i64::(revision.rev_id); - let mut rev_id = RevId::new(); - rev_id.set_value(revision.rev_id); - let data = rev_id.write_to_bytes().unwrap(); - - WsDocumentData { - doc_id: revision.doc_id.clone(), - ty: WsDocumentDataType::Acked, - data, - } -} - #[inline] fn next(rev_id: i64) -> i64 { rev_id + 1 } diff --git a/shared-lib/flowy-collaboration/src/entities/ws/ws.rs b/shared-lib/flowy-collaboration/src/entities/ws/ws.rs index a2e1e31d2f..a13ac2fa79 100644 --- a/shared-lib/flowy-collaboration/src/entities/ws/ws.rs +++ b/shared-lib/flowy-collaboration/src/entities/ws/ws.rs @@ -5,7 +5,7 @@ use lib_ot::revision::{RevId, Revision, RevisionRange}; use std::convert::{TryFrom, TryInto}; #[derive(Debug, Clone, ProtoBuf_Enum, Eq, PartialEq, Hash)] -pub enum WsDocumentDataType { +pub enum DocumentWSDataType { // The frontend receives the Acked means the backend has accepted the revision Acked = 0, // The frontend receives the PushRev event means the backend is pushing the new revision to frontend @@ -15,7 +15,7 @@ pub enum WsDocumentDataType { UserConnect = 3, } -impl WsDocumentDataType { +impl DocumentWSDataType { pub fn data(&self, bytes: Bytes) -> Result where T: TryFrom, @@ -24,64 +24,75 @@ impl WsDocumentDataType { } } -impl std::default::Default for WsDocumentDataType { - fn default() -> Self { WsDocumentDataType::Acked } +impl std::default::Default for DocumentWSDataType { + fn default() -> Self { DocumentWSDataType::Acked } } #[derive(ProtoBuf, Default, Debug, Clone)] -pub struct WsDocumentData { +pub struct DocumentWSData { #[pb(index = 1)] pub doc_id: String, #[pb(index = 2)] - pub ty: WsDocumentDataType, + pub ty: DocumentWSDataType, #[pb(index = 3)] pub data: Vec, + + #[pb(index = 4, one_of)] + pub id: Option, } -impl std::convert::From for WsDocumentData { +impl std::convert::From for DocumentWSData { fn from(revision: Revision) -> Self { let doc_id = revision.doc_id.clone(); + let rev_id = revision.rev_id; let bytes: Bytes = revision.try_into().unwrap(); Self { doc_id, - ty: WsDocumentDataType::PushRev, + ty: DocumentWSDataType::PushRev, data: bytes.to_vec(), + id: Some(rev_id), } } } pub struct WsDocumentDataBuilder(); impl WsDocumentDataBuilder { - // WsDocumentDataType::PushRev -> Revision - pub fn build_push_rev_message(doc_id: &str, revision: Revision) -> WsDocumentData { + // DocumentWSDataType::PushRev -> Revision + pub fn build_push_rev_message(doc_id: &str, revision: Revision) -> DocumentWSData { + let rev_id = revision.rev_id; let bytes: Bytes = revision.try_into().unwrap(); - WsDocumentData { + DocumentWSData { doc_id: doc_id.to_string(), - ty: WsDocumentDataType::PushRev, + ty: DocumentWSDataType::PushRev, data: bytes.to_vec(), + id: Some(rev_id), } } - // WsDocumentDataType::PullRev -> RevisionRange - pub fn build_push_pull_message(doc_id: &str, range: RevisionRange) -> WsDocumentData { + // DocumentWSDataType::PullRev -> RevisionRange + pub fn build_push_pull_message(doc_id: &str, range: RevisionRange) -> DocumentWSData { let bytes: Bytes = range.try_into().unwrap(); - WsDocumentData { + DocumentWSData { doc_id: doc_id.to_string(), - ty: WsDocumentDataType::PullRev, + ty: DocumentWSDataType::PullRev, data: bytes.to_vec(), + id: None, } } - // WsDocumentDataType::Acked -> RevId - pub fn build_acked_message(doc_id: &str, rev_id: i64) -> WsDocumentData { + // DocumentWSDataType::Acked -> RevId + pub fn build_acked_message(doc_id: &str, rev_id: i64) -> DocumentWSData { + let cloned_rev_id = rev_id.clone(); let rev_id: RevId = rev_id.into(); let bytes: Bytes = rev_id.try_into().unwrap(); - WsDocumentData { + + DocumentWSData { doc_id: doc_id.to_string(), - ty: WsDocumentDataType::Acked, + ty: DocumentWSDataType::Acked, data: bytes.to_vec(), + id: Some(cloned_rev_id), } } } diff --git a/shared-lib/flowy-collaboration/src/protobuf/model/ws.rs b/shared-lib/flowy-collaboration/src/protobuf/model/ws.rs index 394459aaff..703c6b305f 100644 --- a/shared-lib/flowy-collaboration/src/protobuf/model/ws.rs +++ b/shared-lib/flowy-collaboration/src/protobuf/model/ws.rs @@ -24,24 +24,31 @@ // const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_22_1; #[derive(PartialEq,Clone,Default)] -pub struct WsDocumentData { +pub struct DocumentWSData { // message fields pub doc_id: ::std::string::String, - pub ty: WsDocumentDataType, + pub ty: DocumentWSDataType, pub data: ::std::vec::Vec, + // message oneof groups + pub one_of_id: ::std::option::Option, // special fields pub unknown_fields: ::protobuf::UnknownFields, pub cached_size: ::protobuf::CachedSize, } -impl<'a> ::std::default::Default for &'a WsDocumentData { - fn default() -> &'a WsDocumentData { - ::default_instance() +impl<'a> ::std::default::Default for &'a DocumentWSData { + fn default() -> &'a DocumentWSData { + ::default_instance() } } -impl WsDocumentData { - pub fn new() -> WsDocumentData { +#[derive(Clone,PartialEq,Debug)] +pub enum DocumentWSData_oneof_one_of_id { + id(i64), +} + +impl DocumentWSData { + pub fn new() -> DocumentWSData { ::std::default::Default::default() } @@ -71,18 +78,18 @@ impl WsDocumentData { ::std::mem::replace(&mut self.doc_id, ::std::string::String::new()) } - // .WsDocumentDataType ty = 2; + // .DocumentWSDataType ty = 2; - pub fn get_ty(&self) -> WsDocumentDataType { + pub fn get_ty(&self) -> DocumentWSDataType { self.ty } pub fn clear_ty(&mut self) { - self.ty = WsDocumentDataType::Acked; + self.ty = DocumentWSDataType::Acked; } // Param is passed by value, moved - pub fn set_ty(&mut self, v: WsDocumentDataType) { + pub fn set_ty(&mut self, v: DocumentWSDataType) { self.ty = v; } @@ -111,9 +118,34 @@ impl WsDocumentData { pub fn take_data(&mut self) -> ::std::vec::Vec { ::std::mem::replace(&mut self.data, ::std::vec::Vec::new()) } + + // int64 id = 4; + + + pub fn get_id(&self) -> i64 { + match self.one_of_id { + ::std::option::Option::Some(DocumentWSData_oneof_one_of_id::id(v)) => v, + _ => 0, + } + } + pub fn clear_id(&mut self) { + self.one_of_id = ::std::option::Option::None; + } + + pub fn has_id(&self) -> bool { + match self.one_of_id { + ::std::option::Option::Some(DocumentWSData_oneof_one_of_id::id(..)) => true, + _ => false, + } + } + + // Param is passed by value, moved + pub fn set_id(&mut self, v: i64) { + self.one_of_id = ::std::option::Option::Some(DocumentWSData_oneof_one_of_id::id(v)) + } } -impl ::protobuf::Message for WsDocumentData { +impl ::protobuf::Message for DocumentWSData { fn is_initialized(&self) -> bool { true } @@ -131,6 +163,12 @@ impl ::protobuf::Message for WsDocumentData { 3 => { ::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.data)?; }, + 4 => { + if wire_type != ::protobuf::wire_format::WireTypeVarint { + return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); + } + self.one_of_id = ::std::option::Option::Some(DocumentWSData_oneof_one_of_id::id(is.read_int64()?)); + }, _ => { ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; }, @@ -146,12 +184,19 @@ impl ::protobuf::Message for WsDocumentData { if !self.doc_id.is_empty() { my_size += ::protobuf::rt::string_size(1, &self.doc_id); } - if self.ty != WsDocumentDataType::Acked { + if self.ty != DocumentWSDataType::Acked { my_size += ::protobuf::rt::enum_size(2, self.ty); } if !self.data.is_empty() { my_size += ::protobuf::rt::bytes_size(3, &self.data); } + if let ::std::option::Option::Some(ref v) = self.one_of_id { + match v { + &DocumentWSData_oneof_one_of_id::id(v) => { + my_size += ::protobuf::rt::value_size(4, v, ::protobuf::wire_format::WireTypeVarint); + }, + }; + } my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); self.cached_size.set(my_size); my_size @@ -161,12 +206,19 @@ impl ::protobuf::Message for WsDocumentData { if !self.doc_id.is_empty() { os.write_string(1, &self.doc_id)?; } - if self.ty != WsDocumentDataType::Acked { + if self.ty != DocumentWSDataType::Acked { os.write_enum(2, ::protobuf::ProtobufEnum::value(&self.ty))?; } if !self.data.is_empty() { os.write_bytes(3, &self.data)?; } + if let ::std::option::Option::Some(ref v) = self.one_of_id { + match v { + &DocumentWSData_oneof_one_of_id::id(v) => { + os.write_int64(4, v)?; + }, + }; + } os.write_unknown_fields(self.get_unknown_fields())?; ::std::result::Result::Ok(()) } @@ -197,8 +249,8 @@ impl ::protobuf::Message for WsDocumentData { Self::descriptor_static() } - fn new() -> WsDocumentData { - WsDocumentData::new() + fn new() -> DocumentWSData { + DocumentWSData::new() } fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { @@ -207,49 +259,55 @@ impl ::protobuf::Message for WsDocumentData { let mut fields = ::std::vec::Vec::new(); fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( "doc_id", - |m: &WsDocumentData| { &m.doc_id }, - |m: &mut WsDocumentData| { &mut m.doc_id }, + |m: &DocumentWSData| { &m.doc_id }, + |m: &mut DocumentWSData| { &mut m.doc_id }, )); - fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum>( + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum>( "ty", - |m: &WsDocumentData| { &m.ty }, - |m: &mut WsDocumentData| { &mut m.ty }, + |m: &DocumentWSData| { &m.ty }, + |m: &mut DocumentWSData| { &mut m.ty }, )); fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( "data", - |m: &WsDocumentData| { &m.data }, - |m: &mut WsDocumentData| { &mut m.data }, + |m: &DocumentWSData| { &m.data }, + |m: &mut DocumentWSData| { &mut m.data }, )); - ::protobuf::reflect::MessageDescriptor::new_pb_name::( - "WsDocumentData", + fields.push(::protobuf::reflect::accessor::make_singular_i64_accessor::<_>( + "id", + DocumentWSData::has_id, + DocumentWSData::get_id, + )); + ::protobuf::reflect::MessageDescriptor::new_pb_name::( + "DocumentWSData", fields, file_descriptor_proto() ) }) } - fn default_instance() -> &'static WsDocumentData { - static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; - instance.get(WsDocumentData::new) + fn default_instance() -> &'static DocumentWSData { + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(DocumentWSData::new) } } -impl ::protobuf::Clear for WsDocumentData { +impl ::protobuf::Clear for DocumentWSData { fn clear(&mut self) { self.doc_id.clear(); - self.ty = WsDocumentDataType::Acked; + self.ty = DocumentWSDataType::Acked; self.data.clear(); + self.one_of_id = ::std::option::Option::None; self.unknown_fields.clear(); } } -impl ::std::fmt::Debug for WsDocumentData { +impl ::std::fmt::Debug for DocumentWSData { fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { ::protobuf::text_format::fmt(self, f) } } -impl ::protobuf::reflect::ProtobufValue for WsDocumentData { +impl ::protobuf::reflect::ProtobufValue for DocumentWSData { fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { ::protobuf::reflect::ReflectValueRef::Message(self) } @@ -492,34 +550,34 @@ impl ::protobuf::reflect::ProtobufValue for DocumentConnected { } #[derive(Clone,PartialEq,Eq,Debug,Hash)] -pub enum WsDocumentDataType { +pub enum DocumentWSDataType { Acked = 0, PushRev = 1, PullRev = 2, UserConnect = 3, } -impl ::protobuf::ProtobufEnum for WsDocumentDataType { +impl ::protobuf::ProtobufEnum for DocumentWSDataType { fn value(&self) -> i32 { *self as i32 } - fn from_i32(value: i32) -> ::std::option::Option { + fn from_i32(value: i32) -> ::std::option::Option { match value { - 0 => ::std::option::Option::Some(WsDocumentDataType::Acked), - 1 => ::std::option::Option::Some(WsDocumentDataType::PushRev), - 2 => ::std::option::Option::Some(WsDocumentDataType::PullRev), - 3 => ::std::option::Option::Some(WsDocumentDataType::UserConnect), + 0 => ::std::option::Option::Some(DocumentWSDataType::Acked), + 1 => ::std::option::Option::Some(DocumentWSDataType::PushRev), + 2 => ::std::option::Option::Some(DocumentWSDataType::PullRev), + 3 => ::std::option::Option::Some(DocumentWSDataType::UserConnect), _ => ::std::option::Option::None } } fn values() -> &'static [Self] { - static values: &'static [WsDocumentDataType] = &[ - WsDocumentDataType::Acked, - WsDocumentDataType::PushRev, - WsDocumentDataType::PullRev, - WsDocumentDataType::UserConnect, + static values: &'static [DocumentWSDataType] = &[ + DocumentWSDataType::Acked, + DocumentWSDataType::PushRev, + DocumentWSDataType::PullRev, + DocumentWSDataType::UserConnect, ]; values } @@ -527,63 +585,68 @@ impl ::protobuf::ProtobufEnum for WsDocumentDataType { fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor { static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::EnumDescriptor> = ::protobuf::rt::LazyV2::INIT; descriptor.get(|| { - ::protobuf::reflect::EnumDescriptor::new_pb_name::("WsDocumentDataType", file_descriptor_proto()) + ::protobuf::reflect::EnumDescriptor::new_pb_name::("DocumentWSDataType", file_descriptor_proto()) }) } } -impl ::std::marker::Copy for WsDocumentDataType { +impl ::std::marker::Copy for DocumentWSDataType { } -impl ::std::default::Default for WsDocumentDataType { +impl ::std::default::Default for DocumentWSDataType { fn default() -> Self { - WsDocumentDataType::Acked + DocumentWSDataType::Acked } } -impl ::protobuf::reflect::ProtobufValue for WsDocumentDataType { +impl ::protobuf::reflect::ProtobufValue for DocumentWSDataType { fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { ::protobuf::reflect::ReflectValueRef::Enum(::protobuf::ProtobufEnum::descriptor(self)) } } static file_descriptor_proto_data: &'static [u8] = b"\ - \n\x08ws.proto\"`\n\x0eWsDocumentData\x12\x15\n\x06doc_id\x18\x01\x20\ - \x01(\tR\x05docId\x12#\n\x02ty\x18\x02\x20\x01(\x0e2\x13.WsDocumentDataT\ - ypeR\x02ty\x12\x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data\"Z\n\x11Docum\ - entConnected\x12\x17\n\x07user_id\x18\x01\x20\x01(\tR\x06userId\x12\x15\ - \n\x06doc_id\x18\x02\x20\x01(\tR\x05docId\x12\x15\n\x06rev_id\x18\x03\ - \x20\x01(\x03R\x05revId*J\n\x12WsDocumentDataType\x12\t\n\x05Acked\x10\0\ - \x12\x0b\n\x07PushRev\x10\x01\x12\x0b\n\x07PullRev\x10\x02\x12\x0f\n\x0b\ - UserConnect\x10\x03J\xc8\x04\n\x06\x12\x04\0\0\x11\x01\n\x08\n\x01\x0c\ - \x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x06\x01\n\n\n\x03\x04\0\ - \x01\x12\x03\x02\x08\x16\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x16\n\ - \x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\ - \x12\x03\x03\x0b\x11\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x14\x15\n\ - \x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x1e\n\x0c\n\x05\x04\0\x02\x01\ - \x06\x12\x03\x04\x04\x16\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\x17\ - \x19\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x1c\x1d\n\x0b\n\x04\x04\0\ - \x02\x02\x12\x03\x05\x04\x13\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\ - \x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x0e\n\x0c\n\x05\x04\0\ - \x02\x02\x03\x12\x03\x05\x11\x12\n\n\n\x02\x04\x01\x12\x04\x07\0\x0b\x01\ - \n\n\n\x03\x04\x01\x01\x12\x03\x07\x08\x19\n\x0b\n\x04\x04\x01\x02\0\x12\ - \x03\x08\x04\x17\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\x08\x04\n\n\x0c\n\ - \x05\x04\x01\x02\0\x01\x12\x03\x08\x0b\x12\n\x0c\n\x05\x04\x01\x02\0\x03\ - \x12\x03\x08\x15\x16\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\t\x04\x16\n\x0c\ - \n\x05\x04\x01\x02\x01\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\x01\x02\x01\ - \x01\x12\x03\t\x0b\x11\n\x0c\n\x05\x04\x01\x02\x01\x03\x12\x03\t\x14\x15\ - \n\x0b\n\x04\x04\x01\x02\x02\x12\x03\n\x04\x15\n\x0c\n\x05\x04\x01\x02\ - \x02\x05\x12\x03\n\x04\t\n\x0c\n\x05\x04\x01\x02\x02\x01\x12\x03\n\n\x10\ - \n\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\n\x13\x14\n\n\n\x02\x05\0\x12\ - \x04\x0c\0\x11\x01\n\n\n\x03\x05\0\x01\x12\x03\x0c\x05\x17\n\x0b\n\x04\ - \x05\0\x02\0\x12\x03\r\x04\x0e\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\r\x04\ - \t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\r\x0c\r\n\x0b\n\x04\x05\0\x02\x01\ - \x12\x03\x0e\x04\x10\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x0e\x04\x0b\n\ - \x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x0e\x0e\x0f\n\x0b\n\x04\x05\0\x02\ - \x02\x12\x03\x0f\x04\x10\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\x0f\x04\ - \x0b\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\x0f\x0e\x0f\n\x0b\n\x04\x05\0\ - \x02\x03\x12\x03\x10\x04\x14\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\x10\ - \x04\x0f\n\x0c\n\x05\x05\0\x02\x03\x02\x12\x03\x10\x12\x13b\x06proto3\ + \n\x08ws.proto\"\x7f\n\x0eDocumentWSData\x12\x15\n\x06doc_id\x18\x01\x20\ + \x01(\tR\x05docId\x12#\n\x02ty\x18\x02\x20\x01(\x0e2\x13.DocumentWSDataT\ + ypeR\x02ty\x12\x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data\x12\x10\n\x02\ + id\x18\x04\x20\x01(\x03H\0R\x02idB\x0b\n\tone_of_id\"Z\n\x11DocumentConn\ + ected\x12\x17\n\x07user_id\x18\x01\x20\x01(\tR\x06userId\x12\x15\n\x06do\ + c_id\x18\x02\x20\x01(\tR\x05docId\x12\x15\n\x06rev_id\x18\x03\x20\x01(\ + \x03R\x05revId*J\n\x12DocumentWSDataType\x12\t\n\x05Acked\x10\0\x12\x0b\ + \n\x07PushRev\x10\x01\x12\x0b\n\x07PullRev\x10\x02\x12\x0f\n\x0bUserConn\ + ect\x10\x03J\x9a\x05\n\x06\x12\x04\0\0\x12\x01\n\x08\n\x01\x0c\x12\x03\0\ + \0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x07\x01\n\n\n\x03\x04\0\x01\x12\x03\ + \x02\x08\x16\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x16\n\x0c\n\x05\x04\ + \0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\ + \x0b\x11\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x14\x15\n\x0b\n\x04\x04\ + \0\x02\x01\x12\x03\x04\x04\x1e\n\x0c\n\x05\x04\0\x02\x01\x06\x12\x03\x04\ + \x04\x16\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\x17\x19\n\x0c\n\x05\ + \x04\0\x02\x01\x03\x12\x03\x04\x1c\x1d\n\x0b\n\x04\x04\0\x02\x02\x12\x03\ + \x05\x04\x13\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\n\x0c\n\x05\ + \x04\0\x02\x02\x01\x12\x03\x05\n\x0e\n\x0c\n\x05\x04\0\x02\x02\x03\x12\ + \x03\x05\x11\x12\n\x0b\n\x04\x04\0\x08\0\x12\x03\x06\x04%\n\x0c\n\x05\ + \x04\0\x08\0\x01\x12\x03\x06\n\x13\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x06\ + \x16#\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x06\x16\x1b\n\x0c\n\x05\x04\ + \0\x02\x03\x01\x12\x03\x06\x1c\x1e\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\ + \x06!\"\n\n\n\x02\x04\x01\x12\x04\x08\0\x0c\x01\n\n\n\x03\x04\x01\x01\ + \x12\x03\x08\x08\x19\n\x0b\n\x04\x04\x01\x02\0\x12\x03\t\x04\x17\n\x0c\n\ + \x05\x04\x01\x02\0\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\x01\x02\0\x01\x12\ + \x03\t\x0b\x12\n\x0c\n\x05\x04\x01\x02\0\x03\x12\x03\t\x15\x16\n\x0b\n\ + \x04\x04\x01\x02\x01\x12\x03\n\x04\x16\n\x0c\n\x05\x04\x01\x02\x01\x05\ + \x12\x03\n\x04\n\n\x0c\n\x05\x04\x01\x02\x01\x01\x12\x03\n\x0b\x11\n\x0c\ + \n\x05\x04\x01\x02\x01\x03\x12\x03\n\x14\x15\n\x0b\n\x04\x04\x01\x02\x02\ + \x12\x03\x0b\x04\x15\n\x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\x0b\x04\t\n\ + \x0c\n\x05\x04\x01\x02\x02\x01\x12\x03\x0b\n\x10\n\x0c\n\x05\x04\x01\x02\ + \x02\x03\x12\x03\x0b\x13\x14\n\n\n\x02\x05\0\x12\x04\r\0\x12\x01\n\n\n\ + \x03\x05\0\x01\x12\x03\r\x05\x17\n\x0b\n\x04\x05\0\x02\0\x12\x03\x0e\x04\ + \x0e\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x0e\x04\t\n\x0c\n\x05\x05\0\x02\ + \0\x02\x12\x03\x0e\x0c\r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x0f\x04\x10\n\ + \x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x0f\x04\x0b\n\x0c\n\x05\x05\0\x02\ + \x01\x02\x12\x03\x0f\x0e\x0f\n\x0b\n\x04\x05\0\x02\x02\x12\x03\x10\x04\ + \x10\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\x10\x04\x0b\n\x0c\n\x05\x05\0\ + \x02\x02\x02\x12\x03\x10\x0e\x0f\n\x0b\n\x04\x05\0\x02\x03\x12\x03\x11\ + \x04\x14\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\x11\x04\x0f\n\x0c\n\x05\ + \x05\0\x02\x03\x02\x12\x03\x11\x12\x13b\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/shared-lib/flowy-collaboration/src/protobuf/proto/ws.proto b/shared-lib/flowy-collaboration/src/protobuf/proto/ws.proto index 35a10bbbcf..9d0517d576 100644 --- a/shared-lib/flowy-collaboration/src/protobuf/proto/ws.proto +++ b/shared-lib/flowy-collaboration/src/protobuf/proto/ws.proto @@ -1,16 +1,17 @@ syntax = "proto3"; -message WsDocumentData { +message DocumentWSData { string doc_id = 1; - WsDocumentDataType ty = 2; + DocumentWSDataType ty = 2; bytes data = 3; + oneof one_of_id { int64 id = 4; }; } message DocumentConnected { string user_id = 1; string doc_id = 2; int64 rev_id = 3; } -enum WsDocumentDataType { +enum DocumentWSDataType { Acked = 0; PushRev = 1; PullRev = 2; diff --git a/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs b/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs index 4c178596a8..21677dfc6d 100644 --- a/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs +++ b/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs @@ -60,10 +60,10 @@ pub fn category_from_str(type_str: &str) -> TypeCategory { | "DocDelta" | "NewDocUser" | "DocIdentifier" - | "WsDocumentData" + | "DocumentWSData" | "DocumentConnected" - | "WsError" - | "WsMessage" + | "WSError" + | "WSMessage" | "Revision" | "RevId" | "RevisionRange" @@ -89,9 +89,9 @@ pub fn category_from_str(type_str: &str) -> TypeCategory { | "TrashType" | "ViewType" | "ExportType" - | "WsDocumentDataType" + | "DocumentWSDataType" | "ErrorCode" - | "WsModule" + | "WSModule" | "RevType" | "RevState" => TypeCategory::Enum, diff --git a/shared-lib/lib-ws/src/connect.rs b/shared-lib/lib-ws/src/connect.rs index e1848b46ed..b3fb181d37 100644 --- a/shared-lib/lib-ws/src/connect.rs +++ b/shared-lib/lib-ws/src/connect.rs @@ -1,6 +1,6 @@ #![allow(clippy::all)] use crate::{ - errors::{internal_error, WsError}, + errors::{internal_error, WSError}, MsgReceiver, MsgSender, }; @@ -24,16 +24,16 @@ use tokio_tungstenite::{ type WsConnectResult = Result<(WebSocketStream>, Response), Error>; #[pin_project] -pub struct WsConnectionFuture { +pub struct WSConnectionFuture { msg_tx: Option, ws_rx: Option, #[pin] fut: Pin + Send + Sync>>, } -impl WsConnectionFuture { +impl WSConnectionFuture { pub fn new(msg_tx: MsgSender, ws_rx: MsgReceiver, addr: String) -> Self { - WsConnectionFuture { + WSConnectionFuture { msg_tx: Some(msg_tx), ws_rx: Some(ws_rx), fut: Box::pin(async move { connect_async(&addr).await }), @@ -41,8 +41,8 @@ impl WsConnectionFuture { } } -impl Future for WsConnectionFuture { - type Output = Result; +impl Future for WSConnectionFuture { + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // [[pin]] // poll async function. The following methods not work. @@ -66,7 +66,7 @@ impl Future for WsConnectionFuture { self.msg_tx.take().expect("WsConnection should be call once "), self.ws_rx.take().expect("WsConnection should be call once "), ); - Poll::Ready(Ok(WsStream::new(msg_tx, ws_rx, stream))) + Poll::Ready(Ok(WSStream::new(msg_tx, ws_rx, stream))) }, Err(error) => { tracing::debug!("🐴 ws connect failed: {:?}", error); @@ -77,16 +77,16 @@ impl Future for WsConnectionFuture { } } -type Fut = BoxFuture<'static, Result<(), WsError>>; +type Fut = BoxFuture<'static, Result<(), WSError>>; #[pin_project] -pub struct WsStream { +pub struct WSStream { #[allow(dead_code)] msg_tx: MsgSender, #[pin] inner: Option<(Fut, Fut)>, } -impl WsStream { +impl WSStream { pub fn new(msg_tx: MsgSender, ws_rx: MsgReceiver, stream: WebSocketStream>) -> Self { let (ws_write, ws_read) = stream.split(); Self { @@ -110,7 +110,7 @@ impl WsStream { loop { match rx.recv().await { None => { - return Err(WsError::internal().context("WsStream rx closed unexpectedly")); + return Err(WSError::internal().context("WsStream rx closed unexpectedly")); }, Some(result) => { if result.is_err() { @@ -136,12 +136,12 @@ impl WsStream { } } -impl fmt::Debug for WsStream { +impl fmt::Debug for WSStream { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WsStream").finish() } } -impl Future for WsStream { - type Output = Result<(), WsError>; +impl Future for WSStream { + type Output = Result<(), WSError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let (mut ws_read, mut ws_write) = self.inner.take().unwrap(); @@ -161,11 +161,11 @@ impl Future for WsStream { } } -fn send_message(msg_tx: MsgSender, message: Result) -> Result<(), WsError> { +fn send_message(msg_tx: MsgSender, message: Result) -> Result<(), WSError> { match message { Ok(Message::Binary(bytes)) => msg_tx.unbounded_send(Message::Binary(bytes)).map_err(internal_error), Ok(_) => Ok(()), - Err(e) => Err(WsError::internal().context(e)), + Err(e) => Err(WSError::internal().context(e)), } } #[allow(dead_code)] diff --git a/shared-lib/lib-ws/src/errors.rs b/shared-lib/lib-ws/src/errors.rs index 44f59d61ae..9003f9ebf9 100644 --- a/shared-lib/lib-ws/src/errors.rs +++ b/shared-lib/lib-ws/src/errors.rs @@ -6,7 +6,7 @@ use tokio_tungstenite::tungstenite::{http::StatusCode, Message}; use url::ParseError; #[derive(Debug, Default, Clone, ProtoBuf)] -pub struct WsError { +pub struct WSError { #[pb(index = 1)] pub code: ErrorCode, @@ -14,11 +14,11 @@ pub struct WsError { pub msg: String, } -macro_rules! static_user_error { +macro_rules! static_ws_error { ($name:ident, $status:expr) => { #[allow(non_snake_case, missing_docs)] - pub fn $name() -> WsError { - WsError { + pub fn $name() -> WSError { + WSError { code: $status, msg: format!("{}", $status), } @@ -26,10 +26,10 @@ macro_rules! static_user_error { }; } -impl WsError { +impl WSError { #[allow(dead_code)] - pub(crate) fn new(code: ErrorCode) -> WsError { - WsError { + pub(crate) fn new(code: ErrorCode) -> WSError { + WSError { code, msg: "".to_string(), } @@ -40,16 +40,16 @@ impl WsError { self } - static_user_error!(internal, ErrorCode::InternalError); - static_user_error!(unsupported_message, ErrorCode::UnsupportedMessage); - static_user_error!(unauthorized, ErrorCode::Unauthorized); + static_ws_error!(internal, ErrorCode::InternalError); + static_ws_error!(unsupported_message, ErrorCode::UnsupportedMessage); + static_ws_error!(unauthorized, ErrorCode::Unauthorized); } -pub fn internal_error(e: T) -> WsError +pub fn internal_error(e: T) -> WSError where T: std::fmt::Debug, { - WsError::internal().context(e) + WSError::internal().context(e) } #[derive(Debug, Clone, ProtoBuf_Enum, Display, PartialEq, Eq)] @@ -63,29 +63,29 @@ impl std::default::Default for ErrorCode { fn default() -> Self { ErrorCode::InternalError } } -impl std::convert::From for WsError { - fn from(error: ParseError) -> Self { WsError::internal().context(error) } +impl std::convert::From for WSError { + fn from(error: ParseError) -> Self { WSError::internal().context(error) } } -impl std::convert::From for WsError { - fn from(error: protobuf::ProtobufError) -> Self { WsError::internal().context(error) } +impl std::convert::From for WSError { + fn from(error: protobuf::ProtobufError) -> Self { WSError::internal().context(error) } } -impl std::convert::From> for WsError { - fn from(error: TrySendError) -> Self { WsError::internal().context(error) } +impl std::convert::From> for WSError { + fn from(error: TrySendError) -> Self { WSError::internal().context(error) } } -impl std::convert::From for WsError { +impl std::convert::From for WSError { fn from(error: tokio_tungstenite::tungstenite::Error) -> Self { match error { tokio_tungstenite::tungstenite::Error::Http(response) => { if response.status() == StatusCode::UNAUTHORIZED { - WsError::unauthorized() + WSError::unauthorized() } else { - WsError::internal().context(response) + WSError::internal().context(response) } }, - _ => WsError::internal().context(error), + _ => WSError::internal().context(error), } } } diff --git a/shared-lib/lib-ws/src/msg.rs b/shared-lib/lib-ws/src/msg.rs index f7dcaa23b0..dc61309e26 100644 --- a/shared-lib/lib-ws/src/msg.rs +++ b/shared-lib/lib-ws/src/msg.rs @@ -5,33 +5,33 @@ use tokio_tungstenite::tungstenite::Message as TokioMessage; // Opti: using four bytes of the data to represent the source #[derive(ProtoBuf, Debug, Clone, Default)] -pub struct WsMessage { +pub struct WSMessage { #[pb(index = 1)] - pub module: WsModule, + pub module: WSModule, #[pb(index = 2)] pub data: Vec, } #[derive(ProtoBuf_Enum, Debug, Clone, Eq, PartialEq, Hash)] -pub enum WsModule { +pub enum WSModule { Doc = 0, } -impl std::default::Default for WsModule { - fn default() -> Self { WsModule::Doc } +impl std::default::Default for WSModule { + fn default() -> Self { WSModule::Doc } } -impl ToString for WsModule { +impl ToString for WSModule { fn to_string(&self) -> String { match self { - WsModule::Doc => "0".to_string(), + WSModule::Doc => "0".to_string(), } } } -impl std::convert::From for TokioMessage { - fn from(msg: WsMessage) -> Self { +impl std::convert::From for TokioMessage { + fn from(msg: WSMessage) -> Self { let result: Result = msg.try_into(); match result { Ok(bytes) => TokioMessage::Binary(bytes.to_vec()), diff --git a/shared-lib/lib-ws/src/protobuf/model/errors.rs b/shared-lib/lib-ws/src/protobuf/model/errors.rs index 7db7f53940..1f4e8a0bec 100644 --- a/shared-lib/lib-ws/src/protobuf/model/errors.rs +++ b/shared-lib/lib-ws/src/protobuf/model/errors.rs @@ -24,7 +24,7 @@ // const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_22_1; #[derive(PartialEq,Clone,Default)] -pub struct WsError { +pub struct WSError { // message fields pub code: ErrorCode, pub msg: ::std::string::String, @@ -33,14 +33,14 @@ pub struct WsError { pub cached_size: ::protobuf::CachedSize, } -impl<'a> ::std::default::Default for &'a WsError { - fn default() -> &'a WsError { - ::default_instance() +impl<'a> ::std::default::Default for &'a WSError { + fn default() -> &'a WSError { + ::default_instance() } } -impl WsError { - pub fn new() -> WsError { +impl WSError { + pub fn new() -> WSError { ::std::default::Default::default() } @@ -86,7 +86,7 @@ impl WsError { } } -impl ::protobuf::Message for WsError { +impl ::protobuf::Message for WSError { fn is_initialized(&self) -> bool { true } @@ -161,8 +161,8 @@ impl ::protobuf::Message for WsError { Self::descriptor_static() } - fn new() -> WsError { - WsError::new() + fn new() -> WSError { + WSError::new() } fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { @@ -171,29 +171,29 @@ impl ::protobuf::Message for WsError { let mut fields = ::std::vec::Vec::new(); fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum>( "code", - |m: &WsError| { &m.code }, - |m: &mut WsError| { &mut m.code }, + |m: &WSError| { &m.code }, + |m: &mut WSError| { &mut m.code }, )); fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( "msg", - |m: &WsError| { &m.msg }, - |m: &mut WsError| { &mut m.msg }, + |m: &WSError| { &m.msg }, + |m: &mut WSError| { &mut m.msg }, )); - ::protobuf::reflect::MessageDescriptor::new_pb_name::( - "WsError", + ::protobuf::reflect::MessageDescriptor::new_pb_name::( + "WSError", fields, file_descriptor_proto() ) }) } - fn default_instance() -> &'static WsError { - static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; - instance.get(WsError::new) + fn default_instance() -> &'static WSError { + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(WSError::new) } } -impl ::protobuf::Clear for WsError { +impl ::protobuf::Clear for WSError { fn clear(&mut self) { self.code = ErrorCode::InternalError; self.msg.clear(); @@ -201,13 +201,13 @@ impl ::protobuf::Clear for WsError { } } -impl ::std::fmt::Debug for WsError { +impl ::std::fmt::Debug for WSError { fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { ::protobuf::text_format::fmt(self, f) } } -impl ::protobuf::reflect::ProtobufValue for WsError { +impl ::protobuf::reflect::ProtobufValue for WSError { fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { ::protobuf::reflect::ReflectValueRef::Message(self) } @@ -267,7 +267,7 @@ impl ::protobuf::reflect::ProtobufValue for ErrorCode { } static file_descriptor_proto_data: &'static [u8] = b"\ - \n\x0cerrors.proto\";\n\x07WsError\x12\x1e\n\x04code\x18\x01\x20\x01(\ + \n\x0cerrors.proto\";\n\x07WSError\x12\x1e\n\x04code\x18\x01\x20\x01(\ \x0e2\n.ErrorCodeR\x04code\x12\x10\n\x03msg\x18\x02\x20\x01(\tR\x03msg*H\ \n\tErrorCode\x12\x11\n\rInternalError\x10\0\x12\x16\n\x12UnsupportedMes\ sage\x10\x01\x12\x10\n\x0cUnauthorized\x10\x02J\xab\x02\n\x06\x12\x04\0\ diff --git a/shared-lib/lib-ws/src/protobuf/model/msg.rs b/shared-lib/lib-ws/src/protobuf/model/msg.rs index 35c80e3a65..9484576264 100644 --- a/shared-lib/lib-ws/src/protobuf/model/msg.rs +++ b/shared-lib/lib-ws/src/protobuf/model/msg.rs @@ -24,38 +24,38 @@ // const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_22_1; #[derive(PartialEq,Clone,Default)] -pub struct WsMessage { +pub struct WSMessage { // message fields - pub module: WsModule, + pub module: WSModule, pub data: ::std::vec::Vec, // special fields pub unknown_fields: ::protobuf::UnknownFields, pub cached_size: ::protobuf::CachedSize, } -impl<'a> ::std::default::Default for &'a WsMessage { - fn default() -> &'a WsMessage { - ::default_instance() +impl<'a> ::std::default::Default for &'a WSMessage { + fn default() -> &'a WSMessage { + ::default_instance() } } -impl WsMessage { - pub fn new() -> WsMessage { +impl WSMessage { + pub fn new() -> WSMessage { ::std::default::Default::default() } - // .WsModule module = 1; + // .WSModule module = 1; - pub fn get_module(&self) -> WsModule { + pub fn get_module(&self) -> WSModule { self.module } pub fn clear_module(&mut self) { - self.module = WsModule::Doc; + self.module = WSModule::Doc; } // Param is passed by value, moved - pub fn set_module(&mut self, v: WsModule) { + pub fn set_module(&mut self, v: WSModule) { self.module = v; } @@ -86,7 +86,7 @@ impl WsMessage { } } -impl ::protobuf::Message for WsMessage { +impl ::protobuf::Message for WSMessage { fn is_initialized(&self) -> bool { true } @@ -113,7 +113,7 @@ impl ::protobuf::Message for WsMessage { #[allow(unused_variables)] fn compute_size(&self) -> u32 { let mut my_size = 0; - if self.module != WsModule::Doc { + if self.module != WSModule::Doc { my_size += ::protobuf::rt::enum_size(1, self.module); } if !self.data.is_empty() { @@ -125,7 +125,7 @@ impl ::protobuf::Message for WsMessage { } fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { - if self.module != WsModule::Doc { + if self.module != WSModule::Doc { os.write_enum(1, ::protobuf::ProtobufEnum::value(&self.module))?; } if !self.data.is_empty() { @@ -161,78 +161,78 @@ impl ::protobuf::Message for WsMessage { Self::descriptor_static() } - fn new() -> WsMessage { - WsMessage::new() + fn new() -> WSMessage { + WSMessage::new() } fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; descriptor.get(|| { let mut fields = ::std::vec::Vec::new(); - fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum>( + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum>( "module", - |m: &WsMessage| { &m.module }, - |m: &mut WsMessage| { &mut m.module }, + |m: &WSMessage| { &m.module }, + |m: &mut WSMessage| { &mut m.module }, )); fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( "data", - |m: &WsMessage| { &m.data }, - |m: &mut WsMessage| { &mut m.data }, + |m: &WSMessage| { &m.data }, + |m: &mut WSMessage| { &mut m.data }, )); - ::protobuf::reflect::MessageDescriptor::new_pb_name::( - "WsMessage", + ::protobuf::reflect::MessageDescriptor::new_pb_name::( + "WSMessage", fields, file_descriptor_proto() ) }) } - fn default_instance() -> &'static WsMessage { - static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; - instance.get(WsMessage::new) + fn default_instance() -> &'static WSMessage { + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(WSMessage::new) } } -impl ::protobuf::Clear for WsMessage { +impl ::protobuf::Clear for WSMessage { fn clear(&mut self) { - self.module = WsModule::Doc; + self.module = WSModule::Doc; self.data.clear(); self.unknown_fields.clear(); } } -impl ::std::fmt::Debug for WsMessage { +impl ::std::fmt::Debug for WSMessage { fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { ::protobuf::text_format::fmt(self, f) } } -impl ::protobuf::reflect::ProtobufValue for WsMessage { +impl ::protobuf::reflect::ProtobufValue for WSMessage { fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { ::protobuf::reflect::ReflectValueRef::Message(self) } } #[derive(Clone,PartialEq,Eq,Debug,Hash)] -pub enum WsModule { +pub enum WSModule { Doc = 0, } -impl ::protobuf::ProtobufEnum for WsModule { +impl ::protobuf::ProtobufEnum for WSModule { fn value(&self) -> i32 { *self as i32 } - fn from_i32(value: i32) -> ::std::option::Option { + fn from_i32(value: i32) -> ::std::option::Option { match value { - 0 => ::std::option::Option::Some(WsModule::Doc), + 0 => ::std::option::Option::Some(WSModule::Doc), _ => ::std::option::Option::None } } fn values() -> &'static [Self] { - static values: &'static [WsModule] = &[ - WsModule::Doc, + static values: &'static [WSModule] = &[ + WSModule::Doc, ]; values } @@ -240,30 +240,30 @@ impl ::protobuf::ProtobufEnum for WsModule { fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor { static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::EnumDescriptor> = ::protobuf::rt::LazyV2::INIT; descriptor.get(|| { - ::protobuf::reflect::EnumDescriptor::new_pb_name::("WsModule", file_descriptor_proto()) + ::protobuf::reflect::EnumDescriptor::new_pb_name::("WSModule", file_descriptor_proto()) }) } } -impl ::std::marker::Copy for WsModule { +impl ::std::marker::Copy for WSModule { } -impl ::std::default::Default for WsModule { +impl ::std::default::Default for WSModule { fn default() -> Self { - WsModule::Doc + WSModule::Doc } } -impl ::protobuf::reflect::ProtobufValue for WsModule { +impl ::protobuf::reflect::ProtobufValue for WSModule { fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { ::protobuf::reflect::ReflectValueRef::Enum(::protobuf::ProtobufEnum::descriptor(self)) } } static file_descriptor_proto_data: &'static [u8] = b"\ - \n\tmsg.proto\"B\n\tWsMessage\x12!\n\x06module\x18\x01\x20\x01(\x0e2\t.W\ - sModuleR\x06module\x12\x12\n\x04data\x18\x02\x20\x01(\x0cR\x04data*\x13\ - \n\x08WsModule\x12\x07\n\x03Doc\x10\0J\xd9\x01\n\x06\x12\x04\0\0\x08\x01\ + \n\tmsg.proto\"B\n\tWSMessage\x12!\n\x06module\x18\x01\x20\x01(\x0e2\t.W\ + SModuleR\x06module\x12\x12\n\x04data\x18\x02\x20\x01(\x0cR\x04data*\x13\ + \n\x08WSModule\x12\x07\n\x03Doc\x10\0J\xd9\x01\n\x06\x12\x04\0\0\x08\x01\ \n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x05\x01\n\ \n\n\x03\x04\0\x01\x12\x03\x02\x08\x11\n\x0b\n\x04\x04\0\x02\0\x12\x03\ \x03\x04\x18\n\x0c\n\x05\x04\0\x02\0\x06\x12\x03\x03\x04\x0c\n\x0c\n\x05\ diff --git a/shared-lib/lib-ws/src/protobuf/proto/errors.proto b/shared-lib/lib-ws/src/protobuf/proto/errors.proto index cbb330a6e8..a2e3ccc45a 100644 --- a/shared-lib/lib-ws/src/protobuf/proto/errors.proto +++ b/shared-lib/lib-ws/src/protobuf/proto/errors.proto @@ -1,6 +1,6 @@ syntax = "proto3"; -message WsError { +message WSError { ErrorCode code = 1; string msg = 2; } diff --git a/shared-lib/lib-ws/src/protobuf/proto/msg.proto b/shared-lib/lib-ws/src/protobuf/proto/msg.proto index 4b71578340..551a813596 100644 --- a/shared-lib/lib-ws/src/protobuf/proto/msg.proto +++ b/shared-lib/lib-ws/src/protobuf/proto/msg.proto @@ -1,9 +1,9 @@ syntax = "proto3"; -message WsMessage { - WsModule module = 1; +message WSMessage { + WSModule module = 1; bytes data = 2; } -enum WsModule { +enum WSModule { Doc = 0; } diff --git a/shared-lib/lib-ws/src/ws.rs b/shared-lib/lib-ws/src/ws.rs index ba62f4a454..02bc58279f 100644 --- a/shared-lib/lib-ws/src/ws.rs +++ b/shared-lib/lib-ws/src/ws.rs @@ -1,9 +1,9 @@ #![allow(clippy::type_complexity)] use crate::{ - connect::{WsConnectionFuture, WsStream}, - errors::WsError, - WsMessage, - WsModule, + connect::{WSConnectionFuture, WSStream}, + errors::WSError, + WSMessage, + WSModule, }; use backend_service::errors::ServerError; use bytes::Bytes; @@ -30,36 +30,36 @@ use tokio_tungstenite::tungstenite::{ pub type MsgReceiver = UnboundedReceiver; pub type MsgSender = UnboundedSender; -type Handlers = DashMap>; +type Handlers = DashMap>; -pub trait WsMessageReceiver: Sync + Send + 'static { - fn source(&self) -> WsModule; - fn receive_message(&self, msg: WsMessage); +pub trait WSMessageReceiver: Sync + Send + 'static { + fn source(&self) -> WSModule; + fn receive_message(&self, msg: WSMessage); } -pub struct WsController { +pub struct WSController { handlers: Handlers, - state_notify: Arc>, - sender_ctrl: Arc>, + state_notify: Arc>, + sender_ctrl: Arc>, addr: Arc>>, } -impl std::default::Default for WsController { +impl std::default::Default for WSController { fn default() -> Self { let (state_notify, _) = broadcast::channel(16); Self { handlers: DashMap::new(), - sender_ctrl: Arc::new(RwLock::new(WsSenderController::default())), + sender_ctrl: Arc::new(RwLock::new(WSSenderController::default())), state_notify: Arc::new(state_notify), addr: Arc::new(RwLock::new(None)), } } } -impl WsController { - pub fn new() -> Self { WsController::default() } +impl WSController { + pub fn new() -> Self { WSController::default() } - pub fn add_receiver(&self, handler: Arc) -> Result<(), WsError> { + pub fn add_receiver(&self, handler: Arc) -> Result<(), WSError> { let source = handler.source(); if self.handlers.contains_key(&source) { log::error!("WsSource's {:?} is already registered", source); @@ -74,7 +74,7 @@ impl WsController { self.connect(addr, strategy).await } - pub async fn stop(&self) { self.sender_ctrl.write().set_state(WsConnectState::Disconnected); } + pub async fn stop(&self) { self.sender_ctrl.write().set_state(WSConnectState::Disconnected); } async fn connect(&self, addr: String, strategy: T) -> Result<(), ServerError> where @@ -83,25 +83,25 @@ impl WsController { { let (ret, rx) = oneshot::channel::>(); *self.addr.write() = Some(addr.clone()); - let action = WsConnectAction { + let action = WSConnectAction { addr, handlers: self.handlers.clone(), }; let retry = Retry::spawn(strategy, action); let sender_ctrl = self.sender_ctrl.clone(); - sender_ctrl.write().set_state(WsConnectState::Connecting); + sender_ctrl.write().set_state(WSConnectState::Connecting); tokio::spawn(async move { match retry.await { Ok(result) => { - let WsConnectResult { + let WSConnectResult { stream, handlers_fut, sender, } = result; sender_ctrl.write().set_sender(sender); - sender_ctrl.write().set_state(WsConnectState::Connected); + sender_ctrl.write().set_state(WSConnectState::Connected); let _ = ret.send(Ok(())); spawn_stream_and_handlers(stream, handlers_fut, sender_ctrl.clone()).await; }, @@ -131,20 +131,20 @@ impl WsController { self.connect(addr, strategy).await } - pub fn subscribe_state(&self) -> broadcast::Receiver { self.state_notify.subscribe() } + pub fn subscribe_state(&self) -> broadcast::Receiver { self.state_notify.subscribe() } - pub fn sender(&self) -> Result, WsError> { + pub fn sender(&self) -> Result, WSError> { match self.sender_ctrl.read().sender() { - None => Err(WsError::internal().context("WsSender is not initialized, should call connect first")), + None => Err(WSError::internal().context("WsSender is not initialized, should call connect first")), Some(sender) => Ok(sender), } } } async fn spawn_stream_and_handlers( - stream: WsStream, - handlers: WsHandlerFuture, - sender_ctrl: Arc>, + stream: WSStream, + handlers: WSHandlerFuture, + sender_ctrl: Arc>, ) { tokio::select! { result = stream => { @@ -157,14 +157,14 @@ async fn spawn_stream_and_handlers( } #[pin_project] -pub struct WsHandlerFuture { +pub struct WSHandlerFuture { #[pin] msg_rx: MsgReceiver, // Opti: Hashmap would be better handlers: Handlers, } -impl WsHandlerFuture { +impl WSHandlerFuture { fn new(handlers: Handlers, msg_rx: MsgReceiver) -> Self { Self { msg_rx, handlers } } fn handler_ws_message(&self, message: Message) { @@ -175,7 +175,7 @@ impl WsHandlerFuture { fn handle_binary_message(&self, bytes: Vec) { let bytes = Bytes::from(bytes); - match WsMessage::try_from(bytes) { + match WSMessage::try_from(bytes) { Ok(message) => match self.handlers.get(&message.module) { None => log::error!("Can't find any handler for message: {:?}", message), Some(handler) => handler.receive_message(message.clone()), @@ -187,7 +187,7 @@ impl WsHandlerFuture { } } -impl Future for WsHandlerFuture { +impl Future for WSHandlerFuture { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { @@ -202,37 +202,37 @@ impl Future for WsHandlerFuture { } #[derive(Debug, Clone)] -pub struct WsSender { +pub struct WSSender { ws_tx: MsgSender, } -impl WsSender { - pub fn send_msg>(&self, msg: T) -> Result<(), WsError> { +impl WSSender { + pub fn send_msg>(&self, msg: T) -> Result<(), WSError> { let msg = msg.into(); let _ = self .ws_tx .unbounded_send(msg.into()) - .map_err(|e| WsError::internal().context(e))?; + .map_err(|e| WSError::internal().context(e))?; Ok(()) } - pub fn send_text(&self, source: &WsModule, text: &str) -> Result<(), WsError> { - let msg = WsMessage { + pub fn send_text(&self, source: &WSModule, text: &str) -> Result<(), WSError> { + let msg = WSMessage { module: source.clone(), data: text.as_bytes().to_vec(), }; self.send_msg(msg) } - pub fn send_binary(&self, source: &WsModule, bytes: Vec) -> Result<(), WsError> { - let msg = WsMessage { + pub fn send_binary(&self, source: &WSModule, bytes: Vec) -> Result<(), WSError> { + let msg = WSMessage { module: source.clone(), data: bytes, }; self.send_msg(msg) } - pub fn send_disconnect(&self, reason: &str) -> Result<(), WsError> { + pub fn send_disconnect(&self, reason: &str) -> Result<(), WSError> { let frame = CloseFrame { code: CloseCode::Normal, reason: reason.to_owned().into(), @@ -241,44 +241,44 @@ impl WsSender { let _ = self .ws_tx .unbounded_send(msg) - .map_err(|e| WsError::internal().context(e))?; + .map_err(|e| WSError::internal().context(e))?; Ok(()) } } -struct WsConnectAction { +struct WSConnectAction { addr: String, handlers: Handlers, } -impl Action for WsConnectAction { +impl Action for WSConnectAction { type Future = Pin> + Send + Sync>>; - type Item = WsConnectResult; - type Error = WsError; + type Item = WSConnectResult; + type Error = WSError; fn run(&mut self) -> Self::Future { let addr = self.addr.clone(); let handlers = self.handlers.clone(); - Box::pin(WsConnectActionFut::new(addr, handlers)) + Box::pin(WSConnectActionFut::new(addr, handlers)) } } -struct WsConnectResult { - stream: WsStream, - handlers_fut: WsHandlerFuture, - sender: WsSender, +struct WSConnectResult { + stream: WSStream, + handlers_fut: WSHandlerFuture, + sender: WSSender, } #[pin_project] -struct WsConnectActionFut { +struct WSConnectActionFut { addr: String, #[pin] - conn: WsConnectionFuture, - handlers_fut: Option, - sender: Option, + conn: WSConnectionFuture, + handlers_fut: Option, + sender: Option, } -impl WsConnectActionFut { +impl WSConnectActionFut { fn new(addr: String, handlers: Handlers) -> Self { // Stream User // β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” @@ -292,9 +292,9 @@ impl WsConnectActionFut { // β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ let (msg_tx, msg_rx) = futures_channel::mpsc::unbounded(); let (ws_tx, ws_rx) = futures_channel::mpsc::unbounded(); - let sender = WsSender { ws_tx }; - let handlers_fut = WsHandlerFuture::new(handlers, msg_rx); - let conn = WsConnectionFuture::new(msg_tx, ws_rx, addr.clone()); + let sender = WSSender { ws_tx }; + let handlers_fut = WSHandlerFuture::new(handlers, msg_rx); + let conn = WSConnectionFuture::new(msg_tx, ws_rx, addr.clone()); Self { addr, conn, @@ -304,15 +304,15 @@ impl WsConnectActionFut { } } -impl Future for WsConnectActionFut { - type Output = Result; +impl Future for WSConnectActionFut { + type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); match ready!(this.conn.as_mut().poll(cx)) { Ok(stream) => { let handlers_fut = this.handlers_fut.take().expect("Only take once"); let sender = this.sender.take().expect("Only take once"); - Poll::Ready(Ok(WsConnectResult { + Poll::Ready(Ok(WSConnectResult { stream, handlers_fut, sender, @@ -324,39 +324,39 @@ impl Future for WsConnectActionFut { } #[derive(Clone, Eq, PartialEq)] -pub enum WsConnectState { +pub enum WSConnectState { Init, Connecting, Connected, Disconnected, } -impl std::fmt::Display for WsConnectState { +impl std::fmt::Display for WSConnectState { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - WsConnectState::Init => f.write_str("Init"), - WsConnectState::Connected => f.write_str("Connecting"), - WsConnectState::Connecting => f.write_str("Connected"), - WsConnectState::Disconnected => f.write_str("Disconnected"), + WSConnectState::Init => f.write_str("Init"), + WSConnectState::Connected => f.write_str("Connecting"), + WSConnectState::Connecting => f.write_str("Connected"), + WSConnectState::Disconnected => f.write_str("Disconnected"), } } } -impl std::fmt::Debug for WsConnectState { +impl std::fmt::Debug for WSConnectState { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str(&format!("{}", self)) } } -struct WsSenderController { - state: WsConnectState, - state_notify: Arc>, - sender: Option>, +struct WSSenderController { + state: WSConnectState, + state_notify: Arc>, + sender: Option>, } -impl WsSenderController { - fn set_sender(&mut self, sender: WsSender) { self.sender = Some(Arc::new(sender)); } +impl WSSenderController { + fn set_sender(&mut self, sender: WSSender) { self.sender = Some(Arc::new(sender)); } - fn set_state(&mut self, state: WsConnectState) { - if state != WsConnectState::Connected { + fn set_state(&mut self, state: WSConnectState) { + if state != WSConnectState::Connected { self.sender = None; } @@ -364,24 +364,24 @@ impl WsSenderController { let _ = self.state_notify.send(self.state.clone()); } - fn set_error(&mut self, error: WsError) { + fn set_error(&mut self, error: WSError) { log::error!("{:?}", error); - self.set_state(WsConnectState::Disconnected); + self.set_state(WSConnectState::Disconnected); } - fn sender(&self) -> Option> { self.sender.clone() } + fn sender(&self) -> Option> { self.sender.clone() } - fn is_connecting(&self) -> bool { self.state == WsConnectState::Connecting } + fn is_connecting(&self) -> bool { self.state == WSConnectState::Connecting } #[allow(dead_code)] - fn is_connected(&self) -> bool { self.state == WsConnectState::Connected } + fn is_connected(&self) -> bool { self.state == WSConnectState::Connected } } -impl std::default::Default for WsSenderController { +impl std::default::Default for WSSenderController { fn default() -> Self { let (state_notify, _) = broadcast::channel(16); - WsSenderController { - state: WsConnectState::Init, + WSSenderController { + state: WSConnectState::Init, state_notify: Arc::new(state_notify), sender: None, }