mv websocket to flowy-net crate

This commit is contained in:
appflowy 2021-12-14 15:31:44 +08:00
parent 5b7e6690f8
commit 72a8f7a9e3
75 changed files with 1042 additions and 376 deletions

25
backend/Cargo.lock generated
View File

@ -458,6 +458,7 @@ dependencies = [
"flowy-collaboration", "flowy-collaboration",
"flowy-core-infra", "flowy-core-infra",
"flowy-document", "flowy-document",
"flowy-net",
"flowy-sdk", "flowy-sdk",
"flowy-test", "flowy-test",
"flowy-user", "flowy-user",
@ -1339,14 +1340,35 @@ dependencies = [
] ]
[[package]] [[package]]
name = "flowy-net" name = "flowy-error"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"bytes", "bytes",
"derive_more",
"flowy-derive", "flowy-derive",
"lib-dispatch",
"protobuf", "protobuf",
] ]
[[package]]
name = "flowy-net"
version = "0.1.0"
dependencies = [
"anyhow",
"bytes",
"flowy-derive",
"flowy-error",
"lib-dispatch",
"lib-infra",
"lib-ws",
"parking_lot",
"protobuf",
"strum",
"strum_macros",
"tokio",
"tracing",
]
[[package]] [[package]]
name = "flowy-sdk" name = "flowy-sdk"
version = "0.1.0" version = "0.1.0"
@ -1416,7 +1438,6 @@ dependencies = [
"lib-dispatch", "lib-dispatch",
"lib-infra", "lib-infra",
"lib-sqlite", "lib-sqlite",
"lib-ws",
"log", "log",
"once_cell", "once_cell",
"parking_lot", "parking_lot",

View File

@ -103,4 +103,5 @@ flowy-sdk = { path = "../frontend/rust-lib/flowy-sdk", features = ["http_server"
flowy-user = { path = "../frontend/rust-lib/flowy-user", features = ["http_server"] } flowy-user = { path = "../frontend/rust-lib/flowy-user", features = ["http_server"] }
flowy-document = { path = "../frontend/rust-lib/flowy-document", features = ["flowy_unit_test", "http_server"] } flowy-document = { path = "../frontend/rust-lib/flowy-document", features = ["flowy_unit_test", "http_server"] }
flowy-test = { path = "../frontend/rust-lib/flowy-test" } flowy-test = { path = "../frontend/rust-lib/flowy-test" }
flowy-net = { path = "../frontend/rust-lib/flowy-net" }

View File

@ -16,6 +16,7 @@ use lib_ot::rich_text::{RichTextAttribute, RichTextDelta};
use parking_lot::RwLock; use parking_lot::RwLock;
use lib_ot::core::Interval; use lib_ot::core::Interval;
use flowy_collaboration::core::sync::ServerDocManager; use flowy_collaboration::core::sync::ServerDocManager;
use flowy_net::services::ws::WsManager;
pub struct DocumentTest { pub struct DocumentTest {
server: TestServer, server: TestServer,
@ -53,6 +54,7 @@ struct ScriptContext {
client_edit_context: Option<Arc<ClientEditDocContext>>, client_edit_context: Option<Arc<ClientEditDocContext>>,
client_sdk: FlowySDKTest, client_sdk: FlowySDKTest,
client_user_session: Arc<UserSession>, client_user_session: Arc<UserSession>,
ws_manager: Arc<WsManager>,
server_doc_manager: Arc<ServerDocManager>, server_doc_manager: Arc<ServerDocManager>,
server_pg_pool: Data<PgPool>, server_pg_pool: Data<PgPool>,
doc_id: String, doc_id: String,
@ -61,12 +63,14 @@ struct ScriptContext {
impl ScriptContext { impl ScriptContext {
async fn new(client_sdk: FlowySDKTest, server: TestServer) -> Self { async fn new(client_sdk: FlowySDKTest, server: TestServer) -> Self {
let user_session = client_sdk.user_session.clone(); let user_session = client_sdk.user_session.clone();
let ws_manager = client_sdk.ws_manager.clone();
let doc_id = create_doc(&client_sdk).await; let doc_id = create_doc(&client_sdk).await;
Self { Self {
client_edit_context: None, client_edit_context: None,
client_sdk, client_sdk,
client_user_session: user_session, client_user_session: user_session,
ws_manager,
server_doc_manager: server.app_ctx.document_core.manager.clone(), server_doc_manager: server.app_ctx.document_core.manager.clone(),
server_pg_pool: Data::new(server.pg_pool.clone()), server_pg_pool: Data::new(server.pg_pool.clone()),
doc_id, doc_id,
@ -99,9 +103,10 @@ async fn run_scripts(context: Arc<RwLock<ScriptContext>>, scripts: Vec<DocScript
match script { match script {
DocScript::ClientConnectWs => { DocScript::ClientConnectWs => {
// sleep(Duration::from_millis(300)).await; // sleep(Duration::from_millis(300)).await;
let ws_manager = context.read().ws_manager.clone();
let user_session = context.read().client_user_session.clone(); let user_session = context.read().client_user_session.clone();
let token = user_session.token().unwrap(); let token = user_session.token().unwrap();
let _ = user_session.start_ws_connection(&token).await.unwrap(); let _ = ws_manager.start(token).await.unwrap();
}, },
DocScript::ClientOpenDoc => { DocScript::ClientOpenDoc => {
context.write().open_doc().await; context.write().open_doc().await;

View File

@ -3,7 +3,7 @@ import 'dart:async';
import 'package:connectivity_plus/connectivity_plus.dart'; import 'package:connectivity_plus/connectivity_plus.dart';
import 'package:flowy_log/flowy_log.dart'; import 'package:flowy_log/flowy_log.dart';
import 'package:flowy_sdk/dispatch/dispatch.dart'; import 'package:flowy_sdk/dispatch/dispatch.dart';
import 'package:flowy_sdk/protobuf/lib-infra/protobuf.dart'; import 'package:flowy_sdk/protobuf/flowy-net/network_state.pb.dart';
import 'package:flutter/services.dart'; import 'package:flutter/services.dart';
class NetworkMonitor { class NetworkMonitor {
@ -45,7 +45,7 @@ class NetworkMonitor {
}(); }();
Log.info("Network type: $networkType"); Log.info("Network type: $networkType");
final state = NetworkState.create()..ty = networkType; final state = NetworkState.create()..ty = networkType;
UserEventUpdateNetworkType(state).send().then((result) { NetworkEventUpdateNetworkType(state).send().then((result) {
result.fold( result.fold(
(l) {}, (l) {},
(e) => Log.error(e), (e) => Log.error(e),

View File

@ -412,6 +412,23 @@ class WorkspaceEventExportDocument {
} }
} }
class NetworkEventUpdateNetworkType {
NetworkState request;
NetworkEventUpdateNetworkType(this.request);
Future<Either<Unit, FlowyError>> send() {
final request = FFIRequest.create()
..event = NetworkEvent.UpdateNetworkType.toString()
..payload = requestToBytes(this.request);
return Dispatch.asyncRequest(request)
.then((bytesResult) => bytesResult.fold(
(bytes) => left(unit),
(errBytes) => right(FlowyError.fromBuffer(errBytes)),
));
}
}
class UserEventInitUser { class UserEventInitUser {
UserEventInitUser(); UserEventInitUser();
@ -519,20 +536,3 @@ class UserEventCheckUser {
} }
} }
class UserEventUpdateNetworkType {
NetworkState request;
UserEventUpdateNetworkType(this.request);
Future<Either<Unit, UserError>> send() {
final request = FFIRequest.create()
..event = UserEvent.UpdateNetworkType.toString()
..payload = requestToBytes(this.request);
return Dispatch.asyncRequest(request)
.then((bytesResult) => bytesResult.fold(
(bytes) => left(unit),
(errBytes) => right(UserError.fromBuffer(errBytes)),
));
}
}

View File

@ -3,6 +3,8 @@ import 'package:dartz/dartz.dart';
import 'package:flowy_log/flowy_log.dart'; import 'package:flowy_log/flowy_log.dart';
// ignore: unnecessary_import // ignore: unnecessary_import
import 'package:flowy_sdk/protobuf/dart-ffi/ffi_response.pb.dart'; import 'package:flowy_sdk/protobuf/dart-ffi/ffi_response.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-net/event.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-net/network_state.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-net/network_state.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-user/errors.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-user/errors.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-user/event.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-user/event.pb.dart';
@ -53,7 +55,7 @@ class Dispatch {
} }
} }
Future<Either<Uint8List, Uint8List>> _extractPayload(Future<Either<FFIResponse, FlowyError>> responseFuture) { Future<Either<Uint8List, Uint8List>> _extractPayload(Future<Either<FFIResponse, FlowyInternalError>> responseFuture) {
return responseFuture.then((result) { return responseFuture.then((result) {
return result.fold( return result.fold(
(response) { (response) {
@ -79,7 +81,7 @@ Future<Either<Uint8List, Uint8List>> _extractPayload(Future<Either<FFIResponse,
}); });
} }
Future<Either<FFIResponse, FlowyError>> _extractResponse(Completer<Uint8List> bytesFuture) { Future<Either<FFIResponse, FlowyInternalError>> _extractResponse(Completer<Uint8List> bytesFuture) {
return bytesFuture.future.then((bytes) { return bytesFuture.future.then((bytes) {
try { try {
final response = FFIResponse.fromBuffer(bytes); final response = FFIResponse.fromBuffer(bytes);

View File

@ -1,6 +1,6 @@
import 'package:flowy_sdk/protobuf/dart-ffi/protobuf.dart'; import 'package:flowy_sdk/protobuf/dart-ffi/protobuf.dart';
class FlowyError { class FlowyInternalError {
late FFIStatusCode _statusCode; late FFIStatusCode _statusCode;
late String _error; late String _error;
@ -20,13 +20,13 @@ class FlowyError {
return "$_statusCode: $_error"; return "$_statusCode: $_error";
} }
FlowyError({required FFIStatusCode statusCode, required String error}) { FlowyInternalError({required FFIStatusCode statusCode, required String error}) {
_statusCode = statusCode; _statusCode = statusCode;
_error = error; _error = error;
} }
factory FlowyError.from(FFIResponse resp) { factory FlowyInternalError.from(FFIResponse resp) {
return FlowyError(statusCode: resp.code, error: ""); return FlowyInternalError(statusCode: resp.code, error: "");
} }
} }
@ -38,8 +38,8 @@ class StackTraceError {
this.trace, this.trace,
); );
FlowyError asFlowyError() { FlowyInternalError asFlowyError() {
return FlowyError(statusCode: FFIStatusCode.Err, error: this.toString()); return FlowyInternalError(statusCode: FFIStatusCode.Err, error: this.toString());
} }
String toString() { String toString() {

View File

@ -0,0 +1,74 @@
///
// Generated code. Do not modify.
// source: errors.proto
//
// @dart = 2.12
// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields
import 'dart:core' as $core;
import 'package:protobuf/protobuf.dart' as $pb;
export 'errors.pbenum.dart';
class FlowyError extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'FlowyError', createEmptyInstance: create)
..a<$core.int>(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'code', $pb.PbFieldType.O3)
..aOS(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'msg')
..hasRequiredFields = false
;
FlowyError._() : super();
factory FlowyError({
$core.int? code,
$core.String? msg,
}) {
final _result = create();
if (code != null) {
_result.code = code;
}
if (msg != null) {
_result.msg = msg;
}
return _result;
}
factory FlowyError.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
factory FlowyError.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.deepCopy] instead. '
'Will be removed in next major version')
FlowyError clone() => FlowyError()..mergeFromMessage(this);
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.rebuild] instead. '
'Will be removed in next major version')
FlowyError copyWith(void Function(FlowyError) updates) => super.copyWith((message) => updates(message as FlowyError)) as FlowyError; // ignore: deprecated_member_use
$pb.BuilderInfo get info_ => _i;
@$core.pragma('dart2js:noInline')
static FlowyError create() => FlowyError._();
FlowyError createEmptyInstance() => create();
static $pb.PbList<FlowyError> createRepeated() => $pb.PbList<FlowyError>();
@$core.pragma('dart2js:noInline')
static FlowyError getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<FlowyError>(create);
static FlowyError? _defaultInstance;
@$pb.TagNumber(1)
$core.int get code => $_getIZ(0);
@$pb.TagNumber(1)
set code($core.int v) { $_setSignedInt32(0, v); }
@$pb.TagNumber(1)
$core.bool hasCode() => $_has(0);
@$pb.TagNumber(1)
void clearCode() => clearField(1);
@$pb.TagNumber(2)
$core.String get msg => $_getSZ(1);
@$pb.TagNumber(2)
set msg($core.String v) { $_setString(1, v); }
@$pb.TagNumber(2)
$core.bool hasMsg() => $_has(1);
@$pb.TagNumber(2)
void clearMsg() => clearField(2);
}

View File

@ -0,0 +1,24 @@
///
// Generated code. Do not modify.
// source: errors.proto
//
// @dart = 2.12
// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields
// ignore_for_file: UNDEFINED_SHOWN_NAME
import 'dart:core' as $core;
import 'package:protobuf/protobuf.dart' as $pb;
class ErrorCode extends $pb.ProtobufEnum {
static const ErrorCode Internal = ErrorCode._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Internal');
static const $core.List<ErrorCode> values = <ErrorCode> [
Internal,
];
static final $core.Map<$core.int, ErrorCode> _byValue = $pb.ProtobufEnum.initByValue(values);
static ErrorCode? valueOf($core.int value) => _byValue[value];
const ErrorCode._($core.int v, $core.String n) : super(v, n);
}

View File

@ -0,0 +1,31 @@
///
// Generated code. Do not modify.
// source: errors.proto
//
// @dart = 2.12
// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields,deprecated_member_use_from_same_package
import 'dart:core' as $core;
import 'dart:convert' as $convert;
import 'dart:typed_data' as $typed_data;
@$core.Deprecated('Use errorCodeDescriptor instead')
const ErrorCode$json = const {
'1': 'ErrorCode',
'2': const [
const {'1': 'Internal', '2': 0},
],
};
/// Descriptor for `ErrorCode`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List errorCodeDescriptor = $convert.base64Decode('CglFcnJvckNvZGUSDAoISW50ZXJuYWwQAA==');
@$core.Deprecated('Use flowyErrorDescriptor instead')
const FlowyError$json = const {
'1': 'FlowyError',
'2': const [
const {'1': 'code', '3': 1, '4': 1, '5': 5, '10': 'code'},
const {'1': 'msg', '3': 2, '4': 1, '5': 9, '10': 'msg'},
],
};
/// Descriptor for `FlowyError`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List flowyErrorDescriptor = $convert.base64Decode('CgpGbG93eUVycm9yEhIKBGNvZGUYASABKAVSBGNvZGUSEAoDbXNnGAIgASgJUgNtc2c=');

View File

@ -0,0 +1,9 @@
///
// Generated code. Do not modify.
// source: errors.proto
//
// @dart = 2.12
// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields,deprecated_member_use_from_same_package
export 'errors.pb.dart';

View File

@ -0,0 +1,2 @@
// Auto-generated, do not edit
export './errors.pb.dart';

View File

@ -1,11 +1,11 @@
/// ///
// Generated code. Do not modify. // Generated code. Do not modify.
// source: cache.proto // source: event.proto
// //
// @dart = 2.12 // @dart = 2.12
// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields // ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields
import 'dart:core' as $core; import 'dart:core' as $core;
export 'cache.pbenum.dart'; export 'event.pbenum.dart';

View File

@ -0,0 +1,24 @@
///
// Generated code. Do not modify.
// source: event.proto
//
// @dart = 2.12
// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields
// ignore_for_file: UNDEFINED_SHOWN_NAME
import 'dart:core' as $core;
import 'package:protobuf/protobuf.dart' as $pb;
class NetworkEvent extends $pb.ProtobufEnum {
static const NetworkEvent UpdateNetworkType = NetworkEvent._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UpdateNetworkType');
static const $core.List<NetworkEvent> values = <NetworkEvent> [
UpdateNetworkType,
];
static final $core.Map<$core.int, NetworkEvent> _byValue = $pb.ProtobufEnum.initByValue(values);
static NetworkEvent? valueOf($core.int value) => _byValue[value];
const NetworkEvent._($core.int v, $core.String n) : super(v, n);
}

View File

@ -1,6 +1,6 @@
/// ///
// Generated code. Do not modify. // Generated code. Do not modify.
// source: cache.proto // source: event.proto
// //
// @dart = 2.12 // @dart = 2.12
// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields,deprecated_member_use_from_same_package // ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields,deprecated_member_use_from_same_package
@ -8,14 +8,13 @@
import 'dart:core' as $core; import 'dart:core' as $core;
import 'dart:convert' as $convert; import 'dart:convert' as $convert;
import 'dart:typed_data' as $typed_data; import 'dart:typed_data' as $typed_data;
@$core.Deprecated('Use revStateDescriptor instead') @$core.Deprecated('Use networkEventDescriptor instead')
const RevState$json = const { const NetworkEvent$json = const {
'1': 'RevState', '1': 'NetworkEvent',
'2': const [ '2': const [
const {'1': 'Local', '2': 0}, const {'1': 'UpdateNetworkType', '2': 0},
const {'1': 'Acked', '2': 1},
], ],
}; };
/// Descriptor for `RevState`. Decode as a `google.protobuf.EnumDescriptorProto`. /// Descriptor for `NetworkEvent`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List revStateDescriptor = $convert.base64Decode('CghSZXZTdGF0ZRIJCgVMb2NhbBAAEgkKBUFja2VkEAE='); final $typed_data.Uint8List networkEventDescriptor = $convert.base64Decode('CgxOZXR3b3JrRXZlbnQSFQoRVXBkYXRlTmV0d29ya1R5cGUQAA==');

View File

@ -1,9 +1,9 @@
/// ///
// Generated code. Do not modify. // Generated code. Do not modify.
// source: cache.proto // source: event.proto
// //
// @dart = 2.12 // @dart = 2.12
// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields,deprecated_member_use_from_same_package // ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields,deprecated_member_use_from_same_package
export 'cache.pb.dart'; export 'event.pb.dart';

View File

@ -1,2 +1,3 @@
// Auto-generated, do not edit // Auto-generated, do not edit
export './network_state.pb.dart'; export './network_state.pb.dart';
export './event.pb.dart';

View File

@ -17,7 +17,6 @@ class UserEvent extends $pb.ProtobufEnum {
static const UserEvent UpdateUser = UserEvent._(4, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UpdateUser'); static const UserEvent UpdateUser = UserEvent._(4, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UpdateUser');
static const UserEvent GetUserProfile = UserEvent._(5, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'GetUserProfile'); static const UserEvent GetUserProfile = UserEvent._(5, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'GetUserProfile');
static const UserEvent CheckUser = UserEvent._(6, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'CheckUser'); static const UserEvent CheckUser = UserEvent._(6, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'CheckUser');
static const UserEvent UpdateNetworkType = UserEvent._(10, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UpdateNetworkType');
static const $core.List<UserEvent> values = <UserEvent> [ static const $core.List<UserEvent> values = <UserEvent> [
InitUser, InitUser,
@ -27,7 +26,6 @@ class UserEvent extends $pb.ProtobufEnum {
UpdateUser, UpdateUser,
GetUserProfile, GetUserProfile,
CheckUser, CheckUser,
UpdateNetworkType,
]; ];
static final $core.Map<$core.int, UserEvent> _byValue = $pb.ProtobufEnum.initByValue(values); static final $core.Map<$core.int, UserEvent> _byValue = $pb.ProtobufEnum.initByValue(values);

View File

@ -19,9 +19,8 @@ const UserEvent$json = const {
const {'1': 'UpdateUser', '2': 4}, const {'1': 'UpdateUser', '2': 4},
const {'1': 'GetUserProfile', '2': 5}, const {'1': 'GetUserProfile', '2': 5},
const {'1': 'CheckUser', '2': 6}, const {'1': 'CheckUser', '2': 6},
const {'1': 'UpdateNetworkType', '2': 10},
], ],
}; };
/// Descriptor for `UserEvent`. Decode as a `google.protobuf.EnumDescriptorProto`. /// Descriptor for `UserEvent`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List userEventDescriptor = $convert.base64Decode('CglVc2VyRXZlbnQSDAoISW5pdFVzZXIQABIKCgZTaWduSW4QARIKCgZTaWduVXAQAhILCgdTaWduT3V0EAMSDgoKVXBkYXRlVXNlchAEEhIKDkdldFVzZXJQcm9maWxlEAUSDQoJQ2hlY2tVc2VyEAYSFQoRVXBkYXRlTmV0d29ya1R5cGUQCg=='); final $typed_data.Uint8List userEventDescriptor = $convert.base64Decode('CglVc2VyRXZlbnQSDAoISW5pdFVzZXIQABIKCgZTaWduSW4QARIKCgZTaWduVXAQAhILCgdTaWduT3V0EAMSDgoKVXBkYXRlVXNlchAEEhIKDkdldFVzZXJQcm9maWxlEAUSDQoJQ2hlY2tVc2VyEAY=');

View File

@ -1,26 +0,0 @@
///
// Generated code. Do not modify.
// source: cache.proto
//
// @dart = 2.12
// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields
// ignore_for_file: UNDEFINED_SHOWN_NAME
import 'dart:core' as $core;
import 'package:protobuf/protobuf.dart' as $pb;
class RevState extends $pb.ProtobufEnum {
static const RevState Local = RevState._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Local');
static const RevState Acked = RevState._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Acked');
static const $core.List<RevState> values = <RevState> [
Local,
Acked,
];
static final $core.Map<$core.int, RevState> _byValue = $pb.ProtobufEnum.initByValue(values);
static RevState? valueOf($core.int value) => _byValue[value];
const RevState._($core.int v, $core.String n) : super(v, n);
}

View File

@ -24,3 +24,18 @@ class RevType extends $pb.ProtobufEnum {
const RevType._($core.int v, $core.String n) : super(v, n); const RevType._($core.int v, $core.String n) : super(v, n);
} }
class RevState extends $pb.ProtobufEnum {
static const RevState StateLocal = RevState._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'StateLocal');
static const RevState Acked = RevState._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Acked');
static const $core.List<RevState> values = <RevState> [
StateLocal,
Acked,
];
static final $core.Map<$core.int, RevState> _byValue = $pb.ProtobufEnum.initByValue(values);
static RevState? valueOf($core.int value) => _byValue[value];
const RevState._($core.int v, $core.String n) : super(v, n);
}

View File

@ -19,6 +19,17 @@ const RevType$json = const {
/// Descriptor for `RevType`. Decode as a `google.protobuf.EnumDescriptorProto`. /// Descriptor for `RevType`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List revTypeDescriptor = $convert.base64Decode('CgdSZXZUeXBlEgkKBUxvY2FsEAASCgoGUmVtb3RlEAE='); final $typed_data.Uint8List revTypeDescriptor = $convert.base64Decode('CgdSZXZUeXBlEgkKBUxvY2FsEAASCgoGUmVtb3RlEAE=');
@$core.Deprecated('Use revStateDescriptor instead')
const RevState$json = const {
'1': 'RevState',
'2': const [
const {'1': 'StateLocal', '2': 0},
const {'1': 'Acked', '2': 1},
],
};
/// Descriptor for `RevState`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List revStateDescriptor = $convert.base64Decode('CghSZXZTdGF0ZRIOCgpTdGF0ZUxvY2FsEAASCQoFQWNrZWQQAQ==');
@$core.Deprecated('Use revisionDescriptor instead') @$core.Deprecated('Use revisionDescriptor instead')
const Revision$json = const { const Revision$json = const {
'1': 'Revision', '1': 'Revision',

View File

@ -1,3 +1,2 @@
// Auto-generated, do not edit // Auto-generated, do not edit
export './cache.pb.dart';
export './model.pb.dart'; export './model.pb.dart';

View File

@ -11,6 +11,7 @@ members = [
"flowy-core", "flowy-core",
"dart-notify", "dart-notify",
"flowy-document", "flowy-document",
"flowy-error",
] ]
exclude = ["../backend"] exclude = ["../backend"]

View File

@ -52,7 +52,7 @@ color-eyre = { version = "0.5", default-features = false }
criterion = "0.3" criterion = "0.3"
rand = "0.7.3" rand = "0.7.3"
env_logger = "0.8.2" env_logger = "0.8.2"
flowy-user = { path = "../flowy-user", features = ["ws_mock"] } flowy-net = { path = "../flowy-net", features = ["ws_mock"] }
[features] [features]
http_server = [] http_server = []

View File

@ -63,7 +63,7 @@ impl RevisionCache {
} }
let record = RevisionRecord { let record = RevisionRecord {
revision, revision,
state: RevState::Local, state: RevState::StateLocal,
}; };
self.memory_cache.add_revision(record).await?; self.memory_cache.add_revision(record).await?;
self.save_revisions().await; self.save_revisions().await;
@ -77,7 +77,7 @@ impl RevisionCache {
} }
let record = RevisionRecord { let record = RevisionRecord {
revision, revision,
state: RevState::Local, state: RevState::StateLocal,
}; };
self.memory_cache.add_revision(record).await?; self.memory_cache.add_revision(record).await?;
self.save_revisions().await; self.save_revisions().await;

View File

@ -35,7 +35,7 @@ impl RevisionMemoryCache {
} }
match record.state { match record.state {
RevState::Local => { RevState::StateLocal => {
tracing::debug!("{}:add revision {}", record.revision.doc_id, record.revision.rev_id); tracing::debug!("{}:add revision {}", record.revision.doc_id, record.revision.rev_id);
self.local_revs.write().await.push_back(record.revision.rev_id); self.local_revs.write().await.push_back(record.revision.rev_id);
}, },

View File

@ -13,7 +13,7 @@ pub(crate) trait WsDocumentHandler: Send + Sync {
pub type WsStateReceiver = tokio::sync::broadcast::Receiver<WsConnectState>; pub type WsStateReceiver = tokio::sync::broadcast::Receiver<WsConnectState>;
pub trait DocumentWebSocket: Send + Sync { pub trait DocumentWebSocket: Send + Sync {
fn send(&self, data: WsDocumentData) -> Result<(), DocError>; fn send(&self, data: WsDocumentData) -> Result<(), DocError>;
fn state_notify(&self) -> WsStateReceiver; fn subscribe_state_changed(&self) -> WsStateReceiver;
} }
pub struct WsDocumentManager { pub struct WsDocumentManager {
@ -56,7 +56,7 @@ impl WsDocumentManager {
#[tracing::instrument(level = "debug", skip(ws, handlers))] #[tracing::instrument(level = "debug", skip(ws, handlers))]
fn listen_ws_state_changed(ws: Arc<dyn DocumentWebSocket>, handlers: Arc<DashMap<String, Arc<dyn WsDocumentHandler>>>) { fn listen_ws_state_changed(ws: Arc<dyn DocumentWebSocket>, handlers: Arc<DashMap<String, Arc<dyn WsDocumentHandler>>>) {
let mut notify = ws.state_notify(); let mut notify = ws.subscribe_state_changed();
tokio::spawn(async move { tokio::spawn(async move {
while let Ok(state) = notify.recv().await { while let Ok(state) = notify.recv().await {
handlers.iter().for_each(|handle| { handlers.iter().for_each(|handle| {

View File

@ -49,7 +49,7 @@ impl_sql_integer_expression!(RevTableState);
impl std::convert::From<RevTableState> for RevState { impl std::convert::From<RevTableState> for RevState {
fn from(s: RevTableState) -> Self { fn from(s: RevTableState) -> Self {
match s { match s {
RevTableState::Local => RevState::Local, RevTableState::Local => RevState::StateLocal,
RevTableState::Acked => RevState::Acked, RevTableState::Acked => RevState::Acked,
} }
} }
@ -58,7 +58,7 @@ impl std::convert::From<RevTableState> for RevState {
impl std::convert::From<RevState> for RevTableState { impl std::convert::From<RevState> for RevTableState {
fn from(s: RevState) -> Self { fn from(s: RevState) -> Self {
match s { match s {
RevState::Local => RevTableState::Local, RevState::StateLocal => RevTableState::Local,
RevState::Acked => RevTableState::Acked, RevState::Acked => RevTableState::Acked,
} }
} }

View File

@ -6,7 +6,7 @@ async fn doc_rev_state_test1() {
let scripts = vec![ let scripts = vec![
InsertText("123", 0), InsertText("123", 0),
AssertCurrentRevId(1), AssertCurrentRevId(1),
AssertRevisionState(1, RevState::Local), AssertRevisionState(1, RevState::StateLocal),
SimulateAckedMessage(1), SimulateAckedMessage(1),
AssertRevisionState(1, RevState::Acked), AssertRevisionState(1, RevState::Acked),
AssertNextSendingRevision(None), AssertNextSendingRevision(None),
@ -22,9 +22,9 @@ async fn doc_rev_state_test2() {
InsertText("2", 1), InsertText("2", 1),
InsertText("3", 2), InsertText("3", 2),
AssertCurrentRevId(3), AssertCurrentRevId(3),
AssertRevisionState(1, RevState::Local), AssertRevisionState(1, RevState::StateLocal),
AssertRevisionState(2, RevState::Local), AssertRevisionState(2, RevState::StateLocal),
AssertRevisionState(3, RevState::Local), AssertRevisionState(3, RevState::StateLocal),
SimulateAckedMessage(1), SimulateAckedMessage(1),
AssertRevisionState(1, RevState::Acked), AssertRevisionState(1, RevState::Acked),
AssertNextSendingRevision(Some(2)), AssertNextSendingRevision(Some(2)),
@ -32,7 +32,7 @@ async fn doc_rev_state_test2() {
AssertRevisionState(2, RevState::Acked), AssertRevisionState(2, RevState::Acked),
// //
AssertNextSendingRevision(Some(3)), AssertNextSendingRevision(Some(3)),
AssertRevisionState(3, RevState::Local), AssertRevisionState(3, RevState::StateLocal),
AssertJson(r#"[{"insert":"123\n"}]"#), AssertJson(r#"[{"insert":"123\n"}]"#),
]; ];
EditorTest::new().await.run_scripts(scripts).await; EditorTest::new().await.run_scripts(scripts).await;

View File

@ -0,0 +1,13 @@
[package]
name = "flowy-error"
version = "0.1.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
flowy-derive = { path = "../../../shared-lib/flowy-derive" }
derive_more = {version = "0.99", features = ["display"]}
lib-dispatch = { path = "../lib-dispatch" }
protobuf = {version = "2.20.0"}
bytes = "1.0"

View File

@ -0,0 +1,3 @@
proto_crates = ["src/errors.rs",]
event_files = []

View File

@ -0,0 +1,84 @@
use crate::protobuf::ErrorCode as ProtoBufErrorCode;
use bytes::Bytes;
use derive_more::Display;
use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
use lib_dispatch::prelude::{EventResponse, ResponseBuilder};
use protobuf::ProtobufEnum;
use std::{
convert::{TryFrom, TryInto},
fmt::Debug,
};
#[derive(Debug, Default, Clone, ProtoBuf)]
pub struct FlowyError {
#[pb(index = 1)]
pub code: i32,
#[pb(index = 2)]
pub msg: String,
}
macro_rules! static_any_error {
($name:ident, $code:expr) => {
#[allow(non_snake_case, missing_docs)]
pub fn $name() -> FlowyError { $code.into() }
};
}
impl FlowyError {
pub fn new(code: ErrorCode, msg: &str) -> Self {
Self {
code: code.value(),
msg: msg.to_owned(),
}
}
pub fn context<T: Debug>(mut self, error: T) -> Self {
self.msg = format!("{:?}", error);
self
}
static_any_error!(internal, ErrorCode::Internal);
}
impl std::convert::From<ErrorCode> for FlowyError {
fn from(code: ErrorCode) -> Self {
FlowyError {
code: code.value(),
msg: format!("{}", code),
}
}
}
#[derive(Debug, Clone, ProtoBuf_Enum, Display, PartialEq, Eq)]
pub enum ErrorCode {
#[display(fmt = "Internal error")]
Internal = 0,
}
impl ErrorCode {
pub fn value(&self) -> i32 {
let code: ProtoBufErrorCode = self.clone().try_into().unwrap();
code.value()
}
pub fn from_i32(value: i32) -> Self {
match ProtoBufErrorCode::from_i32(value) {
None => ErrorCode::Internal,
Some(code) => ErrorCode::try_from(&code).unwrap(),
}
}
}
pub fn internal_error<T>(e: T) -> FlowyError
where
T: std::fmt::Debug,
{
FlowyError::internal().context(e)
}
impl lib_dispatch::Error for FlowyError {
fn as_response(&self) -> EventResponse {
let bytes: Bytes = self.clone().try_into().unwrap();
ResponseBuilder::Err().data(bytes).build()
}
}

View File

@ -0,0 +1,4 @@
mod errors;
pub mod protobuf;
pub use errors::*;

View File

@ -0,0 +1,4 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit
mod model;
pub use model::*;

View File

@ -0,0 +1,293 @@
// This file is generated by rust-protobuf 2.22.1. Do not edit
// @generated
// https://github.com/rust-lang/rust-clippy/issues/702
#![allow(unknown_lints)]
#![allow(clippy::all)]
#![allow(unused_attributes)]
#![cfg_attr(rustfmt, rustfmt::skip)]
#![allow(box_pointers)]
#![allow(dead_code)]
#![allow(missing_docs)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#![allow(non_upper_case_globals)]
#![allow(trivial_casts)]
#![allow(unused_imports)]
#![allow(unused_results)]
//! Generated file from `errors.proto`
/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_22_1;
#[derive(PartialEq,Clone,Default)]
pub struct FlowyError {
// message fields
pub code: i32,
pub msg: ::std::string::String,
// special fields
pub unknown_fields: ::protobuf::UnknownFields,
pub cached_size: ::protobuf::CachedSize,
}
impl<'a> ::std::default::Default for &'a FlowyError {
fn default() -> &'a FlowyError {
<FlowyError as ::protobuf::Message>::default_instance()
}
}
impl FlowyError {
pub fn new() -> FlowyError {
::std::default::Default::default()
}
// int32 code = 1;
pub fn get_code(&self) -> i32 {
self.code
}
pub fn clear_code(&mut self) {
self.code = 0;
}
// Param is passed by value, moved
pub fn set_code(&mut self, v: i32) {
self.code = v;
}
// string msg = 2;
pub fn get_msg(&self) -> &str {
&self.msg
}
pub fn clear_msg(&mut self) {
self.msg.clear();
}
// Param is passed by value, moved
pub fn set_msg(&mut self, v: ::std::string::String) {
self.msg = v;
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_msg(&mut self) -> &mut ::std::string::String {
&mut self.msg
}
// Take field
pub fn take_msg(&mut self) -> ::std::string::String {
::std::mem::replace(&mut self.msg, ::std::string::String::new())
}
}
impl ::protobuf::Message for FlowyError {
fn is_initialized(&self) -> bool {
true
}
fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
while !is.eof()? {
let (field_number, wire_type) = is.read_tag_unpack()?;
match field_number {
1 => {
if wire_type != ::protobuf::wire_format::WireTypeVarint {
return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
}
let tmp = is.read_int32()?;
self.code = tmp;
},
2 => {
::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.msg)?;
},
_ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
},
};
}
::std::result::Result::Ok(())
}
// Compute sizes of nested messages
#[allow(unused_variables)]
fn compute_size(&self) -> u32 {
let mut my_size = 0;
if self.code != 0 {
my_size += ::protobuf::rt::value_size(1, self.code, ::protobuf::wire_format::WireTypeVarint);
}
if !self.msg.is_empty() {
my_size += ::protobuf::rt::string_size(2, &self.msg);
}
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size);
my_size
}
fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
if self.code != 0 {
os.write_int32(1, self.code)?;
}
if !self.msg.is_empty() {
os.write_string(2, &self.msg)?;
}
os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(())
}
fn get_cached_size(&self) -> u32 {
self.cached_size.get()
}
fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
&self.unknown_fields
}
fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
&mut self.unknown_fields
}
fn as_any(&self) -> &dyn (::std::any::Any) {
self as &dyn (::std::any::Any)
}
fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
self as &mut dyn (::std::any::Any)
}
fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
self
}
fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
Self::descriptor_static()
}
fn new() -> FlowyError {
FlowyError::new()
}
fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
descriptor.get(|| {
let mut fields = ::std::vec::Vec::new();
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt32>(
"code",
|m: &FlowyError| { &m.code },
|m: &mut FlowyError| { &mut m.code },
));
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
"msg",
|m: &FlowyError| { &m.msg },
|m: &mut FlowyError| { &mut m.msg },
));
::protobuf::reflect::MessageDescriptor::new_pb_name::<FlowyError>(
"FlowyError",
fields,
file_descriptor_proto()
)
})
}
fn default_instance() -> &'static FlowyError {
static instance: ::protobuf::rt::LazyV2<FlowyError> = ::protobuf::rt::LazyV2::INIT;
instance.get(FlowyError::new)
}
}
impl ::protobuf::Clear for FlowyError {
fn clear(&mut self) {
self.code = 0;
self.msg.clear();
self.unknown_fields.clear();
}
}
impl ::std::fmt::Debug for FlowyError {
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
::protobuf::text_format::fmt(self, f)
}
}
impl ::protobuf::reflect::ProtobufValue for FlowyError {
fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
::protobuf::reflect::ReflectValueRef::Message(self)
}
}
#[derive(Clone,PartialEq,Eq,Debug,Hash)]
pub enum ErrorCode {
Internal = 0,
}
impl ::protobuf::ProtobufEnum for ErrorCode {
fn value(&self) -> i32 {
*self as i32
}
fn from_i32(value: i32) -> ::std::option::Option<ErrorCode> {
match value {
0 => ::std::option::Option::Some(ErrorCode::Internal),
_ => ::std::option::Option::None
}
}
fn values() -> &'static [Self] {
static values: &'static [ErrorCode] = &[
ErrorCode::Internal,
];
values
}
fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor {
static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::EnumDescriptor> = ::protobuf::rt::LazyV2::INIT;
descriptor.get(|| {
::protobuf::reflect::EnumDescriptor::new_pb_name::<ErrorCode>("ErrorCode", file_descriptor_proto())
})
}
}
impl ::std::marker::Copy for ErrorCode {
}
impl ::std::default::Default for ErrorCode {
fn default() -> Self {
ErrorCode::Internal
}
}
impl ::protobuf::reflect::ProtobufValue for ErrorCode {
fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
::protobuf::reflect::ReflectValueRef::Enum(::protobuf::ProtobufEnum::descriptor(self))
}
}
static file_descriptor_proto_data: &'static [u8] = b"\
\n\x0cerrors.proto\"2\n\nFlowyError\x12\x12\n\x04code\x18\x01\x20\x01(\
\x05R\x04code\x12\x10\n\x03msg\x18\x02\x20\x01(\tR\x03msg*\x19\n\tErrorC\
ode\x12\x0c\n\x08Internal\x10\0J\xd9\x01\n\x06\x12\x04\0\0\x08\x01\n\x08\
\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x05\x01\n\n\n\
\x03\x04\0\x01\x12\x03\x02\x08\x12\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\
\x04\x13\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\t\n\x0c\n\x05\x04\0\
\x02\0\x01\x12\x03\x03\n\x0e\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x11\
\x12\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x13\n\x0c\n\x05\x04\0\x02\
\x01\x05\x12\x03\x04\x04\n\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\x0b\
\x0e\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x11\x12\n\n\n\x02\x05\0\
\x12\x04\x06\0\x08\x01\n\n\n\x03\x05\0\x01\x12\x03\x06\x05\x0e\n\x0b\n\
\x04\x05\0\x02\0\x12\x03\x07\x04\x11\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\
\x07\x04\x0c\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x07\x0f\x10b\x06proto3\
";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap()
}
pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
file_descriptor_proto_lazy.get(|| {
parse_descriptor_proto()
})
}

View File

@ -0,0 +1,5 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit
mod errors;
pub use errors::*;

View File

@ -0,0 +1,9 @@
syntax = "proto3";
message FlowyError {
int32 code = 1;
string msg = 2;
}
enum ErrorCode {
Internal = 0;
}

View File

@ -6,6 +6,23 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
lib-dispatch = { path = "../lib-dispatch" }
flowy-error = { path = "../flowy-error" }
flowy-derive = { path = "../../../shared-lib/flowy-derive" } flowy-derive = { path = "../../../shared-lib/flowy-derive" }
lib-infra = { path = "../../../shared-lib/lib-infra" }
lib-ws = { path = "../../../shared-lib/lib-ws" }
protobuf = {version = "2.18.0"} protobuf = {version = "2.18.0"}
bytes = { version = "1.0" } bytes = { version = "1.0" }
anyhow = "1.0"
tokio = {version = "1", features = ["sync"]}
parking_lot = "0.11"
strum = "0.21"
strum_macros = "0.21"
tracing = { version = "0.1", features = ["log"] }
flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration", optional = true}
lazy_static = {version = "1.4.0", optional = true}
dashmap = {version = "4.0", optional = true}
[features]
ws_mock = ["flowy-collaboration", "lazy_static", "dashmap"]

View File

@ -1,2 +1,2 @@
proto_crates = ["src/entities"] proto_crates = ["src/event.rs", "src/entities"]
event_files = [] event_files = ["src/event.rs"]

View File

@ -0,0 +1,9 @@
use flowy_derive::{Flowy_Event, ProtoBuf_Enum};
use strum_macros::Display;
#[derive(Clone, Copy, PartialEq, Eq, Debug, Display, Hash, ProtoBuf_Enum, Flowy_Event)]
#[event_err = "FlowyError"]
pub enum NetworkEvent {
#[event(input = "NetworkState")]
UpdateNetworkType = 0,
}

View File

@ -0,0 +1,12 @@
use crate::{entities::NetworkState, services::ws::WsManager};
use bytes::Bytes;
use flowy_error::FlowyError;
use lib_dispatch::prelude::{Data, Unit};
use std::sync::Arc;
#[tracing::instrument(skip(data, ws_manager))]
pub async fn update_network_ty(data: Data<NetworkState>, ws_manager: Unit<Arc<WsManager>>) -> Result<(), FlowyError> {
let network_state = data.into_inner();
ws_manager.update_network_type(&network_state.ty);
Ok(())
}

View File

@ -1,2 +1,6 @@
pub mod entities; pub mod entities;
mod event;
mod handlers;
pub mod module;
pub mod protobuf; pub mod protobuf;
pub mod services;

View File

@ -0,0 +1,10 @@
use crate::{event::NetworkEvent, handlers::*, services::ws::WsManager};
use lib_dispatch::prelude::*;
use std::sync::Arc;
pub fn create(ws_manager: Arc<WsManager>) -> Module {
Module::new()
.name("Flowy-Network")
.data(ws_manager.clone())
.event(NetworkEvent::UpdateNetworkType, update_network_ty)
}

View File

@ -17,35 +17,32 @@
#![allow(trivial_casts)] #![allow(trivial_casts)]
#![allow(unused_imports)] #![allow(unused_imports)]
#![allow(unused_results)] #![allow(unused_results)]
//! Generated file from `cache.proto` //! Generated file from `event.proto`
/// Generated files are compatible only with the same version /// Generated files are compatible only with the same version
/// of protobuf runtime. /// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_22_1; // const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_22_1;
#[derive(Clone,PartialEq,Eq,Debug,Hash)] #[derive(Clone,PartialEq,Eq,Debug,Hash)]
pub enum RevState { pub enum NetworkEvent {
Local = 0, UpdateNetworkType = 0,
Acked = 1,
} }
impl ::protobuf::ProtobufEnum for RevState { impl ::protobuf::ProtobufEnum for NetworkEvent {
fn value(&self) -> i32 { fn value(&self) -> i32 {
*self as i32 *self as i32
} }
fn from_i32(value: i32) -> ::std::option::Option<RevState> { fn from_i32(value: i32) -> ::std::option::Option<NetworkEvent> {
match value { match value {
0 => ::std::option::Option::Some(RevState::Local), 0 => ::std::option::Option::Some(NetworkEvent::UpdateNetworkType),
1 => ::std::option::Option::Some(RevState::Acked),
_ => ::std::option::Option::None _ => ::std::option::Option::None
} }
} }
fn values() -> &'static [Self] { fn values() -> &'static [Self] {
static values: &'static [RevState] = &[ static values: &'static [NetworkEvent] = &[
RevState::Local, NetworkEvent::UpdateNetworkType,
RevState::Acked,
]; ];
values values
} }
@ -53,34 +50,32 @@ impl ::protobuf::ProtobufEnum for RevState {
fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor { fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor {
static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::EnumDescriptor> = ::protobuf::rt::LazyV2::INIT; static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::EnumDescriptor> = ::protobuf::rt::LazyV2::INIT;
descriptor.get(|| { descriptor.get(|| {
::protobuf::reflect::EnumDescriptor::new_pb_name::<RevState>("RevState", file_descriptor_proto()) ::protobuf::reflect::EnumDescriptor::new_pb_name::<NetworkEvent>("NetworkEvent", file_descriptor_proto())
}) })
} }
} }
impl ::std::marker::Copy for RevState { impl ::std::marker::Copy for NetworkEvent {
} }
impl ::std::default::Default for RevState { impl ::std::default::Default for NetworkEvent {
fn default() -> Self { fn default() -> Self {
RevState::Local NetworkEvent::UpdateNetworkType
} }
} }
impl ::protobuf::reflect::ProtobufValue for RevState { impl ::protobuf::reflect::ProtobufValue for NetworkEvent {
fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
::protobuf::reflect::ReflectValueRef::Enum(::protobuf::ProtobufEnum::descriptor(self)) ::protobuf::reflect::ReflectValueRef::Enum(::protobuf::ProtobufEnum::descriptor(self))
} }
} }
static file_descriptor_proto_data: &'static [u8] = b"\ static file_descriptor_proto_data: &'static [u8] = b"\
\n\x0bcache.proto*\x20\n\x08RevState\x12\t\n\x05Local\x10\0\x12\t\n\x05A\ \n\x0bevent.proto*%\n\x0cNetworkEvent\x12\x15\n\x11UpdateNetworkType\x10\
cked\x10\x01J|\n\x06\x12\x04\0\0\x05\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\ \0JS\n\x06\x12\x04\0\0\x04\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\
\n\n\n\x02\x05\0\x12\x04\x02\0\x05\x01\n\n\n\x03\x05\0\x01\x12\x03\x02\ \x05\0\x12\x04\x02\0\x04\x01\n\n\n\x03\x05\0\x01\x12\x03\x02\x05\x11\n\
\x05\r\n\x0b\n\x04\x05\0\x02\0\x12\x03\x03\x04\x0e\n\x0c\n\x05\x05\0\x02\ \x0b\n\x04\x05\0\x02\0\x12\x03\x03\x04\x1a\n\x0c\n\x05\x05\0\x02\0\x01\
\0\x01\x12\x03\x03\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x03\x0c\r\n\ \x12\x03\x03\x04\x15\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x03\x18\x19b\
\x0b\n\x04\x05\0\x02\x01\x12\x03\x04\x04\x0e\n\x0c\n\x05\x05\0\x02\x01\
\x01\x12\x03\x04\x04\t\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x04\x0c\rb\
\x06proto3\ \x06proto3\
"; ";

View File

@ -3,3 +3,6 @@
mod network_state; mod network_state;
pub use network_state::*; pub use network_state::*;
mod event;
pub use event::*;

View File

@ -0,0 +1,5 @@
syntax = "proto3";
enum NetworkEvent {
UpdateNetworkType = 0;
}

View File

@ -0,0 +1,3 @@
mod ws_mock;
pub use ws_mock::*;

View File

@ -1,23 +1,19 @@
use crate::{ use crate::services::ws::{FlowyError, FlowyWebSocket, FlowyWsSender, WsConnectState, WsMessage, WsMessageHandler};
errors::UserError,
services::user::ws_manager::{FlowyWebSocket, FlowyWsSender},
};
use bytes::Bytes; use bytes::Bytes;
use dashmap::DashMap; use dashmap::DashMap;
use flowy_collaboration::{ use flowy_collaboration::{
core::sync::{ServerDocManager, ServerDocPersistence}, core::sync::{RevisionUser, ServerDocManager, ServerDocPersistence, SyncResponse},
entities::{ entities::{
doc::Doc, doc::Doc,
ws::{WsDataType, WsDocumentData}, ws::{WsDataType, WsDocumentData},
}, },
errors::CollaborateError, errors::CollaborateError,
Revision,
RichTextDelta,
}; };
use lazy_static::lazy_static; use lazy_static::lazy_static;
use lib_infra::future::{FutureResult, FutureResultSend}; use lib_infra::future::{FutureResult, FutureResultSend};
use lib_ot::{revision::Revision, rich_text::RichTextDelta}; use lib_ws::WsModule;
use lib_ws::{WsConnectState, WsMessage, WsMessageHandler, WsModule};
use flowy_collaboration::core::sync::{RevisionUser, SyncResponse};
use std::{ use std::{
convert::{TryFrom, TryInto}, convert::{TryFrom, TryInto},
sync::Arc, sync::Arc,
@ -47,7 +43,7 @@ impl MockWebSocket {
} }
impl FlowyWebSocket for Arc<MockWebSocket> { impl FlowyWebSocket for Arc<MockWebSocket> {
fn start_connect(&self, _addr: String) -> FutureResult<(), UserError> { fn start_connect(&self, _addr: String) -> FutureResult<(), FlowyError> {
let mut ws_receiver = self.ws_sender.subscribe(); let mut ws_receiver = self.ws_sender.subscribe();
let cloned_ws = self.clone(); let cloned_ws = self.clone();
tokio::spawn(async move { tokio::spawn(async move {
@ -56,7 +52,7 @@ impl FlowyWebSocket for Arc<MockWebSocket> {
let mut rx = DOC_SERVER.handle_ws_data(ws_data).await; let mut rx = DOC_SERVER.handle_ws_data(ws_data).await;
let new_ws_message = rx.recv().await.unwrap(); let new_ws_message = rx.recv().await.unwrap();
match cloned_ws.handlers.get(&new_ws_message.module) { match cloned_ws.handlers.get(&new_ws_message.module) {
None => log::error!("Can't find any handler for message: {:?}", new_ws_message), None => tracing::error!("Can't find any handler for message: {:?}", new_ws_message),
Some(handler) => handler.receive_message(new_ws_message.clone()), Some(handler) => handler.receive_message(new_ws_message.clone()),
} }
} }
@ -67,18 +63,18 @@ impl FlowyWebSocket for Arc<MockWebSocket> {
fn conn_state_subscribe(&self) -> Receiver<WsConnectState> { self.state_sender.subscribe() } fn conn_state_subscribe(&self) -> Receiver<WsConnectState> { self.state_sender.subscribe() }
fn reconnect(&self, _count: usize) -> FutureResult<(), UserError> { FutureResult::new(async { Ok(()) }) } fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn add_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), UserError> { fn add_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), FlowyError> {
let source = handler.source(); let source = handler.source();
if self.handlers.contains_key(&source) { if self.handlers.contains_key(&source) {
log::error!("WsSource's {:?} is already registered", source); tracing::error!("WsSource's {:?} is already registered", source);
} }
self.handlers.insert(source, handler); self.handlers.insert(source, handler);
Ok(()) Ok(())
} }
fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, UserError> { Ok(Arc::new(self.ws_sender.clone())) } fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) }
} }
lazy_static! { lazy_static! {

View File

@ -0,0 +1,4 @@
pub mod ws;
#[cfg(feature = "ws_mock")]
mod mock;

View File

@ -0,0 +1,18 @@
use lib_infra::future::FutureResult;
use std::sync::Arc;
use tokio::sync::broadcast;
pub use flowy_error::FlowyError;
pub use lib_ws::{WsConnectState, WsMessage, WsMessageHandler};
pub trait FlowyWebSocket: Send + Sync {
fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError>;
fn conn_state_subscribe(&self) -> broadcast::Receiver<WsConnectState>;
fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError>;
fn add_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), FlowyError>;
fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, FlowyError>;
}
pub trait FlowyWsSender: Send + Sync {
fn send(&self, msg: WsMessage) -> Result<(), FlowyError>;
}

View File

@ -1,42 +1,52 @@
use crate::errors::UserError; use crate::{
entities::NetworkType,
use flowy_net::entities::NetworkType; services::ws::{local_web_socket, FlowyWebSocket, FlowyWsSender},
};
use flowy_error::{internal_error, FlowyError};
use lib_infra::future::FutureResult; use lib_infra::future::FutureResult;
use lib_ws::{WsConnectState, WsController, WsMessage, WsMessageHandler, WsSender}; use lib_ws::{WsConnectState, WsController, WsMessage, WsMessageHandler, WsSender};
use parking_lot::RwLock; use parking_lot::RwLock;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{broadcast, broadcast::Receiver}; use tokio::sync::{broadcast, broadcast::Receiver};
pub trait FlowyWebSocket: Send + Sync {
fn start_connect(&self, addr: String) -> FutureResult<(), UserError>;
fn conn_state_subscribe(&self) -> broadcast::Receiver<WsConnectState>;
fn reconnect(&self, count: usize) -> FutureResult<(), UserError>;
fn add_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), UserError>;
fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, UserError>;
}
pub trait FlowyWsSender: Send + Sync {
fn send(&self, msg: WsMessage) -> Result<(), UserError>;
}
pub struct WsManager { pub struct WsManager {
inner: Arc<dyn FlowyWebSocket>, inner: Arc<dyn FlowyWebSocket>,
connect_type: RwLock<NetworkType>, connect_type: RwLock<NetworkType>,
status_notifier: broadcast::Sender<NetworkType>,
addr: String,
} }
impl WsManager { impl WsManager {
pub fn new() -> Self { WsManager::default() } pub fn new(addr: String) -> Self {
let ws: Arc<dyn FlowyWebSocket> = if cfg!(feature = "http_server") {
Arc::new(Arc::new(WsController::new()))
} else {
local_web_socket()
};
pub async fn start(&self, addr: String) -> Result<(), UserError> { let (status_notifier, _) = broadcast::channel(10);
WsManager {
inner: ws,
connect_type: RwLock::new(NetworkType::default()),
status_notifier,
addr,
}
}
pub async fn start(&self, token: String) -> Result<(), FlowyError> {
let addr = format!("{}/{}", self.addr, token);
self.listen_on_websocket(); self.listen_on_websocket();
let _ = self.inner.start_connect(addr).await?; let _ = self.inner.start_connect(addr).await?;
Ok(()) Ok(())
} }
pub fn update_network_type(&self, new_type: &NetworkType) { pub fn update_network_type(&self, new_type: &NetworkType) {
tracing::debug!("Network new state: {:?}", new_type);
let old_type = self.connect_type.read().clone(); let old_type = self.connect_type.read().clone();
let _ = self.status_notifier.send(new_type.clone());
if &old_type != new_type { if &old_type != new_type {
log::debug!("Connect type switch from {:?} to {:?}", old_type, new_type); tracing::debug!("Connect type switch from {:?} to {:?}", old_type, new_type);
match (old_type.is_connect(), new_type.is_connect()) { match (old_type.is_connect(), new_type.is_connect()) {
(false, true) => { (false, true) => {
let ws_controller = self.inner.clone(); let ws_controller = self.inner.clone();
@ -69,7 +79,7 @@ impl WsManager {
} }
}, },
Err(e) => { Err(e) => {
log::error!("Websocket state notify error: {:?}", e); tracing::error!("Websocket state notify error: {:?}", e);
break; break;
}, },
} }
@ -77,76 +87,60 @@ impl WsManager {
}); });
} }
pub fn state_subscribe(&self) -> broadcast::Receiver<WsConnectState> { self.inner.conn_state_subscribe() } pub fn subscribe_websocket_state(&self) -> broadcast::Receiver<WsConnectState> { self.inner.conn_state_subscribe() }
pub fn add_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), UserError> { pub fn subscribe_network_ty(&self) -> broadcast::Receiver<NetworkType> { self.status_notifier.subscribe() }
pub fn add_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), FlowyError> {
let _ = self.inner.add_handler(handler)?; let _ = self.inner.add_handler(handler)?;
Ok(()) Ok(())
} }
pub fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, UserError> { pub fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, FlowyError> { self.inner.ws_sender() }
//
self.inner.ws_sender()
}
} }
async fn retry_connect(ws: Arc<dyn FlowyWebSocket>, count: usize) { async fn retry_connect(ws: Arc<dyn FlowyWebSocket>, count: usize) {
match ws.reconnect(count).await { match ws.reconnect(count).await {
Ok(_) => {}, Ok(_) => {},
Err(e) => { Err(e) => {
log::error!("websocket connect failed: {:?}", e); tracing::error!("websocket connect failed: {:?}", e);
}, },
} }
} }
impl std::default::Default for WsManager {
fn default() -> Self {
let ws: Arc<dyn FlowyWebSocket> = if cfg!(feature = "http_server") {
Arc::new(Arc::new(WsController::new()))
} else {
crate::services::server::local_web_socket()
};
WsManager {
inner: ws,
connect_type: RwLock::new(NetworkType::default()),
}
}
}
impl FlowyWebSocket for Arc<WsController> { impl FlowyWebSocket for Arc<WsController> {
fn start_connect(&self, addr: String) -> FutureResult<(), UserError> { fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError> {
let cloned_ws = self.clone(); let cloned_ws = self.clone();
FutureResult::new(async move { FutureResult::new(async move {
let _ = cloned_ws.start(addr).await?; let _ = cloned_ws.start(addr).await.map_err(internal_error)?;
Ok(()) Ok(())
}) })
} }
fn conn_state_subscribe(&self) -> Receiver<WsConnectState> { self.state_subscribe() } fn conn_state_subscribe(&self) -> Receiver<WsConnectState> { self.state_subscribe() }
fn reconnect(&self, count: usize) -> FutureResult<(), UserError> { fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError> {
let cloned_ws = self.clone(); let cloned_ws = self.clone();
FutureResult::new(async move { FutureResult::new(async move {
let _ = cloned_ws.retry(count).await?; let _ = cloned_ws.retry(count).await.map_err(internal_error)?;
Ok(()) Ok(())
}) })
} }
fn add_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), UserError> { fn add_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), FlowyError> {
let _ = self.add_handler(handler)?; let _ = self.add_handler(handler)?;
Ok(()) Ok(())
} }
fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, UserError> { fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, FlowyError> {
let sender = self.sender()?; let sender = self.sender().map_err(internal_error)?;
Ok(sender) Ok(sender)
} }
} }
impl FlowyWsSender for WsSender { impl FlowyWsSender for WsSender {
fn send(&self, msg: WsMessage) -> Result<(), UserError> { fn send(&self, msg: WsMessage) -> Result<(), FlowyError> {
let _ = self.send_msg(msg)?; let _ = self.send_msg(msg).map_err(internal_error)?;
Ok(()) Ok(())
} }
} }

View File

@ -0,0 +1,15 @@
pub use conn::*;
pub use manager::*;
use std::sync::Arc;
mod conn;
mod manager;
mod ws_local;
#[cfg(not(feature = "ws_mock"))]
pub(crate) fn local_web_socket() -> Arc<dyn FlowyWebSocket> { Arc::new(Arc::new(ws_local::LocalWebSocket::default())) }
#[cfg(feature = "ws_mock")]
pub(crate) fn local_web_socket() -> Arc<dyn FlowyWebSocket> {
Arc::new(Arc::new(crate::services::mock::MockWebSocket::default()))
}

View File

@ -1,9 +1,5 @@
use crate::{ use crate::services::ws::{FlowyError, FlowyWebSocket, FlowyWsSender, WsConnectState, WsMessage, WsMessageHandler};
errors::UserError,
services::user::ws_manager::{FlowyWebSocket, FlowyWsSender},
};
use lib_infra::future::FutureResult; use lib_infra::future::FutureResult;
use lib_ws::{WsConnectState, WsMessage, WsMessageHandler};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{broadcast, broadcast::Receiver}; use tokio::sync::{broadcast, broadcast::Receiver};
@ -24,19 +20,19 @@ impl std::default::Default for LocalWebSocket {
} }
impl FlowyWebSocket for Arc<LocalWebSocket> { impl FlowyWebSocket for Arc<LocalWebSocket> {
fn start_connect(&self, _addr: String) -> FutureResult<(), UserError> { FutureResult::new(async { Ok(()) }) } fn start_connect(&self, _addr: String) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn conn_state_subscribe(&self) -> Receiver<WsConnectState> { self.state_sender.subscribe() } fn conn_state_subscribe(&self) -> Receiver<WsConnectState> { self.state_sender.subscribe() }
fn reconnect(&self, _count: usize) -> FutureResult<(), UserError> { FutureResult::new(async { Ok(()) }) } fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn add_handler(&self, _handler: Arc<dyn WsMessageHandler>) -> Result<(), UserError> { Ok(()) } fn add_handler(&self, _handler: Arc<dyn WsMessageHandler>) -> Result<(), FlowyError> { Ok(()) }
fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, UserError> { Ok(Arc::new(self.ws_sender.clone())) } fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) }
} }
impl FlowyWsSender for broadcast::Sender<WsMessage> { impl FlowyWsSender for broadcast::Sender<WsMessage> {
fn send(&self, msg: WsMessage) -> Result<(), UserError> { fn send(&self, msg: WsMessage) -> Result<(), FlowyError> {
let _ = self.send(msg); let _ = self.send(msg);
Ok(()) Ok(())
} }

View File

@ -6,6 +6,7 @@ use flowy_document::{
module::DocumentUser, module::DocumentUser,
services::ws::{DocumentWebSocket, WsDocumentManager, WsStateReceiver}, services::ws::{DocumentWebSocket, WsDocumentManager, WsStateReceiver},
}; };
use flowy_net::services::ws::WsManager;
use flowy_user::{ use flowy_user::{
errors::{ErrorCode, UserError}, errors::{ErrorCode, UserError},
services::user::UserSession, services::user::UserSession,
@ -13,27 +14,23 @@ use flowy_user::{
use lib_ws::{WsMessage, WsMessageHandler, WsModule}; use lib_ws::{WsMessage, WsMessageHandler, WsModule};
use std::{convert::TryInto, path::Path, sync::Arc}; use std::{convert::TryInto, path::Path, sync::Arc};
pub struct DocumentDepsResolver { pub struct DocumentDepsResolver();
user_session: Arc<UserSession>,
}
impl DocumentDepsResolver { impl DocumentDepsResolver {
pub fn new(user_session: Arc<UserSession>) -> Self { Self { user_session } } pub fn resolve(
ws_manager: Arc<WsManager>,
pub fn split_into(self) -> (Arc<dyn DocumentUser>, Arc<WsDocumentManager>) { user_session: Arc<UserSession>,
) -> (Arc<dyn DocumentUser>, Arc<WsDocumentManager>) {
let user = Arc::new(DocumentUserImpl { let user = Arc::new(DocumentUserImpl {
user: self.user_session.clone(), user: user_session.clone(),
}); });
let sender = Arc::new(WsSenderImpl { let sender = Arc::new(WsSenderImpl {
user: self.user_session.clone(), ws_manager: ws_manager.clone(),
}); });
let ws_manager = Arc::new(WsDocumentManager::new(sender)); let ws_doc = Arc::new(WsDocumentManager::new(sender));
let ws_handler = Arc::new(DocumentWsMessageReceiver { let ws_handler = Arc::new(DocumentWsMessageReceiver { inner: ws_doc.clone() });
inner: ws_manager.clone(), ws_manager.add_handler(ws_handler);
}); (user, ws_doc)
self.user_session.add_ws_handler(ws_handler);
(user, ws_manager)
} }
} }
@ -69,7 +66,7 @@ impl DocumentUser for DocumentUserImpl {
} }
struct WsSenderImpl { struct WsSenderImpl {
user: Arc<UserSession>, ws_manager: Arc<WsManager>,
} }
impl DocumentWebSocket for WsSenderImpl { impl DocumentWebSocket for WsSenderImpl {
@ -79,13 +76,13 @@ impl DocumentWebSocket for WsSenderImpl {
module: WsModule::Doc, module: WsModule::Doc,
data: bytes.to_vec(), data: bytes.to_vec(),
}; };
let sender = self.user.ws_sender().map_err(internal_error)?; let sender = self.ws_manager.ws_sender().map_err(internal_error)?;
sender.send(msg).map_err(internal_error)?; sender.send(msg).map_err(internal_error)?;
Ok(()) Ok(())
} }
fn state_notify(&self) -> WsStateReceiver { self.user.ws_state_notifier() } fn subscribe_state_changed(&self) -> WsStateReceiver { self.ws_manager.subscribe_websocket_state() }
} }
struct DocumentWsMessageReceiver { struct DocumentWsMessageReceiver {

View File

@ -1,11 +1,11 @@
mod deps_resolve; mod deps_resolve;
// mod flowy_server; // mod flowy_server;
pub mod module; pub mod module;
use crate::deps_resolve::WorkspaceDepsResolver; use crate::deps_resolve::{DocumentDepsResolver, WorkspaceDepsResolver};
use backend_service::configuration::ClientServerConfiguration; use backend_service::configuration::ClientServerConfiguration;
use flowy_core::{errors::WorkspaceError, module::init_core, prelude::CoreContext}; use flowy_core::{errors::WorkspaceError, module::init_core, prelude::CoreContext};
use flowy_document::module::FlowyDocument; use flowy_document::module::FlowyDocument;
use flowy_net::entities::NetworkType; use flowy_net::{entities::NetworkType, services::ws::WsManager};
use flowy_user::{ use flowy_user::{
prelude::UserStatus, prelude::UserStatus,
services::user::{UserSession, UserSessionConfig}, services::user::{UserSession, UserSessionConfig},
@ -53,6 +53,7 @@ fn crate_log_filter(level: Option<String>) -> String {
filters.push(format!("flowy_user={}", level)); filters.push(format!("flowy_user={}", level));
filters.push(format!("flowy_document={}", level)); filters.push(format!("flowy_document={}", level));
filters.push(format!("flowy_document_infra={}", level)); filters.push(format!("flowy_document_infra={}", level));
filters.push(format!("flowy_net={}", level));
filters.push(format!("dart_notify={}", level)); filters.push(format!("dart_notify={}", level));
filters.push(format!("lib_ot={}", level)); filters.push(format!("lib_ot={}", level));
filters.push(format!("lib_ws={}", level)); filters.push(format!("lib_ws={}", level));
@ -68,6 +69,7 @@ pub struct FlowySDK {
pub flowy_document: Arc<FlowyDocument>, pub flowy_document: Arc<FlowyDocument>,
pub core: Arc<CoreContext>, pub core: Arc<CoreContext>,
pub dispatcher: Arc<EventDispatcher>, pub dispatcher: Arc<EventDispatcher>,
pub ws_manager: Arc<WsManager>,
} }
impl FlowySDK { impl FlowySDK {
@ -76,49 +78,59 @@ impl FlowySDK {
init_kv(&config.root); init_kv(&config.root);
tracing::debug!("🔥 {:?}", config); tracing::debug!("🔥 {:?}", config);
let session_cache_key = format!("{}_session_cache", &config.name); let ws_manager = Arc::new(WsManager::new(config.server_config.ws_addr()));
let user_session = mk_user_session(&config);
let flowy_document = mk_document(ws_manager.clone(), user_session.clone(), &config.server_config);
let core_ctx = mk_core_context(user_session.clone(), flowy_document.clone(), &config.server_config);
let user_config = UserSessionConfig::new(&config.root, &config.server_config, &session_cache_key); //
let user_session = Arc::new(UserSession::new(user_config)); let modules = mk_modules(ws_manager.clone(), core_ctx.clone(), user_session.clone());
let flowy_document = mk_document_module(user_session.clone(), &config.server_config);
let core = mk_core(user_session.clone(), flowy_document.clone(), &config.server_config);
let modules = mk_modules(core.clone(), user_session.clone());
let dispatcher = Arc::new(EventDispatcher::construct(|| modules)); let dispatcher = Arc::new(EventDispatcher::construct(|| modules));
_init(&dispatcher, user_session.clone(), core.clone()); _init(&dispatcher, ws_manager.clone(), user_session.clone(), core_ctx.clone());
Self { Self {
config, config,
user_session, user_session,
flowy_document, flowy_document,
core, core: core_ctx,
dispatcher, dispatcher,
ws_manager,
} }
} }
pub fn dispatcher(&self) -> Arc<EventDispatcher> { self.dispatcher.clone() } pub fn dispatcher(&self) -> Arc<EventDispatcher> { self.dispatcher.clone() }
} }
fn _init(dispatch: &EventDispatcher, user_session: Arc<UserSession>, core: Arc<CoreContext>) { fn _init(
let user_status_subscribe = user_session.notifier.user_status_subscribe(); dispatch: &EventDispatcher,
let network_status_subscribe = user_session.notifier.network_type_subscribe(); ws_manager: Arc<WsManager>,
user_session: Arc<UserSession>,
core: Arc<CoreContext>,
) {
let subscribe_user_status = user_session.notifier.subscribe_user_status();
let subscribe_network_type = ws_manager.subscribe_network_ty();
let cloned_core = core.clone(); let cloned_core = core.clone();
dispatch.spawn(async move { dispatch.spawn(async move {
user_session.init(); user_session.init();
_listen_user_status(user_status_subscribe, core.clone()).await; _listen_user_status(ws_manager, subscribe_user_status, core.clone()).await;
}); });
dispatch.spawn(async move { dispatch.spawn(async move {
_listen_network_status(network_status_subscribe, cloned_core).await; _listen_network_status(subscribe_network_type, cloned_core).await;
}); });
} }
async fn _listen_user_status(mut subscribe: broadcast::Receiver<UserStatus>, core: Arc<CoreContext>) { async fn _listen_user_status(
ws_manager: Arc<WsManager>,
mut subscribe: broadcast::Receiver<UserStatus>,
core: Arc<CoreContext>,
) {
while let Ok(status) = subscribe.recv().await { while let Ok(status) = subscribe.recv().await {
let result = || async { let result = || async {
match status { match status {
UserStatus::Login { token } => { UserStatus::Login { token } => {
let _ = core.user_did_sign_in(&token).await?; let _ = core.user_did_sign_in(&token).await?;
let _ = ws_manager.start(token).await.unwrap();
}, },
UserStatus::Logout { .. } => { UserStatus::Logout { .. } => {
core.user_did_logout().await; core.user_did_logout().await;
@ -164,7 +176,13 @@ fn init_log(config: &FlowySDKConfig) {
} }
} }
fn mk_core( fn mk_user_session(config: &FlowySDKConfig) -> Arc<UserSession> {
let session_cache_key = format!("{}_session_cache", &config.name);
let user_config = UserSessionConfig::new(&config.root, &config.server_config, &session_cache_key);
Arc::new(UserSession::new(user_config))
}
fn mk_core_context(
user_session: Arc<UserSession>, user_session: Arc<UserSession>,
flowy_document: Arc<FlowyDocument>, flowy_document: Arc<FlowyDocument>,
server_config: &ClientServerConfiguration, server_config: &ClientServerConfiguration,
@ -173,3 +191,12 @@ fn mk_core(
let (user, database) = workspace_deps.split_into(); let (user, database) = workspace_deps.split_into();
init_core(user, database, flowy_document, server_config) init_core(user, database, flowy_document, server_config)
} }
pub fn mk_document(
ws_manager: Arc<WsManager>,
user_session: Arc<UserSession>,
server_config: &ClientServerConfiguration,
) -> Arc<FlowyDocument> {
let (user, ws_doc) = DocumentDepsResolver::resolve(ws_manager, user_session);
Arc::new(FlowyDocument::new(user, ws_doc, server_config))
}

View File

@ -2,24 +2,20 @@ use crate::deps_resolve::DocumentDepsResolver;
use backend_service::configuration::ClientServerConfiguration; use backend_service::configuration::ClientServerConfiguration;
use flowy_core::prelude::CoreContext; use flowy_core::prelude::CoreContext;
use flowy_document::module::FlowyDocument; use flowy_document::module::FlowyDocument;
use flowy_net::services::ws::WsManager;
use flowy_user::services::user::UserSession; use flowy_user::services::user::UserSession;
use lib_dispatch::prelude::Module; use lib_dispatch::prelude::Module;
use std::sync::Arc; use std::sync::Arc;
pub fn mk_modules(core: Arc<CoreContext>, user_session: Arc<UserSession>) -> Vec<Module> { pub fn mk_modules(ws_manager: Arc<WsManager>, core: Arc<CoreContext>, user_session: Arc<UserSession>) -> Vec<Module> {
let user_module = mk_user_module(user_session); let user_module = mk_user_module(user_session);
let workspace_module = mk_core_module(core); let core_module = mk_core_module(core);
vec![user_module, workspace_module] let network_module = mk_network_module(ws_manager);
vec![user_module, core_module, network_module]
} }
fn mk_user_module(user_session: Arc<UserSession>) -> Module { flowy_user::module::create(user_session) } fn mk_user_module(user_session: Arc<UserSession>) -> Module { flowy_user::module::create(user_session) }
fn mk_core_module(core: Arc<CoreContext>) -> Module { flowy_core::module::create(core) } fn mk_core_module(core: Arc<CoreContext>) -> Module { flowy_core::module::create(core) }
pub fn mk_document_module( fn mk_network_module(ws_manager: Arc<WsManager>) -> Module { flowy_net::module::create(ws_manager) }
user_session: Arc<UserSession>,
server_config: &ClientServerConfiguration,
) -> Arc<FlowyDocument> {
let document_deps = DocumentDepsResolver::new(user_session);
let (user, ws_manager) = document_deps.split_into();
Arc::new(FlowyDocument::new(user, ws_manager, server_config))
}

View File

@ -9,13 +9,10 @@ edition = "2018"
flowy-user-infra = { path = "../../../shared-lib/flowy-user-infra" } flowy-user-infra = { path = "../../../shared-lib/flowy-user-infra" }
backend-service = { path = "../../../shared-lib/backend-service" } backend-service = { path = "../../../shared-lib/backend-service" }
flowy-derive = { path = "../../../shared-lib/flowy-derive" } flowy-derive = { path = "../../../shared-lib/flowy-derive" }
lib-ws = { path = "../../../shared-lib/lib-ws" }
lib-sqlite = { path = "../../../shared-lib/lib-sqlite" } lib-sqlite = { path = "../../../shared-lib/lib-sqlite" }
lib-infra = { path = "../../../shared-lib/lib-infra" } lib-infra = { path = "../../../shared-lib/lib-infra" }
flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration", optional = true} flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration", optional = true}
lib-ot = { path = "../../../shared-lib/lib-ot", optional = true }
derive_more = {version = "0.99", features = ["display"]} derive_more = {version = "0.99", features = ["display"]}
flowy-database = { path = "../flowy-database" } flowy-database = { path = "../flowy-database" }
@ -51,5 +48,4 @@ futures = "0.3.15"
serial_test = "0.5.1" serial_test = "0.5.1"
[features] [features]
http_server = [] http_server = []
ws_mock = ["flowy-collaboration", "lib-ot"]

View File

@ -1,5 +1,4 @@
use bytes::Bytes; use bytes::Bytes;
use flowy_derive::ProtoBuf; use flowy_derive::ProtoBuf;
pub use flowy_user_infra::errors::ErrorCode; pub use flowy_user_infra::errors::ErrorCode;
use lib_dispatch::prelude::{EventResponse, ResponseBuilder}; use lib_dispatch::prelude::{EventResponse, ResponseBuilder};
@ -77,15 +76,6 @@ impl std::convert::From<::r2d2::Error> for UserError {
fn from(error: r2d2::Error) -> Self { UserError::internal().context(error) } fn from(error: r2d2::Error) -> Self { UserError::internal().context(error) }
} }
impl std::convert::From<lib_ws::errors::WsError> for UserError {
fn from(error: lib_ws::errors::WsError) -> Self {
match error.code {
lib_ws::errors::ErrorCode::InternalError => UserError::internal().context(error.msg),
_ => UserError::internal().context(error),
}
}
}
// use diesel::result::{Error, DatabaseErrorKind}; // use diesel::result::{Error, DatabaseErrorKind};
// use lib_sqlite::ErrorKind; // use lib_sqlite::ErrorKind;
impl std::convert::From<lib_sqlite::Error> for UserError { impl std::convert::From<lib_sqlite::Error> for UserError {

View File

@ -5,26 +5,23 @@ use strum_macros::Display;
#[event_err = "UserError"] #[event_err = "UserError"]
pub enum UserEvent { pub enum UserEvent {
#[event()] #[event()]
InitUser = 0, InitUser = 0,
#[event(input = "SignInRequest", output = "UserProfile")] #[event(input = "SignInRequest", output = "UserProfile")]
SignIn = 1, SignIn = 1,
#[event(input = "SignUpRequest", output = "UserProfile")] #[event(input = "SignUpRequest", output = "UserProfile")]
SignUp = 2, SignUp = 2,
#[event(passthrough)] #[event(passthrough)]
SignOut = 3, SignOut = 3,
#[event(input = "UpdateUserRequest")] #[event(input = "UpdateUserRequest")]
UpdateUser = 4, UpdateUser = 4,
#[event(output = "UserProfile")] #[event(output = "UserProfile")]
GetUserProfile = 5, GetUserProfile = 5,
#[event(output = "UserProfile")] #[event(output = "UserProfile")]
CheckUser = 6, CheckUser = 6,
#[event(input = "NetworkState")]
UpdateNetworkType = 10,
} }

View File

@ -36,10 +36,3 @@ pub async fn update_user_handler(
session.update_user(params).await?; session.update_user(params).await?;
Ok(()) Ok(())
} }
#[tracing::instrument(skip(data, session))]
pub async fn update_network_ty(data: Data<NetworkState>, session: Unit<Arc<UserSession>>) -> Result<(), UserError> {
let network_state = data.into_inner();
session.set_network_state(network_state);
Ok(())
}

View File

@ -14,5 +14,4 @@ pub fn create(user_session: Arc<UserSession>) -> Module {
.event(UserEvent::SignOut, sign_out) .event(UserEvent::SignOut, sign_out)
.event(UserEvent::UpdateUser, update_user_handler) .event(UserEvent::UpdateUser, update_user_handler)
.event(UserEvent::CheckUser, check_user_handler) .event(UserEvent::CheckUser, check_user_handler)
.event(UserEvent::UpdateNetworkType, update_network_ty)
} }

View File

@ -32,7 +32,6 @@ pub enum UserEvent {
UpdateUser = 4, UpdateUser = 4,
GetUserProfile = 5, GetUserProfile = 5,
CheckUser = 6, CheckUser = 6,
UpdateNetworkType = 10,
} }
impl ::protobuf::ProtobufEnum for UserEvent { impl ::protobuf::ProtobufEnum for UserEvent {
@ -49,7 +48,6 @@ impl ::protobuf::ProtobufEnum for UserEvent {
4 => ::std::option::Option::Some(UserEvent::UpdateUser), 4 => ::std::option::Option::Some(UserEvent::UpdateUser),
5 => ::std::option::Option::Some(UserEvent::GetUserProfile), 5 => ::std::option::Option::Some(UserEvent::GetUserProfile),
6 => ::std::option::Option::Some(UserEvent::CheckUser), 6 => ::std::option::Option::Some(UserEvent::CheckUser),
10 => ::std::option::Option::Some(UserEvent::UpdateNetworkType),
_ => ::std::option::Option::None _ => ::std::option::Option::None
} }
} }
@ -63,7 +61,6 @@ impl ::protobuf::ProtobufEnum for UserEvent {
UserEvent::UpdateUser, UserEvent::UpdateUser,
UserEvent::GetUserProfile, UserEvent::GetUserProfile,
UserEvent::CheckUser, UserEvent::CheckUser,
UserEvent::UpdateNetworkType,
]; ];
values values
} }
@ -92,29 +89,26 @@ impl ::protobuf::reflect::ProtobufValue for UserEvent {
} }
static file_descriptor_proto_data: &'static [u8] = b"\ static file_descriptor_proto_data: &'static [u8] = b"\
\n\x0bevent.proto*\x88\x01\n\tUserEvent\x12\x0c\n\x08InitUser\x10\0\x12\ \n\x0bevent.proto*q\n\tUserEvent\x12\x0c\n\x08InitUser\x10\0\x12\n\n\x06\
\n\n\x06SignIn\x10\x01\x12\n\n\x06SignUp\x10\x02\x12\x0b\n\x07SignOut\ SignIn\x10\x01\x12\n\n\x06SignUp\x10\x02\x12\x0b\n\x07SignOut\x10\x03\
\x10\x03\x12\x0e\n\nUpdateUser\x10\x04\x12\x12\n\x0eGetUserProfile\x10\ \x12\x0e\n\nUpdateUser\x10\x04\x12\x12\n\x0eGetUserProfile\x10\x05\x12\r\
\x05\x12\r\n\tCheckUser\x10\x06\x12\x15\n\x11UpdateNetworkType\x10\nJ\ \n\tCheckUser\x10\x06J\xc9\x02\n\x06\x12\x04\0\0\n\x01\n\x08\n\x01\x0c\
\xf2\x02\n\x06\x12\x04\0\0\x0b\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\ \x12\x03\0\0\x12\n\n\n\x02\x05\0\x12\x04\x02\0\n\x01\n\n\n\x03\x05\0\x01\
\x02\x05\0\x12\x04\x02\0\x0b\x01\n\n\n\x03\x05\0\x01\x12\x03\x02\x05\x0e\ \x12\x03\x02\x05\x0e\n\x0b\n\x04\x05\0\x02\0\x12\x03\x03\x04\x11\n\x0c\n\
\n\x0b\n\x04\x05\0\x02\0\x12\x03\x03\x04\x11\n\x0c\n\x05\x05\0\x02\0\x01\ \x05\x05\0\x02\0\x01\x12\x03\x03\x04\x0c\n\x0c\n\x05\x05\0\x02\0\x02\x12\
\x12\x03\x03\x04\x0c\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x03\x0f\x10\n\ \x03\x03\x0f\x10\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x04\x04\x0f\n\x0c\n\
\x0b\n\x04\x05\0\x02\x01\x12\x03\x04\x04\x0f\n\x0c\n\x05\x05\0\x02\x01\ \x05\x05\0\x02\x01\x01\x12\x03\x04\x04\n\n\x0c\n\x05\x05\0\x02\x01\x02\
\x01\x12\x03\x04\x04\n\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x04\r\x0e\n\ \x12\x03\x04\r\x0e\n\x0b\n\x04\x05\0\x02\x02\x12\x03\x05\x04\x0f\n\x0c\n\
\x0b\n\x04\x05\0\x02\x02\x12\x03\x05\x04\x0f\n\x0c\n\x05\x05\0\x02\x02\ \x05\x05\0\x02\x02\x01\x12\x03\x05\x04\n\n\x0c\n\x05\x05\0\x02\x02\x02\
\x01\x12\x03\x05\x04\n\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\x05\r\x0e\n\ \x12\x03\x05\r\x0e\n\x0b\n\x04\x05\0\x02\x03\x12\x03\x06\x04\x10\n\x0c\n\
\x0b\n\x04\x05\0\x02\x03\x12\x03\x06\x04\x10\n\x0c\n\x05\x05\0\x02\x03\ \x05\x05\0\x02\x03\x01\x12\x03\x06\x04\x0b\n\x0c\n\x05\x05\0\x02\x03\x02\
\x01\x12\x03\x06\x04\x0b\n\x0c\n\x05\x05\0\x02\x03\x02\x12\x03\x06\x0e\ \x12\x03\x06\x0e\x0f\n\x0b\n\x04\x05\0\x02\x04\x12\x03\x07\x04\x13\n\x0c\
\x0f\n\x0b\n\x04\x05\0\x02\x04\x12\x03\x07\x04\x13\n\x0c\n\x05\x05\0\x02\ \n\x05\x05\0\x02\x04\x01\x12\x03\x07\x04\x0e\n\x0c\n\x05\x05\0\x02\x04\
\x04\x01\x12\x03\x07\x04\x0e\n\x0c\n\x05\x05\0\x02\x04\x02\x12\x03\x07\ \x02\x12\x03\x07\x11\x12\n\x0b\n\x04\x05\0\x02\x05\x12\x03\x08\x04\x17\n\
\x11\x12\n\x0b\n\x04\x05\0\x02\x05\x12\x03\x08\x04\x17\n\x0c\n\x05\x05\0\ \x0c\n\x05\x05\0\x02\x05\x01\x12\x03\x08\x04\x12\n\x0c\n\x05\x05\0\x02\
\x02\x05\x01\x12\x03\x08\x04\x12\n\x0c\n\x05\x05\0\x02\x05\x02\x12\x03\ \x05\x02\x12\x03\x08\x15\x16\n\x0b\n\x04\x05\0\x02\x06\x12\x03\t\x04\x12\
\x08\x15\x16\n\x0b\n\x04\x05\0\x02\x06\x12\x03\t\x04\x12\n\x0c\n\x05\x05\ \n\x0c\n\x05\x05\0\x02\x06\x01\x12\x03\t\x04\r\n\x0c\n\x05\x05\0\x02\x06\
\0\x02\x06\x01\x12\x03\t\x04\r\n\x0c\n\x05\x05\0\x02\x06\x02\x12\x03\t\ \x02\x12\x03\t\x10\x11b\x06proto3\
\x10\x11\n\x0b\n\x04\x05\0\x02\x07\x12\x03\n\x04\x1b\n\x0c\n\x05\x05\0\
\x02\x07\x01\x12\x03\n\x04\x15\n\x0c\n\x05\x05\0\x02\x07\x02\x12\x03\n\
\x18\x1ab\x06proto3\
"; ";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

View File

@ -8,5 +8,4 @@ enum UserEvent {
UpdateUser = 4; UpdateUser = 4;
GetUserProfile = 5; GetUserProfile = 5;
CheckUser = 6; CheckUser = 6;
UpdateNetworkType = 10;
} }

View File

@ -1,6 +1,5 @@
mod server_api; mod server_api;
mod server_api_mock; mod server_api_mock;
mod ws_local;
pub use server_api::*; pub use server_api::*;
pub use server_api_mock::*; pub use server_api_mock::*;
@ -10,7 +9,6 @@ pub(crate) type Server = Arc<dyn UserServerAPI + Send + Sync>;
use crate::{ use crate::{
entities::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserParams, UserProfile}, entities::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserParams, UserProfile},
errors::UserError, errors::UserError,
services::user::ws_manager::FlowyWebSocket,
}; };
use backend_service::configuration::ClientServerConfiguration; use backend_service::configuration::ClientServerConfiguration;
use lib_infra::future::FutureResult; use lib_infra::future::FutureResult;
@ -31,12 +29,3 @@ pub(crate) fn construct_user_server(config: &ClientServerConfiguration) -> Arc<d
Arc::new(UserServerMock {}) Arc::new(UserServerMock {})
} }
} }
#[cfg(feature = "ws_mock")]
mod ws_mock;
#[cfg(not(feature = "ws_mock"))]
pub(crate) fn local_web_socket() -> Arc<dyn FlowyWebSocket> { Arc::new(Arc::new(ws_local::LocalWebSocket::default())) }
#[cfg(feature = "ws_mock")]
pub(crate) fn local_web_socket() -> Arc<dyn FlowyWebSocket> { Arc::new(Arc::new(ws_mock::MockWebSocket::default())) }

View File

@ -3,4 +3,3 @@ pub use user_session::*;
pub mod database; pub mod database;
mod notifier; mod notifier;
mod user_session; mod user_session;
pub mod ws_manager;

View File

@ -4,17 +4,12 @@ use tokio::sync::{broadcast, mpsc};
pub struct UserNotifier { pub struct UserNotifier {
user_status_notifier: broadcast::Sender<UserStatus>, user_status_notifier: broadcast::Sender<UserStatus>,
network_status_notifier: broadcast::Sender<NetworkType>,
} }
impl std::default::Default for UserNotifier { impl std::default::Default for UserNotifier {
fn default() -> Self { fn default() -> Self {
let (user_status_notifier, _) = broadcast::channel(10); let (user_status_notifier, _) = broadcast::channel(10);
let (network_status_notifier, _) = broadcast::channel(10); UserNotifier { user_status_notifier }
UserNotifier {
user_status_notifier,
network_status_notifier,
}
} }
} }
@ -40,11 +35,5 @@ impl UserNotifier {
}); });
} }
pub fn update_network_type(&self, ty: &NetworkType) { let _ = self.network_status_notifier.send(ty.clone()); } pub fn subscribe_user_status(&self) -> broadcast::Receiver<UserStatus> { self.user_status_notifier.subscribe() }
pub fn user_status_subscribe(&self) -> broadcast::Receiver<UserStatus> { self.user_status_notifier.subscribe() }
pub fn network_type_subscribe(&self) -> broadcast::Receiver<NetworkType> {
self.network_status_notifier.subscribe()
}
} }

View File

@ -15,7 +15,6 @@ use flowy_database::{
}; };
use flowy_user_infra::entities::{SignInResponse, SignUpResponse}; use flowy_user_infra::entities::{SignInResponse, SignUpResponse};
use lib_sqlite::ConnectionPool; use lib_sqlite::ConnectionPool;
use lib_ws::{WsConnectState, WsMessageHandler};
use crate::{ use crate::{
entities::{SignInParams, SignUpParams, UpdateUserParams, UserProfile}, entities::{SignInParams, SignUpParams, UpdateUserParams, UserProfile},
@ -23,11 +22,7 @@ use crate::{
notify::*, notify::*,
services::{ services::{
server::{construct_user_server, Server}, server::{construct_user_server, Server},
user::{ user::{database::UserDB, notifier::UserNotifier},
database::UserDB,
notifier::UserNotifier,
ws_manager::{FlowyWsSender, WsManager},
},
}, },
sql_tables::{UserTable, UserTableChangeset}, sql_tables::{UserTable, UserTableChangeset},
}; };
@ -55,7 +50,6 @@ pub struct UserSession {
#[allow(dead_code)] #[allow(dead_code)]
server: Server, server: Server,
session: RwLock<Option<Session>>, session: RwLock<Option<Session>>,
ws_manager: Arc<WsManager>,
pub notifier: UserNotifier, pub notifier: UserNotifier,
} }
@ -63,14 +57,12 @@ impl UserSession {
pub fn new(config: UserSessionConfig) -> Self { pub fn new(config: UserSessionConfig) -> Self {
let db = UserDB::new(&config.root_dir); let db = UserDB::new(&config.root_dir);
let server = construct_user_server(&config.server_config); let server = construct_user_server(&config.server_config);
let ws_manager = Arc::new(WsManager::new());
let notifier = UserNotifier::new(); let notifier = UserNotifier::new();
Self { Self {
database: db, database: db,
config, config,
server, server,
session: RwLock::new(None), session: RwLock::new(None),
ws_manager,
notifier, notifier,
} }
} }
@ -153,12 +145,7 @@ impl UserSession {
Ok(()) Ok(())
} }
pub async fn init_user(&self) -> Result<(), UserError> { pub async fn init_user(&self) -> Result<(), UserError> { Ok(()) }
let (_, token) = self.get_session()?.into_part();
let _ = self.start_ws_connection(&token).await?;
Ok(())
}
pub async fn check_user(&self) -> Result<UserProfile, UserError> { pub async fn check_user(&self) -> Result<UserProfile, UserError> {
let (user_id, token) = self.get_session()?.into_part(); let (user_id, token) = self.get_session()?.into_part();
@ -191,21 +178,6 @@ impl UserSession {
pub fn user_name(&self) -> Result<String, UserError> { Ok(self.get_session()?.name) } pub fn user_name(&self) -> Result<String, UserError> { Ok(self.get_session()?.name) }
pub fn token(&self) -> Result<String, UserError> { Ok(self.get_session()?.token) } pub fn token(&self) -> Result<String, UserError> { Ok(self.get_session()?.token) }
pub fn add_ws_handler(&self, handler: Arc<dyn WsMessageHandler>) { let _ = self.ws_manager.add_handler(handler); }
pub fn set_network_state(&self, new_state: NetworkState) {
log::debug!("Network new state: {:?}", new_state);
self.ws_manager.update_network_type(&new_state.ty);
self.notifier.update_network_type(&new_state.ty);
}
pub fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, UserError> {
let sender = self.ws_manager.ws_sender()?;
Ok(sender)
}
pub fn ws_state_notifier(&self) -> broadcast::Receiver<WsConnectState> { self.ws_manager.state_subscribe() }
} }
impl UserSession { impl UserSession {
@ -302,13 +274,6 @@ impl UserSession {
Err(_) => false, Err(_) => false,
} }
} }
#[tracing::instrument(level = "debug", skip(self, token))]
pub async fn start_ws_connection(&self, token: &str) -> Result<(), UserError> {
let addr = format!("{}/{}", self.server.ws_addr(), token);
let _ = self.ws_manager.start(addr).await?;
Ok(())
}
} }
pub async fn update_user( pub async fn update_user(

View File

@ -3,3 +3,5 @@ pub mod entities;
pub mod errors; pub mod errors;
pub mod protobuf; pub mod protobuf;
pub mod util; pub mod util;
pub use lib_ot::{revision::Revision, rich_text::RichTextDelta};

View File

@ -20,6 +20,7 @@ pub fn category_from_str(type_str: &str) -> TypeCategory {
| "DocError" | "DocError"
| "FFIRequest" | "FFIRequest"
| "FFIResponse" | "FFIResponse"
| "FlowyError"
| "SubscribeObject" | "SubscribeObject"
| "NetworkState" | "NetworkState"
| "UserError" | "UserError"
@ -84,6 +85,7 @@ pub fn category_from_str(type_str: &str) -> TypeCategory {
| "ErrorCode" | "ErrorCode"
| "DocObservable" | "DocObservable"
| "FFIStatusCode" | "FFIStatusCode"
| "NetworkEvent"
| "NetworkType" | "NetworkType"
| "UserEvent" | "UserEvent"
| "UserNotification" | "UserNotification"
@ -93,6 +95,7 @@ pub fn category_from_str(type_str: &str) -> TypeCategory {
| "WsDataType" | "WsDataType"
| "WsModule" | "WsModule"
| "RevType" | "RevType"
| "RevState"
=> TypeCategory::Enum, => TypeCategory::Enum,
"Option" => TypeCategory::Opt, "Option" => TypeCategory::Opt,

View File

@ -1,8 +1,5 @@
#![cfg_attr(rustfmt, rustfmt::skip)] #![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit // Auto-generated, do not edit
mod cache;
pub use cache::*;
mod model; mod model;
pub use model::*; pub use model::*;

View File

@ -840,6 +840,56 @@ impl ::protobuf::reflect::ProtobufValue for RevType {
} }
} }
#[derive(Clone,PartialEq,Eq,Debug,Hash)]
pub enum RevState {
StateLocal = 0,
Acked = 1,
}
impl ::protobuf::ProtobufEnum for RevState {
fn value(&self) -> i32 {
*self as i32
}
fn from_i32(value: i32) -> ::std::option::Option<RevState> {
match value {
0 => ::std::option::Option::Some(RevState::StateLocal),
1 => ::std::option::Option::Some(RevState::Acked),
_ => ::std::option::Option::None
}
}
fn values() -> &'static [Self] {
static values: &'static [RevState] = &[
RevState::StateLocal,
RevState::Acked,
];
values
}
fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor {
static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::EnumDescriptor> = ::protobuf::rt::LazyV2::INIT;
descriptor.get(|| {
::protobuf::reflect::EnumDescriptor::new_pb_name::<RevState>("RevState", file_descriptor_proto())
})
}
}
impl ::std::marker::Copy for RevState {
}
impl ::std::default::Default for RevState {
fn default() -> Self {
RevState::StateLocal
}
}
impl ::protobuf::reflect::ProtobufValue for RevState {
fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
::protobuf::reflect::ReflectValueRef::Enum(::protobuf::ProtobufEnum::descriptor(self))
}
}
static file_descriptor_proto_data: &'static [u8] = b"\ static file_descriptor_proto_data: &'static [u8] = b"\
\n\x0bmodel.proto\"\xbc\x01\n\x08Revision\x12\x1e\n\x0bbase_rev_id\x18\ \n\x0bmodel.proto\"\xbc\x01\n\x08Revision\x12\x1e\n\x0bbase_rev_id\x18\
\x01\x20\x01(\x03R\tbaseRevId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\x03R\ \x01\x20\x01(\x03R\tbaseRevId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\x03R\
@ -850,8 +900,9 @@ static file_descriptor_proto_data: &'static [u8] = b"\
\x14\n\x05value\x18\x01\x20\x01(\x03R\x05value\"N\n\rRevisionRange\x12\ \x14\n\x05value\x18\x01\x20\x01(\x03R\x05value\"N\n\rRevisionRange\x12\
\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\x14\n\x05start\x18\x02\ \x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\x14\n\x05start\x18\x02\
\x20\x01(\x03R\x05start\x12\x10\n\x03end\x18\x03\x20\x01(\x03R\x03end*\ \x20\x01(\x03R\x05start\x12\x10\n\x03end\x18\x03\x20\x01(\x03R\x03end*\
\x20\n\x07RevType\x12\t\n\x05Local\x10\0\x12\n\n\x06Remote\x10\x01J\xa1\ \x20\n\x07RevType\x12\t\n\x05Local\x10\0\x12\n\n\x06Remote\x10\x01*%\n\
\x06\n\x06\x12\x04\0\0\x16\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\ \x08RevState\x12\x0e\n\nStateLocal\x10\0\x12\t\n\x05Acked\x10\x01J\x8b\
\x07\n\x06\x12\x04\0\0\x1a\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\
\x04\0\x12\x04\x02\0\n\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x10\n\x0b\ \x04\0\x12\x04\x02\0\n\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x10\n\x0b\
\n\x04\x04\0\x02\0\x12\x03\x03\x04\x1a\n\x0c\n\x05\x04\0\x02\0\x05\x12\ \n\x04\x04\0\x02\0\x12\x03\x03\x04\x1a\n\x0c\n\x05\x04\0\x02\0\x05\x12\
\x03\x03\x04\t\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\n\x15\n\x0c\n\x05\ \x03\x03\x04\t\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\n\x15\n\x0c\n\x05\
@ -890,7 +941,12 @@ static file_descriptor_proto_data: &'static [u8] = b"\
\0\x02\0\x01\x12\x03\x14\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x14\ \0\x02\0\x01\x12\x03\x14\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x14\
\x0c\r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x15\x04\x0f\n\x0c\n\x05\x05\0\ \x0c\r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x15\x04\x0f\n\x0c\n\x05\x05\0\
\x02\x01\x01\x12\x03\x15\x04\n\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x15\ \x02\x01\x01\x12\x03\x15\x04\n\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x15\
\r\x0eb\x06proto3\ \r\x0e\n\n\n\x02\x05\x01\x12\x04\x17\0\x1a\x01\n\n\n\x03\x05\x01\x01\x12\
\x03\x17\x05\r\n\x0b\n\x04\x05\x01\x02\0\x12\x03\x18\x04\x13\n\x0c\n\x05\
\x05\x01\x02\0\x01\x12\x03\x18\x04\x0e\n\x0c\n\x05\x05\x01\x02\0\x02\x12\
\x03\x18\x11\x12\n\x0b\n\x04\x05\x01\x02\x01\x12\x03\x19\x04\x0e\n\x0c\n\
\x05\x05\x01\x02\x01\x01\x12\x03\x19\x04\t\n\x0c\n\x05\x05\x01\x02\x01\
\x02\x12\x03\x19\x0c\rb\x06proto3\
"; ";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

View File

@ -1,6 +0,0 @@
syntax = "proto3";
enum RevState {
Local = 0;
Acked = 1;
}

View File

@ -21,3 +21,7 @@ enum RevType {
Local = 0; Local = 0;
Remote = 1; Remote = 1;
} }
enum RevState {
StateLocal = 0;
Acked = 1;
}

View File

@ -32,12 +32,6 @@ impl Revision {
pub fn pair_rev_id(&self) -> (i64, i64) { (self.base_rev_id, self.rev_id) } pub fn pair_rev_id(&self) -> (i64, i64) { (self.base_rev_id, self.rev_id) }
pub fn is_initial(&self) -> bool { self.rev_id == 0 } pub fn is_initial(&self) -> bool { self.rev_id == 0 }
// pub fn from_pb(pb: &mut crate::protobuf::Revision) -> Self {
// pb.try_into().unwrap() }
// pub fn from_pb(mut pb: crate::protobuf::Revision) -> Self {
// Revision::try_from(&mut pb).unwrap() }
} }
impl std::fmt::Debug for Revision { impl std::fmt::Debug for Revision {
@ -160,6 +154,6 @@ pub fn md5<T: AsRef<[u8]>>(data: T) -> String {
#[derive(Debug, Clone, Eq, PartialEq)] #[derive(Debug, Clone, Eq, PartialEq)]
pub enum RevState { pub enum RevState {
Local = 0, StateLocal = 0,
Acked = 1, Acked = 1,
} }