rename some struct

This commit is contained in:
appflowy 2021-12-16 22:24:05 +08:00
parent 4a46bf3fa3
commit 405c618d27
37 changed files with 1199 additions and 537 deletions

View File

@ -4,7 +4,7 @@ use crate::{
};
use actix::Addr;
use actix_web::web::Data;
use lib_ws::WsModule;
use lib_ws::WSModule;
use sqlx::PgPool;
use std::sync::Arc;
@ -23,7 +23,7 @@ impl AppContext {
let mut ws_bizs = WsBizHandlers::new();
let document_core = Arc::new(DocumentCore::new(pg_pool.clone()));
ws_bizs.register(WsModule::Doc, document_core.clone());
ws_bizs.register(WSModule::Doc, document_core.clone());
AppContext {
ws_server,

View File

@ -11,7 +11,7 @@ use async_stream::stream;
use backend_service::errors::{internal_error, Result, ServerError};
use flowy_collaboration::{
core::sync::ServerDocManager,
protobuf::{WsDocumentData, WsDocumentDataType},
protobuf::{DocumentWSData, DocumentWSDataType},
};
use futures::stream::StreamExt;
use lib_ot::protobuf::Revision;
@ -69,7 +69,7 @@ impl DocWsActor {
async fn handle_client_data(&self, client_data: WsClientData, pool: Data<PgPool>) -> Result<()> {
let WsClientData { user, socket, data } = client_data;
let document_data = spawn_blocking(move || {
let document_data: WsDocumentData = parse_from_bytes(&data)?;
let document_data: DocumentWSData = parse_from_bytes(&data)?;
Result::Ok(document_data)
})
.await
@ -78,10 +78,10 @@ impl DocWsActor {
let data = document_data.data;
match document_data.ty {
WsDocumentDataType::Acked => Ok(()),
WsDocumentDataType::PushRev => self.apply_pushed_rev(user, socket, data, pool).await,
WsDocumentDataType::PullRev => Ok(()),
WsDocumentDataType::UserConnect => Ok(()),
DocumentWSDataType::Acked => Ok(()),
DocumentWSDataType::PushRev => self.apply_pushed_rev(user, socket, data, pool).await,
DocumentWSDataType::PullRev => Ok(()),
DocumentWSDataType::UserConnect => Ok(()),
}
}

View File

@ -1,5 +1,5 @@
use crate::web_socket::WsClientData;
use lib_ws::WsModule;
use lib_ws::WSModule;
use std::{collections::HashMap, sync::Arc};
pub trait WsBizHandler: Send + Sync {
@ -8,7 +8,7 @@ pub trait WsBizHandler: Send + Sync {
pub type BizHandler = Arc<dyn WsBizHandler>;
pub struct WsBizHandlers {
inner: HashMap<WsModule, BizHandler>,
inner: HashMap<WSModule, BizHandler>,
}
impl std::default::Default for WsBizHandlers {
@ -18,7 +18,7 @@ impl std::default::Default for WsBizHandlers {
impl WsBizHandlers {
pub fn new() -> Self { WsBizHandlers::default() }
pub fn register(&mut self, source: WsModule, handler: BizHandler) { self.inner.insert(source, handler); }
pub fn register(&mut self, source: WSModule, handler: BizHandler) { self.inner.insert(source, handler); }
pub fn get(&self, source: &WsModule) -> Option<BizHandler> { self.inner.get(source).cloned() }
pub fn get(&self, source: &WSModule) -> Option<BizHandler> { self.inner.get(source).cloned() }
}

View File

@ -1,7 +1,7 @@
use actix::Message;
use bytes::Bytes;
use flowy_collaboration::entities::ws::WsDocumentData;
use lib_ws::{WsMessage, WsModule};
use flowy_collaboration::entities::ws::DocumentWSData;
use lib_ws::{WSMessage, WSModule};
use std::convert::TryInto;
#[derive(Debug, Message, Clone)]
@ -14,11 +14,11 @@ impl std::ops::Deref for WsMessageAdaptor {
fn deref(&self) -> &Self::Target { &self.0 }
}
impl std::convert::From<WsDocumentData> for WsMessageAdaptor {
fn from(data: WsDocumentData) -> Self {
impl std::convert::From<DocumentWSData> for WsMessageAdaptor {
fn from(data: DocumentWSData) -> Self {
let bytes: Bytes = data.try_into().unwrap();
let msg = WsMessage {
module: WsModule::Doc,
let msg = WSMessage {
module: WSModule::Doc,
data: bytes.to_vec(),
};

View File

@ -12,7 +12,7 @@ use actix::*;
use actix_web::web::Data;
use actix_web_actors::{ws, ws::Message::Text};
use bytes::Bytes;
use lib_ws::WsMessage;
use lib_ws::WSMessage;
use std::{convert::TryFrom, sync::Arc, time::Instant};
#[derive(Debug)]
@ -64,7 +64,7 @@ impl WsClient {
fn handle_binary_message(&self, bytes: Bytes, socket: Socket) {
// TODO: ok to unwrap?
let message: WsMessage = WsMessage::try_from(bytes).unwrap();
let message: WSMessage = WSMessage::try_from(bytes).unwrap();
match self.biz_handlers.get(&message.module) {
None => {
log::error!("Can't find the handler for {:?}", message.module);

View File

@ -14,19 +14,31 @@ import 'ws.pbenum.dart';
export 'ws.pbenum.dart';
class WsDocumentData extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WsDocumentData', createEmptyInstance: create)
enum DocumentWSData_OneOfId {
id,
notSet
}
class DocumentWSData extends $pb.GeneratedMessage {
static const $core.Map<$core.int, DocumentWSData_OneOfId> _DocumentWSData_OneOfIdByTag = {
4 : DocumentWSData_OneOfId.id,
0 : DocumentWSData_OneOfId.notSet
};
static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'DocumentWSData', createEmptyInstance: create)
..oo(0, [4])
..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId')
..e<WsDocumentDataType>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: WsDocumentDataType.Acked, valueOf: WsDocumentDataType.valueOf, enumValues: WsDocumentDataType.values)
..e<DocumentWSDataType>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: DocumentWSDataType.Acked, valueOf: DocumentWSDataType.valueOf, enumValues: DocumentWSDataType.values)
..a<$core.List<$core.int>>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY)
..aInt64(4, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id')
..hasRequiredFields = false
;
WsDocumentData._() : super();
factory WsDocumentData({
DocumentWSData._() : super();
factory DocumentWSData({
$core.String? docId,
WsDocumentDataType? ty,
DocumentWSDataType? ty,
$core.List<$core.int>? data,
$fixnum.Int64? id,
}) {
final _result = create();
if (docId != null) {
@ -38,28 +50,34 @@ class WsDocumentData extends $pb.GeneratedMessage {
if (data != null) {
_result.data = data;
}
if (id != null) {
_result.id = id;
}
return _result;
}
factory WsDocumentData.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
factory WsDocumentData.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
factory DocumentWSData.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
factory DocumentWSData.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.deepCopy] instead. '
'Will be removed in next major version')
WsDocumentData clone() => WsDocumentData()..mergeFromMessage(this);
DocumentWSData clone() => DocumentWSData()..mergeFromMessage(this);
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.rebuild] instead. '
'Will be removed in next major version')
WsDocumentData copyWith(void Function(WsDocumentData) updates) => super.copyWith((message) => updates(message as WsDocumentData)) as WsDocumentData; // ignore: deprecated_member_use
DocumentWSData copyWith(void Function(DocumentWSData) updates) => super.copyWith((message) => updates(message as DocumentWSData)) as DocumentWSData; // ignore: deprecated_member_use
$pb.BuilderInfo get info_ => _i;
@$core.pragma('dart2js:noInline')
static WsDocumentData create() => WsDocumentData._();
WsDocumentData createEmptyInstance() => create();
static $pb.PbList<WsDocumentData> createRepeated() => $pb.PbList<WsDocumentData>();
static DocumentWSData create() => DocumentWSData._();
DocumentWSData createEmptyInstance() => create();
static $pb.PbList<DocumentWSData> createRepeated() => $pb.PbList<DocumentWSData>();
@$core.pragma('dart2js:noInline')
static WsDocumentData getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<WsDocumentData>(create);
static WsDocumentData? _defaultInstance;
static DocumentWSData getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<DocumentWSData>(create);
static DocumentWSData? _defaultInstance;
DocumentWSData_OneOfId whichOneOfId() => _DocumentWSData_OneOfIdByTag[$_whichOneof(0)]!;
void clearOneOfId() => clearField($_whichOneof(0));
@$pb.TagNumber(1)
$core.String get docId => $_getSZ(0);
@ -71,9 +89,9 @@ class WsDocumentData extends $pb.GeneratedMessage {
void clearDocId() => clearField(1);
@$pb.TagNumber(2)
WsDocumentDataType get ty => $_getN(1);
DocumentWSDataType get ty => $_getN(1);
@$pb.TagNumber(2)
set ty(WsDocumentDataType v) { setField(2, v); }
set ty(DocumentWSDataType v) { setField(2, v); }
@$pb.TagNumber(2)
$core.bool hasTy() => $_has(1);
@$pb.TagNumber(2)
@ -87,6 +105,15 @@ class WsDocumentData extends $pb.GeneratedMessage {
$core.bool hasData() => $_has(2);
@$pb.TagNumber(3)
void clearData() => clearField(3);
@$pb.TagNumber(4)
$fixnum.Int64 get id => $_getI64(3);
@$pb.TagNumber(4)
set id($fixnum.Int64 v) { $_setInt64(3, v); }
@$pb.TagNumber(4)
$core.bool hasId() => $_has(3);
@$pb.TagNumber(4)
void clearId() => clearField(4);
}
class DocumentConnected extends $pb.GeneratedMessage {

View File

@ -9,22 +9,22 @@
import 'dart:core' as $core;
import 'package:protobuf/protobuf.dart' as $pb;
class WsDocumentDataType extends $pb.ProtobufEnum {
static const WsDocumentDataType Acked = WsDocumentDataType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Acked');
static const WsDocumentDataType PushRev = WsDocumentDataType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PushRev');
static const WsDocumentDataType PullRev = WsDocumentDataType._(2, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PullRev');
static const WsDocumentDataType UserConnect = WsDocumentDataType._(3, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UserConnect');
class DocumentWSDataType extends $pb.ProtobufEnum {
static const DocumentWSDataType Acked = DocumentWSDataType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Acked');
static const DocumentWSDataType PushRev = DocumentWSDataType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PushRev');
static const DocumentWSDataType PullRev = DocumentWSDataType._(2, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PullRev');
static const DocumentWSDataType UserConnect = DocumentWSDataType._(3, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UserConnect');
static const $core.List<WsDocumentDataType> values = <WsDocumentDataType> [
static const $core.List<DocumentWSDataType> values = <DocumentWSDataType> [
Acked,
PushRev,
PullRev,
UserConnect,
];
static final $core.Map<$core.int, WsDocumentDataType> _byValue = $pb.ProtobufEnum.initByValue(values);
static WsDocumentDataType? valueOf($core.int value) => _byValue[value];
static final $core.Map<$core.int, DocumentWSDataType> _byValue = $pb.ProtobufEnum.initByValue(values);
static DocumentWSDataType? valueOf($core.int value) => _byValue[value];
const WsDocumentDataType._($core.int v, $core.String n) : super(v, n);
const DocumentWSDataType._($core.int v, $core.String n) : super(v, n);
}

View File

@ -8,9 +8,9 @@
import 'dart:core' as $core;
import 'dart:convert' as $convert;
import 'dart:typed_data' as $typed_data;
@$core.Deprecated('Use wsDocumentDataTypeDescriptor instead')
const WsDocumentDataType$json = const {
'1': 'WsDocumentDataType',
@$core.Deprecated('Use documentWSDataTypeDescriptor instead')
const DocumentWSDataType$json = const {
'1': 'DocumentWSDataType',
'2': const [
const {'1': 'Acked', '2': 0},
const {'1': 'PushRev', '2': 1},
@ -19,20 +19,24 @@ const WsDocumentDataType$json = const {
],
};
/// Descriptor for `WsDocumentDataType`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List wsDocumentDataTypeDescriptor = $convert.base64Decode('ChJXc0RvY3VtZW50RGF0YVR5cGUSCQoFQWNrZWQQABILCgdQdXNoUmV2EAESCwoHUHVsbFJldhACEg8KC1VzZXJDb25uZWN0EAM=');
@$core.Deprecated('Use wsDocumentDataDescriptor instead')
const WsDocumentData$json = const {
'1': 'WsDocumentData',
/// Descriptor for `DocumentWSDataType`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List documentWSDataTypeDescriptor = $convert.base64Decode('ChJEb2N1bWVudFdTRGF0YVR5cGUSCQoFQWNrZWQQABILCgdQdXNoUmV2EAESCwoHUHVsbFJldhACEg8KC1VzZXJDb25uZWN0EAM=');
@$core.Deprecated('Use documentWSDataDescriptor instead')
const DocumentWSData$json = const {
'1': 'DocumentWSData',
'2': const [
const {'1': 'doc_id', '3': 1, '4': 1, '5': 9, '10': 'docId'},
const {'1': 'ty', '3': 2, '4': 1, '5': 14, '6': '.WsDocumentDataType', '10': 'ty'},
const {'1': 'ty', '3': 2, '4': 1, '5': 14, '6': '.DocumentWSDataType', '10': 'ty'},
const {'1': 'data', '3': 3, '4': 1, '5': 12, '10': 'data'},
const {'1': 'id', '3': 4, '4': 1, '5': 3, '9': 0, '10': 'id'},
],
'8': const [
const {'1': 'one_of_id'},
],
};
/// Descriptor for `WsDocumentData`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List wsDocumentDataDescriptor = $convert.base64Decode('Cg5Xc0RvY3VtZW50RGF0YRIVCgZkb2NfaWQYASABKAlSBWRvY0lkEiMKAnR5GAIgASgOMhMuV3NEb2N1bWVudERhdGFUeXBlUgJ0eRISCgRkYXRhGAMgASgMUgRkYXRh');
/// Descriptor for `DocumentWSData`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List documentWSDataDescriptor = $convert.base64Decode('Cg5Eb2N1bWVudFdTRGF0YRIVCgZkb2NfaWQYASABKAlSBWRvY0lkEiMKAnR5GAIgASgOMhMuRG9jdW1lbnRXU0RhdGFUeXBlUgJ0eRISCgRkYXRhGAMgASgMUgRkYXRhEhAKAmlkGAQgASgDSABSAmlkQgsKCW9uZV9vZl9pZA==');
@$core.Deprecated('Use documentConnectedDescriptor instead')
const DocumentConnected$json = const {
'1': 'DocumentConnected',

View File

@ -13,15 +13,15 @@ import 'errors.pbenum.dart';
export 'errors.pbenum.dart';
class WsError extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WsError', createEmptyInstance: create)
class WSError extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WSError', createEmptyInstance: create)
..e<ErrorCode>(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'code', $pb.PbFieldType.OE, defaultOrMaker: ErrorCode.InternalError, valueOf: ErrorCode.valueOf, enumValues: ErrorCode.values)
..aOS(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'msg')
..hasRequiredFields = false
;
WsError._() : super();
factory WsError({
WSError._() : super();
factory WSError({
ErrorCode? code,
$core.String? msg,
}) {
@ -34,26 +34,26 @@ class WsError extends $pb.GeneratedMessage {
}
return _result;
}
factory WsError.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
factory WsError.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
factory WSError.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
factory WSError.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.deepCopy] instead. '
'Will be removed in next major version')
WsError clone() => WsError()..mergeFromMessage(this);
WSError clone() => WSError()..mergeFromMessage(this);
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.rebuild] instead. '
'Will be removed in next major version')
WsError copyWith(void Function(WsError) updates) => super.copyWith((message) => updates(message as WsError)) as WsError; // ignore: deprecated_member_use
WSError copyWith(void Function(WSError) updates) => super.copyWith((message) => updates(message as WSError)) as WSError; // ignore: deprecated_member_use
$pb.BuilderInfo get info_ => _i;
@$core.pragma('dart2js:noInline')
static WsError create() => WsError._();
WsError createEmptyInstance() => create();
static $pb.PbList<WsError> createRepeated() => $pb.PbList<WsError>();
static WSError create() => WSError._();
WSError createEmptyInstance() => create();
static $pb.PbList<WSError> createRepeated() => $pb.PbList<WSError>();
@$core.pragma('dart2js:noInline')
static WsError getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<WsError>(create);
static WsError? _defaultInstance;
static WSError getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<WSError>(create);
static WSError? _defaultInstance;
@$pb.TagNumber(1)
ErrorCode get code => $_getN(0);

View File

@ -20,14 +20,14 @@ const ErrorCode$json = const {
/// Descriptor for `ErrorCode`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List errorCodeDescriptor = $convert.base64Decode('CglFcnJvckNvZGUSEQoNSW50ZXJuYWxFcnJvchAAEhYKElVuc3VwcG9ydGVkTWVzc2FnZRABEhAKDFVuYXV0aG9yaXplZBAC');
@$core.Deprecated('Use wsErrorDescriptor instead')
const WsError$json = const {
'1': 'WsError',
@$core.Deprecated('Use wSErrorDescriptor instead')
const WSError$json = const {
'1': 'WSError',
'2': const [
const {'1': 'code', '3': 1, '4': 1, '5': 14, '6': '.ErrorCode', '10': 'code'},
const {'1': 'msg', '3': 2, '4': 1, '5': 9, '10': 'msg'},
],
};
/// Descriptor for `WsError`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List wsErrorDescriptor = $convert.base64Decode('CgdXc0Vycm9yEh4KBGNvZGUYASABKA4yCi5FcnJvckNvZGVSBGNvZGUSEAoDbXNnGAIgASgJUgNtc2c=');
/// Descriptor for `WSError`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List wSErrorDescriptor = $convert.base64Decode('CgdXU0Vycm9yEh4KBGNvZGUYASABKA4yCi5FcnJvckNvZGVSBGNvZGUSEAoDbXNnGAIgASgJUgNtc2c=');

View File

@ -13,16 +13,16 @@ import 'msg.pbenum.dart';
export 'msg.pbenum.dart';
class WsMessage extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WsMessage', createEmptyInstance: create)
..e<WsModule>(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'module', $pb.PbFieldType.OE, defaultOrMaker: WsModule.Doc, valueOf: WsModule.valueOf, enumValues: WsModule.values)
class WSMessage extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WSMessage', createEmptyInstance: create)
..e<WSModule>(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'module', $pb.PbFieldType.OE, defaultOrMaker: WSModule.Doc, valueOf: WSModule.valueOf, enumValues: WSModule.values)
..a<$core.List<$core.int>>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY)
..hasRequiredFields = false
;
WsMessage._() : super();
factory WsMessage({
WsModule? module,
WSMessage._() : super();
factory WSMessage({
WSModule? module,
$core.List<$core.int>? data,
}) {
final _result = create();
@ -34,31 +34,31 @@ class WsMessage extends $pb.GeneratedMessage {
}
return _result;
}
factory WsMessage.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
factory WsMessage.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
factory WSMessage.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
factory WSMessage.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.deepCopy] instead. '
'Will be removed in next major version')
WsMessage clone() => WsMessage()..mergeFromMessage(this);
WSMessage clone() => WSMessage()..mergeFromMessage(this);
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.rebuild] instead. '
'Will be removed in next major version')
WsMessage copyWith(void Function(WsMessage) updates) => super.copyWith((message) => updates(message as WsMessage)) as WsMessage; // ignore: deprecated_member_use
WSMessage copyWith(void Function(WSMessage) updates) => super.copyWith((message) => updates(message as WSMessage)) as WSMessage; // ignore: deprecated_member_use
$pb.BuilderInfo get info_ => _i;
@$core.pragma('dart2js:noInline')
static WsMessage create() => WsMessage._();
WsMessage createEmptyInstance() => create();
static $pb.PbList<WsMessage> createRepeated() => $pb.PbList<WsMessage>();
static WSMessage create() => WSMessage._();
WSMessage createEmptyInstance() => create();
static $pb.PbList<WSMessage> createRepeated() => $pb.PbList<WSMessage>();
@$core.pragma('dart2js:noInline')
static WsMessage getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<WsMessage>(create);
static WsMessage? _defaultInstance;
static WSMessage getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<WSMessage>(create);
static WSMessage? _defaultInstance;
@$pb.TagNumber(1)
WsModule get module => $_getN(0);
WSModule get module => $_getN(0);
@$pb.TagNumber(1)
set module(WsModule v) { setField(1, v); }
set module(WSModule v) { setField(1, v); }
@$pb.TagNumber(1)
$core.bool hasModule() => $_has(0);
@$pb.TagNumber(1)

View File

@ -9,16 +9,16 @@
import 'dart:core' as $core;
import 'package:protobuf/protobuf.dart' as $pb;
class WsModule extends $pb.ProtobufEnum {
static const WsModule Doc = WsModule._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Doc');
class WSModule extends $pb.ProtobufEnum {
static const WSModule Doc = WSModule._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Doc');
static const $core.List<WsModule> values = <WsModule> [
static const $core.List<WSModule> values = <WSModule> [
Doc,
];
static final $core.Map<$core.int, WsModule> _byValue = $pb.ProtobufEnum.initByValue(values);
static WsModule? valueOf($core.int value) => _byValue[value];
static final $core.Map<$core.int, WSModule> _byValue = $pb.ProtobufEnum.initByValue(values);
static WSModule? valueOf($core.int value) => _byValue[value];
const WsModule._($core.int v, $core.String n) : super(v, n);
const WSModule._($core.int v, $core.String n) : super(v, n);
}

View File

@ -8,24 +8,24 @@
import 'dart:core' as $core;
import 'dart:convert' as $convert;
import 'dart:typed_data' as $typed_data;
@$core.Deprecated('Use wsModuleDescriptor instead')
const WsModule$json = const {
'1': 'WsModule',
@$core.Deprecated('Use wSModuleDescriptor instead')
const WSModule$json = const {
'1': 'WSModule',
'2': const [
const {'1': 'Doc', '2': 0},
],
};
/// Descriptor for `WsModule`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List wsModuleDescriptor = $convert.base64Decode('CghXc01vZHVsZRIHCgNEb2MQAA==');
@$core.Deprecated('Use wsMessageDescriptor instead')
const WsMessage$json = const {
'1': 'WsMessage',
/// Descriptor for `WSModule`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List wSModuleDescriptor = $convert.base64Decode('CghXU01vZHVsZRIHCgNEb2MQAA==');
@$core.Deprecated('Use wSMessageDescriptor instead')
const WSMessage$json = const {
'1': 'WSMessage',
'2': const [
const {'1': 'module', '3': 1, '4': 1, '5': 14, '6': '.WsModule', '10': 'module'},
const {'1': 'module', '3': 1, '4': 1, '5': 14, '6': '.WSModule', '10': 'module'},
const {'1': 'data', '3': 2, '4': 1, '5': 12, '10': 'data'},
],
};
/// Descriptor for `WsMessage`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List wsMessageDescriptor = $convert.base64Decode('CglXc01lc3NhZ2USIQoGbW9kdWxlGAEgASgOMgkuV3NNb2R1bGVSBm1vZHVsZRISCgRkYXRhGAIgASgMUgRkYXRh');
/// Descriptor for `WSMessage`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List wSMessageDescriptor = $convert.base64Decode('CglXU01lc3NhZ2USIQoGbW9kdWxlGAEgASgOMgkuV1NNb2R1bGVSBm1vZHVsZRISCgRkYXRhGAIgASgMUgRkYXRh');

View File

@ -0,0 +1,207 @@
use async_stream::stream;
use bytes::Bytes;
use flowy_collaboration::{
core::document::{history::UndoResult, Document},
errors::CollaborateError,
};
use flowy_error::FlowyError;
use futures::stream::StreamExt;
use lib_ot::{
core::{Interval, OperationTransformable},
revision::{RevId, Revision},
rich_text::{RichTextAttribute, RichTextDelta},
};
use std::{convert::TryFrom, sync::Arc};
use tokio::sync::{mpsc, oneshot, RwLock};
pub(crate) struct EditCommandQueue {
doc_id: String,
document: Arc<RwLock<Document>>,
receiver: Option<mpsc::UnboundedReceiver<EditCommand>>,
}
impl EditCommandQueue {
pub(crate) fn new(doc_id: &str, delta: RichTextDelta, receiver: mpsc::UnboundedReceiver<EditCommand>) -> Self {
let document = Arc::new(RwLock::new(Document::from_delta(delta)));
Self {
doc_id: doc_id.to_owned(),
document,
receiver: Some(receiver),
}
}
pub(crate) async fn run(mut self) {
let mut receiver = self.receiver.take().expect("Should only call once");
let stream = stream! {
loop {
match receiver.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
stream
.for_each(|msg| async {
match self.handle_message(msg).await {
Ok(_) => {},
Err(e) => tracing::debug!("[EditCommandQueue]: {}", e),
}
})
.await;
}
async fn handle_message(&self, msg: EditCommand) -> Result<(), FlowyError> {
match msg {
EditCommand::ComposeDelta { delta, ret } => {
let result = self.composed_delta(delta).await;
let _ = ret.send(result);
},
EditCommand::ProcessRemoteRevision { bytes, ret } => {
let f = || async {
let revision = Revision::try_from(bytes)?;
let delta = RichTextDelta::from_bytes(&revision.delta_data)?;
let server_rev_id: RevId = revision.rev_id.into();
let read_guard = self.document.read().await;
let (server_prime, client_prime) = read_guard.delta().transform(&delta)?;
drop(read_guard);
let transform_delta = TransformDeltas {
client_prime,
server_prime,
server_rev_id,
};
Ok::<TransformDeltas, CollaborateError>(transform_delta)
};
let _ = ret.send(f().await);
},
EditCommand::Insert { index, data, ret } => {
let mut write_guard = self.document.write().await;
let delta = write_guard.insert(index, data)?;
let md5 = write_guard.md5();
let _ = ret.send(Ok((delta, md5)));
},
EditCommand::Delete { interval, ret } => {
let mut write_guard = self.document.write().await;
let delta = write_guard.delete(interval)?;
let md5 = write_guard.md5();
let _ = ret.send(Ok((delta, md5)));
},
EditCommand::Format {
interval,
attribute,
ret,
} => {
let mut write_guard = self.document.write().await;
let delta = write_guard.format(interval, attribute)?;
let md5 = write_guard.md5();
let _ = ret.send(Ok((delta, md5)));
},
EditCommand::Replace { interval, data, ret } => {
let mut write_guard = self.document.write().await;
let delta = write_guard.replace(interval, data)?;
let md5 = write_guard.md5();
let _ = ret.send(Ok((delta, md5)));
},
EditCommand::CanUndo { ret } => {
let _ = ret.send(self.document.read().await.can_undo());
},
EditCommand::CanRedo { ret } => {
let _ = ret.send(self.document.read().await.can_redo());
},
EditCommand::Undo { ret } => {
let result = self.document.write().await.undo();
let _ = ret.send(result);
},
EditCommand::Redo { ret } => {
let result = self.document.write().await.redo();
let _ = ret.send(result);
},
EditCommand::ReadDoc { ret } => {
let data = self.document.read().await.to_json();
let _ = ret.send(Ok(data));
},
EditCommand::ReadDocDelta { ret } => {
let delta = self.document.read().await.delta().clone();
let _ = ret.send(Ok(delta));
},
}
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, delta), fields(compose_result), err)]
async fn composed_delta(&self, delta: RichTextDelta) -> Result<String, CollaborateError> {
// tracing::debug!("{:?} thread handle_message", thread::current(),);
let mut document = self.document.write().await;
tracing::Span::current().record(
"composed_delta",
&format!("doc_id:{} - {}", &self.doc_id, delta.to_json()).as_str(),
);
let _ = document.compose_delta(delta)?;
let md5 = document.md5();
drop(document);
Ok(md5)
}
}
pub(crate) type Ret<T> = oneshot::Sender<Result<T, CollaborateError>>;
pub(crate) type NewDelta = (RichTextDelta, String);
pub(crate) type DocumentMD5 = String;
#[allow(dead_code)]
pub(crate) enum EditCommand {
ComposeDelta {
delta: RichTextDelta,
ret: Ret<DocumentMD5>,
},
ProcessRemoteRevision {
bytes: Bytes,
ret: Ret<TransformDeltas>,
},
Insert {
index: usize,
data: String,
ret: Ret<NewDelta>,
},
Delete {
interval: Interval,
ret: Ret<NewDelta>,
},
Format {
interval: Interval,
attribute: RichTextAttribute,
ret: Ret<NewDelta>,
},
Replace {
interval: Interval,
data: String,
ret: Ret<NewDelta>,
},
CanUndo {
ret: oneshot::Sender<bool>,
},
CanRedo {
ret: oneshot::Sender<bool>,
},
Undo {
ret: Ret<UndoResult>,
},
Redo {
ret: Ret<UndoResult>,
},
ReadDoc {
ret: Ret<String>,
},
ReadDocDelta {
ret: Ret<RichTextDelta>,
},
}
pub(crate) struct TransformDeltas {
pub client_prime: RichTextDelta,
pub server_prime: RichTextDelta,
pub server_rev_id: RevId,
}

View File

@ -0,0 +1,297 @@
use crate::services::doc::{DocumentWebSocket, DocumentWsHandler, SYNC_INTERVAL_IN_MILLIS};
use async_stream::stream;
use bytes::Bytes;
use flowy_collaboration::{
entities::ws::{DocumentWSData, DocumentWSDataType},
Revision,
};
use flowy_error::{internal_error, FlowyError, FlowyResult};
use futures::stream::StreamExt;
use lib_infra::future::FutureResult;
use lib_ot::revision::{RevId, RevisionRange};
use lib_ws::WSConnectState;
use std::{convert::TryFrom, sync::Arc};
use tokio::{
sync::{
broadcast,
mpsc,
mpsc::{UnboundedReceiver, UnboundedSender},
},
task::spawn_blocking,
time::{interval, Duration},
};
pub(crate) struct WebSocketManager {
doc_id: String,
data_provider: Arc<dyn DocumentSinkDataProvider>,
stream_consumer: Arc<dyn DocumentWebSocketSteamConsumer>,
ws: Arc<dyn DocumentWebSocket>,
ws_msg_tx: UnboundedSender<DocumentWSData>,
ws_msg_rx: Option<UnboundedReceiver<DocumentWSData>>,
stop_sync_tx: SinkStopTx,
}
impl WebSocketManager {
pub(crate) fn new(
doc_id: &str,
ws: Arc<dyn DocumentWebSocket>,
data_provider: Arc<dyn DocumentSinkDataProvider>,
stream_consumer: Arc<dyn DocumentWebSocketSteamConsumer>,
) -> Self {
let (ws_msg_tx, ws_msg_rx) = mpsc::unbounded_channel();
let (stop_sync_tx, _) = tokio::sync::broadcast::channel(2);
let doc_id = doc_id.to_string();
let mut manager = WebSocketManager {
doc_id,
data_provider,
stream_consumer,
ws,
ws_msg_tx,
ws_msg_rx: Some(ws_msg_rx),
stop_sync_tx,
};
manager.start_sync();
manager
}
fn start_sync(&mut self) {
let ws_msg_rx = self.ws_msg_rx.take().expect("Only take once");
let sink = DocumentWebSocketSink::new(
&self.doc_id,
self.data_provider.clone(),
self.ws.clone(),
self.stop_sync_tx.subscribe(),
);
let stream = DocumentWebSocketStream::new(
&self.doc_id,
self.stream_consumer.clone(),
ws_msg_rx,
self.ws.clone(),
self.stop_sync_tx.subscribe(),
);
tokio::spawn(sink.run());
tokio::spawn(stream.run());
self.notify_user_conn();
}
pub(crate) fn stop(&self) {
if self.stop_sync_tx.send(()).is_ok() {
tracing::debug!("{} stop sync", self.doc_id)
}
}
#[tracing::instrument(level = "debug", skip(self))]
fn notify_user_conn(&self) {
// let rev_id: RevId = self.rev_manager.rev_id().into();
// if let Ok(user_id) = self.user.user_id() {
// let action = OpenDocAction::new(&user_id, &self.doc_id, &rev_id,
// &self.ws_sender); let strategy =
// ExponentialBackoff::from_millis(50).take(3); let retry =
// Retry::spawn(strategy, action); tokio::spawn(async move {
// match retry.await {
// Ok(_) => log::debug!("Notify open doc success"),
// Err(e) => log::error!("Notify open doc failed: {}", e),
// }
// });
// }
}
}
impl DocumentWsHandler for WebSocketManager {
fn receive(&self, doc_data: DocumentWSData) {
match self.ws_msg_tx.send(doc_data) {
Ok(_) => {},
Err(e) => tracing::error!("❌Propagate ws message failed. {}", e),
}
}
fn connect_state_changed(&self, state: &WSConnectState) {
match state {
WSConnectState::Init => {},
WSConnectState::Connecting => {},
WSConnectState::Connected => self.notify_user_conn(),
WSConnectState::Disconnected => {},
}
}
}
pub trait DocumentWebSocketSteamConsumer: Send + Sync {
fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError>;
fn make_revision_from_range(&self, range: RevisionRange) -> FutureResult<Revision, FlowyError>;
fn ack_revision(&self, rev_id: i64) -> FutureResult<(), FlowyError>;
}
pub(crate) struct DocumentWebSocketStream {
doc_id: String,
consumer: Arc<dyn DocumentWebSocketSteamConsumer>,
ws_msg_rx: Option<mpsc::UnboundedReceiver<DocumentWSData>>,
ws_sender: Arc<dyn DocumentWebSocket>,
stop_rx: Option<SinkStopRx>,
}
impl DocumentWebSocketStream {
pub(crate) fn new(
doc_id: &str,
consumer: Arc<dyn DocumentWebSocketSteamConsumer>,
ws_msg_rx: mpsc::UnboundedReceiver<DocumentWSData>,
ws_sender: Arc<dyn DocumentWebSocket>,
stop_rx: SinkStopRx,
) -> Self {
DocumentWebSocketStream {
doc_id: doc_id.to_owned(),
consumer,
ws_msg_rx: Some(ws_msg_rx),
ws_sender,
stop_rx: Some(stop_rx),
}
}
pub async fn run(mut self) {
let mut receiver = self.ws_msg_rx.take().expect("Only take once");
let mut stop_rx = self.stop_rx.take().expect("Only take once");
let doc_id = self.doc_id.clone();
let stream = stream! {
loop {
tokio::select! {
result = receiver.recv() => {
match result {
Some(msg) => {
yield msg
},
None => {
tracing::debug!("[DocumentStream:{}] loop exit", doc_id);
break;
},
}
},
_ = stop_rx.recv() => {
tracing::debug!("[DocumentStream:{}] loop exit", doc_id);
break
},
};
}
};
stream
.for_each(|msg| async {
match self.handle_message(msg).await {
Ok(_) => {},
Err(e) => log::error!("[DocumentStream:{}] error: {}", self.doc_id, e),
}
})
.await;
}
async fn handle_message(&self, msg: DocumentWSData) -> FlowyResult<()> {
let DocumentWSData {
doc_id: _,
ty,
data,
id: _,
} = msg;
let bytes = spawn_blocking(move || Bytes::from(data))
.await
.map_err(internal_error)?;
tracing::debug!("[DocumentStream]: receives new message: {:?}", ty);
match ty {
DocumentWSDataType::PushRev => {
let _ = self.consumer.receive_push_revision(bytes).await?;
},
DocumentWSDataType::PullRev => {
let range = RevisionRange::try_from(bytes)?;
let revision = self.consumer.make_revision_from_range(range).await?;
let _ = self.ws_sender.send(revision.into());
},
DocumentWSDataType::Acked => {
let rev_id = RevId::try_from(bytes)?;
let _ = self.consumer.ack_revision(rev_id.into()).await;
},
DocumentWSDataType::UserConnect => {},
}
Ok(())
}
}
pub(crate) type Tick = ();
pub(crate) type SinkStopRx = broadcast::Receiver<()>;
pub(crate) type SinkStopTx = broadcast::Sender<()>;
pub trait DocumentSinkDataProvider: Send + Sync {
fn next(&self) -> FutureResult<Option<DocumentWSData>, FlowyError>;
}
pub(crate) struct DocumentWebSocketSink {
provider: Arc<dyn DocumentSinkDataProvider>,
ws_sender: Arc<dyn DocumentWebSocket>,
stop_rx: Option<SinkStopRx>,
doc_id: String,
}
impl DocumentWebSocketSink {
pub(crate) fn new(
doc_id: &str,
provider: Arc<dyn DocumentSinkDataProvider>,
ws_sender: Arc<dyn DocumentWebSocket>,
stop_rx: SinkStopRx,
) -> Self {
Self {
provider,
ws_sender,
stop_rx: Some(stop_rx),
doc_id: doc_id.to_owned(),
}
}
pub async fn run(mut self) {
let (tx, mut rx) = mpsc::unbounded_channel();
let mut stop_rx = self.stop_rx.take().expect("Only take once");
let doc_id = self.doc_id.clone();
tokio::spawn(tick(tx));
let stream = stream! {
loop {
tokio::select! {
result = rx.recv() => {
match result {
Some(msg) => yield msg,
None => break,
}
},
_ = stop_rx.recv() => {
tracing::debug!("[DocumentSink:{}] loop exit", doc_id);
break
},
};
}
};
stream
.for_each(|_| async {
match self.send_next_revision().await {
Ok(_) => {},
Err(e) => log::error!("[DocumentSink]: send msg failed, {:?}", e),
}
})
.await;
}
async fn send_next_revision(&self) -> FlowyResult<()> {
match self.provider.next().await? {
None => {
tracing::debug!("Finish synchronizing revisions");
Ok(())
},
Some(data) => {
self.ws_sender.send(data).map_err(internal_error)
// let _ = tokio::time::timeout(Duration::from_millis(2000),
},
}
}
}
async fn tick(sender: mpsc::UnboundedSender<Tick>) {
let mut interval = interval(Duration::from_millis(SYNC_INTERVAL_IN_MILLIS));
while sender.send(()).is_ok() {
interval.tick().await;
}
}

View File

@ -2,7 +2,7 @@ use crate::{errors::FlowyError, module::DocumentUser, services::doc::*};
use bytes::Bytes;
use flowy_collaboration::{
core::document::history::UndoResult,
entities::{doc::DocDelta, ws::WsDocumentData},
entities::{doc::DocDelta, ws::DocumentWSData},
errors::CollaborateResult,
};
use flowy_database::ConnectionPool;
@ -13,16 +13,16 @@ use lib_ot::{
revision::{RevId, RevType, Revision, RevisionRange},
rich_text::{RichTextAttribute, RichTextDelta},
};
use std::sync::Arc;
use parking_lot::RwLock;
use std::{collections::VecDeque, sync::Arc};
use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
pub type DocId = String;
type SinkVec = Arc<RwLock<VecDeque<DocumentWSData>>>;
pub struct ClientDocEditor {
pub doc_id: DocId,
pub doc_id: String,
rev_manager: Arc<RevisionManager>,
ws_manager: Arc<WebSocketManager>,
edit_cmd_tx: UnboundedSender<EditCommand>,
sink_vec: SinkVec,
user: Arc<dyn DocumentUser>,
}
@ -38,15 +38,17 @@ impl ClientDocEditor {
let edit_cmd_tx = spawn_edit_queue(doc_id, delta, pool.clone());
let doc_id = doc_id.to_string();
let rev_manager = Arc::new(rev_manager);
let sink_vec = Arc::new(RwLock::new(VecDeque::new()));
let data_provider = Arc::new(DocumentSinkDataProviderAdapter {
rev_manager: rev_manager.clone(),
sink_vec: sink_vec.clone(),
});
let stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter {
doc_id: doc_id.clone(),
edit_cmd_tx: edit_cmd_tx.clone(),
rev_manager: rev_manager.clone(),
user: user.clone(),
sink_vec: sink_vec.clone(),
});
let ws_manager = Arc::new(WebSocketManager::new(&doc_id, ws, data_provider, stream_consumer));
let editor = Arc::new(Self {
@ -54,6 +56,7 @@ impl ClientDocEditor {
rev_manager,
ws_manager,
edit_cmd_tx,
sink_vec,
user,
});
Ok(editor)
@ -199,6 +202,7 @@ struct DocumentWebSocketSteamConsumerAdapter {
edit_cmd_tx: UnboundedSender<EditCommand>,
rev_manager: Arc<RevisionManager>,
user: Arc<dyn DocumentUser>,
sink_vec: SinkVec,
}
impl DocumentWebSocketSteamConsumer for DocumentWebSocketSteamConsumerAdapter {
@ -233,18 +237,28 @@ impl DocumentWebSocketSteamConsumer for DocumentWebSocketSteamConsumerAdapter {
struct DocumentSinkDataProviderAdapter {
rev_manager: Arc<RevisionManager>,
sink_vec: SinkVec,
}
impl DocumentSinkDataProvider for DocumentSinkDataProviderAdapter {
fn next(&self) -> FutureResult<Option<WsDocumentData>, FlowyError> {
fn next(&self) -> FutureResult<Option<DocumentWSData>, FlowyError> {
let rev_manager = self.rev_manager.clone();
let sink_vec = self.sink_vec.clone();
FutureResult::new(async move {
match rev_manager.next_sync_revision().await? {
Some(rev) => {
tracing::debug!("[DocumentSinkDataProvider]: revision: {}:{:?}", rev.doc_id, rev.rev_id);
Ok(Some(rev.into()))
},
None => Ok(None),
if sink_vec.read().is_empty() {
match rev_manager.next_sync_revision().await? {
Some(rev) => {
tracing::debug!("[DocumentSinkDataProvider]: revision: {}:{:?}", rev.doc_id, rev.rev_id);
Ok(Some(rev.into()))
},
None => Ok(None),
}
} else {
match sink_vec.read().front() {
None => Ok(None),
Some(data) => Ok(Some(data.clone())),
}
}
})
}

View File

@ -0,0 +1,67 @@
use crate::errors::FlowyError;
use bytes::Bytes;
use dashmap::DashMap;
use flowy_collaboration::entities::ws::DocumentWSData;
use lib_ws::WSConnectState;
use std::{convert::TryInto, sync::Arc};
pub(crate) trait DocumentWsHandler: Send + Sync {
fn receive(&self, data: DocumentWSData);
fn connect_state_changed(&self, state: &WSConnectState);
}
pub type WsStateReceiver = tokio::sync::broadcast::Receiver<WSConnectState>;
pub trait DocumentWebSocket: Send + Sync {
fn send(&self, data: DocumentWSData) -> Result<(), FlowyError>;
fn subscribe_state_changed(&self) -> WsStateReceiver;
}
pub struct DocumentWsHandlers {
ws: Arc<dyn DocumentWebSocket>,
// key: the document id
handlers: Arc<DashMap<String, Arc<dyn DocumentWsHandler>>>,
}
impl DocumentWsHandlers {
pub fn new(ws: Arc<dyn DocumentWebSocket>) -> Self {
let handlers: Arc<DashMap<String, Arc<dyn DocumentWsHandler>>> = Arc::new(DashMap::new());
Self { ws, handlers }
}
pub(crate) fn init(&self) { listen_ws_state_changed(self.ws.clone(), self.handlers.clone()); }
pub(crate) fn register_handler(&self, id: &str, handler: Arc<dyn DocumentWsHandler>) {
if self.handlers.contains_key(id) {
log::error!("Duplicate handler registered for {:?}", id);
}
self.handlers.insert(id.to_string(), handler);
}
pub(crate) fn remove_handler(&self, id: &str) { self.handlers.remove(id); }
pub fn did_receive_data(&self, data: Bytes) {
let data: DocumentWSData = data.try_into().unwrap();
match self.handlers.get(&data.doc_id) {
None => {
log::error!("Can't find any source handler for {:?}", data.doc_id);
},
Some(handler) => {
handler.receive(data);
},
}
}
pub fn ws(&self) -> Arc<dyn DocumentWebSocket> { self.ws.clone() }
}
#[tracing::instrument(level = "debug", skip(ws, handlers))]
fn listen_ws_state_changed(ws: Arc<dyn DocumentWebSocket>, handlers: Arc<DashMap<String, Arc<dyn DocumentWsHandler>>>) {
let mut notify = ws.subscribe_state_changed();
tokio::spawn(async move {
while let Ok(state) = notify.recv().await {
handlers.iter().for_each(|handle| {
handle.value().connect_state_changed(&state);
});
}
});
}

View File

@ -1,11 +1,11 @@
use crate::services::ws::{FlowyError, FlowyWebSocket, FlowyWsSender, WsConnectState, WsMessage, WsMessageReceiver};
use crate::services::ws::{FlowyError, FlowyWebSocket, FlowyWsSender, WSConnectState, WSMessage, WSMessageReceiver};
use bytes::Bytes;
use dashmap::DashMap;
use flowy_collaboration::{
core::sync::{RevisionUser, ServerDocManager, ServerDocPersistence, SyncResponse},
entities::{
doc::Doc,
ws::{WsDocumentData, WsDocumentDataType},
ws::{DocumentWSData, DocumentWSDataType},
},
errors::CollaborateError,
Revision,
@ -13,7 +13,7 @@ use flowy_collaboration::{
};
use lazy_static::lazy_static;
use lib_infra::future::{FutureResult, FutureResultSend};
use lib_ws::WsModule;
use lib_ws::WSModule;
use parking_lot::RwLock;
use std::{
convert::{TryFrom, TryInto},
@ -22,9 +22,9 @@ use std::{
use tokio::sync::{broadcast, broadcast::Receiver, mpsc};
pub struct MockWebSocket {
handlers: DashMap<WsModule, Arc<dyn WsMessageReceiver>>,
state_sender: broadcast::Sender<WsConnectState>,
ws_sender: broadcast::Sender<WsMessage>,
handlers: DashMap<WSModule, Arc<dyn WSMessageReceiver>>,
state_sender: broadcast::Sender<WSConnectState>,
ws_sender: broadcast::Sender<WSMessage>,
is_stop: RwLock<bool>,
}
@ -56,7 +56,7 @@ impl FlowyWebSocket for Arc<MockWebSocket> {
if *cloned_ws.is_stop.read() {
// do nothing
} else {
let ws_data = WsDocumentData::try_from(Bytes::from(message.data.clone())).unwrap();
let ws_data = DocumentWSData::try_from(Bytes::from(message.data.clone())).unwrap();
let mut rx = DOC_SERVER.handle_ws_data(ws_data).await;
let new_ws_message = rx.recv().await.unwrap();
match cloned_ws.handlers.get(&new_ws_message.module) {
@ -75,11 +75,11 @@ impl FlowyWebSocket for Arc<MockWebSocket> {
FutureResult::new(async { Ok(()) })
}
fn subscribe_connect_state(&self) -> Receiver<WsConnectState> { self.state_sender.subscribe() }
fn subscribe_connect_state(&self) -> Receiver<WSConnectState> { self.state_sender.subscribe() }
fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn add_message_receiver(&self, handler: Arc<dyn WsMessageReceiver>) -> Result<(), FlowyError> {
fn add_message_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
let source = handler.source();
if self.handlers.contains_key(&source) {
tracing::error!("WsSource's {:?} is already registered", source);
@ -108,13 +108,13 @@ impl std::default::Default for MockDocServer {
}
impl MockDocServer {
async fn handle_ws_data(&self, ws_data: WsDocumentData) -> mpsc::Receiver<WsMessage> {
async fn handle_ws_data(&self, ws_data: DocumentWSData) -> mpsc::Receiver<WSMessage> {
let bytes = Bytes::from(ws_data.data);
match ws_data.ty {
WsDocumentDataType::Acked => {
DocumentWSDataType::Acked => {
unimplemented!()
},
WsDocumentDataType::PushRev => {
DocumentWSDataType::PushRev => {
let revision = Revision::try_from(bytes).unwrap();
let handler = match self.manager.get(&revision.doc_id).await {
None => self.manager.create_doc(revision.clone()).await.unwrap(),
@ -129,10 +129,10 @@ impl MockDocServer {
handler.apply_revision(Arc::new(user), revision).await.unwrap();
rx
},
WsDocumentDataType::PullRev => {
DocumentWSDataType::PullRev => {
unimplemented!()
},
WsDocumentDataType::UserConnect => {
DocumentWSDataType::UserConnect => {
unimplemented!()
},
}
@ -184,7 +184,7 @@ impl ServerDocPersistence for MockDocServerPersistence {
#[derive(Debug)]
struct MockDocUser {
user_id: String,
tx: mpsc::Sender<WsMessage>,
tx: mpsc::Sender<WSMessage>,
}
impl RevisionUser for MockDocUser {
@ -196,24 +196,24 @@ impl RevisionUser for MockDocUser {
match resp {
SyncResponse::Pull(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WsMessage {
module: WsModule::Doc,
let msg = WSMessage {
module: WSModule::Doc,
data: bytes.to_vec(),
};
sender.send(msg).await.unwrap();
},
SyncResponse::Push(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WsMessage {
module: WsModule::Doc,
let msg = WSMessage {
module: WSModule::Doc,
data: bytes.to_vec(),
};
sender.send(msg).await.unwrap();
},
SyncResponse::Ack(data) => {
let bytes: Bytes = data.try_into().unwrap();
let msg = WsMessage {
module: WsModule::Doc,
let msg = WSMessage {
module: WSModule::Doc,
data: bytes.to_vec(),
};
sender.send(msg).await.unwrap();

View File

@ -3,17 +3,17 @@ use std::sync::Arc;
use tokio::sync::broadcast;
pub use flowy_error::FlowyError;
pub use lib_ws::{WsConnectState, WsMessage, WsMessageReceiver};
pub use lib_ws::{WSConnectState, WSMessage, WSMessageReceiver};
pub trait FlowyWebSocket: Send + Sync {
fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError>;
fn stop_connect(&self) -> FutureResult<(), FlowyError>;
fn subscribe_connect_state(&self) -> broadcast::Receiver<WsConnectState>;
fn subscribe_connect_state(&self) -> broadcast::Receiver<WSConnectState>;
fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError>;
fn add_message_receiver(&self, handler: Arc<dyn WsMessageReceiver>) -> Result<(), FlowyError>;
fn add_message_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError>;
fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, FlowyError>;
}
pub trait FlowyWsSender: Send + Sync {
fn send(&self, msg: WsMessage) -> Result<(), FlowyError>;
fn send(&self, msg: WSMessage) -> Result<(), FlowyError>;
}

View File

@ -4,7 +4,7 @@ use crate::{
};
use flowy_error::{internal_error, FlowyError};
use lib_infra::future::FutureResult;
use lib_ws::{WsConnectState, WsController, WsMessage, WsMessageReceiver, WsSender};
use lib_ws::{WSConnectState, WSController, WSMessage, WSMessageReceiver, WSSender};
use parking_lot::RwLock;
use std::sync::Arc;
use tokio::sync::{broadcast, broadcast::Receiver};
@ -19,7 +19,7 @@ pub struct WsManager {
impl WsManager {
pub fn new(addr: String) -> Self {
let ws: Arc<dyn FlowyWebSocket> = if cfg!(feature = "http_server") {
Arc::new(Arc::new(WsController::new()))
Arc::new(Arc::new(WSController::new()))
} else {
local_web_socket()
};
@ -65,13 +65,13 @@ impl WsManager {
}
}
pub fn subscribe_websocket_state(&self) -> broadcast::Receiver<WsConnectState> {
pub fn subscribe_websocket_state(&self) -> broadcast::Receiver<WSConnectState> {
self.inner.subscribe_connect_state()
}
pub fn subscribe_network_ty(&self) -> broadcast::Receiver<NetworkType> { self.status_notifier.subscribe() }
pub fn add_receiver(&self, handler: Arc<dyn WsMessageReceiver>) -> Result<(), FlowyError> {
pub fn add_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
let _ = self.inner.add_message_receiver(handler)?;
Ok(())
}
@ -88,10 +88,10 @@ fn listen_on_websocket(ws: Arc<dyn FlowyWebSocket>) {
Ok(state) => {
tracing::info!("Websocket state changed: {}", state);
match state {
WsConnectState::Init => {},
WsConnectState::Connected => {},
WsConnectState::Connecting => {},
WsConnectState::Disconnected => retry_connect(ws.clone(), 100).await,
WSConnectState::Init => {},
WSConnectState::Connected => {},
WSConnectState::Connecting => {},
WSConnectState::Disconnected => retry_connect(ws.clone(), 100).await,
}
},
Err(e) => {
@ -112,7 +112,7 @@ async fn retry_connect(ws: Arc<dyn FlowyWebSocket>, count: usize) {
}
}
impl FlowyWebSocket for Arc<WsController> {
impl FlowyWebSocket for Arc<WSController> {
fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError> {
let cloned_ws = self.clone();
FutureResult::new(async move {
@ -129,7 +129,7 @@ impl FlowyWebSocket for Arc<WsController> {
})
}
fn subscribe_connect_state(&self) -> Receiver<WsConnectState> { self.subscribe_state() }
fn subscribe_connect_state(&self) -> Receiver<WSConnectState> { self.subscribe_state() }
fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError> {
let cloned_ws = self.clone();
@ -139,7 +139,7 @@ impl FlowyWebSocket for Arc<WsController> {
})
}
fn add_message_receiver(&self, handler: Arc<dyn WsMessageReceiver>) -> Result<(), FlowyError> {
fn add_message_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
let _ = self.add_receiver(handler).map_err(internal_error)?;
Ok(())
}
@ -150,8 +150,8 @@ impl FlowyWebSocket for Arc<WsController> {
}
}
impl FlowyWsSender for WsSender {
fn send(&self, msg: WsMessage) -> Result<(), FlowyError> {
impl FlowyWsSender for WSSender {
fn send(&self, msg: WSMessage) -> Result<(), FlowyError> {
let _ = self.send_msg(msg).map_err(internal_error)?;
Ok(())
}

View File

@ -1,11 +1,11 @@
use crate::services::ws::{FlowyError, FlowyWebSocket, FlowyWsSender, WsConnectState, WsMessage, WsMessageReceiver};
use crate::services::ws::{FlowyError, FlowyWebSocket, FlowyWsSender, WSConnectState, WSMessage, WSMessageReceiver};
use lib_infra::future::FutureResult;
use std::sync::Arc;
use tokio::sync::{broadcast, broadcast::Receiver};
pub(crate) struct LocalWebSocket {
state_sender: broadcast::Sender<WsConnectState>,
ws_sender: broadcast::Sender<WsMessage>,
state_sender: broadcast::Sender<WSConnectState>,
ws_sender: broadcast::Sender<WSMessage>,
}
impl std::default::Default for LocalWebSocket {
@ -24,17 +24,17 @@ impl FlowyWebSocket for Arc<LocalWebSocket> {
fn stop_connect(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn subscribe_connect_state(&self) -> Receiver<WsConnectState> { self.state_sender.subscribe() }
fn subscribe_connect_state(&self) -> Receiver<WSConnectState> { self.state_sender.subscribe() }
fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn add_message_receiver(&self, _handler: Arc<dyn WsMessageReceiver>) -> Result<(), FlowyError> { Ok(()) }
fn add_message_receiver(&self, _handler: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> { Ok(()) }
fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) }
}
impl FlowyWsSender for broadcast::Sender<WsMessage> {
fn send(&self, msg: WsMessage) -> Result<(), FlowyError> {
impl FlowyWsSender for broadcast::Sender<WSMessage> {
fn send(&self, msg: WSMessage) -> Result<(), FlowyError> {
let _ = self.send(msg);
Ok(())
}

View File

@ -1,5 +1,5 @@
use bytes::Bytes;
use flowy_collaboration::entities::ws::WsDocumentData;
use flowy_collaboration::entities::ws::DocumentWSData;
use flowy_database::ConnectionPool;
use flowy_document::{
errors::{internal_error, FlowyError},
@ -8,7 +8,7 @@ use flowy_document::{
};
use flowy_net::services::ws::WsManager;
use flowy_user::services::user::UserSession;
use lib_ws::{WsMessage, WsMessageReceiver, WsModule};
use lib_ws::{WSMessage, WSMessageReceiver, WSModule};
use std::{convert::TryInto, path::Path, sync::Arc};
pub struct DocumentDepsResolver();
@ -61,10 +61,10 @@ struct WsSenderImpl {
}
impl DocumentWebSocket for WsSenderImpl {
fn send(&self, data: WsDocumentData) -> Result<(), FlowyError> {
fn send(&self, data: DocumentWSData) -> Result<(), FlowyError> {
let bytes: Bytes = data.try_into().unwrap();
let msg = WsMessage {
module: WsModule::Doc,
let msg = WSMessage {
module: WSModule::Doc,
data: bytes.to_vec(),
};
let sender = self.ws_manager.ws_sender().map_err(internal_error)?;
@ -78,7 +78,7 @@ impl DocumentWebSocket for WsSenderImpl {
struct WsMessageReceiverAdaptor(Arc<DocumentWsHandlers>);
impl WsMessageReceiver for WsMessageReceiverAdaptor {
fn source(&self) -> WsModule { WsModule::Doc }
fn receive_message(&self, msg: WsMessage) { self.0.did_receive_data(Bytes::from(msg.data)); }
impl WSMessageReceiver for WsMessageReceiverAdaptor {
fn source(&self) -> WSModule { WSModule::Doc }
fn receive_message(&self, msg: WSMessage) { self.0.did_receive_data(Bytes::from(msg.data)); }
}

View File

@ -46,7 +46,7 @@ fn parse_files_protobuf(proto_crate_path: &str, proto_output_dir: &str) -> Vec<P
// https://docs.rs/syn/1.0.54/syn/struct.File.html
let ast =
syn::parse_file(read_file(&path).unwrap().as_ref()).expect("Unable to parse file");
syn::parse_file(read_file(&path).unwrap().as_ref()).expect(&format!("Unable to parse file at {}", path));
let structs = get_ast_structs(&ast);
let proto_file_path = format!("{}/{}.proto", &proto_output_dir, &file_name);
let mut proto_file_content = parse_or_init_proto_file(proto_file_path.as_ref());

View File

@ -1,6 +1,6 @@
use crate::{
core::document::Document,
entities::ws::{WsDocumentData, WsDocumentDataType},
entities::ws::{DocumentWSData, DocumentWSDataType, WsDocumentDataBuilder},
};
use bytes::Bytes;
use lib_ot::{
@ -29,9 +29,9 @@ pub trait RevisionUser: Send + Sync + Debug {
}
pub enum SyncResponse {
Pull(WsDocumentData),
Push(WsDocumentData),
Ack(WsDocumentData),
Pull(DocumentWSData),
Push(DocumentWSData),
Ack(DocumentWSData),
NewRevision {
rev_id: i64,
doc_json: String,
@ -63,7 +63,10 @@ impl RevisionSynchronizer {
if server_base_rev_id == revision.base_rev_id || server_rev_id == revision.rev_id {
// The rev is in the right order, just compose it.
let _ = self.compose_revision(&revision)?;
user.recv(SyncResponse::Ack(mk_acked_message(&revision)));
user.recv(SyncResponse::Ack(WsDocumentDataBuilder::build_acked_message(
&revision.doc_id,
revision.rev_id,
)));
let rev_id = revision.rev_id;
let doc_id = self.doc_id.clone();
let doc_json = self.doc_json();
@ -74,21 +77,27 @@ impl RevisionSynchronizer {
});
} else {
// The server document is outdated, pull the missing revision from the client.
let msg = mk_pull_message(&self.doc_id, server_rev_id, revision.rev_id);
let range = RevisionRange {
doc_id: self.doc_id.clone(),
start: server_rev_id,
end: revision.rev_id,
};
let msg = WsDocumentDataBuilder::build_push_pull_message(&self.doc_id, range);
user.recv(SyncResponse::Pull(msg));
}
},
Ordering::Equal => {
// Do nothing
log::warn!("Applied revision rev_id is the same as cur_rev_id");
user.recv(SyncResponse::Ack(mk_acked_message(&revision)));
let data = WsDocumentDataBuilder::build_acked_message(&revision.doc_id, revision.rev_id);
user.recv(SyncResponse::Ack(data));
},
Ordering::Greater => {
// The client document is outdated. Transform the client revision delta and then
// send the prime delta to the client. Client should compose the this prime
// delta.
let cli_revision = self.transform_revision(&revision)?;
let data = mk_push_message(&self.doc_id, cli_revision);
let data = WsDocumentDataBuilder::build_push_rev_message(&self.doc_id, cli_revision);
user.recv(SyncResponse::Push(data));
},
}
@ -143,44 +152,6 @@ impl RevisionSynchronizer {
}
}
fn mk_push_message(doc_id: &str, revision: Revision) -> WsDocumentData {
let bytes: Bytes = revision.try_into().unwrap();
WsDocumentData {
doc_id: doc_id.to_string(),
ty: WsDocumentDataType::PushRev,
data: bytes.to_vec(),
}
}
fn mk_pull_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsDocumentData {
let range = RevisionRange {
doc_id: doc_id.to_string(),
start: from_rev_id,
end: to_rev_id,
};
let bytes: Bytes = range.try_into().unwrap();
WsDocumentData {
doc_id: doc_id.to_string(),
ty: WsDocumentDataType::PullRev,
data: bytes.to_vec(),
}
}
fn mk_acked_message(revision: &Revision) -> WsDocumentData {
// let mut wtr = vec![];
// let _ = wtr.write_i64::<BigEndian>(revision.rev_id);
let mut rev_id = RevId::new();
rev_id.set_value(revision.rev_id);
let data = rev_id.write_to_bytes().unwrap();
WsDocumentData {
doc_id: revision.doc_id.clone(),
ty: WsDocumentDataType::Acked,
data,
}
}
#[inline]
fn next(rev_id: i64) -> i64 { rev_id + 1 }

View File

@ -5,7 +5,7 @@ use lib_ot::revision::{RevId, Revision, RevisionRange};
use std::convert::{TryFrom, TryInto};
#[derive(Debug, Clone, ProtoBuf_Enum, Eq, PartialEq, Hash)]
pub enum WsDocumentDataType {
pub enum DocumentWSDataType {
// The frontend receives the Acked means the backend has accepted the revision
Acked = 0,
// The frontend receives the PushRev event means the backend is pushing the new revision to frontend
@ -15,7 +15,7 @@ pub enum WsDocumentDataType {
UserConnect = 3,
}
impl WsDocumentDataType {
impl DocumentWSDataType {
pub fn data<T>(&self, bytes: Bytes) -> Result<T, CollaborateError>
where
T: TryFrom<Bytes, Error = CollaborateError>,
@ -24,64 +24,75 @@ impl WsDocumentDataType {
}
}
impl std::default::Default for WsDocumentDataType {
fn default() -> Self { WsDocumentDataType::Acked }
impl std::default::Default for DocumentWSDataType {
fn default() -> Self { DocumentWSDataType::Acked }
}
#[derive(ProtoBuf, Default, Debug, Clone)]
pub struct WsDocumentData {
pub struct DocumentWSData {
#[pb(index = 1)]
pub doc_id: String,
#[pb(index = 2)]
pub ty: WsDocumentDataType,
pub ty: DocumentWSDataType,
#[pb(index = 3)]
pub data: Vec<u8>,
#[pb(index = 4, one_of)]
pub id: Option<i64>,
}
impl std::convert::From<Revision> for WsDocumentData {
impl std::convert::From<Revision> for DocumentWSData {
fn from(revision: Revision) -> Self {
let doc_id = revision.doc_id.clone();
let rev_id = revision.rev_id;
let bytes: Bytes = revision.try_into().unwrap();
Self {
doc_id,
ty: WsDocumentDataType::PushRev,
ty: DocumentWSDataType::PushRev,
data: bytes.to_vec(),
id: Some(rev_id),
}
}
}
pub struct WsDocumentDataBuilder();
impl WsDocumentDataBuilder {
// WsDocumentDataType::PushRev -> Revision
pub fn build_push_rev_message(doc_id: &str, revision: Revision) -> WsDocumentData {
// DocumentWSDataType::PushRev -> Revision
pub fn build_push_rev_message(doc_id: &str, revision: Revision) -> DocumentWSData {
let rev_id = revision.rev_id;
let bytes: Bytes = revision.try_into().unwrap();
WsDocumentData {
DocumentWSData {
doc_id: doc_id.to_string(),
ty: WsDocumentDataType::PushRev,
ty: DocumentWSDataType::PushRev,
data: bytes.to_vec(),
id: Some(rev_id),
}
}
// WsDocumentDataType::PullRev -> RevisionRange
pub fn build_push_pull_message(doc_id: &str, range: RevisionRange) -> WsDocumentData {
// DocumentWSDataType::PullRev -> RevisionRange
pub fn build_push_pull_message(doc_id: &str, range: RevisionRange) -> DocumentWSData {
let bytes: Bytes = range.try_into().unwrap();
WsDocumentData {
DocumentWSData {
doc_id: doc_id.to_string(),
ty: WsDocumentDataType::PullRev,
ty: DocumentWSDataType::PullRev,
data: bytes.to_vec(),
id: None,
}
}
// WsDocumentDataType::Acked -> RevId
pub fn build_acked_message(doc_id: &str, rev_id: i64) -> WsDocumentData {
// DocumentWSDataType::Acked -> RevId
pub fn build_acked_message(doc_id: &str, rev_id: i64) -> DocumentWSData {
let cloned_rev_id = rev_id.clone();
let rev_id: RevId = rev_id.into();
let bytes: Bytes = rev_id.try_into().unwrap();
WsDocumentData {
DocumentWSData {
doc_id: doc_id.to_string(),
ty: WsDocumentDataType::Acked,
ty: DocumentWSDataType::Acked,
data: bytes.to_vec(),
id: Some(cloned_rev_id),
}
}
}

View File

@ -24,24 +24,31 @@
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_22_1;
#[derive(PartialEq,Clone,Default)]
pub struct WsDocumentData {
pub struct DocumentWSData {
// message fields
pub doc_id: ::std::string::String,
pub ty: WsDocumentDataType,
pub ty: DocumentWSDataType,
pub data: ::std::vec::Vec<u8>,
// message oneof groups
pub one_of_id: ::std::option::Option<DocumentWSData_oneof_one_of_id>,
// special fields
pub unknown_fields: ::protobuf::UnknownFields,
pub cached_size: ::protobuf::CachedSize,
}
impl<'a> ::std::default::Default for &'a WsDocumentData {
fn default() -> &'a WsDocumentData {
<WsDocumentData as ::protobuf::Message>::default_instance()
impl<'a> ::std::default::Default for &'a DocumentWSData {
fn default() -> &'a DocumentWSData {
<DocumentWSData as ::protobuf::Message>::default_instance()
}
}
impl WsDocumentData {
pub fn new() -> WsDocumentData {
#[derive(Clone,PartialEq,Debug)]
pub enum DocumentWSData_oneof_one_of_id {
id(i64),
}
impl DocumentWSData {
pub fn new() -> DocumentWSData {
::std::default::Default::default()
}
@ -71,18 +78,18 @@ impl WsDocumentData {
::std::mem::replace(&mut self.doc_id, ::std::string::String::new())
}
// .WsDocumentDataType ty = 2;
// .DocumentWSDataType ty = 2;
pub fn get_ty(&self) -> WsDocumentDataType {
pub fn get_ty(&self) -> DocumentWSDataType {
self.ty
}
pub fn clear_ty(&mut self) {
self.ty = WsDocumentDataType::Acked;
self.ty = DocumentWSDataType::Acked;
}
// Param is passed by value, moved
pub fn set_ty(&mut self, v: WsDocumentDataType) {
pub fn set_ty(&mut self, v: DocumentWSDataType) {
self.ty = v;
}
@ -111,9 +118,34 @@ impl WsDocumentData {
pub fn take_data(&mut self) -> ::std::vec::Vec<u8> {
::std::mem::replace(&mut self.data, ::std::vec::Vec::new())
}
// int64 id = 4;
pub fn get_id(&self) -> i64 {
match self.one_of_id {
::std::option::Option::Some(DocumentWSData_oneof_one_of_id::id(v)) => v,
_ => 0,
}
}
pub fn clear_id(&mut self) {
self.one_of_id = ::std::option::Option::None;
}
pub fn has_id(&self) -> bool {
match self.one_of_id {
::std::option::Option::Some(DocumentWSData_oneof_one_of_id::id(..)) => true,
_ => false,
}
}
// Param is passed by value, moved
pub fn set_id(&mut self, v: i64) {
self.one_of_id = ::std::option::Option::Some(DocumentWSData_oneof_one_of_id::id(v))
}
}
impl ::protobuf::Message for WsDocumentData {
impl ::protobuf::Message for DocumentWSData {
fn is_initialized(&self) -> bool {
true
}
@ -131,6 +163,12 @@ impl ::protobuf::Message for WsDocumentData {
3 => {
::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.data)?;
},
4 => {
if wire_type != ::protobuf::wire_format::WireTypeVarint {
return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
}
self.one_of_id = ::std::option::Option::Some(DocumentWSData_oneof_one_of_id::id(is.read_int64()?));
},
_ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
},
@ -146,12 +184,19 @@ impl ::protobuf::Message for WsDocumentData {
if !self.doc_id.is_empty() {
my_size += ::protobuf::rt::string_size(1, &self.doc_id);
}
if self.ty != WsDocumentDataType::Acked {
if self.ty != DocumentWSDataType::Acked {
my_size += ::protobuf::rt::enum_size(2, self.ty);
}
if !self.data.is_empty() {
my_size += ::protobuf::rt::bytes_size(3, &self.data);
}
if let ::std::option::Option::Some(ref v) = self.one_of_id {
match v {
&DocumentWSData_oneof_one_of_id::id(v) => {
my_size += ::protobuf::rt::value_size(4, v, ::protobuf::wire_format::WireTypeVarint);
},
};
}
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size);
my_size
@ -161,12 +206,19 @@ impl ::protobuf::Message for WsDocumentData {
if !self.doc_id.is_empty() {
os.write_string(1, &self.doc_id)?;
}
if self.ty != WsDocumentDataType::Acked {
if self.ty != DocumentWSDataType::Acked {
os.write_enum(2, ::protobuf::ProtobufEnum::value(&self.ty))?;
}
if !self.data.is_empty() {
os.write_bytes(3, &self.data)?;
}
if let ::std::option::Option::Some(ref v) = self.one_of_id {
match v {
&DocumentWSData_oneof_one_of_id::id(v) => {
os.write_int64(4, v)?;
},
};
}
os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(())
}
@ -197,8 +249,8 @@ impl ::protobuf::Message for WsDocumentData {
Self::descriptor_static()
}
fn new() -> WsDocumentData {
WsDocumentData::new()
fn new() -> DocumentWSData {
DocumentWSData::new()
}
fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
@ -207,49 +259,55 @@ impl ::protobuf::Message for WsDocumentData {
let mut fields = ::std::vec::Vec::new();
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
"doc_id",
|m: &WsDocumentData| { &m.doc_id },
|m: &mut WsDocumentData| { &mut m.doc_id },
|m: &DocumentWSData| { &m.doc_id },
|m: &mut DocumentWSData| { &mut m.doc_id },
));
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum<WsDocumentDataType>>(
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum<DocumentWSDataType>>(
"ty",
|m: &WsDocumentData| { &m.ty },
|m: &mut WsDocumentData| { &mut m.ty },
|m: &DocumentWSData| { &m.ty },
|m: &mut DocumentWSData| { &mut m.ty },
));
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>(
"data",
|m: &WsDocumentData| { &m.data },
|m: &mut WsDocumentData| { &mut m.data },
|m: &DocumentWSData| { &m.data },
|m: &mut DocumentWSData| { &mut m.data },
));
::protobuf::reflect::MessageDescriptor::new_pb_name::<WsDocumentData>(
"WsDocumentData",
fields.push(::protobuf::reflect::accessor::make_singular_i64_accessor::<_>(
"id",
DocumentWSData::has_id,
DocumentWSData::get_id,
));
::protobuf::reflect::MessageDescriptor::new_pb_name::<DocumentWSData>(
"DocumentWSData",
fields,
file_descriptor_proto()
)
})
}
fn default_instance() -> &'static WsDocumentData {
static instance: ::protobuf::rt::LazyV2<WsDocumentData> = ::protobuf::rt::LazyV2::INIT;
instance.get(WsDocumentData::new)
fn default_instance() -> &'static DocumentWSData {
static instance: ::protobuf::rt::LazyV2<DocumentWSData> = ::protobuf::rt::LazyV2::INIT;
instance.get(DocumentWSData::new)
}
}
impl ::protobuf::Clear for WsDocumentData {
impl ::protobuf::Clear for DocumentWSData {
fn clear(&mut self) {
self.doc_id.clear();
self.ty = WsDocumentDataType::Acked;
self.ty = DocumentWSDataType::Acked;
self.data.clear();
self.one_of_id = ::std::option::Option::None;
self.unknown_fields.clear();
}
}
impl ::std::fmt::Debug for WsDocumentData {
impl ::std::fmt::Debug for DocumentWSData {
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
::protobuf::text_format::fmt(self, f)
}
}
impl ::protobuf::reflect::ProtobufValue for WsDocumentData {
impl ::protobuf::reflect::ProtobufValue for DocumentWSData {
fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
::protobuf::reflect::ReflectValueRef::Message(self)
}
@ -492,34 +550,34 @@ impl ::protobuf::reflect::ProtobufValue for DocumentConnected {
}
#[derive(Clone,PartialEq,Eq,Debug,Hash)]
pub enum WsDocumentDataType {
pub enum DocumentWSDataType {
Acked = 0,
PushRev = 1,
PullRev = 2,
UserConnect = 3,
}
impl ::protobuf::ProtobufEnum for WsDocumentDataType {
impl ::protobuf::ProtobufEnum for DocumentWSDataType {
fn value(&self) -> i32 {
*self as i32
}
fn from_i32(value: i32) -> ::std::option::Option<WsDocumentDataType> {
fn from_i32(value: i32) -> ::std::option::Option<DocumentWSDataType> {
match value {
0 => ::std::option::Option::Some(WsDocumentDataType::Acked),
1 => ::std::option::Option::Some(WsDocumentDataType::PushRev),
2 => ::std::option::Option::Some(WsDocumentDataType::PullRev),
3 => ::std::option::Option::Some(WsDocumentDataType::UserConnect),
0 => ::std::option::Option::Some(DocumentWSDataType::Acked),
1 => ::std::option::Option::Some(DocumentWSDataType::PushRev),
2 => ::std::option::Option::Some(DocumentWSDataType::PullRev),
3 => ::std::option::Option::Some(DocumentWSDataType::UserConnect),
_ => ::std::option::Option::None
}
}
fn values() -> &'static [Self] {
static values: &'static [WsDocumentDataType] = &[
WsDocumentDataType::Acked,
WsDocumentDataType::PushRev,
WsDocumentDataType::PullRev,
WsDocumentDataType::UserConnect,
static values: &'static [DocumentWSDataType] = &[
DocumentWSDataType::Acked,
DocumentWSDataType::PushRev,
DocumentWSDataType::PullRev,
DocumentWSDataType::UserConnect,
];
values
}
@ -527,63 +585,68 @@ impl ::protobuf::ProtobufEnum for WsDocumentDataType {
fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor {
static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::EnumDescriptor> = ::protobuf::rt::LazyV2::INIT;
descriptor.get(|| {
::protobuf::reflect::EnumDescriptor::new_pb_name::<WsDocumentDataType>("WsDocumentDataType", file_descriptor_proto())
::protobuf::reflect::EnumDescriptor::new_pb_name::<DocumentWSDataType>("DocumentWSDataType", file_descriptor_proto())
})
}
}
impl ::std::marker::Copy for WsDocumentDataType {
impl ::std::marker::Copy for DocumentWSDataType {
}
impl ::std::default::Default for WsDocumentDataType {
impl ::std::default::Default for DocumentWSDataType {
fn default() -> Self {
WsDocumentDataType::Acked
DocumentWSDataType::Acked
}
}
impl ::protobuf::reflect::ProtobufValue for WsDocumentDataType {
impl ::protobuf::reflect::ProtobufValue for DocumentWSDataType {
fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
::protobuf::reflect::ReflectValueRef::Enum(::protobuf::ProtobufEnum::descriptor(self))
}
}
static file_descriptor_proto_data: &'static [u8] = b"\
\n\x08ws.proto\"`\n\x0eWsDocumentData\x12\x15\n\x06doc_id\x18\x01\x20\
\x01(\tR\x05docId\x12#\n\x02ty\x18\x02\x20\x01(\x0e2\x13.WsDocumentDataT\
ypeR\x02ty\x12\x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data\"Z\n\x11Docum\
entConnected\x12\x17\n\x07user_id\x18\x01\x20\x01(\tR\x06userId\x12\x15\
\n\x06doc_id\x18\x02\x20\x01(\tR\x05docId\x12\x15\n\x06rev_id\x18\x03\
\x20\x01(\x03R\x05revId*J\n\x12WsDocumentDataType\x12\t\n\x05Acked\x10\0\
\x12\x0b\n\x07PushRev\x10\x01\x12\x0b\n\x07PullRev\x10\x02\x12\x0f\n\x0b\
UserConnect\x10\x03J\xc8\x04\n\x06\x12\x04\0\0\x11\x01\n\x08\n\x01\x0c\
\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x06\x01\n\n\n\x03\x04\0\
\x01\x12\x03\x02\x08\x16\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x16\n\
\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\
\x12\x03\x03\x0b\x11\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x14\x15\n\
\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x1e\n\x0c\n\x05\x04\0\x02\x01\
\x06\x12\x03\x04\x04\x16\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\x17\
\x19\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x1c\x1d\n\x0b\n\x04\x04\0\
\x02\x02\x12\x03\x05\x04\x13\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\
\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x0e\n\x0c\n\x05\x04\0\
\x02\x02\x03\x12\x03\x05\x11\x12\n\n\n\x02\x04\x01\x12\x04\x07\0\x0b\x01\
\n\n\n\x03\x04\x01\x01\x12\x03\x07\x08\x19\n\x0b\n\x04\x04\x01\x02\0\x12\
\x03\x08\x04\x17\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\x08\x04\n\n\x0c\n\
\x05\x04\x01\x02\0\x01\x12\x03\x08\x0b\x12\n\x0c\n\x05\x04\x01\x02\0\x03\
\x12\x03\x08\x15\x16\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\t\x04\x16\n\x0c\
\n\x05\x04\x01\x02\x01\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\x01\x02\x01\
\x01\x12\x03\t\x0b\x11\n\x0c\n\x05\x04\x01\x02\x01\x03\x12\x03\t\x14\x15\
\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\n\x04\x15\n\x0c\n\x05\x04\x01\x02\
\x02\x05\x12\x03\n\x04\t\n\x0c\n\x05\x04\x01\x02\x02\x01\x12\x03\n\n\x10\
\n\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\n\x13\x14\n\n\n\x02\x05\0\x12\
\x04\x0c\0\x11\x01\n\n\n\x03\x05\0\x01\x12\x03\x0c\x05\x17\n\x0b\n\x04\
\x05\0\x02\0\x12\x03\r\x04\x0e\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\r\x04\
\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\r\x0c\r\n\x0b\n\x04\x05\0\x02\x01\
\x12\x03\x0e\x04\x10\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x0e\x04\x0b\n\
\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x0e\x0e\x0f\n\x0b\n\x04\x05\0\x02\
\x02\x12\x03\x0f\x04\x10\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\x0f\x04\
\x0b\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\x0f\x0e\x0f\n\x0b\n\x04\x05\0\
\x02\x03\x12\x03\x10\x04\x14\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\x10\
\x04\x0f\n\x0c\n\x05\x05\0\x02\x03\x02\x12\x03\x10\x12\x13b\x06proto3\
\n\x08ws.proto\"\x7f\n\x0eDocumentWSData\x12\x15\n\x06doc_id\x18\x01\x20\
\x01(\tR\x05docId\x12#\n\x02ty\x18\x02\x20\x01(\x0e2\x13.DocumentWSDataT\
ypeR\x02ty\x12\x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data\x12\x10\n\x02\
id\x18\x04\x20\x01(\x03H\0R\x02idB\x0b\n\tone_of_id\"Z\n\x11DocumentConn\
ected\x12\x17\n\x07user_id\x18\x01\x20\x01(\tR\x06userId\x12\x15\n\x06do\
c_id\x18\x02\x20\x01(\tR\x05docId\x12\x15\n\x06rev_id\x18\x03\x20\x01(\
\x03R\x05revId*J\n\x12DocumentWSDataType\x12\t\n\x05Acked\x10\0\x12\x0b\
\n\x07PushRev\x10\x01\x12\x0b\n\x07PullRev\x10\x02\x12\x0f\n\x0bUserConn\
ect\x10\x03J\x9a\x05\n\x06\x12\x04\0\0\x12\x01\n\x08\n\x01\x0c\x12\x03\0\
\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x07\x01\n\n\n\x03\x04\0\x01\x12\x03\
\x02\x08\x16\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x16\n\x0c\n\x05\x04\
\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\
\x0b\x11\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x14\x15\n\x0b\n\x04\x04\
\0\x02\x01\x12\x03\x04\x04\x1e\n\x0c\n\x05\x04\0\x02\x01\x06\x12\x03\x04\
\x04\x16\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\x17\x19\n\x0c\n\x05\
\x04\0\x02\x01\x03\x12\x03\x04\x1c\x1d\n\x0b\n\x04\x04\0\x02\x02\x12\x03\
\x05\x04\x13\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\n\x0c\n\x05\
\x04\0\x02\x02\x01\x12\x03\x05\n\x0e\n\x0c\n\x05\x04\0\x02\x02\x03\x12\
\x03\x05\x11\x12\n\x0b\n\x04\x04\0\x08\0\x12\x03\x06\x04%\n\x0c\n\x05\
\x04\0\x08\0\x01\x12\x03\x06\n\x13\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x06\
\x16#\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x06\x16\x1b\n\x0c\n\x05\x04\
\0\x02\x03\x01\x12\x03\x06\x1c\x1e\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\
\x06!\"\n\n\n\x02\x04\x01\x12\x04\x08\0\x0c\x01\n\n\n\x03\x04\x01\x01\
\x12\x03\x08\x08\x19\n\x0b\n\x04\x04\x01\x02\0\x12\x03\t\x04\x17\n\x0c\n\
\x05\x04\x01\x02\0\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\x01\x02\0\x01\x12\
\x03\t\x0b\x12\n\x0c\n\x05\x04\x01\x02\0\x03\x12\x03\t\x15\x16\n\x0b\n\
\x04\x04\x01\x02\x01\x12\x03\n\x04\x16\n\x0c\n\x05\x04\x01\x02\x01\x05\
\x12\x03\n\x04\n\n\x0c\n\x05\x04\x01\x02\x01\x01\x12\x03\n\x0b\x11\n\x0c\
\n\x05\x04\x01\x02\x01\x03\x12\x03\n\x14\x15\n\x0b\n\x04\x04\x01\x02\x02\
\x12\x03\x0b\x04\x15\n\x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\x0b\x04\t\n\
\x0c\n\x05\x04\x01\x02\x02\x01\x12\x03\x0b\n\x10\n\x0c\n\x05\x04\x01\x02\
\x02\x03\x12\x03\x0b\x13\x14\n\n\n\x02\x05\0\x12\x04\r\0\x12\x01\n\n\n\
\x03\x05\0\x01\x12\x03\r\x05\x17\n\x0b\n\x04\x05\0\x02\0\x12\x03\x0e\x04\
\x0e\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x0e\x04\t\n\x0c\n\x05\x05\0\x02\
\0\x02\x12\x03\x0e\x0c\r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x0f\x04\x10\n\
\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x0f\x04\x0b\n\x0c\n\x05\x05\0\x02\
\x01\x02\x12\x03\x0f\x0e\x0f\n\x0b\n\x04\x05\0\x02\x02\x12\x03\x10\x04\
\x10\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\x10\x04\x0b\n\x0c\n\x05\x05\0\
\x02\x02\x02\x12\x03\x10\x0e\x0f\n\x0b\n\x04\x05\0\x02\x03\x12\x03\x11\
\x04\x14\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\x11\x04\x0f\n\x0c\n\x05\
\x05\0\x02\x03\x02\x12\x03\x11\x12\x13b\x06proto3\
";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

View File

@ -1,16 +1,17 @@
syntax = "proto3";
message WsDocumentData {
message DocumentWSData {
string doc_id = 1;
WsDocumentDataType ty = 2;
DocumentWSDataType ty = 2;
bytes data = 3;
oneof one_of_id { int64 id = 4; };
}
message DocumentConnected {
string user_id = 1;
string doc_id = 2;
int64 rev_id = 3;
}
enum WsDocumentDataType {
enum DocumentWSDataType {
Acked = 0;
PushRev = 1;
PullRev = 2;

View File

@ -60,10 +60,10 @@ pub fn category_from_str(type_str: &str) -> TypeCategory {
| "DocDelta"
| "NewDocUser"
| "DocIdentifier"
| "WsDocumentData"
| "DocumentWSData"
| "DocumentConnected"
| "WsError"
| "WsMessage"
| "WSError"
| "WSMessage"
| "Revision"
| "RevId"
| "RevisionRange"
@ -89,9 +89,9 @@ pub fn category_from_str(type_str: &str) -> TypeCategory {
| "TrashType"
| "ViewType"
| "ExportType"
| "WsDocumentDataType"
| "DocumentWSDataType"
| "ErrorCode"
| "WsModule"
| "WSModule"
| "RevType"
| "RevState"
=> TypeCategory::Enum,

View File

@ -1,6 +1,6 @@
#![allow(clippy::all)]
use crate::{
errors::{internal_error, WsError},
errors::{internal_error, WSError},
MsgReceiver,
MsgSender,
};
@ -24,16 +24,16 @@ use tokio_tungstenite::{
type WsConnectResult = Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), Error>;
#[pin_project]
pub struct WsConnectionFuture {
pub struct WSConnectionFuture {
msg_tx: Option<MsgSender>,
ws_rx: Option<MsgReceiver>,
#[pin]
fut: Pin<Box<dyn Future<Output = WsConnectResult> + Send + Sync>>,
}
impl WsConnectionFuture {
impl WSConnectionFuture {
pub fn new(msg_tx: MsgSender, ws_rx: MsgReceiver, addr: String) -> Self {
WsConnectionFuture {
WSConnectionFuture {
msg_tx: Some(msg_tx),
ws_rx: Some(ws_rx),
fut: Box::pin(async move { connect_async(&addr).await }),
@ -41,8 +41,8 @@ impl WsConnectionFuture {
}
}
impl Future for WsConnectionFuture {
type Output = Result<WsStream, WsError>;
impl Future for WSConnectionFuture {
type Output = Result<WSStream, WSError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// [[pin]]
// poll async function. The following methods not work.
@ -66,7 +66,7 @@ impl Future for WsConnectionFuture {
self.msg_tx.take().expect("WsConnection should be call once "),
self.ws_rx.take().expect("WsConnection should be call once "),
);
Poll::Ready(Ok(WsStream::new(msg_tx, ws_rx, stream)))
Poll::Ready(Ok(WSStream::new(msg_tx, ws_rx, stream)))
},
Err(error) => {
tracing::debug!("🐴 ws connect failed: {:?}", error);
@ -77,16 +77,16 @@ impl Future for WsConnectionFuture {
}
}
type Fut = BoxFuture<'static, Result<(), WsError>>;
type Fut = BoxFuture<'static, Result<(), WSError>>;
#[pin_project]
pub struct WsStream {
pub struct WSStream {
#[allow(dead_code)]
msg_tx: MsgSender,
#[pin]
inner: Option<(Fut, Fut)>,
}
impl WsStream {
impl WSStream {
pub fn new(msg_tx: MsgSender, ws_rx: MsgReceiver, stream: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self {
let (ws_write, ws_read) = stream.split();
Self {
@ -110,7 +110,7 @@ impl WsStream {
loop {
match rx.recv().await {
None => {
return Err(WsError::internal().context("WsStream rx closed unexpectedly"));
return Err(WSError::internal().context("WsStream rx closed unexpectedly"));
},
Some(result) => {
if result.is_err() {
@ -136,12 +136,12 @@ impl WsStream {
}
}
impl fmt::Debug for WsStream {
impl fmt::Debug for WSStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WsStream").finish() }
}
impl Future for WsStream {
type Output = Result<(), WsError>;
impl Future for WSStream {
type Output = Result<(), WSError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (mut ws_read, mut ws_write) = self.inner.take().unwrap();
@ -161,11 +161,11 @@ impl Future for WsStream {
}
}
fn send_message(msg_tx: MsgSender, message: Result<Message, Error>) -> Result<(), WsError> {
fn send_message(msg_tx: MsgSender, message: Result<Message, Error>) -> Result<(), WSError> {
match message {
Ok(Message::Binary(bytes)) => msg_tx.unbounded_send(Message::Binary(bytes)).map_err(internal_error),
Ok(_) => Ok(()),
Err(e) => Err(WsError::internal().context(e)),
Err(e) => Err(WSError::internal().context(e)),
}
}
#[allow(dead_code)]

View File

@ -6,7 +6,7 @@ use tokio_tungstenite::tungstenite::{http::StatusCode, Message};
use url::ParseError;
#[derive(Debug, Default, Clone, ProtoBuf)]
pub struct WsError {
pub struct WSError {
#[pb(index = 1)]
pub code: ErrorCode,
@ -14,11 +14,11 @@ pub struct WsError {
pub msg: String,
}
macro_rules! static_user_error {
macro_rules! static_ws_error {
($name:ident, $status:expr) => {
#[allow(non_snake_case, missing_docs)]
pub fn $name() -> WsError {
WsError {
pub fn $name() -> WSError {
WSError {
code: $status,
msg: format!("{}", $status),
}
@ -26,10 +26,10 @@ macro_rules! static_user_error {
};
}
impl WsError {
impl WSError {
#[allow(dead_code)]
pub(crate) fn new(code: ErrorCode) -> WsError {
WsError {
pub(crate) fn new(code: ErrorCode) -> WSError {
WSError {
code,
msg: "".to_string(),
}
@ -40,16 +40,16 @@ impl WsError {
self
}
static_user_error!(internal, ErrorCode::InternalError);
static_user_error!(unsupported_message, ErrorCode::UnsupportedMessage);
static_user_error!(unauthorized, ErrorCode::Unauthorized);
static_ws_error!(internal, ErrorCode::InternalError);
static_ws_error!(unsupported_message, ErrorCode::UnsupportedMessage);
static_ws_error!(unauthorized, ErrorCode::Unauthorized);
}
pub fn internal_error<T>(e: T) -> WsError
pub fn internal_error<T>(e: T) -> WSError
where
T: std::fmt::Debug,
{
WsError::internal().context(e)
WSError::internal().context(e)
}
#[derive(Debug, Clone, ProtoBuf_Enum, Display, PartialEq, Eq)]
@ -63,29 +63,29 @@ impl std::default::Default for ErrorCode {
fn default() -> Self { ErrorCode::InternalError }
}
impl std::convert::From<url::ParseError> for WsError {
fn from(error: ParseError) -> Self { WsError::internal().context(error) }
impl std::convert::From<url::ParseError> for WSError {
fn from(error: ParseError) -> Self { WSError::internal().context(error) }
}
impl std::convert::From<protobuf::ProtobufError> for WsError {
fn from(error: protobuf::ProtobufError) -> Self { WsError::internal().context(error) }
impl std::convert::From<protobuf::ProtobufError> for WSError {
fn from(error: protobuf::ProtobufError) -> Self { WSError::internal().context(error) }
}
impl std::convert::From<futures_channel::mpsc::TrySendError<Message>> for WsError {
fn from(error: TrySendError<Message>) -> Self { WsError::internal().context(error) }
impl std::convert::From<futures_channel::mpsc::TrySendError<Message>> for WSError {
fn from(error: TrySendError<Message>) -> Self { WSError::internal().context(error) }
}
impl std::convert::From<tokio_tungstenite::tungstenite::Error> for WsError {
impl std::convert::From<tokio_tungstenite::tungstenite::Error> for WSError {
fn from(error: tokio_tungstenite::tungstenite::Error) -> Self {
match error {
tokio_tungstenite::tungstenite::Error::Http(response) => {
if response.status() == StatusCode::UNAUTHORIZED {
WsError::unauthorized()
WSError::unauthorized()
} else {
WsError::internal().context(response)
WSError::internal().context(response)
}
},
_ => WsError::internal().context(error),
_ => WSError::internal().context(error),
}
}
}

View File

@ -5,33 +5,33 @@ use tokio_tungstenite::tungstenite::Message as TokioMessage;
// Opti: using four bytes of the data to represent the source
#[derive(ProtoBuf, Debug, Clone, Default)]
pub struct WsMessage {
pub struct WSMessage {
#[pb(index = 1)]
pub module: WsModule,
pub module: WSModule,
#[pb(index = 2)]
pub data: Vec<u8>,
}
#[derive(ProtoBuf_Enum, Debug, Clone, Eq, PartialEq, Hash)]
pub enum WsModule {
pub enum WSModule {
Doc = 0,
}
impl std::default::Default for WsModule {
fn default() -> Self { WsModule::Doc }
impl std::default::Default for WSModule {
fn default() -> Self { WSModule::Doc }
}
impl ToString for WsModule {
impl ToString for WSModule {
fn to_string(&self) -> String {
match self {
WsModule::Doc => "0".to_string(),
WSModule::Doc => "0".to_string(),
}
}
}
impl std::convert::From<WsMessage> for TokioMessage {
fn from(msg: WsMessage) -> Self {
impl std::convert::From<WSMessage> for TokioMessage {
fn from(msg: WSMessage) -> Self {
let result: Result<Bytes, ::protobuf::ProtobufError> = msg.try_into();
match result {
Ok(bytes) => TokioMessage::Binary(bytes.to_vec()),

View File

@ -24,7 +24,7 @@
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_22_1;
#[derive(PartialEq,Clone,Default)]
pub struct WsError {
pub struct WSError {
// message fields
pub code: ErrorCode,
pub msg: ::std::string::String,
@ -33,14 +33,14 @@ pub struct WsError {
pub cached_size: ::protobuf::CachedSize,
}
impl<'a> ::std::default::Default for &'a WsError {
fn default() -> &'a WsError {
<WsError as ::protobuf::Message>::default_instance()
impl<'a> ::std::default::Default for &'a WSError {
fn default() -> &'a WSError {
<WSError as ::protobuf::Message>::default_instance()
}
}
impl WsError {
pub fn new() -> WsError {
impl WSError {
pub fn new() -> WSError {
::std::default::Default::default()
}
@ -86,7 +86,7 @@ impl WsError {
}
}
impl ::protobuf::Message for WsError {
impl ::protobuf::Message for WSError {
fn is_initialized(&self) -> bool {
true
}
@ -161,8 +161,8 @@ impl ::protobuf::Message for WsError {
Self::descriptor_static()
}
fn new() -> WsError {
WsError::new()
fn new() -> WSError {
WSError::new()
}
fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
@ -171,29 +171,29 @@ impl ::protobuf::Message for WsError {
let mut fields = ::std::vec::Vec::new();
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum<ErrorCode>>(
"code",
|m: &WsError| { &m.code },
|m: &mut WsError| { &mut m.code },
|m: &WSError| { &m.code },
|m: &mut WSError| { &mut m.code },
));
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
"msg",
|m: &WsError| { &m.msg },
|m: &mut WsError| { &mut m.msg },
|m: &WSError| { &m.msg },
|m: &mut WSError| { &mut m.msg },
));
::protobuf::reflect::MessageDescriptor::new_pb_name::<WsError>(
"WsError",
::protobuf::reflect::MessageDescriptor::new_pb_name::<WSError>(
"WSError",
fields,
file_descriptor_proto()
)
})
}
fn default_instance() -> &'static WsError {
static instance: ::protobuf::rt::LazyV2<WsError> = ::protobuf::rt::LazyV2::INIT;
instance.get(WsError::new)
fn default_instance() -> &'static WSError {
static instance: ::protobuf::rt::LazyV2<WSError> = ::protobuf::rt::LazyV2::INIT;
instance.get(WSError::new)
}
}
impl ::protobuf::Clear for WsError {
impl ::protobuf::Clear for WSError {
fn clear(&mut self) {
self.code = ErrorCode::InternalError;
self.msg.clear();
@ -201,13 +201,13 @@ impl ::protobuf::Clear for WsError {
}
}
impl ::std::fmt::Debug for WsError {
impl ::std::fmt::Debug for WSError {
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
::protobuf::text_format::fmt(self, f)
}
}
impl ::protobuf::reflect::ProtobufValue for WsError {
impl ::protobuf::reflect::ProtobufValue for WSError {
fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
::protobuf::reflect::ReflectValueRef::Message(self)
}
@ -267,7 +267,7 @@ impl ::protobuf::reflect::ProtobufValue for ErrorCode {
}
static file_descriptor_proto_data: &'static [u8] = b"\
\n\x0cerrors.proto\";\n\x07WsError\x12\x1e\n\x04code\x18\x01\x20\x01(\
\n\x0cerrors.proto\";\n\x07WSError\x12\x1e\n\x04code\x18\x01\x20\x01(\
\x0e2\n.ErrorCodeR\x04code\x12\x10\n\x03msg\x18\x02\x20\x01(\tR\x03msg*H\
\n\tErrorCode\x12\x11\n\rInternalError\x10\0\x12\x16\n\x12UnsupportedMes\
sage\x10\x01\x12\x10\n\x0cUnauthorized\x10\x02J\xab\x02\n\x06\x12\x04\0\

View File

@ -24,38 +24,38 @@
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_22_1;
#[derive(PartialEq,Clone,Default)]
pub struct WsMessage {
pub struct WSMessage {
// message fields
pub module: WsModule,
pub module: WSModule,
pub data: ::std::vec::Vec<u8>,
// special fields
pub unknown_fields: ::protobuf::UnknownFields,
pub cached_size: ::protobuf::CachedSize,
}
impl<'a> ::std::default::Default for &'a WsMessage {
fn default() -> &'a WsMessage {
<WsMessage as ::protobuf::Message>::default_instance()
impl<'a> ::std::default::Default for &'a WSMessage {
fn default() -> &'a WSMessage {
<WSMessage as ::protobuf::Message>::default_instance()
}
}
impl WsMessage {
pub fn new() -> WsMessage {
impl WSMessage {
pub fn new() -> WSMessage {
::std::default::Default::default()
}
// .WsModule module = 1;
// .WSModule module = 1;
pub fn get_module(&self) -> WsModule {
pub fn get_module(&self) -> WSModule {
self.module
}
pub fn clear_module(&mut self) {
self.module = WsModule::Doc;
self.module = WSModule::Doc;
}
// Param is passed by value, moved
pub fn set_module(&mut self, v: WsModule) {
pub fn set_module(&mut self, v: WSModule) {
self.module = v;
}
@ -86,7 +86,7 @@ impl WsMessage {
}
}
impl ::protobuf::Message for WsMessage {
impl ::protobuf::Message for WSMessage {
fn is_initialized(&self) -> bool {
true
}
@ -113,7 +113,7 @@ impl ::protobuf::Message for WsMessage {
#[allow(unused_variables)]
fn compute_size(&self) -> u32 {
let mut my_size = 0;
if self.module != WsModule::Doc {
if self.module != WSModule::Doc {
my_size += ::protobuf::rt::enum_size(1, self.module);
}
if !self.data.is_empty() {
@ -125,7 +125,7 @@ impl ::protobuf::Message for WsMessage {
}
fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
if self.module != WsModule::Doc {
if self.module != WSModule::Doc {
os.write_enum(1, ::protobuf::ProtobufEnum::value(&self.module))?;
}
if !self.data.is_empty() {
@ -161,78 +161,78 @@ impl ::protobuf::Message for WsMessage {
Self::descriptor_static()
}
fn new() -> WsMessage {
WsMessage::new()
fn new() -> WSMessage {
WSMessage::new()
}
fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
descriptor.get(|| {
let mut fields = ::std::vec::Vec::new();
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum<WsModule>>(
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum<WSModule>>(
"module",
|m: &WsMessage| { &m.module },
|m: &mut WsMessage| { &mut m.module },
|m: &WSMessage| { &m.module },
|m: &mut WSMessage| { &mut m.module },
));
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>(
"data",
|m: &WsMessage| { &m.data },
|m: &mut WsMessage| { &mut m.data },
|m: &WSMessage| { &m.data },
|m: &mut WSMessage| { &mut m.data },
));
::protobuf::reflect::MessageDescriptor::new_pb_name::<WsMessage>(
"WsMessage",
::protobuf::reflect::MessageDescriptor::new_pb_name::<WSMessage>(
"WSMessage",
fields,
file_descriptor_proto()
)
})
}
fn default_instance() -> &'static WsMessage {
static instance: ::protobuf::rt::LazyV2<WsMessage> = ::protobuf::rt::LazyV2::INIT;
instance.get(WsMessage::new)
fn default_instance() -> &'static WSMessage {
static instance: ::protobuf::rt::LazyV2<WSMessage> = ::protobuf::rt::LazyV2::INIT;
instance.get(WSMessage::new)
}
}
impl ::protobuf::Clear for WsMessage {
impl ::protobuf::Clear for WSMessage {
fn clear(&mut self) {
self.module = WsModule::Doc;
self.module = WSModule::Doc;
self.data.clear();
self.unknown_fields.clear();
}
}
impl ::std::fmt::Debug for WsMessage {
impl ::std::fmt::Debug for WSMessage {
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
::protobuf::text_format::fmt(self, f)
}
}
impl ::protobuf::reflect::ProtobufValue for WsMessage {
impl ::protobuf::reflect::ProtobufValue for WSMessage {
fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
::protobuf::reflect::ReflectValueRef::Message(self)
}
}
#[derive(Clone,PartialEq,Eq,Debug,Hash)]
pub enum WsModule {
pub enum WSModule {
Doc = 0,
}
impl ::protobuf::ProtobufEnum for WsModule {
impl ::protobuf::ProtobufEnum for WSModule {
fn value(&self) -> i32 {
*self as i32
}
fn from_i32(value: i32) -> ::std::option::Option<WsModule> {
fn from_i32(value: i32) -> ::std::option::Option<WSModule> {
match value {
0 => ::std::option::Option::Some(WsModule::Doc),
0 => ::std::option::Option::Some(WSModule::Doc),
_ => ::std::option::Option::None
}
}
fn values() -> &'static [Self] {
static values: &'static [WsModule] = &[
WsModule::Doc,
static values: &'static [WSModule] = &[
WSModule::Doc,
];
values
}
@ -240,30 +240,30 @@ impl ::protobuf::ProtobufEnum for WsModule {
fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor {
static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::EnumDescriptor> = ::protobuf::rt::LazyV2::INIT;
descriptor.get(|| {
::protobuf::reflect::EnumDescriptor::new_pb_name::<WsModule>("WsModule", file_descriptor_proto())
::protobuf::reflect::EnumDescriptor::new_pb_name::<WSModule>("WSModule", file_descriptor_proto())
})
}
}
impl ::std::marker::Copy for WsModule {
impl ::std::marker::Copy for WSModule {
}
impl ::std::default::Default for WsModule {
impl ::std::default::Default for WSModule {
fn default() -> Self {
WsModule::Doc
WSModule::Doc
}
}
impl ::protobuf::reflect::ProtobufValue for WsModule {
impl ::protobuf::reflect::ProtobufValue for WSModule {
fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
::protobuf::reflect::ReflectValueRef::Enum(::protobuf::ProtobufEnum::descriptor(self))
}
}
static file_descriptor_proto_data: &'static [u8] = b"\
\n\tmsg.proto\"B\n\tWsMessage\x12!\n\x06module\x18\x01\x20\x01(\x0e2\t.W\
sModuleR\x06module\x12\x12\n\x04data\x18\x02\x20\x01(\x0cR\x04data*\x13\
\n\x08WsModule\x12\x07\n\x03Doc\x10\0J\xd9\x01\n\x06\x12\x04\0\0\x08\x01\
\n\tmsg.proto\"B\n\tWSMessage\x12!\n\x06module\x18\x01\x20\x01(\x0e2\t.W\
SModuleR\x06module\x12\x12\n\x04data\x18\x02\x20\x01(\x0cR\x04data*\x13\
\n\x08WSModule\x12\x07\n\x03Doc\x10\0J\xd9\x01\n\x06\x12\x04\0\0\x08\x01\
\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x05\x01\n\
\n\n\x03\x04\0\x01\x12\x03\x02\x08\x11\n\x0b\n\x04\x04\0\x02\0\x12\x03\
\x03\x04\x18\n\x0c\n\x05\x04\0\x02\0\x06\x12\x03\x03\x04\x0c\n\x0c\n\x05\

View File

@ -1,6 +1,6 @@
syntax = "proto3";
message WsError {
message WSError {
ErrorCode code = 1;
string msg = 2;
}

View File

@ -1,9 +1,9 @@
syntax = "proto3";
message WsMessage {
WsModule module = 1;
message WSMessage {
WSModule module = 1;
bytes data = 2;
}
enum WsModule {
enum WSModule {
Doc = 0;
}

View File

@ -1,9 +1,9 @@
#![allow(clippy::type_complexity)]
use crate::{
connect::{WsConnectionFuture, WsStream},
errors::WsError,
WsMessage,
WsModule,
connect::{WSConnectionFuture, WSStream},
errors::WSError,
WSMessage,
WSModule,
};
use backend_service::errors::ServerError;
use bytes::Bytes;
@ -30,36 +30,36 @@ use tokio_tungstenite::tungstenite::{
pub type MsgReceiver = UnboundedReceiver<Message>;
pub type MsgSender = UnboundedSender<Message>;
type Handlers = DashMap<WsModule, Arc<dyn WsMessageReceiver>>;
type Handlers = DashMap<WSModule, Arc<dyn WSMessageReceiver>>;
pub trait WsMessageReceiver: Sync + Send + 'static {
fn source(&self) -> WsModule;
fn receive_message(&self, msg: WsMessage);
pub trait WSMessageReceiver: Sync + Send + 'static {
fn source(&self) -> WSModule;
fn receive_message(&self, msg: WSMessage);
}
pub struct WsController {
pub struct WSController {
handlers: Handlers,
state_notify: Arc<broadcast::Sender<WsConnectState>>,
sender_ctrl: Arc<RwLock<WsSenderController>>,
state_notify: Arc<broadcast::Sender<WSConnectState>>,
sender_ctrl: Arc<RwLock<WSSenderController>>,
addr: Arc<RwLock<Option<String>>>,
}
impl std::default::Default for WsController {
impl std::default::Default for WSController {
fn default() -> Self {
let (state_notify, _) = broadcast::channel(16);
Self {
handlers: DashMap::new(),
sender_ctrl: Arc::new(RwLock::new(WsSenderController::default())),
sender_ctrl: Arc::new(RwLock::new(WSSenderController::default())),
state_notify: Arc::new(state_notify),
addr: Arc::new(RwLock::new(None)),
}
}
}
impl WsController {
pub fn new() -> Self { WsController::default() }
impl WSController {
pub fn new() -> Self { WSController::default() }
pub fn add_receiver(&self, handler: Arc<dyn WsMessageReceiver>) -> Result<(), WsError> {
pub fn add_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), WSError> {
let source = handler.source();
if self.handlers.contains_key(&source) {
log::error!("WsSource's {:?} is already registered", source);
@ -74,7 +74,7 @@ impl WsController {
self.connect(addr, strategy).await
}
pub async fn stop(&self) { self.sender_ctrl.write().set_state(WsConnectState::Disconnected); }
pub async fn stop(&self) { self.sender_ctrl.write().set_state(WSConnectState::Disconnected); }
async fn connect<T, I>(&self, addr: String, strategy: T) -> Result<(), ServerError>
where
@ -83,25 +83,25 @@ impl WsController {
{
let (ret, rx) = oneshot::channel::<Result<(), ServerError>>();
*self.addr.write() = Some(addr.clone());
let action = WsConnectAction {
let action = WSConnectAction {
addr,
handlers: self.handlers.clone(),
};
let retry = Retry::spawn(strategy, action);
let sender_ctrl = self.sender_ctrl.clone();
sender_ctrl.write().set_state(WsConnectState::Connecting);
sender_ctrl.write().set_state(WSConnectState::Connecting);
tokio::spawn(async move {
match retry.await {
Ok(result) => {
let WsConnectResult {
let WSConnectResult {
stream,
handlers_fut,
sender,
} = result;
sender_ctrl.write().set_sender(sender);
sender_ctrl.write().set_state(WsConnectState::Connected);
sender_ctrl.write().set_state(WSConnectState::Connected);
let _ = ret.send(Ok(()));
spawn_stream_and_handlers(stream, handlers_fut, sender_ctrl.clone()).await;
},
@ -131,20 +131,20 @@ impl WsController {
self.connect(addr, strategy).await
}
pub fn subscribe_state(&self) -> broadcast::Receiver<WsConnectState> { self.state_notify.subscribe() }
pub fn subscribe_state(&self) -> broadcast::Receiver<WSConnectState> { self.state_notify.subscribe() }
pub fn sender(&self) -> Result<Arc<WsSender>, WsError> {
pub fn sender(&self) -> Result<Arc<WSSender>, WSError> {
match self.sender_ctrl.read().sender() {
None => Err(WsError::internal().context("WsSender is not initialized, should call connect first")),
None => Err(WSError::internal().context("WsSender is not initialized, should call connect first")),
Some(sender) => Ok(sender),
}
}
}
async fn spawn_stream_and_handlers(
stream: WsStream,
handlers: WsHandlerFuture,
sender_ctrl: Arc<RwLock<WsSenderController>>,
stream: WSStream,
handlers: WSHandlerFuture,
sender_ctrl: Arc<RwLock<WSSenderController>>,
) {
tokio::select! {
result = stream => {
@ -157,14 +157,14 @@ async fn spawn_stream_and_handlers(
}
#[pin_project]
pub struct WsHandlerFuture {
pub struct WSHandlerFuture {
#[pin]
msg_rx: MsgReceiver,
// Opti: Hashmap would be better
handlers: Handlers,
}
impl WsHandlerFuture {
impl WSHandlerFuture {
fn new(handlers: Handlers, msg_rx: MsgReceiver) -> Self { Self { msg_rx, handlers } }
fn handler_ws_message(&self, message: Message) {
@ -175,7 +175,7 @@ impl WsHandlerFuture {
fn handle_binary_message(&self, bytes: Vec<u8>) {
let bytes = Bytes::from(bytes);
match WsMessage::try_from(bytes) {
match WSMessage::try_from(bytes) {
Ok(message) => match self.handlers.get(&message.module) {
None => log::error!("Can't find any handler for message: {:?}", message),
Some(handler) => handler.receive_message(message.clone()),
@ -187,7 +187,7 @@ impl WsHandlerFuture {
}
}
impl Future for WsHandlerFuture {
impl Future for WSHandlerFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
@ -202,37 +202,37 @@ impl Future for WsHandlerFuture {
}
#[derive(Debug, Clone)]
pub struct WsSender {
pub struct WSSender {
ws_tx: MsgSender,
}
impl WsSender {
pub fn send_msg<T: Into<WsMessage>>(&self, msg: T) -> Result<(), WsError> {
impl WSSender {
pub fn send_msg<T: Into<WSMessage>>(&self, msg: T) -> Result<(), WSError> {
let msg = msg.into();
let _ = self
.ws_tx
.unbounded_send(msg.into())
.map_err(|e| WsError::internal().context(e))?;
.map_err(|e| WSError::internal().context(e))?;
Ok(())
}
pub fn send_text(&self, source: &WsModule, text: &str) -> Result<(), WsError> {
let msg = WsMessage {
pub fn send_text(&self, source: &WSModule, text: &str) -> Result<(), WSError> {
let msg = WSMessage {
module: source.clone(),
data: text.as_bytes().to_vec(),
};
self.send_msg(msg)
}
pub fn send_binary(&self, source: &WsModule, bytes: Vec<u8>) -> Result<(), WsError> {
let msg = WsMessage {
pub fn send_binary(&self, source: &WSModule, bytes: Vec<u8>) -> Result<(), WSError> {
let msg = WSMessage {
module: source.clone(),
data: bytes,
};
self.send_msg(msg)
}
pub fn send_disconnect(&self, reason: &str) -> Result<(), WsError> {
pub fn send_disconnect(&self, reason: &str) -> Result<(), WSError> {
let frame = CloseFrame {
code: CloseCode::Normal,
reason: reason.to_owned().into(),
@ -241,44 +241,44 @@ impl WsSender {
let _ = self
.ws_tx
.unbounded_send(msg)
.map_err(|e| WsError::internal().context(e))?;
.map_err(|e| WSError::internal().context(e))?;
Ok(())
}
}
struct WsConnectAction {
struct WSConnectAction {
addr: String,
handlers: Handlers,
}
impl Action for WsConnectAction {
impl Action for WSConnectAction {
type Future = Pin<Box<dyn Future<Output = Result<Self::Item, Self::Error>> + Send + Sync>>;
type Item = WsConnectResult;
type Error = WsError;
type Item = WSConnectResult;
type Error = WSError;
fn run(&mut self) -> Self::Future {
let addr = self.addr.clone();
let handlers = self.handlers.clone();
Box::pin(WsConnectActionFut::new(addr, handlers))
Box::pin(WSConnectActionFut::new(addr, handlers))
}
}
struct WsConnectResult {
stream: WsStream,
handlers_fut: WsHandlerFuture,
sender: WsSender,
struct WSConnectResult {
stream: WSStream,
handlers_fut: WSHandlerFuture,
sender: WSSender,
}
#[pin_project]
struct WsConnectActionFut {
struct WSConnectActionFut {
addr: String,
#[pin]
conn: WsConnectionFuture,
handlers_fut: Option<WsHandlerFuture>,
sender: Option<WsSender>,
conn: WSConnectionFuture,
handlers_fut: Option<WSHandlerFuture>,
sender: Option<WSSender>,
}
impl WsConnectActionFut {
impl WSConnectActionFut {
fn new(addr: String, handlers: Handlers) -> Self {
// Stream User
// ┌───────────────┐ ┌──────────────┐
@ -292,9 +292,9 @@ impl WsConnectActionFut {
// └───────────────┘ └──────────────┘
let (msg_tx, msg_rx) = futures_channel::mpsc::unbounded();
let (ws_tx, ws_rx) = futures_channel::mpsc::unbounded();
let sender = WsSender { ws_tx };
let handlers_fut = WsHandlerFuture::new(handlers, msg_rx);
let conn = WsConnectionFuture::new(msg_tx, ws_rx, addr.clone());
let sender = WSSender { ws_tx };
let handlers_fut = WSHandlerFuture::new(handlers, msg_rx);
let conn = WSConnectionFuture::new(msg_tx, ws_rx, addr.clone());
Self {
addr,
conn,
@ -304,15 +304,15 @@ impl WsConnectActionFut {
}
}
impl Future for WsConnectActionFut {
type Output = Result<WsConnectResult, WsError>;
impl Future for WSConnectActionFut {
type Output = Result<WSConnectResult, WSError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
match ready!(this.conn.as_mut().poll(cx)) {
Ok(stream) => {
let handlers_fut = this.handlers_fut.take().expect("Only take once");
let sender = this.sender.take().expect("Only take once");
Poll::Ready(Ok(WsConnectResult {
Poll::Ready(Ok(WSConnectResult {
stream,
handlers_fut,
sender,
@ -324,39 +324,39 @@ impl Future for WsConnectActionFut {
}
#[derive(Clone, Eq, PartialEq)]
pub enum WsConnectState {
pub enum WSConnectState {
Init,
Connecting,
Connected,
Disconnected,
}
impl std::fmt::Display for WsConnectState {
impl std::fmt::Display for WSConnectState {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
WsConnectState::Init => f.write_str("Init"),
WsConnectState::Connected => f.write_str("Connecting"),
WsConnectState::Connecting => f.write_str("Connected"),
WsConnectState::Disconnected => f.write_str("Disconnected"),
WSConnectState::Init => f.write_str("Init"),
WSConnectState::Connected => f.write_str("Connecting"),
WSConnectState::Connecting => f.write_str("Connected"),
WSConnectState::Disconnected => f.write_str("Disconnected"),
}
}
}
impl std::fmt::Debug for WsConnectState {
impl std::fmt::Debug for WSConnectState {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str(&format!("{}", self)) }
}
struct WsSenderController {
state: WsConnectState,
state_notify: Arc<broadcast::Sender<WsConnectState>>,
sender: Option<Arc<WsSender>>,
struct WSSenderController {
state: WSConnectState,
state_notify: Arc<broadcast::Sender<WSConnectState>>,
sender: Option<Arc<WSSender>>,
}
impl WsSenderController {
fn set_sender(&mut self, sender: WsSender) { self.sender = Some(Arc::new(sender)); }
impl WSSenderController {
fn set_sender(&mut self, sender: WSSender) { self.sender = Some(Arc::new(sender)); }
fn set_state(&mut self, state: WsConnectState) {
if state != WsConnectState::Connected {
fn set_state(&mut self, state: WSConnectState) {
if state != WSConnectState::Connected {
self.sender = None;
}
@ -364,24 +364,24 @@ impl WsSenderController {
let _ = self.state_notify.send(self.state.clone());
}
fn set_error(&mut self, error: WsError) {
fn set_error(&mut self, error: WSError) {
log::error!("{:?}", error);
self.set_state(WsConnectState::Disconnected);
self.set_state(WSConnectState::Disconnected);
}
fn sender(&self) -> Option<Arc<WsSender>> { self.sender.clone() }
fn sender(&self) -> Option<Arc<WSSender>> { self.sender.clone() }
fn is_connecting(&self) -> bool { self.state == WsConnectState::Connecting }
fn is_connecting(&self) -> bool { self.state == WSConnectState::Connecting }
#[allow(dead_code)]
fn is_connected(&self) -> bool { self.state == WsConnectState::Connected }
fn is_connected(&self) -> bool { self.state == WSConnectState::Connected }
}
impl std::default::Default for WsSenderController {
impl std::default::Default for WSSenderController {
fn default() -> Self {
let (state_notify, _) = broadcast::channel(16);
WsSenderController {
state: WsConnectState::Init,
WSSenderController {
state: WSConnectState::Init,
state_notify: Arc::new(state_notify),
sender: None,
}