intergate ws into flowy document

This commit is contained in:
appflowy 2021-09-20 15:38:55 +08:00
parent 07b4113dc1
commit 749b043a99
33 changed files with 377 additions and 207 deletions

View File

@ -12,12 +12,14 @@ import 'package:protobuf/protobuf.dart' as $pb;
class ErrorCode extends $pb.ProtobufEnum {
static const ErrorCode DocIdInvalid = ErrorCode._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'DocIdInvalid');
static const ErrorCode DocNotfound = ErrorCode._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'DocNotfound');
static const ErrorCode WsConnectError = ErrorCode._(10, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'WsConnectError');
static const ErrorCode UserUnauthorized = ErrorCode._(999, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UserUnauthorized');
static const ErrorCode InternalError = ErrorCode._(1000, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'InternalError');
static const $core.List<ErrorCode> values = <ErrorCode> [
DocIdInvalid,
DocNotfound,
WsConnectError,
UserUnauthorized,
InternalError,
];

View File

@ -14,13 +14,14 @@ const ErrorCode$json = const {
'2': const [
const {'1': 'DocIdInvalid', '2': 0},
const {'1': 'DocNotfound', '2': 1},
const {'1': 'WsConnectError', '2': 10},
const {'1': 'UserUnauthorized', '2': 999},
const {'1': 'InternalError', '2': 1000},
],
};
/// Descriptor for `ErrorCode`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List errorCodeDescriptor = $convert.base64Decode('CglFcnJvckNvZGUSEAoMRG9jSWRJbnZhbGlkEAASDwoLRG9jTm90Zm91bmQQARIVChBVc2VyVW5hdXRob3JpemVkEOcHEhIKDUludGVybmFsRXJyb3IQ6Ac=');
final $typed_data.Uint8List errorCodeDescriptor = $convert.base64Decode('CglFcnJvckNvZGUSEAoMRG9jSWRJbnZhbGlkEAASDwoLRG9jTm90Zm91bmQQARISCg5Xc0Nvbm5lY3RFcnJvchAKEhUKEFVzZXJVbmF1dGhvcml6ZWQQ5wcSEgoNSW50ZXJuYWxFcnJvchDoBw==');
@$core.Deprecated('Use docErrorDescriptor instead')
const DocError$json = const {
'1': 'DocError',

View File

@ -22,6 +22,7 @@ class ErrorCode extends $pb.ProtobufEnum {
static const ErrorCode ViewDescInvalid = ErrorCode._(23, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'ViewDescInvalid');
static const ErrorCode ViewDataInvalid = ErrorCode._(24, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'ViewDataInvalid');
static const ErrorCode UserUnauthorized = ErrorCode._(100, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UserUnauthorized');
static const ErrorCode WsConnectError = ErrorCode._(200, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'WsConnectError');
static const ErrorCode InternalError = ErrorCode._(1000, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'InternalError');
static const ErrorCode RecordNotFound = ErrorCode._(1001, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'RecordNotFound');
@ -38,6 +39,7 @@ class ErrorCode extends $pb.ProtobufEnum {
ViewDescInvalid,
ViewDataInvalid,
UserUnauthorized,
WsConnectError,
InternalError,
RecordNotFound,
];

View File

@ -24,13 +24,14 @@ const ErrorCode$json = const {
const {'1': 'ViewDescInvalid', '2': 23},
const {'1': 'ViewDataInvalid', '2': 24},
const {'1': 'UserUnauthorized', '2': 100},
const {'1': 'WsConnectError', '2': 200},
const {'1': 'InternalError', '2': 1000},
const {'1': 'RecordNotFound', '2': 1001},
],
};
/// Descriptor for `ErrorCode`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List errorCodeDescriptor = $convert.base64Decode('CglFcnJvckNvZGUSGAoUV29ya3NwYWNlTmFtZUludmFsaWQQABIWChJXb3Jrc3BhY2VJZEludmFsaWQQARIYChRBcHBDb2xvclN0eWxlSW52YWxpZBACEhgKFFdvcmtzcGFjZURlc2NJbnZhbGlkEAMSEAoMQXBwSWRJbnZhbGlkEAoSEgoOQXBwTmFtZUludmFsaWQQCxITCg9WaWV3TmFtZUludmFsaWQQFBIYChRWaWV3VGh1bWJuYWlsSW52YWxpZBAVEhEKDVZpZXdJZEludmFsaWQQFhITCg9WaWV3RGVzY0ludmFsaWQQFxITCg9WaWV3RGF0YUludmFsaWQQGBIUChBVc2VyVW5hdXRob3JpemVkEGQSEgoNSW50ZXJuYWxFcnJvchDoBxITCg5SZWNvcmROb3RGb3VuZBDpBw==');
final $typed_data.Uint8List errorCodeDescriptor = $convert.base64Decode('CglFcnJvckNvZGUSGAoUV29ya3NwYWNlTmFtZUludmFsaWQQABIWChJXb3Jrc3BhY2VJZEludmFsaWQQARIYChRBcHBDb2xvclN0eWxlSW52YWxpZBACEhgKFFdvcmtzcGFjZURlc2NJbnZhbGlkEAMSEAoMQXBwSWRJbnZhbGlkEAoSEgoOQXBwTmFtZUludmFsaWQQCxITCg9WaWV3TmFtZUludmFsaWQQFBIYChRWaWV3VGh1bWJuYWlsSW52YWxpZBAVEhEKDVZpZXdJZEludmFsaWQQFhITCg9WaWV3RGVzY0ludmFsaWQQFxITCg9WaWV3RGF0YUludmFsaWQQGBIUChBVc2VyVW5hdXRob3JpemVkEGQSEwoOV3NDb25uZWN0RXJyb3IQyAESEgoNSW50ZXJuYWxFcnJvchDoBxITCg5SZWNvcmROb3RGb3VuZBDpBw==');
@$core.Deprecated('Use workspaceErrorDescriptor instead')
const WorkspaceError$json = const {
'1': 'WorkspaceError',

View File

@ -11,13 +11,11 @@ import 'package:protobuf/protobuf.dart' as $pb;
class ErrorCode extends $pb.ProtobufEnum {
static const ErrorCode InternalError = ErrorCode._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'InternalError');
static const ErrorCode DuplicateSource = ErrorCode._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'DuplicateSource');
static const ErrorCode UnsupportedMessage = ErrorCode._(2, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UnsupportedMessage');
static const ErrorCode Unauthorized = ErrorCode._(3, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Unauthorized');
static const ErrorCode UnsupportedMessage = ErrorCode._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UnsupportedMessage');
static const ErrorCode Unauthorized = ErrorCode._(2, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Unauthorized');
static const $core.List<ErrorCode> values = <ErrorCode> [
InternalError,
DuplicateSource,
UnsupportedMessage,
Unauthorized,
];

View File

@ -13,14 +13,13 @@ const ErrorCode$json = const {
'1': 'ErrorCode',
'2': const [
const {'1': 'InternalError', '2': 0},
const {'1': 'DuplicateSource', '2': 1},
const {'1': 'UnsupportedMessage', '2': 2},
const {'1': 'Unauthorized', '2': 3},
const {'1': 'UnsupportedMessage', '2': 1},
const {'1': 'Unauthorized', '2': 2},
],
};
/// Descriptor for `ErrorCode`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List errorCodeDescriptor = $convert.base64Decode('CglFcnJvckNvZGUSEQoNSW50ZXJuYWxFcnJvchAAEhMKD0R1cGxpY2F0ZVNvdXJjZRABEhYKElVuc3VwcG9ydGVkTWVzc2FnZRACEhAKDFVuYXV0aG9yaXplZBAD');
final $typed_data.Uint8List errorCodeDescriptor = $convert.base64Decode('CglFcnJvckNvZGUSEQoNSW50ZXJuYWxFcnJvchAAEhYKElVuc3VwcG9ydGVkTWVzc2FnZRABEhAKDFVuYXV0aG9yaXplZBAC');
@$core.Deprecated('Use wsErrorDescriptor instead')
const WsError$json = const {
'1': 'WsError',

View File

@ -1,4 +1,4 @@
use crate::service::ws_service::{entities::SessionId, WSClient, WSServer};
use crate::service::ws_service::{WSClient, WSServer};
use actix::Addr;
use crate::service::user_service::LoggedUser;

View File

@ -75,7 +75,7 @@ impl Actor for WSClient {
self.server
.send(connect)
.into_actor(self)
.then(|res, client, _ctx| {
.then(|res, _client, _ctx| {
match res {
Ok(Ok(_)) => log::trace!("Send connect message to server success"),
Ok(Err(e)) => log::error!("Send connect message to server failed: {:?}", e),

View File

@ -29,6 +29,7 @@ bytes = { version = "1.0" }
strum = "0.21"
strum_macros = "0.21"
dashmap = "4.0"
parking_lot = "0.11"
[dev-dependencies]
flowy-test = { path = "../flowy-test" }

View File

@ -40,6 +40,7 @@ impl DocError {
static_doc_error!(internal, ErrorCode::InternalError);
static_doc_error!(not_found, ErrorCode::DocNotfound);
static_doc_error!(unauthorized, ErrorCode::UserUnauthorized);
static_doc_error!(ws, ErrorCode::WsConnectError);
}
pub fn internal_error<T>(e: T) -> DocError
@ -57,6 +58,9 @@ pub enum ErrorCode {
#[display(fmt = "DocNotfound")]
DocNotfound = 1,
#[display(fmt = "Document websocket error")]
WsConnectError = 10,
#[display(fmt = "UserUnauthorized")]
UserUnauthorized = 999,
@ -81,6 +85,10 @@ impl std::convert::From<flowy_ot::errors::OTError> for DocError {
fn from(error: flowy_ot::errors::OTError) -> Self { DocError::internal().context(error) }
}
impl std::convert::From<std::io::Error> for DocError {
fn from(error: std::io::Error) -> Self { DocError::internal().context(error) }
}
// impl std::convert::From<::r2d2::Error> for DocError {
// fn from(error: r2d2::Error) -> Self {
// ErrorBuilder::new(ErrorCode::InternalError).error(error).build() } }

View File

@ -10,5 +10,8 @@ mod sql_tables;
extern crate flowy_database;
pub mod prelude {
pub use crate::{module::*, services::server::*};
pub use crate::{
module::*,
services::{server::*, ws_document::*},
};
}

View File

@ -1,35 +1,38 @@
use crate::{
errors::DocError,
services::{doc_cache::DocCache, server::construct_doc_server},
services::{doc_cache::OpenedDocumentCache, server::construct_doc_server},
};
use crate::{
entities::doc::{ApplyChangesetParams, CreateDocParams, Doc, QueryDocParams, SaveDocParams},
services::doc_controller::DocController,
errors::internal_error,
services::{doc_controller::DocController, ws_document::WsDocument},
};
use diesel::SqliteConnection;
use flowy_database::ConnectionPool;
use crate::errors::internal_error;
use std::sync::Arc;
use parking_lot::RwLock;
use std::{sync::Arc};
pub trait DocumentUser: Send + Sync {
fn user_doc_dir(&self) -> Result<String, DocError>;
fn user_dir(&self) -> Result<String, DocError>;
fn user_id(&self) -> Result<String, DocError>;
fn token(&self) -> Result<String, DocError>;
}
pub struct FlowyDocument {
controller: Arc<DocController>,
cache: Arc<DocCache>,
ws: Arc<RwLock<WsDocument>>,
cache: Arc<OpenedDocumentCache>,
}
impl FlowyDocument {
pub fn new(user: Arc<dyn DocumentUser>) -> FlowyDocument {
pub fn new(user: Arc<dyn DocumentUser>, ws: Arc<RwLock<WsDocument>>) -> FlowyDocument {
let server = construct_doc_server();
let cache = Arc::new(DocCache::new());
let cache = Arc::new(OpenedDocumentCache::new());
let controller = Arc::new(DocController::new(server.clone(), user.clone()));
Self { controller, cache }
Self { controller, cache, ws }
}
pub fn create(&self, params: CreateDocParams, conn: &SqliteConnection) -> Result<(), DocError> {

View File

@ -217,6 +217,7 @@ impl ::protobuf::reflect::ProtobufValue for DocError {
pub enum ErrorCode {
DocIdInvalid = 0,
DocNotfound = 1,
WsConnectError = 10,
UserUnauthorized = 999,
InternalError = 1000,
}
@ -230,6 +231,7 @@ impl ::protobuf::ProtobufEnum for ErrorCode {
match value {
0 => ::std::option::Option::Some(ErrorCode::DocIdInvalid),
1 => ::std::option::Option::Some(ErrorCode::DocNotfound),
10 => ::std::option::Option::Some(ErrorCode::WsConnectError),
999 => ::std::option::Option::Some(ErrorCode::UserUnauthorized),
1000 => ::std::option::Option::Some(ErrorCode::InternalError),
_ => ::std::option::Option::None
@ -240,6 +242,7 @@ impl ::protobuf::ProtobufEnum for ErrorCode {
static values: &'static [ErrorCode] = &[
ErrorCode::DocIdInvalid,
ErrorCode::DocNotfound,
ErrorCode::WsConnectError,
ErrorCode::UserUnauthorized,
ErrorCode::InternalError,
];
@ -271,26 +274,29 @@ impl ::protobuf::reflect::ProtobufValue for ErrorCode {
static file_descriptor_proto_data: &'static [u8] = b"\
\n\x0cerrors.proto\"<\n\x08DocError\x12\x1e\n\x04code\x18\x01\x20\x01(\
\x0e2\n.ErrorCodeR\x04code\x12\x10\n\x03msg\x18\x02\x20\x01(\tR\x03msg*Y\
\x0e2\n.ErrorCodeR\x04code\x12\x10\n\x03msg\x18\x02\x20\x01(\tR\x03msg*m\
\n\tErrorCode\x12\x10\n\x0cDocIdInvalid\x10\0\x12\x0f\n\x0bDocNotfound\
\x10\x01\x12\x15\n\x10UserUnauthorized\x10\xe7\x07\x12\x12\n\rInternalEr\
ror\x10\xe8\x07J\xd4\x02\n\x06\x12\x04\0\0\x0b\x01\n\x08\n\x01\x0c\x12\
\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x05\x01\n\n\n\x03\x04\0\x01\
\x12\x03\x02\x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x17\n\x0c\n\
\x05\x04\0\x02\0\x06\x12\x03\x03\x04\r\n\x0c\n\x05\x04\0\x02\0\x01\x12\
\x03\x03\x0e\x12\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x15\x16\n\x0b\n\
\x04\x04\0\x02\x01\x12\x03\x04\x04\x13\n\x0c\n\x05\x04\0\x02\x01\x05\x12\
\x03\x04\x04\n\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\x0b\x0e\n\x0c\n\
\x05\x04\0\x02\x01\x03\x12\x03\x04\x11\x12\n\n\n\x02\x05\0\x12\x04\x06\0\
\x0b\x01\n\n\n\x03\x05\0\x01\x12\x03\x06\x05\x0e\n\x0b\n\x04\x05\0\x02\0\
\x12\x03\x07\x04\x15\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x07\x04\x10\n\
\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x07\x13\x14\n\x0b\n\x04\x05\0\x02\x01\
\x12\x03\x08\x04\x14\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x08\x04\x0f\n\
\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x08\x12\x13\n\x0b\n\x04\x05\0\x02\
\x02\x12\x03\t\x04\x1b\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\t\x04\x14\n\
\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\t\x17\x1a\n\x0b\n\x04\x05\0\x02\x03\
\x12\x03\n\x04\x19\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\n\x04\x11\n\x0c\
\n\x05\x05\0\x02\x03\x02\x12\x03\n\x14\x18b\x06proto3\
\x10\x01\x12\x12\n\x0eWsConnectError\x10\n\x12\x15\n\x10UserUnauthorized\
\x10\xe7\x07\x12\x12\n\rInternalError\x10\xe8\x07J\xfd\x02\n\x06\x12\x04\
\0\0\x0c\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\
\0\x05\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x10\n\x0b\n\x04\x04\0\x02\
\0\x12\x03\x03\x04\x17\n\x0c\n\x05\x04\0\x02\0\x06\x12\x03\x03\x04\r\n\
\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\x0e\x12\n\x0c\n\x05\x04\0\x02\0\
\x03\x12\x03\x03\x15\x16\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x13\n\
\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\x04\n\n\x0c\n\x05\x04\0\x02\x01\
\x01\x12\x03\x04\x0b\x0e\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x11\
\x12\n\n\n\x02\x05\0\x12\x04\x06\0\x0c\x01\n\n\n\x03\x05\0\x01\x12\x03\
\x06\x05\x0e\n\x0b\n\x04\x05\0\x02\0\x12\x03\x07\x04\x15\n\x0c\n\x05\x05\
\0\x02\0\x01\x12\x03\x07\x04\x10\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x07\
\x13\x14\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x08\x04\x14\n\x0c\n\x05\x05\0\
\x02\x01\x01\x12\x03\x08\x04\x0f\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\
\x08\x12\x13\n\x0b\n\x04\x05\0\x02\x02\x12\x03\t\x04\x18\n\x0c\n\x05\x05\
\0\x02\x02\x01\x12\x03\t\x04\x12\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\t\
\x15\x17\n\x0b\n\x04\x05\0\x02\x03\x12\x03\n\x04\x1b\n\x0c\n\x05\x05\0\
\x02\x03\x01\x12\x03\n\x04\x14\n\x0c\n\x05\x05\0\x02\x03\x02\x12\x03\n\
\x17\x1a\n\x0b\n\x04\x05\0\x02\x04\x12\x03\x0b\x04\x19\n\x0c\n\x05\x05\0\
\x02\x04\x01\x12\x03\x0b\x04\x11\n\x0c\n\x05\x05\0\x02\x04\x02\x12\x03\
\x0b\x14\x18b\x06proto3\
";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

View File

@ -7,6 +7,7 @@ message DocError {
enum ErrorCode {
DocIdInvalid = 0;
DocNotfound = 1;
WsConnectError = 10;
UserUnauthorized = 999;
InternalError = 1000;
}

View File

@ -18,11 +18,11 @@ where
fn from(s: T) -> Self { DocId(s.to_string()) }
}
pub(crate) struct DocCache {
pub(crate) struct OpenedDocumentCache {
inner: DashMap<DocId, RwLock<OpenDocument>>,
}
impl DocCache {
impl OpenedDocumentCache {
pub(crate) fn new() -> Self { Self { inner: DashMap::new() } }
pub(crate) fn open<T, D>(&self, id: T, data: D) -> Result<(), DocError>

View File

@ -1,3 +1,4 @@
pub(crate) mod doc_cache;
pub(crate) mod doc_controller;
pub mod server;
pub mod ws_document;

View File

@ -0,0 +1,53 @@
use crate::errors::DocError;
use bytes::Bytes;
use lazy_static::lazy_static;
use std::{convert::TryFrom, sync::Arc};
pub struct WsDocumentMessage(pub Bytes);
pub trait WsSender: Send + Sync {
fn send_msg(&self, msg: WsDocumentMessage) -> Result<(), DocError>;
}
lazy_static! {
pub static ref WS_ID: String = "Document".to_string();
}
pub struct WsDocument {
sender: Arc<dyn WsSender>,
}
impl WsDocument {
pub fn new(sender: Arc<dyn WsSender>) -> Self { Self { sender } }
pub fn receive_msg(&self, _msg: WsDocumentMessage) { unimplemented!() }
pub fn send_msg(&self, _msg: WsDocumentMessage) { unimplemented!() }
}
pub enum WsSource {
Delta,
}
impl AsRef<str> for WsSource {
fn as_ref(&self) -> &str {
match self {
WsSource::Delta => "delta",
}
}
}
impl ToString for WsSource {
fn to_string(&self) -> String {
match self {
WsSource::Delta => self.as_ref().to_string(),
}
}
}
impl TryFrom<String> for WsSource {
type Error = DocError;
fn try_from(value: String) -> Result<Self, Self::Error> {
match value.as_str() {
"delta" => Ok(WsSource::Delta),
_ => Err(DocError::internal().context(format!("Deserialize WsSource failed. Unknown type: {}", &value))),
}
}
}

View File

@ -13,12 +13,14 @@ flowy-infra = { path = "../flowy-infra" }
flowy-workspace = { path = "../flowy-workspace" }
flowy-database = { path = "../flowy-database" }
flowy-document = { path = "../flowy-document" }
flowy-ws = { path = "../flowy-ws" }
tracing = { version = "0.1" }
log = "0.4.14"
futures-core = { version = "0.3", default-features = false }
color-eyre = { version = "0.5", default-features = false }
bytes = "1.0"
tokio = { version = "1", features = ["rt"] }
parking_lot = "0.11"
[dev-dependencies]
serde = { version = "1.0", features = ["derive"] }

View File

@ -0,0 +1,89 @@
use bytes::Bytes;
use flowy_document::{
errors::DocError,
module::DocumentUser,
prelude::{WsDocument, WsDocumentMessage, WsSender, WS_ID},
};
use flowy_user::services::user::UserSession;
use flowy_ws::{WsMessage, WsMessageHandler};
use parking_lot::RwLock;
use std::{path::Path, sync::Arc};
pub struct DocumentDepsResolver {
user_session: Arc<UserSession>,
}
impl DocumentDepsResolver {
pub fn new(user_session: Arc<UserSession>) -> Self { Self { user_session } }
pub fn split_into(self) -> (Arc<dyn DocumentUser>, Arc<RwLock<WsDocument>>) {
let user = Arc::new(DocumentUserImpl {
user: self.user_session.clone(),
});
let sender = Arc::new(WsSenderImpl {
user: self.user_session.clone(),
});
let ws = Arc::new(RwLock::new(WsDocument::new(sender.clone())));
let ws_handler = Arc::new(WsDocumentResolver {
user: self.user_session.clone(),
inner: ws.clone(),
});
self.user_session.add_ws_handler(ws_handler);
(user, ws)
}
}
struct DocumentUserImpl {
user: Arc<UserSession>,
}
impl DocumentUser for DocumentUserImpl {
fn user_dir(&self) -> Result<String, DocError> {
let dir = self.user.user_dir().map_err(|e| DocError::unauthorized().context(e))?;
let doc_dir = format!("{}/doc", dir);
if !Path::new(&doc_dir).exists() {
let _ = std::fs::create_dir_all(&doc_dir)?;
}
Ok(doc_dir)
}
fn user_id(&self) -> Result<String, DocError> { self.user.user_id().map_err(|e| DocError::internal().context(e)) }
fn token(&self) -> Result<String, DocError> { self.user.token().map_err(|e| DocError::internal().context(e)) }
}
struct WsSenderImpl {
user: Arc<UserSession>,
}
impl WsSender for WsSenderImpl {
fn send_msg(&self, msg: WsDocumentMessage) -> Result<(), DocError> {
let msg = WsMessage {
source: WS_ID.clone(),
data: msg.0.to_vec(),
};
let _ = self.user.send_ws_msg(msg).map_err(|e| DocError::internal().context(e))?;
Ok(())
}
}
struct WsDocumentResolver {
user: Arc<UserSession>,
inner: Arc<RwLock<WsDocument>>,
}
impl WsMessageHandler for WsDocumentResolver {
fn source(&self) -> String { WS_ID.clone() }
fn receive_message(&self, msg: WsMessage) {
let msg = WsDocumentMessage(Bytes::from(msg.data));
self.inner.read().receive_msg(msg);
}
}

View File

@ -1,25 +0,0 @@
use flowy_document::{errors::DocError, module::DocumentUser};
use flowy_user::services::user::UserSession;
use std::{path::Path, sync::Arc};
pub struct EditorUserImpl {
pub(crate) user_session: Arc<UserSession>,
}
impl DocumentUser for EditorUserImpl {
fn user_doc_dir(&self) -> Result<String, DocError> {
let dir = self.user_session.user_dir().map_err(|e| DocError::unauthorized().context(e))?;
let doc_dir = format!("{}/doc", dir);
if !Path::new(&doc_dir).exists() {
// TODO: Make sure to unwrap? 😁
std::fs::create_dir_all(&doc_dir).unwrap();
}
Ok(doc_dir)
}
fn user_id(&self) -> Result<String, DocError> { self.user_session.user_id().map_err(|e| DocError::internal().context(e)) }
fn token(&self) -> Result<String, DocError> { self.user_session.token().map_err(|e| DocError::internal().context(e)) }
}

View File

@ -1,5 +1,5 @@
mod editor_deps_impl;
mod workspace_deps_impl;
mod document_deps;
mod workspace_deps;
pub use editor_deps_impl::*;
pub use workspace_deps_impl::*;
pub use document_deps::*;
pub use workspace_deps::*;

View File

@ -1,27 +1,43 @@
use flowy_database::ConnectionPool;
use flowy_user::services::user::UserSession;
use flowy_workspace::{
errors::WorkspaceError,
module::{WorkspaceDatabase, WorkspaceUser},
};
use std::sync::Arc;
pub struct WorkspaceUserImpl {
pub struct WorkspaceDepsResolver {
inner: Arc<Resolver>,
}
struct Resolver {
pub(crate) user_session: Arc<UserSession>,
}
impl WorkspaceUser for WorkspaceUserImpl {
fn user_id(&self) -> Result<String, WorkspaceError> { self.user_session.user_id().map_err(|e| WorkspaceError::internal().context(e)) }
impl WorkspaceDepsResolver {
pub fn new(user_session: Arc<UserSession>) -> Self {
Self {
inner: Arc::new(Resolver { user_session }),
}
}
fn token(&self) -> Result<String, WorkspaceError> { self.user_session.token().map_err(|e| WorkspaceError::internal().context(e)) }
pub fn split_into(self) -> (Arc<dyn WorkspaceUser>, Arc<dyn WorkspaceDatabase>) {
let user: Arc<dyn WorkspaceUser> = self.inner.clone();
let database: Arc<dyn WorkspaceDatabase> = self.inner.clone();
(user, database)
}
}
pub struct WorkspaceDatabaseImpl {
pub(crate) user_session: Arc<UserSession>,
}
impl WorkspaceDatabase for WorkspaceDatabaseImpl {
impl WorkspaceDatabase for Resolver {
fn db_pool(&self) -> Result<Arc<ConnectionPool>, WorkspaceError> {
self.user_session.db_pool().map_err(|e| WorkspaceError::internal().context(e))
}
}
impl WorkspaceUser for Resolver {
fn user_id(&self) -> Result<String, WorkspaceError> { self.user_session.user_id().map_err(|e| WorkspaceError::internal().context(e)) }
fn token(&self) -> Result<String, WorkspaceError> { self.user_session.token().map_err(|e| WorkspaceError::internal().context(e)) }
}

View File

@ -1,8 +1,9 @@
use flowy_dispatch::prelude::Module;
use crate::deps_resolve::{EditorUserImpl, WorkspaceDatabaseImpl, WorkspaceUserImpl};
use crate::deps_resolve::{DocumentDepsResolver, WorkspaceDepsResolver};
use flowy_document::module::FlowyDocument;
use flowy_user::services::user::UserSessionBuilder;
use flowy_user::services::user::{UserSession, UserSessionBuilder};
use std::sync::Arc;
pub struct ModuleConfig {
@ -10,28 +11,23 @@ pub struct ModuleConfig {
}
pub fn build_modules(config: ModuleConfig) -> Vec<Module> {
// runtime.spawn(async move {
// start_ws_connection("eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.
// eyJpc3MiOiJsb2NhbGhvc3QiLCJzdWIiOiJhdXRoIiwiaWF0IjoxNjMxNzcwODQ2LCJleHAiOjE2MzIyMDI4NDYsInVzZXJfaWQiOiI5ZmFiN2I4MS1mZDAyLTRhN2EtYjA4Zi05NDM3NTdmZmE5MDcifQ.
// UzV01tHnWEZWBp3nJPTmFi7ypxBoCe56AjEPb9bnsFE") });
let user_session = Arc::new(UserSessionBuilder::new().root_dir(&config.root).build());
let workspace_user_impl = Arc::new(WorkspaceUserImpl {
user_session: user_session.clone(),
});
let workspace_db = Arc::new(WorkspaceDatabaseImpl {
user_session: user_session.clone(),
});
let editor_user = Arc::new(EditorUserImpl {
user_session: user_session.clone(),
});
let document = Arc::new(FlowyDocument::new(editor_user));
vec![
flowy_user::module::create(user_session),
flowy_workspace::module::create(workspace_user_impl, workspace_db, document),
]
vec![build_user_module(user_session.clone()), build_workspace_module(user_session)]
}
fn build_user_module(user_session: Arc<UserSession>) -> Module { flowy_user::module::create(user_session.clone()) }
fn build_workspace_module(user_session: Arc<UserSession>) -> Module {
let workspace_deps = WorkspaceDepsResolver::new(user_session.clone());
let (user, database) = workspace_deps.split_into();
let document = build_document_module(user_session.clone());
flowy_workspace::module::create(user, database, document)
}
fn build_document_module(user_session: Arc<UserSession>) -> Arc<FlowyDocument> {
let document_deps = DocumentDepsResolver::new(user_session.clone());
let (user, ws) = document_deps.split_into();
let document = Arc::new(FlowyDocument::new(user, ws));
document
}

View File

@ -18,10 +18,10 @@ use flowy_database::{
};
use flowy_infra::kv::KV;
use flowy_sqlite::ConnectionPool;
use flowy_ws::{connect::Retry, WsController, WsMessageHandler};
use flowy_ws::{connect::Retry, WsController, WsMessage, WsMessageHandler, WsSender};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
pub struct UserSessionConfig {
root_dir: String,
@ -173,21 +173,23 @@ impl UserSession {
pub fn token(&self) -> Result<String, UserError> { Ok(self.get_session()?.token) }
pub fn add_ws_msg_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), UserError> {
let _ = self.ws_controller.write().add_handler(handler)?;
Ok(())
pub fn add_ws_handler(&self, handler: Arc<dyn WsMessageHandler>) { let _ = self.ws_controller.write().add_handler(handler); }
pub fn get_ws_sender(&self) -> Result<Arc<WsSender>, UserError> {
match self.ws_controller.try_read_for(Duration::from_millis(300)) {
None => Err(UserError::internal().context("Send ws message timeout")),
Some(guard) => {
let sender = guard.get_sender()?;
Ok(sender)
},
}
}
// pub fn send_ws_msg<T: Into<WsMessage>>(&self, msg: T) -> Result<(),
// UserError> { match self.ws_controller.try_read_for(Duration::
// from_millis(300)) { None =>
// Err(UserError::internal().context("Send ws message timeout")),
// Some(guard) => {
// let _ = guard.send_msg(msg)?;
// Ok(())
// },
// }
// }
pub fn send_ws_msg<T: Into<WsMessage>>(&self, msg: T) -> Result<(), UserError> {
let sender = self.get_ws_sender()?;
let _ = sender.send_msg(msg)?;
Ok(())
}
}
impl UserSession {

View File

@ -44,6 +44,7 @@ impl WorkspaceError {
static_workspace_error!(unauthorized, ErrorCode::UserUnauthorized);
static_workspace_error!(internal, ErrorCode::InternalError);
static_workspace_error!(not_found, ErrorCode::RecordNotFound);
static_workspace_error!(ws, ErrorCode::WsConnectError);
pub fn context<T: Debug>(mut self, error: T) -> Self {
self.msg = format!("{:?}", error);
@ -89,6 +90,9 @@ pub enum ErrorCode {
#[display(fmt = "User unauthorized")]
UserUnauthorized = 100,
#[display(fmt = "Workspace websocket error")]
WsConnectError = 200,
#[display(fmt = "Server error")]
InternalError = 1000,
#[display(fmt = "Record not found")]

View File

@ -4,6 +4,7 @@ use crate::{
handlers::*,
services::{server::construct_workspace_server, AppController, ViewController, WorkspaceController},
};
use flowy_database::DBConnection;
use flowy_dispatch::prelude::*;
use flowy_document::module::FlowyDocument;

View File

@ -227,6 +227,7 @@ pub enum ErrorCode {
ViewDescInvalid = 23,
ViewDataInvalid = 24,
UserUnauthorized = 100,
WsConnectError = 200,
InternalError = 1000,
RecordNotFound = 1001,
}
@ -250,6 +251,7 @@ impl ::protobuf::ProtobufEnum for ErrorCode {
23 => ::std::option::Option::Some(ErrorCode::ViewDescInvalid),
24 => ::std::option::Option::Some(ErrorCode::ViewDataInvalid),
100 => ::std::option::Option::Some(ErrorCode::UserUnauthorized),
200 => ::std::option::Option::Some(ErrorCode::WsConnectError),
1000 => ::std::option::Option::Some(ErrorCode::InternalError),
1001 => ::std::option::Option::Some(ErrorCode::RecordNotFound),
_ => ::std::option::Option::None
@ -270,6 +272,7 @@ impl ::protobuf::ProtobufEnum for ErrorCode {
ErrorCode::ViewDescInvalid,
ErrorCode::ViewDataInvalid,
ErrorCode::UserUnauthorized,
ErrorCode::WsConnectError,
ErrorCode::InternalError,
ErrorCode::RecordNotFound,
];
@ -302,51 +305,54 @@ impl ::protobuf::reflect::ProtobufValue for ErrorCode {
static file_descriptor_proto_data: &'static [u8] = b"\
\n\x0cerrors.proto\"B\n\x0eWorkspaceError\x12\x1e\n\x04code\x18\x01\x20\
\x01(\x0e2\n.ErrorCodeR\x04code\x12\x10\n\x03msg\x18\x02\x20\x01(\tR\x03\
msg*\xc2\x02\n\tErrorCode\x12\x18\n\x14WorkspaceNameInvalid\x10\0\x12\
msg*\xd7\x02\n\tErrorCode\x12\x18\n\x14WorkspaceNameInvalid\x10\0\x12\
\x16\n\x12WorkspaceIdInvalid\x10\x01\x12\x18\n\x14AppColorStyleInvalid\
\x10\x02\x12\x18\n\x14WorkspaceDescInvalid\x10\x03\x12\x10\n\x0cAppIdInv\
alid\x10\n\x12\x12\n\x0eAppNameInvalid\x10\x0b\x12\x13\n\x0fViewNameInva\
lid\x10\x14\x12\x18\n\x14ViewThumbnailInvalid\x10\x15\x12\x11\n\rViewIdI\
nvalid\x10\x16\x12\x13\n\x0fViewDescInvalid\x10\x17\x12\x13\n\x0fViewDat\
aInvalid\x10\x18\x12\x14\n\x10UserUnauthorized\x10d\x12\x12\n\rInternalE\
rror\x10\xe8\x07\x12\x13\n\x0eRecordNotFound\x10\xe9\x07J\xee\x05\n\x06\
\x12\x04\0\0\x15\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\
\x04\x02\0\x05\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x16\n\x0b\n\x04\
\x04\0\x02\0\x12\x03\x03\x04\x17\n\x0c\n\x05\x04\0\x02\0\x06\x12\x03\x03\
\x04\r\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\x0e\x12\n\x0c\n\x05\x04\0\
\x02\0\x03\x12\x03\x03\x15\x16\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\
\x13\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\x04\n\n\x0c\n\x05\x04\0\
\x02\x01\x01\x12\x03\x04\x0b\x0e\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\
\x04\x11\x12\n\n\n\x02\x05\0\x12\x04\x06\0\x15\x01\n\n\n\x03\x05\0\x01\
\x12\x03\x06\x05\x0e\n\x0b\n\x04\x05\0\x02\0\x12\x03\x07\x04\x1d\n\x0c\n\
\x05\x05\0\x02\0\x01\x12\x03\x07\x04\x18\n\x0c\n\x05\x05\0\x02\0\x02\x12\
\x03\x07\x1b\x1c\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x08\x04\x1b\n\x0c\n\
\x05\x05\0\x02\x01\x01\x12\x03\x08\x04\x16\n\x0c\n\x05\x05\0\x02\x01\x02\
\x12\x03\x08\x19\x1a\n\x0b\n\x04\x05\0\x02\x02\x12\x03\t\x04\x1d\n\x0c\n\
\x05\x05\0\x02\x02\x01\x12\x03\t\x04\x18\n\x0c\n\x05\x05\0\x02\x02\x02\
\x12\x03\t\x1b\x1c\n\x0b\n\x04\x05\0\x02\x03\x12\x03\n\x04\x1d\n\x0c\n\
\x05\x05\0\x02\x03\x01\x12\x03\n\x04\x18\n\x0c\n\x05\x05\0\x02\x03\x02\
\x12\x03\n\x1b\x1c\n\x0b\n\x04\x05\0\x02\x04\x12\x03\x0b\x04\x16\n\x0c\n\
\x05\x05\0\x02\x04\x01\x12\x03\x0b\x04\x10\n\x0c\n\x05\x05\0\x02\x04\x02\
\x12\x03\x0b\x13\x15\n\x0b\n\x04\x05\0\x02\x05\x12\x03\x0c\x04\x18\n\x0c\
\n\x05\x05\0\x02\x05\x01\x12\x03\x0c\x04\x12\n\x0c\n\x05\x05\0\x02\x05\
\x02\x12\x03\x0c\x15\x17\n\x0b\n\x04\x05\0\x02\x06\x12\x03\r\x04\x19\n\
\x0c\n\x05\x05\0\x02\x06\x01\x12\x03\r\x04\x13\n\x0c\n\x05\x05\0\x02\x06\
\x02\x12\x03\r\x16\x18\n\x0b\n\x04\x05\0\x02\x07\x12\x03\x0e\x04\x1e\n\
\x0c\n\x05\x05\0\x02\x07\x01\x12\x03\x0e\x04\x18\n\x0c\n\x05\x05\0\x02\
\x07\x02\x12\x03\x0e\x1b\x1d\n\x0b\n\x04\x05\0\x02\x08\x12\x03\x0f\x04\
\x17\n\x0c\n\x05\x05\0\x02\x08\x01\x12\x03\x0f\x04\x11\n\x0c\n\x05\x05\0\
\x02\x08\x02\x12\x03\x0f\x14\x16\n\x0b\n\x04\x05\0\x02\t\x12\x03\x10\x04\
\x19\n\x0c\n\x05\x05\0\x02\t\x01\x12\x03\x10\x04\x13\n\x0c\n\x05\x05\0\
\x02\t\x02\x12\x03\x10\x16\x18\n\x0b\n\x04\x05\0\x02\n\x12\x03\x11\x04\
\x19\n\x0c\n\x05\x05\0\x02\n\x01\x12\x03\x11\x04\x13\n\x0c\n\x05\x05\0\
\x02\n\x02\x12\x03\x11\x16\x18\n\x0b\n\x04\x05\0\x02\x0b\x12\x03\x12\x04\
\x1b\n\x0c\n\x05\x05\0\x02\x0b\x01\x12\x03\x12\x04\x14\n\x0c\n\x05\x05\0\
\x02\x0b\x02\x12\x03\x12\x17\x1a\n\x0b\n\x04\x05\0\x02\x0c\x12\x03\x13\
\x04\x19\n\x0c\n\x05\x05\0\x02\x0c\x01\x12\x03\x13\x04\x11\n\x0c\n\x05\
\x05\0\x02\x0c\x02\x12\x03\x13\x14\x18\n\x0b\n\x04\x05\0\x02\r\x12\x03\
\x14\x04\x1a\n\x0c\n\x05\x05\0\x02\r\x01\x12\x03\x14\x04\x12\n\x0c\n\x05\
\x05\0\x02\r\x02\x12\x03\x14\x15\x19b\x06proto3\
aInvalid\x10\x18\x12\x14\n\x10UserUnauthorized\x10d\x12\x13\n\x0eWsConne\
ctError\x10\xc8\x01\x12\x12\n\rInternalError\x10\xe8\x07\x12\x13\n\x0eRe\
cordNotFound\x10\xe9\x07J\x97\x06\n\x06\x12\x04\0\0\x16\x01\n\x08\n\x01\
\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x05\x01\n\n\n\x03\x04\
\0\x01\x12\x03\x02\x08\x16\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x17\n\
\x0c\n\x05\x04\0\x02\0\x06\x12\x03\x03\x04\r\n\x0c\n\x05\x04\0\x02\0\x01\
\x12\x03\x03\x0e\x12\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x15\x16\n\
\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x13\n\x0c\n\x05\x04\0\x02\x01\
\x05\x12\x03\x04\x04\n\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\x0b\x0e\
\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x11\x12\n\n\n\x02\x05\0\x12\
\x04\x06\0\x16\x01\n\n\n\x03\x05\0\x01\x12\x03\x06\x05\x0e\n\x0b\n\x04\
\x05\0\x02\0\x12\x03\x07\x04\x1d\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x07\
\x04\x18\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x07\x1b\x1c\n\x0b\n\x04\x05\
\0\x02\x01\x12\x03\x08\x04\x1b\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x08\
\x04\x16\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x08\x19\x1a\n\x0b\n\x04\
\x05\0\x02\x02\x12\x03\t\x04\x1d\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\t\
\x04\x18\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\t\x1b\x1c\n\x0b\n\x04\x05\
\0\x02\x03\x12\x03\n\x04\x1d\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\n\x04\
\x18\n\x0c\n\x05\x05\0\x02\x03\x02\x12\x03\n\x1b\x1c\n\x0b\n\x04\x05\0\
\x02\x04\x12\x03\x0b\x04\x16\n\x0c\n\x05\x05\0\x02\x04\x01\x12\x03\x0b\
\x04\x10\n\x0c\n\x05\x05\0\x02\x04\x02\x12\x03\x0b\x13\x15\n\x0b\n\x04\
\x05\0\x02\x05\x12\x03\x0c\x04\x18\n\x0c\n\x05\x05\0\x02\x05\x01\x12\x03\
\x0c\x04\x12\n\x0c\n\x05\x05\0\x02\x05\x02\x12\x03\x0c\x15\x17\n\x0b\n\
\x04\x05\0\x02\x06\x12\x03\r\x04\x19\n\x0c\n\x05\x05\0\x02\x06\x01\x12\
\x03\r\x04\x13\n\x0c\n\x05\x05\0\x02\x06\x02\x12\x03\r\x16\x18\n\x0b\n\
\x04\x05\0\x02\x07\x12\x03\x0e\x04\x1e\n\x0c\n\x05\x05\0\x02\x07\x01\x12\
\x03\x0e\x04\x18\n\x0c\n\x05\x05\0\x02\x07\x02\x12\x03\x0e\x1b\x1d\n\x0b\
\n\x04\x05\0\x02\x08\x12\x03\x0f\x04\x17\n\x0c\n\x05\x05\0\x02\x08\x01\
\x12\x03\x0f\x04\x11\n\x0c\n\x05\x05\0\x02\x08\x02\x12\x03\x0f\x14\x16\n\
\x0b\n\x04\x05\0\x02\t\x12\x03\x10\x04\x19\n\x0c\n\x05\x05\0\x02\t\x01\
\x12\x03\x10\x04\x13\n\x0c\n\x05\x05\0\x02\t\x02\x12\x03\x10\x16\x18\n\
\x0b\n\x04\x05\0\x02\n\x12\x03\x11\x04\x19\n\x0c\n\x05\x05\0\x02\n\x01\
\x12\x03\x11\x04\x13\n\x0c\n\x05\x05\0\x02\n\x02\x12\x03\x11\x16\x18\n\
\x0b\n\x04\x05\0\x02\x0b\x12\x03\x12\x04\x1b\n\x0c\n\x05\x05\0\x02\x0b\
\x01\x12\x03\x12\x04\x14\n\x0c\n\x05\x05\0\x02\x0b\x02\x12\x03\x12\x17\
\x1a\n\x0b\n\x04\x05\0\x02\x0c\x12\x03\x13\x04\x19\n\x0c\n\x05\x05\0\x02\
\x0c\x01\x12\x03\x13\x04\x12\n\x0c\n\x05\x05\0\x02\x0c\x02\x12\x03\x13\
\x15\x18\n\x0b\n\x04\x05\0\x02\r\x12\x03\x14\x04\x19\n\x0c\n\x05\x05\0\
\x02\r\x01\x12\x03\x14\x04\x11\n\x0c\n\x05\x05\0\x02\r\x02\x12\x03\x14\
\x14\x18\n\x0b\n\x04\x05\0\x02\x0e\x12\x03\x15\x04\x1a\n\x0c\n\x05\x05\0\
\x02\x0e\x01\x12\x03\x15\x04\x12\n\x0c\n\x05\x05\0\x02\x0e\x02\x12\x03\
\x15\x15\x19b\x06proto3\
";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

View File

@ -17,6 +17,7 @@ enum ErrorCode {
ViewDescInvalid = 23;
ViewDataInvalid = 24;
UserUnauthorized = 100;
WsConnectError = 200;
InternalError = 1000;
RecordNotFound = 1001;
}

View File

@ -17,16 +17,16 @@ use tokio_tungstenite::{
};
#[pin_project]
pub struct WsConnection {
pub struct WsConnectionFuture {
msg_tx: Option<MsgSender>,
ws_rx: Option<MsgReceiver>,
#[pin]
fut: BoxFuture<'static, Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), Error>>,
}
impl WsConnection {
impl WsConnectionFuture {
pub fn new(msg_tx: MsgSender, ws_rx: MsgReceiver, addr: String) -> Self {
WsConnection {
WsConnectionFuture {
msg_tx: Some(msg_tx),
ws_rx: Some(ws_rx),
fut: Box::pin(async move { connect_async(&addr).await }),
@ -34,7 +34,7 @@ impl WsConnection {
}
}
impl Future for WsConnection {
impl Future for WsConnectionFuture {
type Output = Result<WsStream, WsError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// [[pin]]

View File

@ -36,7 +36,6 @@ impl WsError {
}
static_user_error!(internal, ErrorCode::InternalError);
static_user_error!(duplicate_source, ErrorCode::DuplicateSource);
static_user_error!(unsupported_message, ErrorCode::UnsupportedMessage);
static_user_error!(unauthorized, ErrorCode::Unauthorized);
}
@ -44,9 +43,8 @@ impl WsError {
#[derive(Debug, Clone, ProtoBuf_Enum, Display, PartialEq, Eq)]
pub enum ErrorCode {
InternalError = 0,
DuplicateSource = 1,
UnsupportedMessage = 2,
Unauthorized = 3,
UnsupportedMessage = 1,
Unauthorized = 2,
}
impl std::default::Default for ErrorCode {

View File

@ -216,9 +216,8 @@ impl ::protobuf::reflect::ProtobufValue for WsError {
#[derive(Clone,PartialEq,Eq,Debug,Hash)]
pub enum ErrorCode {
InternalError = 0,
DuplicateSource = 1,
UnsupportedMessage = 2,
Unauthorized = 3,
UnsupportedMessage = 1,
Unauthorized = 2,
}
impl ::protobuf::ProtobufEnum for ErrorCode {
@ -229,9 +228,8 @@ impl ::protobuf::ProtobufEnum for ErrorCode {
fn from_i32(value: i32) -> ::std::option::Option<ErrorCode> {
match value {
0 => ::std::option::Option::Some(ErrorCode::InternalError),
1 => ::std::option::Option::Some(ErrorCode::DuplicateSource),
2 => ::std::option::Option::Some(ErrorCode::UnsupportedMessage),
3 => ::std::option::Option::Some(ErrorCode::Unauthorized),
1 => ::std::option::Option::Some(ErrorCode::UnsupportedMessage),
2 => ::std::option::Option::Some(ErrorCode::Unauthorized),
_ => ::std::option::Option::None
}
}
@ -239,7 +237,6 @@ impl ::protobuf::ProtobufEnum for ErrorCode {
fn values() -> &'static [Self] {
static values: &'static [ErrorCode] = &[
ErrorCode::InternalError,
ErrorCode::DuplicateSource,
ErrorCode::UnsupportedMessage,
ErrorCode::Unauthorized,
];
@ -271,26 +268,24 @@ impl ::protobuf::reflect::ProtobufValue for ErrorCode {
static file_descriptor_proto_data: &'static [u8] = b"\
\n\x0cerrors.proto\";\n\x07WsError\x12\x1e\n\x04code\x18\x01\x20\x01(\
\x0e2\n.ErrorCodeR\x04code\x12\x10\n\x03msg\x18\x02\x20\x01(\tR\x03msg*]\
\n\tErrorCode\x12\x11\n\rInternalError\x10\0\x12\x13\n\x0fDuplicateSourc\
e\x10\x01\x12\x16\n\x12UnsupportedMessage\x10\x02\x12\x10\n\x0cUnauthori\
zed\x10\x03J\xd4\x02\n\x06\x12\x04\0\0\x0b\x01\n\x08\n\x01\x0c\x12\x03\0\
\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x05\x01\n\n\n\x03\x04\0\x01\x12\x03\
\x02\x08\x0f\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x17\n\x0c\n\x05\x04\
\0\x02\0\x06\x12\x03\x03\x04\r\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\
\x0e\x12\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x15\x16\n\x0b\n\x04\x04\
\0\x02\x01\x12\x03\x04\x04\x13\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\
\x04\n\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\x0b\x0e\n\x0c\n\x05\x04\
\0\x02\x01\x03\x12\x03\x04\x11\x12\n\n\n\x02\x05\0\x12\x04\x06\0\x0b\x01\
\n\n\n\x03\x05\0\x01\x12\x03\x06\x05\x0e\n\x0b\n\x04\x05\0\x02\0\x12\x03\
\x07\x04\x16\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x07\x04\x11\n\x0c\n\x05\
\x05\0\x02\0\x02\x12\x03\x07\x14\x15\n\x0b\n\x04\x05\0\x02\x01\x12\x03\
\x08\x04\x18\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x08\x04\x13\n\x0c\n\
\x05\x05\0\x02\x01\x02\x12\x03\x08\x16\x17\n\x0b\n\x04\x05\0\x02\x02\x12\
\x03\t\x04\x1b\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\t\x04\x16\n\x0c\n\
\x05\x05\0\x02\x02\x02\x12\x03\t\x19\x1a\n\x0b\n\x04\x05\0\x02\x03\x12\
\x03\n\x04\x15\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\n\x04\x10\n\x0c\n\
\x05\x05\0\x02\x03\x02\x12\x03\n\x13\x14b\x06proto3\
\x0e2\n.ErrorCodeR\x04code\x12\x10\n\x03msg\x18\x02\x20\x01(\tR\x03msg*H\
\n\tErrorCode\x12\x11\n\rInternalError\x10\0\x12\x16\n\x12UnsupportedMes\
sage\x10\x01\x12\x10\n\x0cUnauthorized\x10\x02J\xab\x02\n\x06\x12\x04\0\
\0\n\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\
\x05\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x0f\n\x0b\n\x04\x04\0\x02\0\
\x12\x03\x03\x04\x17\n\x0c\n\x05\x04\0\x02\0\x06\x12\x03\x03\x04\r\n\x0c\
\n\x05\x04\0\x02\0\x01\x12\x03\x03\x0e\x12\n\x0c\n\x05\x04\0\x02\0\x03\
\x12\x03\x03\x15\x16\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x13\n\x0c\
\n\x05\x04\0\x02\x01\x05\x12\x03\x04\x04\n\n\x0c\n\x05\x04\0\x02\x01\x01\
\x12\x03\x04\x0b\x0e\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x11\x12\n\
\n\n\x02\x05\0\x12\x04\x06\0\n\x01\n\n\n\x03\x05\0\x01\x12\x03\x06\x05\
\x0e\n\x0b\n\x04\x05\0\x02\0\x12\x03\x07\x04\x16\n\x0c\n\x05\x05\0\x02\0\
\x01\x12\x03\x07\x04\x11\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x07\x14\x15\
\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x08\x04\x1b\n\x0c\n\x05\x05\0\x02\x01\
\x01\x12\x03\x08\x04\x16\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x08\x19\
\x1a\n\x0b\n\x04\x05\0\x02\x02\x12\x03\t\x04\x15\n\x0c\n\x05\x05\0\x02\
\x02\x01\x12\x03\t\x04\x10\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\t\x13\
\x14b\x06proto3\
";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

View File

@ -6,7 +6,6 @@ message WsError {
}
enum ErrorCode {
InternalError = 0;
DuplicateSource = 1;
UnsupportedMessage = 2;
Unauthorized = 3;
UnsupportedMessage = 1;
Unauthorized = 2;
}

View File

@ -1,5 +1,5 @@
use crate::{
connect::{Retry, WsConnection},
connect::{Retry, WsConnectionFuture},
errors::WsError,
WsMessage,
};
@ -82,7 +82,7 @@ impl WsController {
pub fn add_handler(&mut self, handler: Arc<dyn WsMessageHandler>) -> Result<(), WsError> {
let source = handler.source();
if self.handlers.contains_key(&source) {
return Err(WsError::duplicate_source());
log::error!("{} source is already registered", source);
}
self.handlers.insert(source, handler);
Ok(())
@ -97,6 +97,13 @@ impl WsController {
self._connect(addr, Some(Box::pin(async { retry.await })))
}
pub fn get_sender(&self) -> Result<Arc<WsSender>, WsError> {
match &self.sender {
None => Err(WsError::internal().context("WsSender is not initialized")),
Some(sender) => Ok(sender.clone()),
}
}
fn _connect(&mut self, addr: String, retry: Option<BoxFuture<'static, ()>>) -> Result<JoinHandle<()>, ServerError> {
log::debug!("🐴 ws connect: {}", &addr);
let (connection, handlers) = self.make_connect(addr.clone());
@ -131,7 +138,7 @@ impl WsController {
}))
}
fn make_connect(&mut self, addr: String) -> (WsConnection, WsHandlers) {
fn make_connect(&mut self, addr: String) -> (WsConnectionFuture, WsHandlerFuture) {
// Stream User
// ┌───────────────┐ ┌──────────────┐
// ┌──────┐ │ ┌─────────┐ │ ┌────────┐ │ ┌────────┐ │
@ -147,22 +154,22 @@ impl WsController {
let handlers = self.handlers.clone();
self.sender = Some(Arc::new(WsSender { ws_tx }));
self.addr = Some(addr.clone());
(WsConnection::new(msg_tx, ws_rx, addr), WsHandlers::new(handlers, msg_rx))
(WsConnectionFuture::new(msg_tx, ws_rx, addr), WsHandlerFuture::new(handlers, msg_rx))
}
}
#[pin_project]
pub struct WsHandlers {
pub struct WsHandlerFuture {
#[pin]
msg_rx: MsgReceiver,
handlers: HashMap<String, Arc<dyn WsMessageHandler>>,
}
impl WsHandlers {
impl WsHandlerFuture {
fn new(handlers: HashMap<String, Arc<dyn WsMessageHandler>>, msg_rx: MsgReceiver) -> Self { Self { msg_rx, handlers } }
}
impl Future for WsHandlers {
impl Future for WsHandlerFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {