diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/errors.pbenum.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/errors.pbenum.dart index d4764daa8c..55e51b76c7 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/errors.pbenum.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/errors.pbenum.dart @@ -12,12 +12,14 @@ import 'package:protobuf/protobuf.dart' as $pb; class ErrorCode extends $pb.ProtobufEnum { static const ErrorCode DocIdInvalid = ErrorCode._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'DocIdInvalid'); static const ErrorCode DocNotfound = ErrorCode._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'DocNotfound'); + static const ErrorCode WsConnectError = ErrorCode._(10, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'WsConnectError'); static const ErrorCode UserUnauthorized = ErrorCode._(999, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UserUnauthorized'); static const ErrorCode InternalError = ErrorCode._(1000, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'InternalError'); static const $core.List values = [ DocIdInvalid, DocNotfound, + WsConnectError, UserUnauthorized, InternalError, ]; diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/errors.pbjson.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/errors.pbjson.dart index d2df076a1f..67e75c3fcd 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/errors.pbjson.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/errors.pbjson.dart @@ -14,13 +14,14 @@ const ErrorCode$json = const { '2': const [ const {'1': 'DocIdInvalid', '2': 0}, const {'1': 'DocNotfound', '2': 1}, + const {'1': 'WsConnectError', '2': 10}, const {'1': 'UserUnauthorized', '2': 999}, const {'1': 'InternalError', '2': 1000}, ], }; /// Descriptor for `ErrorCode`. Decode as a `google.protobuf.EnumDescriptorProto`. -final $typed_data.Uint8List errorCodeDescriptor = $convert.base64Decode('CglFcnJvckNvZGUSEAoMRG9jSWRJbnZhbGlkEAASDwoLRG9jTm90Zm91bmQQARIVChBVc2VyVW5hdXRob3JpemVkEOcHEhIKDUludGVybmFsRXJyb3IQ6Ac='); +final $typed_data.Uint8List errorCodeDescriptor = $convert.base64Decode('CglFcnJvckNvZGUSEAoMRG9jSWRJbnZhbGlkEAASDwoLRG9jTm90Zm91bmQQARISCg5Xc0Nvbm5lY3RFcnJvchAKEhUKEFVzZXJVbmF1dGhvcml6ZWQQ5wcSEgoNSW50ZXJuYWxFcnJvchDoBw=='); @$core.Deprecated('Use docErrorDescriptor instead') const DocError$json = const { '1': 'DocError', diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-workspace/errors.pbenum.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-workspace/errors.pbenum.dart index e504fe2a04..2728c16cfe 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-workspace/errors.pbenum.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-workspace/errors.pbenum.dart @@ -22,6 +22,7 @@ class ErrorCode extends $pb.ProtobufEnum { static const ErrorCode ViewDescInvalid = ErrorCode._(23, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'ViewDescInvalid'); static const ErrorCode ViewDataInvalid = ErrorCode._(24, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'ViewDataInvalid'); static const ErrorCode UserUnauthorized = ErrorCode._(100, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UserUnauthorized'); + static const ErrorCode WsConnectError = ErrorCode._(200, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'WsConnectError'); static const ErrorCode InternalError = ErrorCode._(1000, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'InternalError'); static const ErrorCode RecordNotFound = ErrorCode._(1001, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'RecordNotFound'); @@ -38,6 +39,7 @@ class ErrorCode extends $pb.ProtobufEnum { ViewDescInvalid, ViewDataInvalid, UserUnauthorized, + WsConnectError, InternalError, RecordNotFound, ]; diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-workspace/errors.pbjson.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-workspace/errors.pbjson.dart index e5cfebcdd2..5a13aa5857 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-workspace/errors.pbjson.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-workspace/errors.pbjson.dart @@ -24,13 +24,14 @@ const ErrorCode$json = const { const {'1': 'ViewDescInvalid', '2': 23}, const {'1': 'ViewDataInvalid', '2': 24}, const {'1': 'UserUnauthorized', '2': 100}, + const {'1': 'WsConnectError', '2': 200}, const {'1': 'InternalError', '2': 1000}, const {'1': 'RecordNotFound', '2': 1001}, ], }; /// Descriptor for `ErrorCode`. Decode as a `google.protobuf.EnumDescriptorProto`. -final $typed_data.Uint8List errorCodeDescriptor = $convert.base64Decode('CglFcnJvckNvZGUSGAoUV29ya3NwYWNlTmFtZUludmFsaWQQABIWChJXb3Jrc3BhY2VJZEludmFsaWQQARIYChRBcHBDb2xvclN0eWxlSW52YWxpZBACEhgKFFdvcmtzcGFjZURlc2NJbnZhbGlkEAMSEAoMQXBwSWRJbnZhbGlkEAoSEgoOQXBwTmFtZUludmFsaWQQCxITCg9WaWV3TmFtZUludmFsaWQQFBIYChRWaWV3VGh1bWJuYWlsSW52YWxpZBAVEhEKDVZpZXdJZEludmFsaWQQFhITCg9WaWV3RGVzY0ludmFsaWQQFxITCg9WaWV3RGF0YUludmFsaWQQGBIUChBVc2VyVW5hdXRob3JpemVkEGQSEgoNSW50ZXJuYWxFcnJvchDoBxITCg5SZWNvcmROb3RGb3VuZBDpBw=='); +final $typed_data.Uint8List errorCodeDescriptor = $convert.base64Decode('CglFcnJvckNvZGUSGAoUV29ya3NwYWNlTmFtZUludmFsaWQQABIWChJXb3Jrc3BhY2VJZEludmFsaWQQARIYChRBcHBDb2xvclN0eWxlSW52YWxpZBACEhgKFFdvcmtzcGFjZURlc2NJbnZhbGlkEAMSEAoMQXBwSWRJbnZhbGlkEAoSEgoOQXBwTmFtZUludmFsaWQQCxITCg9WaWV3TmFtZUludmFsaWQQFBIYChRWaWV3VGh1bWJuYWlsSW52YWxpZBAVEhEKDVZpZXdJZEludmFsaWQQFhITCg9WaWV3RGVzY0ludmFsaWQQFxITCg9WaWV3RGF0YUludmFsaWQQGBIUChBVc2VyVW5hdXRob3JpemVkEGQSEwoOV3NDb25uZWN0RXJyb3IQyAESEgoNSW50ZXJuYWxFcnJvchDoBxITCg5SZWNvcmROb3RGb3VuZBDpBw=='); @$core.Deprecated('Use workspaceErrorDescriptor instead') const WorkspaceError$json = const { '1': 'WorkspaceError', diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pbenum.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pbenum.dart index ef8d4c2243..215409e69c 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pbenum.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pbenum.dart @@ -11,13 +11,11 @@ import 'package:protobuf/protobuf.dart' as $pb; class ErrorCode extends $pb.ProtobufEnum { static const ErrorCode InternalError = ErrorCode._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'InternalError'); - static const ErrorCode DuplicateSource = ErrorCode._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'DuplicateSource'); - static const ErrorCode UnsupportedMessage = ErrorCode._(2, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UnsupportedMessage'); - static const ErrorCode Unauthorized = ErrorCode._(3, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Unauthorized'); + static const ErrorCode UnsupportedMessage = ErrorCode._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UnsupportedMessage'); + static const ErrorCode Unauthorized = ErrorCode._(2, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Unauthorized'); static const $core.List values = [ InternalError, - DuplicateSource, UnsupportedMessage, Unauthorized, ]; diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pbjson.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pbjson.dart index 502a5002f2..c1039debcf 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pbjson.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-ws/errors.pbjson.dart @@ -13,14 +13,13 @@ const ErrorCode$json = const { '1': 'ErrorCode', '2': const [ const {'1': 'InternalError', '2': 0}, - const {'1': 'DuplicateSource', '2': 1}, - const {'1': 'UnsupportedMessage', '2': 2}, - const {'1': 'Unauthorized', '2': 3}, + const {'1': 'UnsupportedMessage', '2': 1}, + const {'1': 'Unauthorized', '2': 2}, ], }; /// Descriptor for `ErrorCode`. Decode as a `google.protobuf.EnumDescriptorProto`. -final $typed_data.Uint8List errorCodeDescriptor = $convert.base64Decode('CglFcnJvckNvZGUSEQoNSW50ZXJuYWxFcnJvchAAEhMKD0R1cGxpY2F0ZVNvdXJjZRABEhYKElVuc3VwcG9ydGVkTWVzc2FnZRACEhAKDFVuYXV0aG9yaXplZBAD'); +final $typed_data.Uint8List errorCodeDescriptor = $convert.base64Decode('CglFcnJvckNvZGUSEQoNSW50ZXJuYWxFcnJvchAAEhYKElVuc3VwcG9ydGVkTWVzc2FnZRABEhAKDFVuYXV0aG9yaXplZBAC'); @$core.Deprecated('Use wsErrorDescriptor instead') const WsError$json = const { '1': 'WsError', diff --git a/backend/src/service/ws_service/router.rs b/backend/src/service/ws_service/router.rs index f97d059c33..027643cc7c 100644 --- a/backend/src/service/ws_service/router.rs +++ b/backend/src/service/ws_service/router.rs @@ -1,4 +1,4 @@ -use crate::service::ws_service::{entities::SessionId, WSClient, WSServer}; +use crate::service::ws_service::{WSClient, WSServer}; use actix::Addr; use crate::service::user_service::LoggedUser; diff --git a/backend/src/service/ws_service/ws_client.rs b/backend/src/service/ws_service/ws_client.rs index 0d6279d883..a391e5ad0c 100644 --- a/backend/src/service/ws_service/ws_client.rs +++ b/backend/src/service/ws_service/ws_client.rs @@ -75,7 +75,7 @@ impl Actor for WSClient { self.server .send(connect) .into_actor(self) - .then(|res, client, _ctx| { + .then(|res, _client, _ctx| { match res { Ok(Ok(_)) => log::trace!("Send connect message to server success"), Ok(Err(e)) => log::error!("Send connect message to server failed: {:?}", e), diff --git a/rust-lib/flowy-document/Cargo.toml b/rust-lib/flowy-document/Cargo.toml index a600ef7d5c..374035f7a8 100644 --- a/rust-lib/flowy-document/Cargo.toml +++ b/rust-lib/flowy-document/Cargo.toml @@ -29,6 +29,7 @@ bytes = { version = "1.0" } strum = "0.21" strum_macros = "0.21" dashmap = "4.0" +parking_lot = "0.11" [dev-dependencies] flowy-test = { path = "../flowy-test" } diff --git a/rust-lib/flowy-document/src/errors.rs b/rust-lib/flowy-document/src/errors.rs index 4388e9eabb..599d73ac5e 100644 --- a/rust-lib/flowy-document/src/errors.rs +++ b/rust-lib/flowy-document/src/errors.rs @@ -40,6 +40,7 @@ impl DocError { static_doc_error!(internal, ErrorCode::InternalError); static_doc_error!(not_found, ErrorCode::DocNotfound); static_doc_error!(unauthorized, ErrorCode::UserUnauthorized); + static_doc_error!(ws, ErrorCode::WsConnectError); } pub fn internal_error(e: T) -> DocError @@ -57,6 +58,9 @@ pub enum ErrorCode { #[display(fmt = "DocNotfound")] DocNotfound = 1, + #[display(fmt = "Document websocket error")] + WsConnectError = 10, + #[display(fmt = "UserUnauthorized")] UserUnauthorized = 999, @@ -81,6 +85,10 @@ impl std::convert::From for DocError { fn from(error: flowy_ot::errors::OTError) -> Self { DocError::internal().context(error) } } +impl std::convert::From for DocError { + fn from(error: std::io::Error) -> Self { DocError::internal().context(error) } +} + // impl std::convert::From<::r2d2::Error> for DocError { // fn from(error: r2d2::Error) -> Self { // ErrorBuilder::new(ErrorCode::InternalError).error(error).build() } } diff --git a/rust-lib/flowy-document/src/lib.rs b/rust-lib/flowy-document/src/lib.rs index 58ec6b8fb1..b2555e3a98 100644 --- a/rust-lib/flowy-document/src/lib.rs +++ b/rust-lib/flowy-document/src/lib.rs @@ -10,5 +10,8 @@ mod sql_tables; extern crate flowy_database; pub mod prelude { - pub use crate::{module::*, services::server::*}; + pub use crate::{ + module::*, + services::{server::*, ws_document::*}, + }; } diff --git a/rust-lib/flowy-document/src/module.rs b/rust-lib/flowy-document/src/module.rs index 5c4a38e643..e7f71a1c5c 100644 --- a/rust-lib/flowy-document/src/module.rs +++ b/rust-lib/flowy-document/src/module.rs @@ -1,35 +1,38 @@ use crate::{ errors::DocError, - services::{doc_cache::DocCache, server::construct_doc_server}, + services::{doc_cache::OpenedDocumentCache, server::construct_doc_server}, }; use crate::{ entities::doc::{ApplyChangesetParams, CreateDocParams, Doc, QueryDocParams, SaveDocParams}, - services::doc_controller::DocController, + errors::internal_error, + services::{doc_controller::DocController, ws_document::WsDocument}, }; + + use diesel::SqliteConnection; use flowy_database::ConnectionPool; - -use crate::errors::internal_error; -use std::sync::Arc; +use parking_lot::RwLock; +use std::{sync::Arc}; pub trait DocumentUser: Send + Sync { - fn user_doc_dir(&self) -> Result; + fn user_dir(&self) -> Result; fn user_id(&self) -> Result; fn token(&self) -> Result; } pub struct FlowyDocument { controller: Arc, - cache: Arc, + ws: Arc>, + cache: Arc, } impl FlowyDocument { - pub fn new(user: Arc) -> FlowyDocument { + pub fn new(user: Arc, ws: Arc>) -> FlowyDocument { let server = construct_doc_server(); - let cache = Arc::new(DocCache::new()); + let cache = Arc::new(OpenedDocumentCache::new()); let controller = Arc::new(DocController::new(server.clone(), user.clone())); - Self { controller, cache } + Self { controller, cache, ws } } pub fn create(&self, params: CreateDocParams, conn: &SqliteConnection) -> Result<(), DocError> { diff --git a/rust-lib/flowy-document/src/protobuf/model/errors.rs b/rust-lib/flowy-document/src/protobuf/model/errors.rs index f23cfd75d3..9cc9c119ca 100644 --- a/rust-lib/flowy-document/src/protobuf/model/errors.rs +++ b/rust-lib/flowy-document/src/protobuf/model/errors.rs @@ -217,6 +217,7 @@ impl ::protobuf::reflect::ProtobufValue for DocError { pub enum ErrorCode { DocIdInvalid = 0, DocNotfound = 1, + WsConnectError = 10, UserUnauthorized = 999, InternalError = 1000, } @@ -230,6 +231,7 @@ impl ::protobuf::ProtobufEnum for ErrorCode { match value { 0 => ::std::option::Option::Some(ErrorCode::DocIdInvalid), 1 => ::std::option::Option::Some(ErrorCode::DocNotfound), + 10 => ::std::option::Option::Some(ErrorCode::WsConnectError), 999 => ::std::option::Option::Some(ErrorCode::UserUnauthorized), 1000 => ::std::option::Option::Some(ErrorCode::InternalError), _ => ::std::option::Option::None @@ -240,6 +242,7 @@ impl ::protobuf::ProtobufEnum for ErrorCode { static values: &'static [ErrorCode] = &[ ErrorCode::DocIdInvalid, ErrorCode::DocNotfound, + ErrorCode::WsConnectError, ErrorCode::UserUnauthorized, ErrorCode::InternalError, ]; @@ -271,26 +274,29 @@ impl ::protobuf::reflect::ProtobufValue for ErrorCode { static file_descriptor_proto_data: &'static [u8] = b"\ \n\x0cerrors.proto\"<\n\x08DocError\x12\x1e\n\x04code\x18\x01\x20\x01(\ - \x0e2\n.ErrorCodeR\x04code\x12\x10\n\x03msg\x18\x02\x20\x01(\tR\x03msg*Y\ + \x0e2\n.ErrorCodeR\x04code\x12\x10\n\x03msg\x18\x02\x20\x01(\tR\x03msg*m\ \n\tErrorCode\x12\x10\n\x0cDocIdInvalid\x10\0\x12\x0f\n\x0bDocNotfound\ - \x10\x01\x12\x15\n\x10UserUnauthorized\x10\xe7\x07\x12\x12\n\rInternalEr\ - ror\x10\xe8\x07J\xd4\x02\n\x06\x12\x04\0\0\x0b\x01\n\x08\n\x01\x0c\x12\ - \x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x05\x01\n\n\n\x03\x04\0\x01\ - \x12\x03\x02\x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x17\n\x0c\n\ - \x05\x04\0\x02\0\x06\x12\x03\x03\x04\r\n\x0c\n\x05\x04\0\x02\0\x01\x12\ - \x03\x03\x0e\x12\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x15\x16\n\x0b\n\ - \x04\x04\0\x02\x01\x12\x03\x04\x04\x13\n\x0c\n\x05\x04\0\x02\x01\x05\x12\ - \x03\x04\x04\n\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\x0b\x0e\n\x0c\n\ - \x05\x04\0\x02\x01\x03\x12\x03\x04\x11\x12\n\n\n\x02\x05\0\x12\x04\x06\0\ - \x0b\x01\n\n\n\x03\x05\0\x01\x12\x03\x06\x05\x0e\n\x0b\n\x04\x05\0\x02\0\ - \x12\x03\x07\x04\x15\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x07\x04\x10\n\ - \x0c\n\x05\x05\0\x02\0\x02\x12\x03\x07\x13\x14\n\x0b\n\x04\x05\0\x02\x01\ - \x12\x03\x08\x04\x14\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x08\x04\x0f\n\ - \x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x08\x12\x13\n\x0b\n\x04\x05\0\x02\ - \x02\x12\x03\t\x04\x1b\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\t\x04\x14\n\ - \x0c\n\x05\x05\0\x02\x02\x02\x12\x03\t\x17\x1a\n\x0b\n\x04\x05\0\x02\x03\ - \x12\x03\n\x04\x19\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\n\x04\x11\n\x0c\ - \n\x05\x05\0\x02\x03\x02\x12\x03\n\x14\x18b\x06proto3\ + \x10\x01\x12\x12\n\x0eWsConnectError\x10\n\x12\x15\n\x10UserUnauthorized\ + \x10\xe7\x07\x12\x12\n\rInternalError\x10\xe8\x07J\xfd\x02\n\x06\x12\x04\ + \0\0\x0c\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\ + \0\x05\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x10\n\x0b\n\x04\x04\0\x02\ + \0\x12\x03\x03\x04\x17\n\x0c\n\x05\x04\0\x02\0\x06\x12\x03\x03\x04\r\n\ + \x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\x0e\x12\n\x0c\n\x05\x04\0\x02\0\ + \x03\x12\x03\x03\x15\x16\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x13\n\ + \x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\x04\n\n\x0c\n\x05\x04\0\x02\x01\ + \x01\x12\x03\x04\x0b\x0e\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x11\ + \x12\n\n\n\x02\x05\0\x12\x04\x06\0\x0c\x01\n\n\n\x03\x05\0\x01\x12\x03\ + \x06\x05\x0e\n\x0b\n\x04\x05\0\x02\0\x12\x03\x07\x04\x15\n\x0c\n\x05\x05\ + \0\x02\0\x01\x12\x03\x07\x04\x10\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x07\ + \x13\x14\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x08\x04\x14\n\x0c\n\x05\x05\0\ + \x02\x01\x01\x12\x03\x08\x04\x0f\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\ + \x08\x12\x13\n\x0b\n\x04\x05\0\x02\x02\x12\x03\t\x04\x18\n\x0c\n\x05\x05\ + \0\x02\x02\x01\x12\x03\t\x04\x12\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\t\ + \x15\x17\n\x0b\n\x04\x05\0\x02\x03\x12\x03\n\x04\x1b\n\x0c\n\x05\x05\0\ + \x02\x03\x01\x12\x03\n\x04\x14\n\x0c\n\x05\x05\0\x02\x03\x02\x12\x03\n\ + \x17\x1a\n\x0b\n\x04\x05\0\x02\x04\x12\x03\x0b\x04\x19\n\x0c\n\x05\x05\0\ + \x02\x04\x01\x12\x03\x0b\x04\x11\n\x0c\n\x05\x05\0\x02\x04\x02\x12\x03\ + \x0b\x14\x18b\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/rust-lib/flowy-document/src/protobuf/proto/errors.proto b/rust-lib/flowy-document/src/protobuf/proto/errors.proto index c4673b201a..85dcd67b40 100644 --- a/rust-lib/flowy-document/src/protobuf/proto/errors.proto +++ b/rust-lib/flowy-document/src/protobuf/proto/errors.proto @@ -7,6 +7,7 @@ message DocError { enum ErrorCode { DocIdInvalid = 0; DocNotfound = 1; + WsConnectError = 10; UserUnauthorized = 999; InternalError = 1000; } diff --git a/rust-lib/flowy-document/src/services/doc_cache.rs b/rust-lib/flowy-document/src/services/doc_cache.rs index 10d0db83b9..63d53ba955 100644 --- a/rust-lib/flowy-document/src/services/doc_cache.rs +++ b/rust-lib/flowy-document/src/services/doc_cache.rs @@ -18,11 +18,11 @@ where fn from(s: T) -> Self { DocId(s.to_string()) } } -pub(crate) struct DocCache { +pub(crate) struct OpenedDocumentCache { inner: DashMap>, } -impl DocCache { +impl OpenedDocumentCache { pub(crate) fn new() -> Self { Self { inner: DashMap::new() } } pub(crate) fn open(&self, id: T, data: D) -> Result<(), DocError> diff --git a/rust-lib/flowy-document/src/services/mod.rs b/rust-lib/flowy-document/src/services/mod.rs index 32d84441fc..dddceb1832 100644 --- a/rust-lib/flowy-document/src/services/mod.rs +++ b/rust-lib/flowy-document/src/services/mod.rs @@ -1,3 +1,4 @@ pub(crate) mod doc_cache; pub(crate) mod doc_controller; pub mod server; +pub mod ws_document; diff --git a/rust-lib/flowy-document/src/services/ws_document.rs b/rust-lib/flowy-document/src/services/ws_document.rs new file mode 100644 index 0000000000..baf4ebf2f5 --- /dev/null +++ b/rust-lib/flowy-document/src/services/ws_document.rs @@ -0,0 +1,53 @@ +use crate::errors::DocError; +use bytes::Bytes; +use lazy_static::lazy_static; +use std::{convert::TryFrom, sync::Arc}; + +pub struct WsDocumentMessage(pub Bytes); + +pub trait WsSender: Send + Sync { + fn send_msg(&self, msg: WsDocumentMessage) -> Result<(), DocError>; +} + +lazy_static! { + pub static ref WS_ID: String = "Document".to_string(); +} + +pub struct WsDocument { + sender: Arc, +} +impl WsDocument { + pub fn new(sender: Arc) -> Self { Self { sender } } + pub fn receive_msg(&self, _msg: WsDocumentMessage) { unimplemented!() } + pub fn send_msg(&self, _msg: WsDocumentMessage) { unimplemented!() } +} + +pub enum WsSource { + Delta, +} + +impl AsRef for WsSource { + fn as_ref(&self) -> &str { + match self { + WsSource::Delta => "delta", + } + } +} + +impl ToString for WsSource { + fn to_string(&self) -> String { + match self { + WsSource::Delta => self.as_ref().to_string(), + } + } +} + +impl TryFrom for WsSource { + type Error = DocError; + fn try_from(value: String) -> Result { + match value.as_str() { + "delta" => Ok(WsSource::Delta), + _ => Err(DocError::internal().context(format!("Deserialize WsSource failed. Unknown type: {}", &value))), + } + } +} diff --git a/rust-lib/flowy-sdk/Cargo.toml b/rust-lib/flowy-sdk/Cargo.toml index 18faefcac2..152045fe83 100644 --- a/rust-lib/flowy-sdk/Cargo.toml +++ b/rust-lib/flowy-sdk/Cargo.toml @@ -13,12 +13,14 @@ flowy-infra = { path = "../flowy-infra" } flowy-workspace = { path = "../flowy-workspace" } flowy-database = { path = "../flowy-database" } flowy-document = { path = "../flowy-document" } +flowy-ws = { path = "../flowy-ws" } tracing = { version = "0.1" } log = "0.4.14" futures-core = { version = "0.3", default-features = false } color-eyre = { version = "0.5", default-features = false } bytes = "1.0" tokio = { version = "1", features = ["rt"] } +parking_lot = "0.11" [dev-dependencies] serde = { version = "1.0", features = ["derive"] } diff --git a/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs b/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs new file mode 100644 index 0000000000..19638c9141 --- /dev/null +++ b/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs @@ -0,0 +1,89 @@ +use bytes::Bytes; +use flowy_document::{ + errors::DocError, + module::DocumentUser, + prelude::{WsDocument, WsDocumentMessage, WsSender, WS_ID}, +}; +use flowy_user::services::user::UserSession; +use flowy_ws::{WsMessage, WsMessageHandler}; +use parking_lot::RwLock; +use std::{path::Path, sync::Arc}; + +pub struct DocumentDepsResolver { + user_session: Arc, +} + +impl DocumentDepsResolver { + pub fn new(user_session: Arc) -> Self { Self { user_session } } + + pub fn split_into(self) -> (Arc, Arc>) { + let user = Arc::new(DocumentUserImpl { + user: self.user_session.clone(), + }); + + let sender = Arc::new(WsSenderImpl { + user: self.user_session.clone(), + }); + + let ws = Arc::new(RwLock::new(WsDocument::new(sender.clone()))); + + let ws_handler = Arc::new(WsDocumentResolver { + user: self.user_session.clone(), + inner: ws.clone(), + }); + + self.user_session.add_ws_handler(ws_handler); + + (user, ws) + } +} + +struct DocumentUserImpl { + user: Arc, +} + +impl DocumentUser for DocumentUserImpl { + fn user_dir(&self) -> Result { + let dir = self.user.user_dir().map_err(|e| DocError::unauthorized().context(e))?; + + let doc_dir = format!("{}/doc", dir); + if !Path::new(&doc_dir).exists() { + let _ = std::fs::create_dir_all(&doc_dir)?; + } + Ok(doc_dir) + } + + fn user_id(&self) -> Result { self.user.user_id().map_err(|e| DocError::internal().context(e)) } + + fn token(&self) -> Result { self.user.token().map_err(|e| DocError::internal().context(e)) } +} + +struct WsSenderImpl { + user: Arc, +} + +impl WsSender for WsSenderImpl { + fn send_msg(&self, msg: WsDocumentMessage) -> Result<(), DocError> { + let msg = WsMessage { + source: WS_ID.clone(), + data: msg.0.to_vec(), + }; + let _ = self.user.send_ws_msg(msg).map_err(|e| DocError::internal().context(e))?; + Ok(()) + } +} + +struct WsDocumentResolver { + user: Arc, + inner: Arc>, +} + +impl WsMessageHandler for WsDocumentResolver { + fn source(&self) -> String { WS_ID.clone() } + + fn receive_message(&self, msg: WsMessage) { + let msg = WsDocumentMessage(Bytes::from(msg.data)); + + self.inner.read().receive_msg(msg); + } +} diff --git a/rust-lib/flowy-sdk/src/deps_resolve/editor_deps_impl.rs b/rust-lib/flowy-sdk/src/deps_resolve/editor_deps_impl.rs deleted file mode 100644 index 572905dc94..0000000000 --- a/rust-lib/flowy-sdk/src/deps_resolve/editor_deps_impl.rs +++ /dev/null @@ -1,25 +0,0 @@ -use flowy_document::{errors::DocError, module::DocumentUser}; - -use flowy_user::services::user::UserSession; -use std::{path::Path, sync::Arc}; - -pub struct EditorUserImpl { - pub(crate) user_session: Arc, -} - -impl DocumentUser for EditorUserImpl { - fn user_doc_dir(&self) -> Result { - let dir = self.user_session.user_dir().map_err(|e| DocError::unauthorized().context(e))?; - - let doc_dir = format!("{}/doc", dir); - if !Path::new(&doc_dir).exists() { - // TODO: Make sure to unwrap? 😁 - std::fs::create_dir_all(&doc_dir).unwrap(); - } - Ok(doc_dir) - } - - fn user_id(&self) -> Result { self.user_session.user_id().map_err(|e| DocError::internal().context(e)) } - - fn token(&self) -> Result { self.user_session.token().map_err(|e| DocError::internal().context(e)) } -} diff --git a/rust-lib/flowy-sdk/src/deps_resolve/mod.rs b/rust-lib/flowy-sdk/src/deps_resolve/mod.rs index 379f23f2fc..bf1e78d59b 100644 --- a/rust-lib/flowy-sdk/src/deps_resolve/mod.rs +++ b/rust-lib/flowy-sdk/src/deps_resolve/mod.rs @@ -1,5 +1,5 @@ -mod editor_deps_impl; -mod workspace_deps_impl; +mod document_deps; +mod workspace_deps; -pub use editor_deps_impl::*; -pub use workspace_deps_impl::*; +pub use document_deps::*; +pub use workspace_deps::*; diff --git a/rust-lib/flowy-sdk/src/deps_resolve/workspace_deps_impl.rs b/rust-lib/flowy-sdk/src/deps_resolve/workspace_deps.rs similarity index 54% rename from rust-lib/flowy-sdk/src/deps_resolve/workspace_deps_impl.rs rename to rust-lib/flowy-sdk/src/deps_resolve/workspace_deps.rs index f1f5f6bfb5..82e99ccfed 100644 --- a/rust-lib/flowy-sdk/src/deps_resolve/workspace_deps_impl.rs +++ b/rust-lib/flowy-sdk/src/deps_resolve/workspace_deps.rs @@ -1,27 +1,43 @@ + use flowy_database::ConnectionPool; use flowy_user::services::user::UserSession; use flowy_workspace::{ errors::WorkspaceError, module::{WorkspaceDatabase, WorkspaceUser}, }; + use std::sync::Arc; -pub struct WorkspaceUserImpl { +pub struct WorkspaceDepsResolver { + inner: Arc, +} + +struct Resolver { pub(crate) user_session: Arc, } -impl WorkspaceUser for WorkspaceUserImpl { - fn user_id(&self) -> Result { self.user_session.user_id().map_err(|e| WorkspaceError::internal().context(e)) } +impl WorkspaceDepsResolver { + pub fn new(user_session: Arc) -> Self { + Self { + inner: Arc::new(Resolver { user_session }), + } + } - fn token(&self) -> Result { self.user_session.token().map_err(|e| WorkspaceError::internal().context(e)) } + pub fn split_into(self) -> (Arc, Arc) { + let user: Arc = self.inner.clone(); + let database: Arc = self.inner.clone(); + (user, database) + } } -pub struct WorkspaceDatabaseImpl { - pub(crate) user_session: Arc, -} - -impl WorkspaceDatabase for WorkspaceDatabaseImpl { +impl WorkspaceDatabase for Resolver { fn db_pool(&self) -> Result, WorkspaceError> { self.user_session.db_pool().map_err(|e| WorkspaceError::internal().context(e)) } } + +impl WorkspaceUser for Resolver { + fn user_id(&self) -> Result { self.user_session.user_id().map_err(|e| WorkspaceError::internal().context(e)) } + + fn token(&self) -> Result { self.user_session.token().map_err(|e| WorkspaceError::internal().context(e)) } +} diff --git a/rust-lib/flowy-sdk/src/module.rs b/rust-lib/flowy-sdk/src/module.rs index 314a7a86c9..3961525349 100644 --- a/rust-lib/flowy-sdk/src/module.rs +++ b/rust-lib/flowy-sdk/src/module.rs @@ -1,8 +1,9 @@ use flowy_dispatch::prelude::Module; -use crate::deps_resolve::{EditorUserImpl, WorkspaceDatabaseImpl, WorkspaceUserImpl}; +use crate::deps_resolve::{DocumentDepsResolver, WorkspaceDepsResolver}; use flowy_document::module::FlowyDocument; -use flowy_user::services::user::UserSessionBuilder; +use flowy_user::services::user::{UserSession, UserSessionBuilder}; + use std::sync::Arc; pub struct ModuleConfig { @@ -10,28 +11,23 @@ pub struct ModuleConfig { } pub fn build_modules(config: ModuleConfig) -> Vec { - // runtime.spawn(async move { - // start_ws_connection("eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9. - // eyJpc3MiOiJsb2NhbGhvc3QiLCJzdWIiOiJhdXRoIiwiaWF0IjoxNjMxNzcwODQ2LCJleHAiOjE2MzIyMDI4NDYsInVzZXJfaWQiOiI5ZmFiN2I4MS1mZDAyLTRhN2EtYjA4Zi05NDM3NTdmZmE5MDcifQ. - // UzV01tHnWEZWBp3nJPTmFi7ypxBoCe56AjEPb9bnsFE") }); let user_session = Arc::new(UserSessionBuilder::new().root_dir(&config.root).build()); - - let workspace_user_impl = Arc::new(WorkspaceUserImpl { - user_session: user_session.clone(), - }); - - let workspace_db = Arc::new(WorkspaceDatabaseImpl { - user_session: user_session.clone(), - }); - - let editor_user = Arc::new(EditorUserImpl { - user_session: user_session.clone(), - }); - - let document = Arc::new(FlowyDocument::new(editor_user)); - - vec![ - flowy_user::module::create(user_session), - flowy_workspace::module::create(workspace_user_impl, workspace_db, document), - ] + vec![build_user_module(user_session.clone()), build_workspace_module(user_session)] +} + +fn build_user_module(user_session: Arc) -> Module { flowy_user::module::create(user_session.clone()) } + +fn build_workspace_module(user_session: Arc) -> Module { + let workspace_deps = WorkspaceDepsResolver::new(user_session.clone()); + let (user, database) = workspace_deps.split_into(); + let document = build_document_module(user_session.clone()); + + flowy_workspace::module::create(user, database, document) +} + +fn build_document_module(user_session: Arc) -> Arc { + let document_deps = DocumentDepsResolver::new(user_session.clone()); + let (user, ws) = document_deps.split_into(); + let document = Arc::new(FlowyDocument::new(user, ws)); + document } diff --git a/rust-lib/flowy-user/src/services/user/user_session.rs b/rust-lib/flowy-user/src/services/user/user_session.rs index 69f407fd33..3b563cf26a 100644 --- a/rust-lib/flowy-user/src/services/user/user_session.rs +++ b/rust-lib/flowy-user/src/services/user/user_session.rs @@ -18,10 +18,10 @@ use flowy_database::{ }; use flowy_infra::kv::KV; use flowy_sqlite::ConnectionPool; -use flowy_ws::{connect::Retry, WsController, WsMessageHandler}; +use flowy_ws::{connect::Retry, WsController, WsMessage, WsMessageHandler, WsSender}; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; pub struct UserSessionConfig { root_dir: String, @@ -173,21 +173,23 @@ impl UserSession { pub fn token(&self) -> Result { Ok(self.get_session()?.token) } - pub fn add_ws_msg_handler(&self, handler: Arc) -> Result<(), UserError> { - let _ = self.ws_controller.write().add_handler(handler)?; - Ok(()) + pub fn add_ws_handler(&self, handler: Arc) { let _ = self.ws_controller.write().add_handler(handler); } + + pub fn get_ws_sender(&self) -> Result, UserError> { + match self.ws_controller.try_read_for(Duration::from_millis(300)) { + None => Err(UserError::internal().context("Send ws message timeout")), + Some(guard) => { + let sender = guard.get_sender()?; + Ok(sender) + }, + } } - // pub fn send_ws_msg>(&self, msg: T) -> Result<(), - // UserError> { match self.ws_controller.try_read_for(Duration:: - // from_millis(300)) { None => - // Err(UserError::internal().context("Send ws message timeout")), - // Some(guard) => { - // let _ = guard.send_msg(msg)?; - // Ok(()) - // }, - // } - // } + pub fn send_ws_msg>(&self, msg: T) -> Result<(), UserError> { + let sender = self.get_ws_sender()?; + let _ = sender.send_msg(msg)?; + Ok(()) + } } impl UserSession { diff --git a/rust-lib/flowy-workspace/src/errors.rs b/rust-lib/flowy-workspace/src/errors.rs index b24221a81c..c527615f9a 100644 --- a/rust-lib/flowy-workspace/src/errors.rs +++ b/rust-lib/flowy-workspace/src/errors.rs @@ -44,6 +44,7 @@ impl WorkspaceError { static_workspace_error!(unauthorized, ErrorCode::UserUnauthorized); static_workspace_error!(internal, ErrorCode::InternalError); static_workspace_error!(not_found, ErrorCode::RecordNotFound); + static_workspace_error!(ws, ErrorCode::WsConnectError); pub fn context(mut self, error: T) -> Self { self.msg = format!("{:?}", error); @@ -89,6 +90,9 @@ pub enum ErrorCode { #[display(fmt = "User unauthorized")] UserUnauthorized = 100, + #[display(fmt = "Workspace websocket error")] + WsConnectError = 200, + #[display(fmt = "Server error")] InternalError = 1000, #[display(fmt = "Record not found")] diff --git a/rust-lib/flowy-workspace/src/module.rs b/rust-lib/flowy-workspace/src/module.rs index 79e07ceb15..62b2b3889f 100644 --- a/rust-lib/flowy-workspace/src/module.rs +++ b/rust-lib/flowy-workspace/src/module.rs @@ -4,6 +4,7 @@ use crate::{ handlers::*, services::{server::construct_workspace_server, AppController, ViewController, WorkspaceController}, }; + use flowy_database::DBConnection; use flowy_dispatch::prelude::*; use flowy_document::module::FlowyDocument; diff --git a/rust-lib/flowy-workspace/src/protobuf/model/errors.rs b/rust-lib/flowy-workspace/src/protobuf/model/errors.rs index d2d7299c4f..848ded9495 100644 --- a/rust-lib/flowy-workspace/src/protobuf/model/errors.rs +++ b/rust-lib/flowy-workspace/src/protobuf/model/errors.rs @@ -227,6 +227,7 @@ pub enum ErrorCode { ViewDescInvalid = 23, ViewDataInvalid = 24, UserUnauthorized = 100, + WsConnectError = 200, InternalError = 1000, RecordNotFound = 1001, } @@ -250,6 +251,7 @@ impl ::protobuf::ProtobufEnum for ErrorCode { 23 => ::std::option::Option::Some(ErrorCode::ViewDescInvalid), 24 => ::std::option::Option::Some(ErrorCode::ViewDataInvalid), 100 => ::std::option::Option::Some(ErrorCode::UserUnauthorized), + 200 => ::std::option::Option::Some(ErrorCode::WsConnectError), 1000 => ::std::option::Option::Some(ErrorCode::InternalError), 1001 => ::std::option::Option::Some(ErrorCode::RecordNotFound), _ => ::std::option::Option::None @@ -270,6 +272,7 @@ impl ::protobuf::ProtobufEnum for ErrorCode { ErrorCode::ViewDescInvalid, ErrorCode::ViewDataInvalid, ErrorCode::UserUnauthorized, + ErrorCode::WsConnectError, ErrorCode::InternalError, ErrorCode::RecordNotFound, ]; @@ -302,51 +305,54 @@ impl ::protobuf::reflect::ProtobufValue for ErrorCode { static file_descriptor_proto_data: &'static [u8] = b"\ \n\x0cerrors.proto\"B\n\x0eWorkspaceError\x12\x1e\n\x04code\x18\x01\x20\ \x01(\x0e2\n.ErrorCodeR\x04code\x12\x10\n\x03msg\x18\x02\x20\x01(\tR\x03\ - msg*\xc2\x02\n\tErrorCode\x12\x18\n\x14WorkspaceNameInvalid\x10\0\x12\ + msg*\xd7\x02\n\tErrorCode\x12\x18\n\x14WorkspaceNameInvalid\x10\0\x12\ \x16\n\x12WorkspaceIdInvalid\x10\x01\x12\x18\n\x14AppColorStyleInvalid\ \x10\x02\x12\x18\n\x14WorkspaceDescInvalid\x10\x03\x12\x10\n\x0cAppIdInv\ alid\x10\n\x12\x12\n\x0eAppNameInvalid\x10\x0b\x12\x13\n\x0fViewNameInva\ lid\x10\x14\x12\x18\n\x14ViewThumbnailInvalid\x10\x15\x12\x11\n\rViewIdI\ nvalid\x10\x16\x12\x13\n\x0fViewDescInvalid\x10\x17\x12\x13\n\x0fViewDat\ - aInvalid\x10\x18\x12\x14\n\x10UserUnauthorized\x10d\x12\x12\n\rInternalE\ - rror\x10\xe8\x07\x12\x13\n\x0eRecordNotFound\x10\xe9\x07J\xee\x05\n\x06\ - \x12\x04\0\0\x15\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\ - \x04\x02\0\x05\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x16\n\x0b\n\x04\ - \x04\0\x02\0\x12\x03\x03\x04\x17\n\x0c\n\x05\x04\0\x02\0\x06\x12\x03\x03\ - \x04\r\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\x0e\x12\n\x0c\n\x05\x04\0\ - \x02\0\x03\x12\x03\x03\x15\x16\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\ - \x13\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\x04\n\n\x0c\n\x05\x04\0\ - \x02\x01\x01\x12\x03\x04\x0b\x0e\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\ - \x04\x11\x12\n\n\n\x02\x05\0\x12\x04\x06\0\x15\x01\n\n\n\x03\x05\0\x01\ - \x12\x03\x06\x05\x0e\n\x0b\n\x04\x05\0\x02\0\x12\x03\x07\x04\x1d\n\x0c\n\ - \x05\x05\0\x02\0\x01\x12\x03\x07\x04\x18\n\x0c\n\x05\x05\0\x02\0\x02\x12\ - \x03\x07\x1b\x1c\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x08\x04\x1b\n\x0c\n\ - \x05\x05\0\x02\x01\x01\x12\x03\x08\x04\x16\n\x0c\n\x05\x05\0\x02\x01\x02\ - \x12\x03\x08\x19\x1a\n\x0b\n\x04\x05\0\x02\x02\x12\x03\t\x04\x1d\n\x0c\n\ - \x05\x05\0\x02\x02\x01\x12\x03\t\x04\x18\n\x0c\n\x05\x05\0\x02\x02\x02\ - \x12\x03\t\x1b\x1c\n\x0b\n\x04\x05\0\x02\x03\x12\x03\n\x04\x1d\n\x0c\n\ - \x05\x05\0\x02\x03\x01\x12\x03\n\x04\x18\n\x0c\n\x05\x05\0\x02\x03\x02\ - \x12\x03\n\x1b\x1c\n\x0b\n\x04\x05\0\x02\x04\x12\x03\x0b\x04\x16\n\x0c\n\ - \x05\x05\0\x02\x04\x01\x12\x03\x0b\x04\x10\n\x0c\n\x05\x05\0\x02\x04\x02\ - \x12\x03\x0b\x13\x15\n\x0b\n\x04\x05\0\x02\x05\x12\x03\x0c\x04\x18\n\x0c\ - \n\x05\x05\0\x02\x05\x01\x12\x03\x0c\x04\x12\n\x0c\n\x05\x05\0\x02\x05\ - \x02\x12\x03\x0c\x15\x17\n\x0b\n\x04\x05\0\x02\x06\x12\x03\r\x04\x19\n\ - \x0c\n\x05\x05\0\x02\x06\x01\x12\x03\r\x04\x13\n\x0c\n\x05\x05\0\x02\x06\ - \x02\x12\x03\r\x16\x18\n\x0b\n\x04\x05\0\x02\x07\x12\x03\x0e\x04\x1e\n\ - \x0c\n\x05\x05\0\x02\x07\x01\x12\x03\x0e\x04\x18\n\x0c\n\x05\x05\0\x02\ - \x07\x02\x12\x03\x0e\x1b\x1d\n\x0b\n\x04\x05\0\x02\x08\x12\x03\x0f\x04\ - \x17\n\x0c\n\x05\x05\0\x02\x08\x01\x12\x03\x0f\x04\x11\n\x0c\n\x05\x05\0\ - \x02\x08\x02\x12\x03\x0f\x14\x16\n\x0b\n\x04\x05\0\x02\t\x12\x03\x10\x04\ - \x19\n\x0c\n\x05\x05\0\x02\t\x01\x12\x03\x10\x04\x13\n\x0c\n\x05\x05\0\ - \x02\t\x02\x12\x03\x10\x16\x18\n\x0b\n\x04\x05\0\x02\n\x12\x03\x11\x04\ - \x19\n\x0c\n\x05\x05\0\x02\n\x01\x12\x03\x11\x04\x13\n\x0c\n\x05\x05\0\ - \x02\n\x02\x12\x03\x11\x16\x18\n\x0b\n\x04\x05\0\x02\x0b\x12\x03\x12\x04\ - \x1b\n\x0c\n\x05\x05\0\x02\x0b\x01\x12\x03\x12\x04\x14\n\x0c\n\x05\x05\0\ - \x02\x0b\x02\x12\x03\x12\x17\x1a\n\x0b\n\x04\x05\0\x02\x0c\x12\x03\x13\ - \x04\x19\n\x0c\n\x05\x05\0\x02\x0c\x01\x12\x03\x13\x04\x11\n\x0c\n\x05\ - \x05\0\x02\x0c\x02\x12\x03\x13\x14\x18\n\x0b\n\x04\x05\0\x02\r\x12\x03\ - \x14\x04\x1a\n\x0c\n\x05\x05\0\x02\r\x01\x12\x03\x14\x04\x12\n\x0c\n\x05\ - \x05\0\x02\r\x02\x12\x03\x14\x15\x19b\x06proto3\ + aInvalid\x10\x18\x12\x14\n\x10UserUnauthorized\x10d\x12\x13\n\x0eWsConne\ + ctError\x10\xc8\x01\x12\x12\n\rInternalError\x10\xe8\x07\x12\x13\n\x0eRe\ + cordNotFound\x10\xe9\x07J\x97\x06\n\x06\x12\x04\0\0\x16\x01\n\x08\n\x01\ + \x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x05\x01\n\n\n\x03\x04\ + \0\x01\x12\x03\x02\x08\x16\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x17\n\ + \x0c\n\x05\x04\0\x02\0\x06\x12\x03\x03\x04\r\n\x0c\n\x05\x04\0\x02\0\x01\ + \x12\x03\x03\x0e\x12\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x15\x16\n\ + \x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x13\n\x0c\n\x05\x04\0\x02\x01\ + \x05\x12\x03\x04\x04\n\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\x0b\x0e\ + \n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x11\x12\n\n\n\x02\x05\0\x12\ + \x04\x06\0\x16\x01\n\n\n\x03\x05\0\x01\x12\x03\x06\x05\x0e\n\x0b\n\x04\ + \x05\0\x02\0\x12\x03\x07\x04\x1d\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x07\ + \x04\x18\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x07\x1b\x1c\n\x0b\n\x04\x05\ + \0\x02\x01\x12\x03\x08\x04\x1b\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x08\ + \x04\x16\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x08\x19\x1a\n\x0b\n\x04\ + \x05\0\x02\x02\x12\x03\t\x04\x1d\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\t\ + \x04\x18\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\t\x1b\x1c\n\x0b\n\x04\x05\ + \0\x02\x03\x12\x03\n\x04\x1d\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\n\x04\ + \x18\n\x0c\n\x05\x05\0\x02\x03\x02\x12\x03\n\x1b\x1c\n\x0b\n\x04\x05\0\ + \x02\x04\x12\x03\x0b\x04\x16\n\x0c\n\x05\x05\0\x02\x04\x01\x12\x03\x0b\ + \x04\x10\n\x0c\n\x05\x05\0\x02\x04\x02\x12\x03\x0b\x13\x15\n\x0b\n\x04\ + \x05\0\x02\x05\x12\x03\x0c\x04\x18\n\x0c\n\x05\x05\0\x02\x05\x01\x12\x03\ + \x0c\x04\x12\n\x0c\n\x05\x05\0\x02\x05\x02\x12\x03\x0c\x15\x17\n\x0b\n\ + \x04\x05\0\x02\x06\x12\x03\r\x04\x19\n\x0c\n\x05\x05\0\x02\x06\x01\x12\ + \x03\r\x04\x13\n\x0c\n\x05\x05\0\x02\x06\x02\x12\x03\r\x16\x18\n\x0b\n\ + \x04\x05\0\x02\x07\x12\x03\x0e\x04\x1e\n\x0c\n\x05\x05\0\x02\x07\x01\x12\ + \x03\x0e\x04\x18\n\x0c\n\x05\x05\0\x02\x07\x02\x12\x03\x0e\x1b\x1d\n\x0b\ + \n\x04\x05\0\x02\x08\x12\x03\x0f\x04\x17\n\x0c\n\x05\x05\0\x02\x08\x01\ + \x12\x03\x0f\x04\x11\n\x0c\n\x05\x05\0\x02\x08\x02\x12\x03\x0f\x14\x16\n\ + \x0b\n\x04\x05\0\x02\t\x12\x03\x10\x04\x19\n\x0c\n\x05\x05\0\x02\t\x01\ + \x12\x03\x10\x04\x13\n\x0c\n\x05\x05\0\x02\t\x02\x12\x03\x10\x16\x18\n\ + \x0b\n\x04\x05\0\x02\n\x12\x03\x11\x04\x19\n\x0c\n\x05\x05\0\x02\n\x01\ + \x12\x03\x11\x04\x13\n\x0c\n\x05\x05\0\x02\n\x02\x12\x03\x11\x16\x18\n\ + \x0b\n\x04\x05\0\x02\x0b\x12\x03\x12\x04\x1b\n\x0c\n\x05\x05\0\x02\x0b\ + \x01\x12\x03\x12\x04\x14\n\x0c\n\x05\x05\0\x02\x0b\x02\x12\x03\x12\x17\ + \x1a\n\x0b\n\x04\x05\0\x02\x0c\x12\x03\x13\x04\x19\n\x0c\n\x05\x05\0\x02\ + \x0c\x01\x12\x03\x13\x04\x12\n\x0c\n\x05\x05\0\x02\x0c\x02\x12\x03\x13\ + \x15\x18\n\x0b\n\x04\x05\0\x02\r\x12\x03\x14\x04\x19\n\x0c\n\x05\x05\0\ + \x02\r\x01\x12\x03\x14\x04\x11\n\x0c\n\x05\x05\0\x02\r\x02\x12\x03\x14\ + \x14\x18\n\x0b\n\x04\x05\0\x02\x0e\x12\x03\x15\x04\x1a\n\x0c\n\x05\x05\0\ + \x02\x0e\x01\x12\x03\x15\x04\x12\n\x0c\n\x05\x05\0\x02\x0e\x02\x12\x03\ + \x15\x15\x19b\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/rust-lib/flowy-workspace/src/protobuf/proto/errors.proto b/rust-lib/flowy-workspace/src/protobuf/proto/errors.proto index fce3de0b6a..81830b8638 100644 --- a/rust-lib/flowy-workspace/src/protobuf/proto/errors.proto +++ b/rust-lib/flowy-workspace/src/protobuf/proto/errors.proto @@ -17,6 +17,7 @@ enum ErrorCode { ViewDescInvalid = 23; ViewDataInvalid = 24; UserUnauthorized = 100; + WsConnectError = 200; InternalError = 1000; RecordNotFound = 1001; } diff --git a/rust-lib/flowy-ws/src/connect.rs b/rust-lib/flowy-ws/src/connect.rs index ebd644170b..a883c741bb 100644 --- a/rust-lib/flowy-ws/src/connect.rs +++ b/rust-lib/flowy-ws/src/connect.rs @@ -17,16 +17,16 @@ use tokio_tungstenite::{ }; #[pin_project] -pub struct WsConnection { +pub struct WsConnectionFuture { msg_tx: Option, ws_rx: Option, #[pin] fut: BoxFuture<'static, Result<(WebSocketStream>, Response), Error>>, } -impl WsConnection { +impl WsConnectionFuture { pub fn new(msg_tx: MsgSender, ws_rx: MsgReceiver, addr: String) -> Self { - WsConnection { + WsConnectionFuture { msg_tx: Some(msg_tx), ws_rx: Some(ws_rx), fut: Box::pin(async move { connect_async(&addr).await }), @@ -34,7 +34,7 @@ impl WsConnection { } } -impl Future for WsConnection { +impl Future for WsConnectionFuture { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // [[pin]] diff --git a/rust-lib/flowy-ws/src/errors.rs b/rust-lib/flowy-ws/src/errors.rs index f4333a04cd..77bb360cd1 100644 --- a/rust-lib/flowy-ws/src/errors.rs +++ b/rust-lib/flowy-ws/src/errors.rs @@ -36,7 +36,6 @@ impl WsError { } static_user_error!(internal, ErrorCode::InternalError); - static_user_error!(duplicate_source, ErrorCode::DuplicateSource); static_user_error!(unsupported_message, ErrorCode::UnsupportedMessage); static_user_error!(unauthorized, ErrorCode::Unauthorized); } @@ -44,9 +43,8 @@ impl WsError { #[derive(Debug, Clone, ProtoBuf_Enum, Display, PartialEq, Eq)] pub enum ErrorCode { InternalError = 0, - DuplicateSource = 1, - UnsupportedMessage = 2, - Unauthorized = 3, + UnsupportedMessage = 1, + Unauthorized = 2, } impl std::default::Default for ErrorCode { diff --git a/rust-lib/flowy-ws/src/protobuf/model/errors.rs b/rust-lib/flowy-ws/src/protobuf/model/errors.rs index 21bbb12060..7db7f53940 100644 --- a/rust-lib/flowy-ws/src/protobuf/model/errors.rs +++ b/rust-lib/flowy-ws/src/protobuf/model/errors.rs @@ -216,9 +216,8 @@ impl ::protobuf::reflect::ProtobufValue for WsError { #[derive(Clone,PartialEq,Eq,Debug,Hash)] pub enum ErrorCode { InternalError = 0, - DuplicateSource = 1, - UnsupportedMessage = 2, - Unauthorized = 3, + UnsupportedMessage = 1, + Unauthorized = 2, } impl ::protobuf::ProtobufEnum for ErrorCode { @@ -229,9 +228,8 @@ impl ::protobuf::ProtobufEnum for ErrorCode { fn from_i32(value: i32) -> ::std::option::Option { match value { 0 => ::std::option::Option::Some(ErrorCode::InternalError), - 1 => ::std::option::Option::Some(ErrorCode::DuplicateSource), - 2 => ::std::option::Option::Some(ErrorCode::UnsupportedMessage), - 3 => ::std::option::Option::Some(ErrorCode::Unauthorized), + 1 => ::std::option::Option::Some(ErrorCode::UnsupportedMessage), + 2 => ::std::option::Option::Some(ErrorCode::Unauthorized), _ => ::std::option::Option::None } } @@ -239,7 +237,6 @@ impl ::protobuf::ProtobufEnum for ErrorCode { fn values() -> &'static [Self] { static values: &'static [ErrorCode] = &[ ErrorCode::InternalError, - ErrorCode::DuplicateSource, ErrorCode::UnsupportedMessage, ErrorCode::Unauthorized, ]; @@ -271,26 +268,24 @@ impl ::protobuf::reflect::ProtobufValue for ErrorCode { static file_descriptor_proto_data: &'static [u8] = b"\ \n\x0cerrors.proto\";\n\x07WsError\x12\x1e\n\x04code\x18\x01\x20\x01(\ - \x0e2\n.ErrorCodeR\x04code\x12\x10\n\x03msg\x18\x02\x20\x01(\tR\x03msg*]\ - \n\tErrorCode\x12\x11\n\rInternalError\x10\0\x12\x13\n\x0fDuplicateSourc\ - e\x10\x01\x12\x16\n\x12UnsupportedMessage\x10\x02\x12\x10\n\x0cUnauthori\ - zed\x10\x03J\xd4\x02\n\x06\x12\x04\0\0\x0b\x01\n\x08\n\x01\x0c\x12\x03\0\ - \0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x05\x01\n\n\n\x03\x04\0\x01\x12\x03\ - \x02\x08\x0f\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x17\n\x0c\n\x05\x04\ - \0\x02\0\x06\x12\x03\x03\x04\r\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\ - \x0e\x12\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x15\x16\n\x0b\n\x04\x04\ - \0\x02\x01\x12\x03\x04\x04\x13\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\ - \x04\n\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\x0b\x0e\n\x0c\n\x05\x04\ - \0\x02\x01\x03\x12\x03\x04\x11\x12\n\n\n\x02\x05\0\x12\x04\x06\0\x0b\x01\ - \n\n\n\x03\x05\0\x01\x12\x03\x06\x05\x0e\n\x0b\n\x04\x05\0\x02\0\x12\x03\ - \x07\x04\x16\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x07\x04\x11\n\x0c\n\x05\ - \x05\0\x02\0\x02\x12\x03\x07\x14\x15\n\x0b\n\x04\x05\0\x02\x01\x12\x03\ - \x08\x04\x18\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x08\x04\x13\n\x0c\n\ - \x05\x05\0\x02\x01\x02\x12\x03\x08\x16\x17\n\x0b\n\x04\x05\0\x02\x02\x12\ - \x03\t\x04\x1b\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\t\x04\x16\n\x0c\n\ - \x05\x05\0\x02\x02\x02\x12\x03\t\x19\x1a\n\x0b\n\x04\x05\0\x02\x03\x12\ - \x03\n\x04\x15\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\n\x04\x10\n\x0c\n\ - \x05\x05\0\x02\x03\x02\x12\x03\n\x13\x14b\x06proto3\ + \x0e2\n.ErrorCodeR\x04code\x12\x10\n\x03msg\x18\x02\x20\x01(\tR\x03msg*H\ + \n\tErrorCode\x12\x11\n\rInternalError\x10\0\x12\x16\n\x12UnsupportedMes\ + sage\x10\x01\x12\x10\n\x0cUnauthorized\x10\x02J\xab\x02\n\x06\x12\x04\0\ + \0\n\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\ + \x05\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x0f\n\x0b\n\x04\x04\0\x02\0\ + \x12\x03\x03\x04\x17\n\x0c\n\x05\x04\0\x02\0\x06\x12\x03\x03\x04\r\n\x0c\ + \n\x05\x04\0\x02\0\x01\x12\x03\x03\x0e\x12\n\x0c\n\x05\x04\0\x02\0\x03\ + \x12\x03\x03\x15\x16\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x13\n\x0c\ + \n\x05\x04\0\x02\x01\x05\x12\x03\x04\x04\n\n\x0c\n\x05\x04\0\x02\x01\x01\ + \x12\x03\x04\x0b\x0e\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x11\x12\n\ + \n\n\x02\x05\0\x12\x04\x06\0\n\x01\n\n\n\x03\x05\0\x01\x12\x03\x06\x05\ + \x0e\n\x0b\n\x04\x05\0\x02\0\x12\x03\x07\x04\x16\n\x0c\n\x05\x05\0\x02\0\ + \x01\x12\x03\x07\x04\x11\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x07\x14\x15\ + \n\x0b\n\x04\x05\0\x02\x01\x12\x03\x08\x04\x1b\n\x0c\n\x05\x05\0\x02\x01\ + \x01\x12\x03\x08\x04\x16\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x08\x19\ + \x1a\n\x0b\n\x04\x05\0\x02\x02\x12\x03\t\x04\x15\n\x0c\n\x05\x05\0\x02\ + \x02\x01\x12\x03\t\x04\x10\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\t\x13\ + \x14b\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/rust-lib/flowy-ws/src/protobuf/proto/errors.proto b/rust-lib/flowy-ws/src/protobuf/proto/errors.proto index 8e7fff01a9..cbb330a6e8 100644 --- a/rust-lib/flowy-ws/src/protobuf/proto/errors.proto +++ b/rust-lib/flowy-ws/src/protobuf/proto/errors.proto @@ -6,7 +6,6 @@ message WsError { } enum ErrorCode { InternalError = 0; - DuplicateSource = 1; - UnsupportedMessage = 2; - Unauthorized = 3; + UnsupportedMessage = 1; + Unauthorized = 2; } diff --git a/rust-lib/flowy-ws/src/ws.rs b/rust-lib/flowy-ws/src/ws.rs index 3794cea199..1b2d2a0d11 100644 --- a/rust-lib/flowy-ws/src/ws.rs +++ b/rust-lib/flowy-ws/src/ws.rs @@ -1,5 +1,5 @@ use crate::{ - connect::{Retry, WsConnection}, + connect::{Retry, WsConnectionFuture}, errors::WsError, WsMessage, }; @@ -82,7 +82,7 @@ impl WsController { pub fn add_handler(&mut self, handler: Arc) -> Result<(), WsError> { let source = handler.source(); if self.handlers.contains_key(&source) { - return Err(WsError::duplicate_source()); + log::error!("{} source is already registered", source); } self.handlers.insert(source, handler); Ok(()) @@ -97,6 +97,13 @@ impl WsController { self._connect(addr, Some(Box::pin(async { retry.await }))) } + pub fn get_sender(&self) -> Result, WsError> { + match &self.sender { + None => Err(WsError::internal().context("WsSender is not initialized")), + Some(sender) => Ok(sender.clone()), + } + } + fn _connect(&mut self, addr: String, retry: Option>) -> Result, ServerError> { log::debug!("🐴 ws connect: {}", &addr); let (connection, handlers) = self.make_connect(addr.clone()); @@ -131,7 +138,7 @@ impl WsController { })) } - fn make_connect(&mut self, addr: String) -> (WsConnection, WsHandlers) { + fn make_connect(&mut self, addr: String) -> (WsConnectionFuture, WsHandlerFuture) { // Stream User // ┌───────────────┐ ┌──────────────┐ // ┌──────┐ │ ┌─────────┐ │ ┌────────┐ │ ┌────────┐ │ @@ -147,22 +154,22 @@ impl WsController { let handlers = self.handlers.clone(); self.sender = Some(Arc::new(WsSender { ws_tx })); self.addr = Some(addr.clone()); - (WsConnection::new(msg_tx, ws_rx, addr), WsHandlers::new(handlers, msg_rx)) + (WsConnectionFuture::new(msg_tx, ws_rx, addr), WsHandlerFuture::new(handlers, msg_rx)) } } #[pin_project] -pub struct WsHandlers { +pub struct WsHandlerFuture { #[pin] msg_rx: MsgReceiver, handlers: HashMap>, } -impl WsHandlers { +impl WsHandlerFuture { fn new(handlers: HashMap>, msg_rx: MsgReceiver) -> Self { Self { msg_rx, handlers } } } -impl Future for WsHandlers { +impl Future for WsHandlerFuture { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop {