diff --git a/app_flowy/lib/welcome/infrastructure/i_splash_impl.dart b/app_flowy/lib/welcome/infrastructure/i_splash_impl.dart index bf85b7a1cc..42c4f9fc8c 100644 --- a/app_flowy/lib/welcome/infrastructure/i_splash_impl.dart +++ b/app_flowy/lib/welcome/infrastructure/i_splash_impl.dart @@ -18,7 +18,7 @@ export 'package:app_flowy/welcome/domain/i_splash.dart'; class SplashUserImpl implements ISplashUser { @override Future currentUserProfile() { - final result = UserEventGetUserProfile().send(); + final result = UserEventInitUser().send(); return result.then((result) { return result.fold( (userProfile) { @@ -32,7 +32,6 @@ class SplashUserImpl implements ISplashUser { } } - class SplashRoute implements ISplashRoute { @override Future pushWelcomeScreen(BuildContext context, UserProfile user) async { diff --git a/app_flowy/packages/flowy_sdk/lib/dispatch/code_gen.dart b/app_flowy/packages/flowy_sdk/lib/dispatch/code_gen.dart index b4029c6cdf..10d6de8476 100644 --- a/app_flowy/packages/flowy_sdk/lib/dispatch/code_gen.dart +++ b/app_flowy/packages/flowy_sdk/lib/dispatch/code_gen.dart @@ -288,12 +288,12 @@ class WorkspaceEventApplyChangeset { } } -class UserEventGetUserProfile { - UserEventGetUserProfile(); +class UserEventInitUser { + UserEventInitUser(); Future> send() { final request = FFIRequest.create() - ..event = UserEvent.GetUserProfile.toString(); + ..event = UserEvent.InitUser.toString(); return Dispatch.asyncRequest(request).then((bytesResult) => bytesResult.fold( (okBytes) => left(UserProfile.fromBuffer(okBytes)), @@ -367,3 +367,17 @@ class UserEventUpdateUser { } } +class UserEventGetUserProfile { + UserEventGetUserProfile(); + + Future> send() { + final request = FFIRequest.create() + ..event = UserEvent.GetUserProfile.toString(); + + return Dispatch.asyncRequest(request).then((bytesResult) => bytesResult.fold( + (okBytes) => left(UserProfile.fromBuffer(okBytes)), + (errBytes) => right(UserError.fromBuffer(errBytes)), + )); + } +} + diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-user/event.pbenum.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-user/event.pbenum.dart index 11f095a07c..eb404104cc 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-user/event.pbenum.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-user/event.pbenum.dart @@ -10,18 +10,20 @@ import 'dart:core' as $core; import 'package:protobuf/protobuf.dart' as $pb; class UserEvent extends $pb.ProtobufEnum { - static const UserEvent GetUserProfile = UserEvent._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'GetUserProfile'); + static const UserEvent InitUser = UserEvent._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'InitUser'); static const UserEvent SignIn = UserEvent._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'SignIn'); static const UserEvent SignUp = UserEvent._(2, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'SignUp'); static const UserEvent SignOut = UserEvent._(3, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'SignOut'); static const UserEvent UpdateUser = UserEvent._(4, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UpdateUser'); + static const UserEvent GetUserProfile = UserEvent._(5, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'GetUserProfile'); static const $core.List values = [ - GetUserProfile, + InitUser, SignIn, SignUp, SignOut, UpdateUser, + GetUserProfile, ]; static final $core.Map<$core.int, UserEvent> _byValue = $pb.ProtobufEnum.initByValue(values); diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-user/event.pbjson.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-user/event.pbjson.dart index 596a4c7f87..20c5a37f4e 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-user/event.pbjson.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-user/event.pbjson.dart @@ -12,13 +12,14 @@ import 'dart:typed_data' as $typed_data; const UserEvent$json = const { '1': 'UserEvent', '2': const [ - const {'1': 'GetUserProfile', '2': 0}, + const {'1': 'InitUser', '2': 0}, const {'1': 'SignIn', '2': 1}, const {'1': 'SignUp', '2': 2}, const {'1': 'SignOut', '2': 3}, const {'1': 'UpdateUser', '2': 4}, + const {'1': 'GetUserProfile', '2': 5}, ], }; /// Descriptor for `UserEvent`. Decode as a `google.protobuf.EnumDescriptorProto`. -final $typed_data.Uint8List userEventDescriptor = $convert.base64Decode('CglVc2VyRXZlbnQSEgoOR2V0VXNlclByb2ZpbGUQABIKCgZTaWduSW4QARIKCgZTaWduVXAQAhILCgdTaWduT3V0EAMSDgoKVXBkYXRlVXNlchAE'); +final $typed_data.Uint8List userEventDescriptor = $convert.base64Decode('CglVc2VyRXZlbnQSDAoISW5pdFVzZXIQABIKCgZTaWduSW4QARIKCgZTaWduVXAQAhILCgdTaWduT3V0EAMSDgoKVXBkYXRlVXNlchAEEhIKDkdldFVzZXJQcm9maWxlEAU='); diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-user/user_profile.pb.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-user/user_profile.pb.dart index 66fd00987b..b1299286cb 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-user/user_profile.pb.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-user/user_profile.pb.dart @@ -9,8 +9,6 @@ import 'dart:core' as $core; import 'package:protobuf/protobuf.dart' as $pb; -export 'user_profile.pbenum.dart'; - class UserToken extends $pb.GeneratedMessage { static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'UserToken', createEmptyInstance: create) ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'token') diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-user/user_profile.pbenum.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-user/user_profile.pbenum.dart index 5859c59f22..e73b70b723 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-user/user_profile.pbenum.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-user/user_profile.pbenum.dart @@ -5,24 +5,3 @@ // @dart = 2.12 // ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields -// ignore_for_file: UNDEFINED_SHOWN_NAME -import 'dart:core' as $core; -import 'package:protobuf/protobuf.dart' as $pb; - -class UserStatus extends $pb.ProtobufEnum { - static const UserStatus Unknown = UserStatus._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Unknown'); - static const UserStatus Login = UserStatus._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Login'); - static const UserStatus Expired = UserStatus._(2, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Expired'); - - static const $core.List values = [ - Unknown, - Login, - Expired, - ]; - - static final $core.Map<$core.int, UserStatus> _byValue = $pb.ProtobufEnum.initByValue(values); - static UserStatus? valueOf($core.int value) => _byValue[value]; - - const UserStatus._($core.int v, $core.String n) : super(v, n); -} - diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-user/user_profile.pbjson.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-user/user_profile.pbjson.dart index e9caf76e44..30b3e96da4 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-user/user_profile.pbjson.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-user/user_profile.pbjson.dart @@ -8,18 +8,6 @@ import 'dart:core' as $core; import 'dart:convert' as $convert; import 'dart:typed_data' as $typed_data; -@$core.Deprecated('Use userStatusDescriptor instead') -const UserStatus$json = const { - '1': 'UserStatus', - '2': const [ - const {'1': 'Unknown', '2': 0}, - const {'1': 'Login', '2': 1}, - const {'1': 'Expired', '2': 2}, - ], -}; - -/// Descriptor for `UserStatus`. Decode as a `google.protobuf.EnumDescriptorProto`. -final $typed_data.Uint8List userStatusDescriptor = $convert.base64Decode('CgpVc2VyU3RhdHVzEgsKB1Vua25vd24QABIJCgVMb2dpbhABEgsKB0V4cGlyZWQQAg=='); @$core.Deprecated('Use userTokenDescriptor instead') const UserToken$json = const { '1': 'UserToken', diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 75727d13eb..9535a1a85b 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -86,4 +86,5 @@ path = "src/main.rs" once_cell = "1.7.2" linkify = "0.5.0" flowy-user = { path = "../rust-lib/flowy-user" } -flowy-workspace = { path = "../rust-lib/flowy-workspace" } \ No newline at end of file +flowy-workspace = { path = "../rust-lib/flowy-workspace" } +flowy-ws = { path = "../rust-lib/flowy-ws" } \ No newline at end of file diff --git a/backend/src/config/const_define.rs b/backend/src/config/const_define.rs index bfd2aa6eac..33e81e22a0 100644 --- a/backend/src/config/const_define.rs +++ b/backend/src/config/const_define.rs @@ -4,4 +4,4 @@ pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(8); pub const PING_TIMEOUT: Duration = Duration::from_secs(60); pub const MAX_PAYLOAD_SIZE: usize = 262_144; // max payload size is 256k -pub const IGNORE_ROUTES: [&str; 2] = ["/api/register", "/api/auth"]; +pub const IGNORE_ROUTES: [&str; 3] = ["/api/register", "/api/auth", "/ws"]; diff --git a/backend/src/entities/token.rs b/backend/src/entities/token.rs index 2d586fdc7a..52086c28f7 100644 --- a/backend/src/entities/token.rs +++ b/backend/src/entities/token.rs @@ -32,7 +32,7 @@ impl Claim { } } - pub fn get_user_id(self) -> String { self.user_id } + pub fn user_id(self) -> String { self.user_id } } // impl From for User { diff --git a/backend/src/middleware/auth_middleware.rs b/backend/src/middleware/auth_middleware.rs index eb8a78e454..97e30dfb5e 100644 --- a/backend/src/middleware/auth_middleware.rs +++ b/backend/src/middleware/auth_middleware.rs @@ -56,6 +56,7 @@ where fn call(&self, req: ServiceRequest) -> Self::Future { let mut authenticate_pass: bool = false; for ignore_route in IGNORE_ROUTES.iter() { + log::info!("ignore: {}, path: {}", ignore_route, req.path()); if req.path().starts_with(ignore_route) { authenticate_pass = true; break; @@ -68,7 +69,6 @@ where match result { Ok(logged_user) => { authenticate_pass = AUTHORIZED_USERS.is_authorized(&logged_user); - // Update user timestamp AUTHORIZED_USERS.store_auth(logged_user, true); }, diff --git a/backend/src/service/user_service/auth.rs b/backend/src/service/user_service/auth.rs index 1f2749bf49..d52331c90c 100644 --- a/backend/src/service/user_service/auth.rs +++ b/backend/src/service/user_service/auth.rs @@ -49,7 +49,7 @@ pub async fn sign_in(pool: &PgPool, params: SignInParams) -> Result Result Result { - let _ = AUTHORIZED_USERS.store_auth(logged_user, false)?; + AUTHORIZED_USERS.store_auth(logged_user, false); Ok(FlowyResponse::success()) } @@ -91,7 +91,7 @@ pub async fn register_user( .context("Failed to insert user")?; let logged_user = LoggedUser::new(&response_data.user_id); - let _ = AUTHORIZED_USERS.store_auth(logged_user, true)?; + AUTHORIZED_USERS.store_auth(logged_user, true); let _ = create_default_workspace(&mut transaction, response_data.get_user_id()).await?; transaction @@ -112,7 +112,7 @@ pub(crate) async fn get_user_profile( .await .context("Failed to acquire a Postgres connection to get user detail")?; - let id = logged_user.get_user_id()?; + let id = logged_user.as_uuid()?; let user_table = sqlx::query_as::("SELECT * FROM user_table WHERE id = $1") .bind(id) @@ -126,7 +126,7 @@ pub(crate) async fn get_user_profile( .context("Failed to commit SQL transaction to get user detail.")?; // update the user active time - let _ = AUTHORIZED_USERS.store_auth(logged_user, true)?; + AUTHORIZED_USERS.store_auth(logged_user, true); let mut user_profile = UserProfile::default(); user_profile.set_id(user_table.id.to_string()); @@ -178,7 +178,7 @@ pub(crate) async fn set_user_profile( .add_some_arg("name", name) .add_some_arg("email", email) .add_some_arg("password", password) - .and_where_eq("id", &logged_user.get_user_id()?) + .and_where_eq("id", &logged_user.as_uuid()?) .build()?; sqlx::query_with(&sql, args) diff --git a/backend/src/service/user_service/logged_user.rs b/backend/src/service/user_service/logged_user.rs index d39969be7a..973cef419a 100644 --- a/backend/src/service/user_service/logged_user.rs +++ b/backend/src/service/user_service/logged_user.rs @@ -13,13 +13,13 @@ lazy_static! { #[derive(Debug, PartialEq, Eq, Hash, Clone)] pub struct LoggedUser { - user_id: String, + pub user_id: String, } impl std::convert::From for LoggedUser { fn from(c: Claim) -> Self { Self { - user_id: c.get_user_id(), + user_id: c.user_id(), } } } @@ -36,7 +36,7 @@ impl LoggedUser { Ok(user) } - pub fn get_user_id(&self) -> Result { + pub fn as_uuid(&self) -> Result { let id = uuid::Uuid::parse_str(&self.user_id)?; Ok(id) } @@ -106,13 +106,12 @@ impl AuthorizedUsers { } } - pub fn store_auth(&self, user: LoggedUser, is_auth: bool) -> Result<(), ServerError> { + pub fn store_auth(&self, user: LoggedUser, is_auth: bool) { let status = if is_auth { AuthStatus::Authorized(Utc::now()) } else { AuthStatus::NotAuthorized }; self.0.insert(user, status); - Ok(()) } } diff --git a/backend/src/service/workspace_service/app/app.rs b/backend/src/service/workspace_service/app/app.rs index 57d00575e3..f8a872ab1b 100644 --- a/backend/src/service/workspace_service/app/app.rs +++ b/backend/src/service/workspace_service/app/app.rs @@ -31,7 +31,7 @@ pub(crate) async fn create_app( ) -> Result { let name = AppName::parse(params.take_name()).map_err(invalid_params)?; let workspace_id = WorkspaceId::parse(params.take_workspace_id()).map_err(invalid_params)?; - let user_id = logged_user.get_user_id()?.to_string(); + let user_id = logged_user.as_uuid()?.to_string(); let desc = AppDesc::parse(params.take_desc()).map_err(invalid_params)?; let mut transaction = pool .begin() diff --git a/backend/src/service/workspace_service/workspace/workspace.rs b/backend/src/service/workspace_service/workspace/workspace.rs index 12914aeaf7..93617eeb34 100644 --- a/backend/src/service/workspace_service/workspace/workspace.rs +++ b/backend/src/service/workspace_service/workspace/workspace.rs @@ -28,7 +28,7 @@ pub(crate) async fn create_workspace( ) -> Result { let name = WorkspaceName::parse(params.get_name().to_owned()).map_err(invalid_params)?; let desc = WorkspaceDesc::parse(params.get_desc().to_owned()).map_err(invalid_params)?; - let user_id = logged_user.get_user_id()?.to_string(); + let user_id = logged_user.as_uuid()?.to_string(); let mut transaction = pool .begin() @@ -134,7 +134,7 @@ pub async fn read_workspaces( workspace_id: Option, logged_user: LoggedUser, ) -> Result { - let user_id = logged_user.get_user_id()?.to_string(); + let user_id = logged_user.as_uuid()?.to_string(); let mut transaction = pool .begin() .await diff --git a/backend/src/service/ws_service/router.rs b/backend/src/service/ws_service/router.rs index 810e2ac53b..e61617d87a 100644 --- a/backend/src/service/ws_service/router.rs +++ b/backend/src/service/ws_service/router.rs @@ -1,6 +1,7 @@ use crate::service::ws_service::{entities::SessionId, WSClient, WSServer}; use actix::Addr; +use crate::service::user_service::LoggedUser; use actix_web::{ get, web::{Data, Path, Payload}, @@ -14,17 +15,30 @@ use actix_web_actors::ws; pub async fn establish_ws_connection( request: HttpRequest, payload: Payload, - path: Path, + token: Path, server: Data>, ) -> Result { - let client = WSClient::new(SessionId::new(path.clone()), server.get_ref().clone()); - let result = ws::start(client, &request, payload); - - match result { - Ok(response) => Ok(response.into()), + match LoggedUser::from_token(token.clone()) { + Ok(user) => { + let client = WSClient::new( + SessionId::new(user.user_id.clone()), + server.get_ref().clone(), + ); + let result = ws::start(client, &request, payload); + match result { + Ok(response) => Ok(response.into()), + Err(e) => { + log::error!("ws connection error: {:?}", e); + Err(e) + }, + } + }, Err(e) => { - log::error!("ws connection error: {:?}", e); - Err(e) + if e.is_unauthorized() { + Ok(HttpResponse::Unauthorized().json(e)) + } else { + Ok(HttpResponse::BadRequest().json(e)) + } }, } } diff --git a/backend/tests/api/helper.rs b/backend/tests/api/helper.rs index c5b7c474dd..3c11f2f31b 100644 --- a/backend/tests/api/helper.rs +++ b/backend/tests/api/helper.rs @@ -179,6 +179,10 @@ impl TestServer { let response = user_sign_up_request(params, &url).await.unwrap(); response } + + pub(crate) fn ws_addr(&self) -> String { + format!("{}/ws/{}", self.address, self.user_token.as_ref().unwrap()) + } } pub async fn spawn_server() -> TestServer { let database_name = format!("{}", Uuid::new_v4().to_string()); diff --git a/backend/tests/api/main.rs b/backend/tests/api/main.rs index d287eb3d81..5f27c3ca92 100644 --- a/backend/tests/api/main.rs +++ b/backend/tests/api/main.rs @@ -2,3 +2,4 @@ mod auth; mod doc; mod helper; mod workspace; +mod ws; diff --git a/backend/tests/api/ws.rs b/backend/tests/api/ws.rs new file mode 100644 index 0000000000..61d09d6af0 --- /dev/null +++ b/backend/tests/api/ws.rs @@ -0,0 +1,10 @@ +use crate::helper::TestServer; +use flowy_ws::WsController; + +#[actix_rt::test] +async fn ws_connect() { + let server = TestServer::new().await; + let mut controller = WsController::new(); + let addr = server.ws_addr(); + let _ = controller.connect(addr).await.unwrap(); +} diff --git a/rust-lib/dart-ffi/Cargo.toml b/rust-lib/dart-ffi/Cargo.toml index 7467da3fa5..dbaf8eb045 100644 --- a/rust-lib/dart-ffi/Cargo.toml +++ b/rust-lib/dart-ffi/Cargo.toml @@ -20,8 +20,7 @@ byteorder = {version = "1.3.4"} ffi-support = {version = "0.4.2"} protobuf = {version = "2.20.0"} lazy_static = {version = "1.4.0"} -#tokio = { version = "1", features = ["rt", "rt-multi-thread"] } -tokio = { version = "1", features = ["full"] } +tokio = { version = "1", features = ["rt", "rt-multi-thread"] } log = "0.4.14" serde = { version = "1.0", features = ["derive"] } serde_json = {version = "1.0"} diff --git a/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs b/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs index 4ec763441f..a56895fb31 100644 --- a/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs +++ b/rust-lib/flowy-derive/src/derive_cache/derive_cache.rs @@ -80,7 +80,6 @@ pub fn category_from_str(type_str: &str) -> TypeCategory { | "WorkspaceObservable" | "DocObservable" | "FFIStatusCode" - | "UserStatus" | "UserEvent" | "UserObservable" => TypeCategory::Enum, diff --git a/rust-lib/flowy-dispatch/src/util/mod.rs b/rust-lib/flowy-dispatch/src/util/mod.rs index a7ecb2c1e6..b17e22bae4 100644 --- a/rust-lib/flowy-dispatch/src/util/mod.rs +++ b/rust-lib/flowy-dispatch/src/util/mod.rs @@ -4,7 +4,7 @@ use tokio::runtime; pub mod ready; -pub(crate) fn tokio_default_runtime() -> io::Result { +pub fn tokio_default_runtime() -> io::Result { runtime::Builder::new_multi_thread() .thread_name("flowy-rt") .enable_io() diff --git a/rust-lib/flowy-dispatch/tests/api/helper.rs b/rust-lib/flowy-dispatch/tests/api/helper.rs deleted file mode 100644 index 59399ac8bb..0000000000 --- a/rust-lib/flowy-dispatch/tests/api/helper.rs +++ /dev/null @@ -1,16 +0,0 @@ -#[rustfmt::skip] -use flowy_dispatch::prelude::*; -use std::sync::Once; - -#[allow(dead_code)] -pub fn setup_env() { - static INIT: Once = Once::new(); - (|| env_logger::init()); -} - -pub fn init_dispatch(module_factory: F) -> EventDispatch -where - F: FnOnce() -> Vec, -{ - EventDispatch::construct(module_factory) -} diff --git a/rust-lib/flowy-dispatch/tests/api/main.rs b/rust-lib/flowy-dispatch/tests/api/main.rs index 6147b588f8..144bcec053 100644 --- a/rust-lib/flowy-dispatch/tests/api/main.rs +++ b/rust-lib/flowy-dispatch/tests/api/main.rs @@ -1,2 +1 @@ -mod helper; mod module; diff --git a/rust-lib/flowy-dispatch/tests/api/module.rs b/rust-lib/flowy-dispatch/tests/api/module.rs index db4508517c..8dfac5a6dc 100644 --- a/rust-lib/flowy-dispatch/tests/api/module.rs +++ b/rust-lib/flowy-dispatch/tests/api/module.rs @@ -6,9 +6,10 @@ pub async fn hello() -> String { "say hello".to_string() } #[tokio::test] async fn test() { - setup_env(); + env_logger::init(); + let event = "1"; - let dispatch = Arc::new(init_dispatch(|| vec![Module::new().event(event, hello)])); + let dispatch = Arc::new(EventDispatch::construct(|| vec![Module::new().event(event, hello)])); let request = ModuleRequest::new(event); let _ = EventDispatch::async_send_with_callback(dispatch.clone(), request, |resp| { Box::pin(async move { diff --git a/rust-lib/flowy-sdk/Cargo.toml b/rust-lib/flowy-sdk/Cargo.toml index fe5a4d21a8..18faefcac2 100644 --- a/rust-lib/flowy-sdk/Cargo.toml +++ b/rust-lib/flowy-sdk/Cargo.toml @@ -13,12 +13,12 @@ flowy-infra = { path = "../flowy-infra" } flowy-workspace = { path = "../flowy-workspace" } flowy-database = { path = "../flowy-database" } flowy-document = { path = "../flowy-document" } -flowy-ws = { path = "../flowy-ws" } tracing = { version = "0.1" } log = "0.4.14" futures-core = { version = "0.3", default-features = false } color-eyre = { version = "0.5", default-features = false } bytes = "1.0" +tokio = { version = "1", features = ["rt"] } [dev-dependencies] serde = { version = "1.0", features = ["derive"] } diff --git a/rust-lib/flowy-sdk/src/lib.rs b/rust-lib/flowy-sdk/src/lib.rs index 9693cd8190..f2517eaf71 100644 --- a/rust-lib/flowy-sdk/src/lib.rs +++ b/rust-lib/flowy-sdk/src/lib.rs @@ -3,7 +3,6 @@ mod deps_resolve; pub mod module; use flowy_dispatch::prelude::*; -use flowy_ws::start_ws_connection; use module::build_modules; pub use module::*; use std::sync::{ @@ -88,9 +87,5 @@ fn init_log(config: &FlowySDKConfig) { fn init_dispatch(root: &str) -> EventDispatch { let config = ModuleConfig { root: root.to_owned() }; let dispatch = EventDispatch::construct(|| build_modules(config)); - - dispatch.spawn(async { - start_ws_connection(); - }); dispatch } diff --git a/rust-lib/flowy-sdk/src/module.rs b/rust-lib/flowy-sdk/src/module.rs index c4cfabefad..314a7a86c9 100644 --- a/rust-lib/flowy-sdk/src/module.rs +++ b/rust-lib/flowy-sdk/src/module.rs @@ -10,6 +10,10 @@ pub struct ModuleConfig { } pub fn build_modules(config: ModuleConfig) -> Vec { + // runtime.spawn(async move { + // start_ws_connection("eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9. + // eyJpc3MiOiJsb2NhbGhvc3QiLCJzdWIiOiJhdXRoIiwiaWF0IjoxNjMxNzcwODQ2LCJleHAiOjE2MzIyMDI4NDYsInVzZXJfaWQiOiI5ZmFiN2I4MS1mZDAyLTRhN2EtYjA4Zi05NDM3NTdmZmE5MDcifQ. + // UzV01tHnWEZWBp3nJPTmFi7ypxBoCe56AjEPb9bnsFE") }); let user_session = Arc::new(UserSessionBuilder::new().root_dir(&config.root).build()); let workspace_user_impl = Arc::new(WorkspaceUserImpl { diff --git a/rust-lib/flowy-user/Cargo.toml b/rust-lib/flowy-user/Cargo.toml index 2bf1faec97..1c4e0f54e9 100644 --- a/rust-lib/flowy-user/Cargo.toml +++ b/rust-lib/flowy-user/Cargo.toml @@ -13,6 +13,7 @@ flowy-database = { path = "../flowy-database" } flowy-sqlite = { path = "../flowy-sqlite" } flowy-infra = { path = "../flowy-infra" } flowy-net = { path = "../flowy-net", features = ["flowy_request"] } +flowy-ws = { path = "../flowy-ws"} flowy-observable = { path = "../flowy-observable" } tracing = { version = "0.1", features = ["log"] } diff --git a/rust-lib/flowy-user/src/entities/user_profile.rs b/rust-lib/flowy-user/src/entities/user_profile.rs index e4dd3ad708..73130c080a 100644 --- a/rust-lib/flowy-user/src/entities/user_profile.rs +++ b/rust-lib/flowy-user/src/entities/user_profile.rs @@ -1,4 +1,4 @@ -use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; +use flowy_derive::ProtoBuf; #[derive(Default, ProtoBuf)] pub struct UserToken { @@ -6,17 +6,6 @@ pub struct UserToken { pub token: String, } -#[derive(Debug, ProtoBuf_Enum)] -pub enum UserStatus { - Unknown = 0, - Login = 1, - Expired = 2, -} - -impl std::default::Default for UserStatus { - fn default() -> Self { UserStatus::Unknown } -} - #[derive(ProtoBuf, Default, Debug, PartialEq, Eq, Clone)] pub struct UserProfile { #[pb(index = 1)] diff --git a/rust-lib/flowy-user/src/event.rs b/rust-lib/flowy-user/src/event.rs index 76c9fe073c..b57f66045c 100644 --- a/rust-lib/flowy-user/src/event.rs +++ b/rust-lib/flowy-user/src/event.rs @@ -5,7 +5,7 @@ use strum_macros::Display; #[event_err = "UserError"] pub enum UserEvent { #[event(output = "UserProfile")] - GetUserProfile = 0, + InitUser = 0, #[event(input = "SignInRequest", output = "UserProfile")] SignIn = 1, @@ -18,4 +18,7 @@ pub enum UserEvent { #[event(input = "UpdateUserRequest")] UpdateUser = 4, + + #[event(output = "UserProfile")] + GetUserProfile = 5, } diff --git a/rust-lib/flowy-user/src/handlers/user_handler.rs b/rust-lib/flowy-user/src/handlers/user_handler.rs index 873dd8acba..403182788b 100644 --- a/rust-lib/flowy-user/src/handlers/user_handler.rs +++ b/rust-lib/flowy-user/src/handlers/user_handler.rs @@ -3,8 +3,14 @@ use flowy_dispatch::prelude::*; use std::{convert::TryInto, sync::Arc}; -#[tracing::instrument(name = "get_profile", skip(session))] -pub async fn user_profile_handler(session: Unit>) -> DataResult { +#[tracing::instrument(skip(session))] +pub async fn init_user_handler(session: Unit>) -> DataResult { + let user_profile = session.init_user().await?; + data_result(user_profile) +} + +#[tracing::instrument(skip(session))] +pub async fn get_user_profile_handler(session: Unit>) -> DataResult { let user_profile = session.user_profile().await?; data_result(user_profile) } diff --git a/rust-lib/flowy-user/src/module.rs b/rust-lib/flowy-user/src/module.rs index 8171068cc3..42350504b1 100644 --- a/rust-lib/flowy-user/src/module.rs +++ b/rust-lib/flowy-user/src/module.rs @@ -9,7 +9,8 @@ pub fn create(user_session: Arc) -> Module { .data(user_session) .event(UserEvent::SignIn, sign_in) .event(UserEvent::SignUp, sign_up) - .event(UserEvent::GetUserProfile, user_profile_handler) + .event(UserEvent::InitUser, init_user_handler) + .event(UserEvent::GetUserProfile, get_user_profile_handler) .event(UserEvent::SignOut, sign_out) .event(UserEvent::UpdateUser, update_user_handler) } diff --git a/rust-lib/flowy-user/src/protobuf/model/event.rs b/rust-lib/flowy-user/src/protobuf/model/event.rs index 32e93ad8f5..06b9799c86 100644 --- a/rust-lib/flowy-user/src/protobuf/model/event.rs +++ b/rust-lib/flowy-user/src/protobuf/model/event.rs @@ -25,11 +25,12 @@ #[derive(Clone,PartialEq,Eq,Debug,Hash)] pub enum UserEvent { - GetUserProfile = 0, + InitUser = 0, SignIn = 1, SignUp = 2, SignOut = 3, UpdateUser = 4, + GetUserProfile = 5, } impl ::protobuf::ProtobufEnum for UserEvent { @@ -39,22 +40,24 @@ impl ::protobuf::ProtobufEnum for UserEvent { fn from_i32(value: i32) -> ::std::option::Option { match value { - 0 => ::std::option::Option::Some(UserEvent::GetUserProfile), + 0 => ::std::option::Option::Some(UserEvent::InitUser), 1 => ::std::option::Option::Some(UserEvent::SignIn), 2 => ::std::option::Option::Some(UserEvent::SignUp), 3 => ::std::option::Option::Some(UserEvent::SignOut), 4 => ::std::option::Option::Some(UserEvent::UpdateUser), + 5 => ::std::option::Option::Some(UserEvent::GetUserProfile), _ => ::std::option::Option::None } } fn values() -> &'static [Self] { static values: &'static [UserEvent] = &[ - UserEvent::GetUserProfile, + UserEvent::InitUser, UserEvent::SignIn, UserEvent::SignUp, UserEvent::SignOut, UserEvent::UpdateUser, + UserEvent::GetUserProfile, ]; values } @@ -72,7 +75,7 @@ impl ::std::marker::Copy for UserEvent { impl ::std::default::Default for UserEvent { fn default() -> Self { - UserEvent::GetUserProfile + UserEvent::InitUser } } @@ -83,21 +86,24 @@ impl ::protobuf::reflect::ProtobufValue for UserEvent { } static file_descriptor_proto_data: &'static [u8] = b"\ - \n\x0bevent.proto*T\n\tUserEvent\x12\x12\n\x0eGetUserProfile\x10\0\x12\n\ - \n\x06SignIn\x10\x01\x12\n\n\x06SignUp\x10\x02\x12\x0b\n\x07SignOut\x10\ - \x03\x12\x0e\n\nUpdateUser\x10\x04J\xf7\x01\n\x06\x12\x04\0\0\x08\x01\n\ - \x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x05\0\x12\x04\x02\0\x08\x01\n\n\ - \n\x03\x05\0\x01\x12\x03\x02\x05\x0e\n\x0b\n\x04\x05\0\x02\0\x12\x03\x03\ - \x04\x17\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x03\x04\x12\n\x0c\n\x05\x05\ - \0\x02\0\x02\x12\x03\x03\x15\x16\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x04\ - \x04\x0f\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x04\x04\n\n\x0c\n\x05\x05\ - \0\x02\x01\x02\x12\x03\x04\r\x0e\n\x0b\n\x04\x05\0\x02\x02\x12\x03\x05\ - \x04\x0f\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\x05\x04\n\n\x0c\n\x05\x05\ - \0\x02\x02\x02\x12\x03\x05\r\x0e\n\x0b\n\x04\x05\0\x02\x03\x12\x03\x06\ - \x04\x10\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\x06\x04\x0b\n\x0c\n\x05\ - \x05\0\x02\x03\x02\x12\x03\x06\x0e\x0f\n\x0b\n\x04\x05\0\x02\x04\x12\x03\ - \x07\x04\x13\n\x0c\n\x05\x05\0\x02\x04\x01\x12\x03\x07\x04\x0e\n\x0c\n\ - \x05\x05\0\x02\x04\x02\x12\x03\x07\x11\x12b\x06proto3\ + \n\x0bevent.proto*b\n\tUserEvent\x12\x0c\n\x08InitUser\x10\0\x12\n\n\x06\ + SignIn\x10\x01\x12\n\n\x06SignUp\x10\x02\x12\x0b\n\x07SignOut\x10\x03\ + \x12\x0e\n\nUpdateUser\x10\x04\x12\x12\n\x0eGetUserProfile\x10\x05J\xa0\ + \x02\n\x06\x12\x04\0\0\t\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\ + \x05\0\x12\x04\x02\0\t\x01\n\n\n\x03\x05\0\x01\x12\x03\x02\x05\x0e\n\x0b\ + \n\x04\x05\0\x02\0\x12\x03\x03\x04\x11\n\x0c\n\x05\x05\0\x02\0\x01\x12\ + \x03\x03\x04\x0c\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x03\x0f\x10\n\x0b\n\ + \x04\x05\0\x02\x01\x12\x03\x04\x04\x0f\n\x0c\n\x05\x05\0\x02\x01\x01\x12\ + \x03\x04\x04\n\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x04\r\x0e\n\x0b\n\ + \x04\x05\0\x02\x02\x12\x03\x05\x04\x0f\n\x0c\n\x05\x05\0\x02\x02\x01\x12\ + \x03\x05\x04\n\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\x05\r\x0e\n\x0b\n\ + \x04\x05\0\x02\x03\x12\x03\x06\x04\x10\n\x0c\n\x05\x05\0\x02\x03\x01\x12\ + \x03\x06\x04\x0b\n\x0c\n\x05\x05\0\x02\x03\x02\x12\x03\x06\x0e\x0f\n\x0b\ + \n\x04\x05\0\x02\x04\x12\x03\x07\x04\x13\n\x0c\n\x05\x05\0\x02\x04\x01\ + \x12\x03\x07\x04\x0e\n\x0c\n\x05\x05\0\x02\x04\x02\x12\x03\x07\x11\x12\n\ + \x0b\n\x04\x05\0\x02\x05\x12\x03\x08\x04\x17\n\x0c\n\x05\x05\0\x02\x05\ + \x01\x12\x03\x08\x04\x12\n\x0c\n\x05\x05\0\x02\x05\x02\x12\x03\x08\x15\ + \x16b\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/rust-lib/flowy-user/src/protobuf/model/user_profile.rs b/rust-lib/flowy-user/src/protobuf/model/user_profile.rs index c04ea8be8b..842cbed2b6 100644 --- a/rust-lib/flowy-user/src/protobuf/model/user_profile.rs +++ b/rust-lib/flowy-user/src/protobuf/model/user_profile.rs @@ -1273,59 +1273,6 @@ impl ::protobuf::reflect::ProtobufValue for UpdateUserParams { } } -#[derive(Clone,PartialEq,Eq,Debug,Hash)] -pub enum UserStatus { - Unknown = 0, - Login = 1, - Expired = 2, -} - -impl ::protobuf::ProtobufEnum for UserStatus { - fn value(&self) -> i32 { - *self as i32 - } - - fn from_i32(value: i32) -> ::std::option::Option { - match value { - 0 => ::std::option::Option::Some(UserStatus::Unknown), - 1 => ::std::option::Option::Some(UserStatus::Login), - 2 => ::std::option::Option::Some(UserStatus::Expired), - _ => ::std::option::Option::None - } - } - - fn values() -> &'static [Self] { - static values: &'static [UserStatus] = &[ - UserStatus::Unknown, - UserStatus::Login, - UserStatus::Expired, - ]; - values - } - - fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor { - static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::EnumDescriptor> = ::protobuf::rt::LazyV2::INIT; - descriptor.get(|| { - ::protobuf::reflect::EnumDescriptor::new_pb_name::("UserStatus", file_descriptor_proto()) - }) - } -} - -impl ::std::marker::Copy for UserStatus { -} - -impl ::std::default::Default for UserStatus { - fn default() -> Self { - UserStatus::Unknown - } -} - -impl ::protobuf::reflect::ProtobufValue for UserStatus { - fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { - ::protobuf::reflect::ReflectValueRef::Enum(::protobuf::ProtobufEnum::descriptor(self)) - } -} - static file_descriptor_proto_data: &'static [u8] = b"\ \n\x12user_profile.proto\"!\n\tUserToken\x12\x14\n\x05token\x18\x01\x20\ \x01(\tR\x05token\"]\n\x0bUserProfile\x12\x0e\n\x02id\x18\x01\x20\x01(\t\ @@ -1339,64 +1286,56 @@ static file_descriptor_proto_data: &'static [u8] = b"\ id\x18\x01\x20\x01(\tR\x02id\x12\x14\n\x04name\x18\x02\x20\x01(\tH\0R\ \x04name\x12\x16\n\x05email\x18\x03\x20\x01(\tH\x01R\x05email\x12\x1c\n\ \x08password\x18\x04\x20\x01(\tH\x02R\x08passwordB\r\n\x0bone_of_nameB\ - \x0e\n\x0cone_of_emailB\x11\n\x0fone_of_password*1\n\nUserStatus\x12\x0b\ - \n\x07Unknown\x10\0\x12\t\n\x05Login\x10\x01\x12\x0b\n\x07Expired\x10\ - \x02J\xf2\x08\n\x06\x12\x04\0\0\x1b\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\ - \n\n\n\x02\x04\0\x12\x04\x02\0\x04\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\ - \x08\x11\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x15\n\x0c\n\x05\x04\0\ - \x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\x0b\ - \x10\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x13\x14\n\n\n\x02\x04\x01\ - \x12\x04\x05\0\n\x01\n\n\n\x03\x04\x01\x01\x12\x03\x05\x08\x13\n\x0b\n\ - \x04\x04\x01\x02\0\x12\x03\x06\x04\x12\n\x0c\n\x05\x04\x01\x02\0\x05\x12\ - \x03\x06\x04\n\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x06\x0b\r\n\x0c\n\ - \x05\x04\x01\x02\0\x03\x12\x03\x06\x10\x11\n\x0b\n\x04\x04\x01\x02\x01\ - \x12\x03\x07\x04\x15\n\x0c\n\x05\x04\x01\x02\x01\x05\x12\x03\x07\x04\n\n\ - \x0c\n\x05\x04\x01\x02\x01\x01\x12\x03\x07\x0b\x10\n\x0c\n\x05\x04\x01\ - \x02\x01\x03\x12\x03\x07\x13\x14\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\x08\ - \x04\x14\n\x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\x08\x04\n\n\x0c\n\x05\ - \x04\x01\x02\x02\x01\x12\x03\x08\x0b\x0f\n\x0c\n\x05\x04\x01\x02\x02\x03\ - \x12\x03\x08\x12\x13\n\x0b\n\x04\x04\x01\x02\x03\x12\x03\t\x04\x15\n\x0c\ - \n\x05\x04\x01\x02\x03\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\x01\x02\x03\ - \x01\x12\x03\t\x0b\x10\n\x0c\n\x05\x04\x01\x02\x03\x03\x12\x03\t\x13\x14\ - \n\n\n\x02\x04\x02\x12\x04\x0b\0\x10\x01\n\n\n\x03\x04\x02\x01\x12\x03\ - \x0b\x08\x19\n\x0b\n\x04\x04\x02\x02\0\x12\x03\x0c\x04\x12\n\x0c\n\x05\ - \x04\x02\x02\0\x05\x12\x03\x0c\x04\n\n\x0c\n\x05\x04\x02\x02\0\x01\x12\ - \x03\x0c\x0b\r\n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03\x0c\x10\x11\n\x0b\n\ - \x04\x04\x02\x08\0\x12\x03\r\x04*\n\x0c\n\x05\x04\x02\x08\0\x01\x12\x03\ - \r\n\x15\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\r\x18(\n\x0c\n\x05\x04\x02\ - \x02\x01\x05\x12\x03\r\x18\x1e\n\x0c\n\x05\x04\x02\x02\x01\x01\x12\x03\r\ - \x1f#\n\x0c\n\x05\x04\x02\x02\x01\x03\x12\x03\r&'\n\x0b\n\x04\x04\x02\ - \x08\x01\x12\x03\x0e\x04,\n\x0c\n\x05\x04\x02\x08\x01\x01\x12\x03\x0e\n\ - \x16\n\x0b\n\x04\x04\x02\x02\x02\x12\x03\x0e\x19*\n\x0c\n\x05\x04\x02\ - \x02\x02\x05\x12\x03\x0e\x19\x1f\n\x0c\n\x05\x04\x02\x02\x02\x01\x12\x03\ - \x0e\x20%\n\x0c\n\x05\x04\x02\x02\x02\x03\x12\x03\x0e()\n\x0b\n\x04\x04\ - \x02\x08\x02\x12\x03\x0f\x042\n\x0c\n\x05\x04\x02\x08\x02\x01\x12\x03\ - \x0f\n\x19\n\x0b\n\x04\x04\x02\x02\x03\x12\x03\x0f\x1c0\n\x0c\n\x05\x04\ - \x02\x02\x03\x05\x12\x03\x0f\x1c\"\n\x0c\n\x05\x04\x02\x02\x03\x01\x12\ - \x03\x0f#+\n\x0c\n\x05\x04\x02\x02\x03\x03\x12\x03\x0f./\n\n\n\x02\x04\ - \x03\x12\x04\x11\0\x16\x01\n\n\n\x03\x04\x03\x01\x12\x03\x11\x08\x18\n\ - \x0b\n\x04\x04\x03\x02\0\x12\x03\x12\x04\x12\n\x0c\n\x05\x04\x03\x02\0\ - \x05\x12\x03\x12\x04\n\n\x0c\n\x05\x04\x03\x02\0\x01\x12\x03\x12\x0b\r\n\ - \x0c\n\x05\x04\x03\x02\0\x03\x12\x03\x12\x10\x11\n\x0b\n\x04\x04\x03\x08\ - \0\x12\x03\x13\x04*\n\x0c\n\x05\x04\x03\x08\0\x01\x12\x03\x13\n\x15\n\ - \x0b\n\x04\x04\x03\x02\x01\x12\x03\x13\x18(\n\x0c\n\x05\x04\x03\x02\x01\ - \x05\x12\x03\x13\x18\x1e\n\x0c\n\x05\x04\x03\x02\x01\x01\x12\x03\x13\x1f\ - #\n\x0c\n\x05\x04\x03\x02\x01\x03\x12\x03\x13&'\n\x0b\n\x04\x04\x03\x08\ - \x01\x12\x03\x14\x04,\n\x0c\n\x05\x04\x03\x08\x01\x01\x12\x03\x14\n\x16\ - \n\x0b\n\x04\x04\x03\x02\x02\x12\x03\x14\x19*\n\x0c\n\x05\x04\x03\x02\ - \x02\x05\x12\x03\x14\x19\x1f\n\x0c\n\x05\x04\x03\x02\x02\x01\x12\x03\x14\ - \x20%\n\x0c\n\x05\x04\x03\x02\x02\x03\x12\x03\x14()\n\x0b\n\x04\x04\x03\ - \x08\x02\x12\x03\x15\x042\n\x0c\n\x05\x04\x03\x08\x02\x01\x12\x03\x15\n\ - \x19\n\x0b\n\x04\x04\x03\x02\x03\x12\x03\x15\x1c0\n\x0c\n\x05\x04\x03\ - \x02\x03\x05\x12\x03\x15\x1c\"\n\x0c\n\x05\x04\x03\x02\x03\x01\x12\x03\ - \x15#+\n\x0c\n\x05\x04\x03\x02\x03\x03\x12\x03\x15./\n\n\n\x02\x05\0\x12\ - \x04\x17\0\x1b\x01\n\n\n\x03\x05\0\x01\x12\x03\x17\x05\x0f\n\x0b\n\x04\ - \x05\0\x02\0\x12\x03\x18\x04\x10\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x18\ - \x04\x0b\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x18\x0e\x0f\n\x0b\n\x04\x05\ - \0\x02\x01\x12\x03\x19\x04\x0e\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x19\ - \x04\t\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x19\x0c\r\n\x0b\n\x04\x05\0\ - \x02\x02\x12\x03\x1a\x04\x10\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\x1a\ - \x04\x0b\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\x1a\x0e\x0fb\x06proto3\ + \x0e\n\x0cone_of_emailB\x11\n\x0fone_of_passwordJ\xdf\x07\n\x06\x12\x04\ + \0\0\x16\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\ + \0\x04\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x11\n\x0b\n\x04\x04\0\x02\ + \0\x12\x03\x03\x04\x15\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\ + \x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\x0b\x10\n\x0c\n\x05\x04\0\x02\0\ + \x03\x12\x03\x03\x13\x14\n\n\n\x02\x04\x01\x12\x04\x05\0\n\x01\n\n\n\x03\ + \x04\x01\x01\x12\x03\x05\x08\x13\n\x0b\n\x04\x04\x01\x02\0\x12\x03\x06\ + \x04\x12\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\x06\x04\n\n\x0c\n\x05\x04\ + \x01\x02\0\x01\x12\x03\x06\x0b\r\n\x0c\n\x05\x04\x01\x02\0\x03\x12\x03\ + \x06\x10\x11\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\x07\x04\x15\n\x0c\n\x05\ + \x04\x01\x02\x01\x05\x12\x03\x07\x04\n\n\x0c\n\x05\x04\x01\x02\x01\x01\ + \x12\x03\x07\x0b\x10\n\x0c\n\x05\x04\x01\x02\x01\x03\x12\x03\x07\x13\x14\ + \n\x0b\n\x04\x04\x01\x02\x02\x12\x03\x08\x04\x14\n\x0c\n\x05\x04\x01\x02\ + \x02\x05\x12\x03\x08\x04\n\n\x0c\n\x05\x04\x01\x02\x02\x01\x12\x03\x08\ + \x0b\x0f\n\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\x08\x12\x13\n\x0b\n\x04\ + \x04\x01\x02\x03\x12\x03\t\x04\x15\n\x0c\n\x05\x04\x01\x02\x03\x05\x12\ + \x03\t\x04\n\n\x0c\n\x05\x04\x01\x02\x03\x01\x12\x03\t\x0b\x10\n\x0c\n\ + \x05\x04\x01\x02\x03\x03\x12\x03\t\x13\x14\n\n\n\x02\x04\x02\x12\x04\x0b\ + \0\x10\x01\n\n\n\x03\x04\x02\x01\x12\x03\x0b\x08\x19\n\x0b\n\x04\x04\x02\ + \x02\0\x12\x03\x0c\x04\x12\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0c\x04\ + \n\n\x0c\n\x05\x04\x02\x02\0\x01\x12\x03\x0c\x0b\r\n\x0c\n\x05\x04\x02\ + \x02\0\x03\x12\x03\x0c\x10\x11\n\x0b\n\x04\x04\x02\x08\0\x12\x03\r\x04*\ + \n\x0c\n\x05\x04\x02\x08\0\x01\x12\x03\r\n\x15\n\x0b\n\x04\x04\x02\x02\ + \x01\x12\x03\r\x18(\n\x0c\n\x05\x04\x02\x02\x01\x05\x12\x03\r\x18\x1e\n\ + \x0c\n\x05\x04\x02\x02\x01\x01\x12\x03\r\x1f#\n\x0c\n\x05\x04\x02\x02\ + \x01\x03\x12\x03\r&'\n\x0b\n\x04\x04\x02\x08\x01\x12\x03\x0e\x04,\n\x0c\ + \n\x05\x04\x02\x08\x01\x01\x12\x03\x0e\n\x16\n\x0b\n\x04\x04\x02\x02\x02\ + \x12\x03\x0e\x19*\n\x0c\n\x05\x04\x02\x02\x02\x05\x12\x03\x0e\x19\x1f\n\ + \x0c\n\x05\x04\x02\x02\x02\x01\x12\x03\x0e\x20%\n\x0c\n\x05\x04\x02\x02\ + \x02\x03\x12\x03\x0e()\n\x0b\n\x04\x04\x02\x08\x02\x12\x03\x0f\x042\n\ + \x0c\n\x05\x04\x02\x08\x02\x01\x12\x03\x0f\n\x19\n\x0b\n\x04\x04\x02\x02\ + \x03\x12\x03\x0f\x1c0\n\x0c\n\x05\x04\x02\x02\x03\x05\x12\x03\x0f\x1c\"\ + \n\x0c\n\x05\x04\x02\x02\x03\x01\x12\x03\x0f#+\n\x0c\n\x05\x04\x02\x02\ + \x03\x03\x12\x03\x0f./\n\n\n\x02\x04\x03\x12\x04\x11\0\x16\x01\n\n\n\x03\ + \x04\x03\x01\x12\x03\x11\x08\x18\n\x0b\n\x04\x04\x03\x02\0\x12\x03\x12\ + \x04\x12\n\x0c\n\x05\x04\x03\x02\0\x05\x12\x03\x12\x04\n\n\x0c\n\x05\x04\ + \x03\x02\0\x01\x12\x03\x12\x0b\r\n\x0c\n\x05\x04\x03\x02\0\x03\x12\x03\ + \x12\x10\x11\n\x0b\n\x04\x04\x03\x08\0\x12\x03\x13\x04*\n\x0c\n\x05\x04\ + \x03\x08\0\x01\x12\x03\x13\n\x15\n\x0b\n\x04\x04\x03\x02\x01\x12\x03\x13\ + \x18(\n\x0c\n\x05\x04\x03\x02\x01\x05\x12\x03\x13\x18\x1e\n\x0c\n\x05\ + \x04\x03\x02\x01\x01\x12\x03\x13\x1f#\n\x0c\n\x05\x04\x03\x02\x01\x03\ + \x12\x03\x13&'\n\x0b\n\x04\x04\x03\x08\x01\x12\x03\x14\x04,\n\x0c\n\x05\ + \x04\x03\x08\x01\x01\x12\x03\x14\n\x16\n\x0b\n\x04\x04\x03\x02\x02\x12\ + \x03\x14\x19*\n\x0c\n\x05\x04\x03\x02\x02\x05\x12\x03\x14\x19\x1f\n\x0c\ + \n\x05\x04\x03\x02\x02\x01\x12\x03\x14\x20%\n\x0c\n\x05\x04\x03\x02\x02\ + \x03\x12\x03\x14()\n\x0b\n\x04\x04\x03\x08\x02\x12\x03\x15\x042\n\x0c\n\ + \x05\x04\x03\x08\x02\x01\x12\x03\x15\n\x19\n\x0b\n\x04\x04\x03\x02\x03\ + \x12\x03\x15\x1c0\n\x0c\n\x05\x04\x03\x02\x03\x05\x12\x03\x15\x1c\"\n\ + \x0c\n\x05\x04\x03\x02\x03\x01\x12\x03\x15#+\n\x0c\n\x05\x04\x03\x02\x03\ + \x03\x12\x03\x15./b\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/rust-lib/flowy-user/src/protobuf/proto/event.proto b/rust-lib/flowy-user/src/protobuf/proto/event.proto index 96ec640e3e..e7a54edbd0 100644 --- a/rust-lib/flowy-user/src/protobuf/proto/event.proto +++ b/rust-lib/flowy-user/src/protobuf/proto/event.proto @@ -1,9 +1,10 @@ syntax = "proto3"; enum UserEvent { - GetUserProfile = 0; + InitUser = 0; SignIn = 1; SignUp = 2; SignOut = 3; UpdateUser = 4; + GetUserProfile = 5; } diff --git a/rust-lib/flowy-user/src/protobuf/proto/user_profile.proto b/rust-lib/flowy-user/src/protobuf/proto/user_profile.proto index bf1b5eea73..c139f053fc 100644 --- a/rust-lib/flowy-user/src/protobuf/proto/user_profile.proto +++ b/rust-lib/flowy-user/src/protobuf/proto/user_profile.proto @@ -21,8 +21,3 @@ message UpdateUserParams { oneof one_of_email { string email = 3; }; oneof one_of_password { string password = 4; }; } -enum UserStatus { - Unknown = 0; - Login = 1; - Expired = 2; -} diff --git a/rust-lib/flowy-user/src/services/user/builder.rs b/rust-lib/flowy-user/src/services/user/builder.rs index 7507032a41..3a28f68a68 100644 --- a/rust-lib/flowy-user/src/services/user/builder.rs +++ b/rust-lib/flowy-user/src/services/user/builder.rs @@ -1,20 +1,31 @@ -use crate::services::user::{UserSession, UserSessionConfig}; +use crate::services::user::{SessionStatusCallback, UserSession, UserSessionConfig}; +use std::sync::Arc; pub struct UserSessionBuilder { config: Option, + callback: SessionStatusCallback, } impl UserSessionBuilder { - pub fn new() -> Self { Self { config: None } } + pub fn new() -> Self { + Self { + config: None, + callback: Arc::new(|_| {}), + } + } pub fn root_dir(mut self, dir: &str) -> Self { self.config = Some(UserSessionConfig::new(dir)); self } + pub fn status_callback(mut self, callback: SessionStatusCallback) -> Self { + self.callback = callback; + self + } + pub fn build(mut self) -> UserSession { let config = self.config.take().unwrap(); - - UserSession::new(config) + UserSession::new(config, self.callback.clone()) } } diff --git a/rust-lib/flowy-user/src/services/user/user_session.rs b/rust-lib/flowy-user/src/services/user/user_session.rs index 261e0de6d1..1cbb783323 100644 --- a/rust-lib/flowy-user/src/services/user/user_session.rs +++ b/rust-lib/flowy-user/src/services/user/user_session.rs @@ -18,6 +18,7 @@ use flowy_database::{ }; use flowy_infra::kv::KV; use flowy_sqlite::ConnectionPool; +use flowy_ws::WsController; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -34,24 +35,36 @@ impl UserSessionConfig { } } +pub enum SessionStatus { + Login { token: String }, + Expired { token: String }, +} +pub type SessionStatusCallback = Arc; + pub struct UserSession { database: UserDB, config: UserSessionConfig, #[allow(dead_code)] server: Server, session: RwLock>, + ws: RwLock, + status_callback: SessionStatusCallback, } impl UserSession { - pub fn new(config: UserSessionConfig) -> Self { + pub fn new(config: UserSessionConfig, status_callback: SessionStatusCallback) -> Self { let db = UserDB::new(&config.root_dir); let server = construct_user_server(); - Self { + let ws = RwLock::new(WsController::new()); + let user_session = Self { database: db, config, server, session: RwLock::new(None), - } + ws, + status_callback, + }; + user_session } pub fn db_connection(&self) -> Result { @@ -80,6 +93,9 @@ impl UserSession { let _ = self.set_session(Some(session))?; let user_table = self.save_user(resp.into()).await?; let user_profile = UserProfile::from(user_table); + (self.status_callback)(SessionStatus::Login { + token: user_profile.token.clone(), + }); Ok(user_profile) } } @@ -94,6 +110,9 @@ impl UserSession { let _ = self.set_session(Some(session))?; let user_table = self.save_user(resp.into()).await?; let user_profile = UserProfile::from(user_table); + (self.status_callback)(SessionStatus::Login { + token: user_profile.token.clone(), + }); Ok(user_profile) } } @@ -104,6 +123,9 @@ impl UserSession { let _ = diesel::delete(dsl::user_table.filter(dsl::id.eq(&session.user_id))).execute(&*(self.db_connection()?))?; let _ = self.database.close_user_db(&session.user_id)?; let _ = self.set_session(None)?; + (self.status_callback)(SessionStatus::Expired { + token: session.token.clone(), + }); let _ = self.sign_out_on_server(&session.token).await?; Ok(()) @@ -119,13 +141,26 @@ impl UserSession { Ok(()) } + pub async fn init_user(&self) -> Result { + let (user_id, token) = self.get_session()?.into_part(); + + let user = dsl::user_table + .filter(user_table::id.eq(&user_id)) + .first::(&*(self.db_connection()?))?; + + let _ = self.read_user_profile_on_server(&token)?; + let _ = self.start_ws_connection(&token)?; + + Ok(UserProfile::from(user)) + } + pub async fn user_profile(&self) -> Result { let (user_id, token) = self.get_session()?.into_part(); let user = dsl::user_table .filter(user_table::id.eq(&user_id)) .first::(&*(self.db_connection()?))?; - let _ = self.read_user_profile_on_server(&token).await?; + let _ = self.read_user_profile_on_server(&token)?; Ok(UserProfile::from(user)) } @@ -140,7 +175,7 @@ impl UserSession { } impl UserSession { - async fn read_user_profile_on_server(&self, token: &str) -> Result<(), UserError> { + fn read_user_profile_on_server(&self, token: &str) -> Result<(), UserError> { let server = self.server.clone(); let token = token.to_owned(); tokio::spawn(async move { @@ -225,6 +260,25 @@ impl UserSession { Err(_) => false, } } + + fn start_ws_connection(&self, token: &str) -> Result<(), UserError> { + let addr = format!("{}/{}", flowy_net::config::WS_ADDR.as_str(), token); + log::debug!("🐴 Try to connect: {}", &addr); + let (conn, handlers) = self.ws.write().make_connect(addr); + tokio::spawn(async { + match conn.await { + Ok(_) => { + log::debug!("🐴 ws connect success"); + let _ = handlers.await; + }, + Err(e) => { + // TODO: retry? + log::error!("ws connect failed: {}", e); + }, + } + }); + Ok(()) + } } pub async fn update_user(_server: Server, pool: Arc, params: UpdateUserParams) -> Result<(), UserError> { diff --git a/rust-lib/flowy-ws/Cargo.toml b/rust-lib/flowy-ws/Cargo.toml index d58f0f637e..557ff047d3 100644 --- a/rust-lib/flowy-ws/Cargo.toml +++ b/rust-lib/flowy-ws/Cargo.toml @@ -14,8 +14,6 @@ futures-util = "0.3.17" futures-channel = "0.3.17" tokio = {version = "1", features = ["full"]} futures = "0.3.17" -lazy_static = "1.4" -parking_lot = "0.11" bytes = "0.5" pin-project = "1.0.0" futures-core = { version = "0.3", default-features = false } @@ -27,4 +25,5 @@ strum = "0.21" strum_macros = "0.21" [dev-dependencies] -tokio = {version = "1", features = ["full"]} \ No newline at end of file +tokio = {version = "1", features = ["full"]} +env_logger = "0.8.2" \ No newline at end of file diff --git a/rust-lib/flowy-ws/src/errors.rs b/rust-lib/flowy-ws/src/errors.rs index 9e55d3417c..d0f02dda19 100644 --- a/rust-lib/flowy-ws/src/errors.rs +++ b/rust-lib/flowy-ws/src/errors.rs @@ -27,6 +27,7 @@ macro_rules! static_user_error { } impl WsError { + #[allow(dead_code)] pub(crate) fn new(code: ErrorCode) -> WsError { WsError { code, msg: "".to_string() } } pub fn context(mut self, error: T) -> Self { diff --git a/rust-lib/flowy-ws/src/ws.rs b/rust-lib/flowy-ws/src/ws.rs index 14daeea857..75b58ff99c 100644 --- a/rust-lib/flowy-ws/src/ws.rs +++ b/rust-lib/flowy-ws/src/ws.rs @@ -1,9 +1,8 @@ use crate::errors::WsError; +use flowy_net::{errors::ServerError, response::FlowyResponse}; use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; use futures_core::{future::BoxFuture, ready, Stream}; use futures_util::{pin_mut, FutureExt, StreamExt}; -use lazy_static::lazy_static; -use parking_lot::RwLock; use pin_project::pin_project; use std::{ future::Future, @@ -14,20 +13,17 @@ use std::{ use tokio::net::TcpStream; use tokio_tungstenite::{ connect_async, - tungstenite::{handshake::client::Response, Error, Message}, + tungstenite::{handshake::client::Response, http::StatusCode, Error, Message}, MaybeTlsStream, WebSocketStream, }; -lazy_static! { - pub static ref WS: RwLock = RwLock::new(WsController::new()); -} - -pub fn start_ws_connection() { WS.write().connect(flowy_net::config::WS_ADDR.as_ref()); } pub type MsgReceiver = UnboundedReceiver; pub type MsgSender = UnboundedSender; pub trait WsMessageHandler: Sync + Send + 'static { - fn handler_message(&self, msg: &Message); + fn can_handle(&self) -> bool; + fn receive_message(&self, msg: &Message); + fn send_message(&self, sender: Arc); } pub struct WsController { @@ -47,13 +43,15 @@ impl WsController { pub fn add_handlers(&mut self, handler: Arc) { self.handlers.push(handler); } - pub fn connect(&mut self, addr: &str) { - let (ws, handlers) = self.make_connect(&addr); - let _ = tokio::spawn(ws); + #[allow(dead_code)] + pub async fn connect(&mut self, addr: String) -> Result<(), ServerError> { + let (conn, handlers) = self.make_connect(addr); + let _ = conn.await?; let _ = tokio::spawn(handlers); + Ok(()) } - fn make_connect(&mut self, addr: &str) -> (WsRaw, WsHandlers) { + pub fn make_connect(&mut self, addr: String) -> (WsConnection, WsHandlers) { // Stream User // ┌───────────────┐ ┌──────────────┐ // ┌──────┐ │ ┌─────────┐ │ ┌────────┐ │ ┌────────┐ │ @@ -64,15 +62,12 @@ impl WsController { // └─────────┼──│ws_write │◀─┼────│ ws_rx │◀──┼──│ ws_tx │ │ // │ └─────────┘ │ └────────┘ │ └────────┘ │ // └───────────────┘ └──────────────┘ - let addr = addr.to_string(); let (msg_tx, msg_rx) = futures_channel::mpsc::unbounded(); let (ws_tx, ws_rx) = futures_channel::mpsc::unbounded(); let sender = Arc::new(WsSender::new(ws_tx)); let handlers = self.handlers.clone(); self.sender = Some(sender.clone()); - log::debug!("🐴ws prepare connection"); - - (WsRaw::new(msg_tx, ws_rx, addr), WsHandlers::new(handlers, msg_rx)) + (WsConnection::new(msg_tx, ws_rx, addr), WsHandlers::new(handlers, msg_rx)) } pub fn send_message(&self, msg: Message) -> Result<(), WsError> { @@ -84,7 +79,7 @@ impl WsController { } #[pin_project] -struct WsHandlers { +pub struct WsHandlers { #[pin] msg_rx: MsgReceiver, handlers: Vec>, @@ -101,7 +96,7 @@ impl Future for WsHandlers { match ready!(self.as_mut().project().msg_rx.poll_next(cx)) { None => return Poll::Ready(()), Some(message) => self.handlers.iter().for_each(|handler| { - handler.handler_message(&message); + handler.receive_message(&message); }), } } @@ -109,16 +104,16 @@ impl Future for WsHandlers { } #[pin_project] -pub struct WsRaw { +pub struct WsConnection { msg_tx: Option, ws_rx: Option, #[pin] fut: BoxFuture<'static, Result<(WebSocketStream>, Response), Error>>, } -impl WsRaw { +impl WsConnection { pub fn new(msg_tx: MsgSender, ws_rx: MsgReceiver, addr: String) -> Self { - WsRaw { + WsConnection { msg_tx: Some(msg_tx), ws_rx: Some(ws_rx), fut: Box::pin(async move { connect_async(&addr).await }), @@ -126,8 +121,8 @@ impl WsRaw { } } -impl Future for WsRaw { - type Output = (); +impl Future for WsConnection { + type Output = Result<(), ServerError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // [[pin]] // poll async function. The following methods not work. @@ -147,50 +142,35 @@ impl Future for WsRaw { loop { return match ready!(self.as_mut().project().fut.poll(cx)) { Ok((stream, _)) => { - log::debug!("🐴 ws connect success"); let mut ws_stream = WsStream { msg_tx: self.msg_tx.take(), ws_rx: self.ws_rx.take(), stream: Some(stream), }; match Pin::new(&mut ws_stream).poll(cx) { - Poll::Ready(_a) => Poll::Ready(()), + Poll::Ready(_) => Poll::Ready(Ok(())), Poll::Pending => Poll::Pending, } }, - Err(e) => { - log::error!("🐴 ws connect failed: {:?}", e); - Poll::Ready(()) - }, + Err(error) => Poll::Ready(Err(error_to_flowy_response(error))), }; } } } -#[pin_project] -struct WsConn { - #[pin] - fut: BoxFuture<'static, Result<(WebSocketStream>, Response), Error>>, -} +fn error_to_flowy_response(error: tokio_tungstenite::tungstenite::Error) -> ServerError { + let error = match error { + Error::Http(response) => { + if response.status() == StatusCode::UNAUTHORIZED { + ServerError::unauthorized() + } else { + ServerError::internal().context(response) + } + }, + _ => ServerError::internal().context(error), + }; -impl WsConn { - fn new(addr: String) -> Self { - Self { - fut: Box::pin(async move { connect_async(&addr).await }), - } - } -} - -impl Future for WsConn { - type Output = Result<(WebSocketStream>, Response), Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - return match ready!(self.as_mut().project().fut.poll(cx)) { - Ok(o) => Poll::Ready(Ok(o)), - Err(e) => Poll::Ready(Err(e)), - }; - } - } + error } struct WsStream { @@ -218,7 +198,7 @@ impl Future for WsStream { }); pin_mut!(to_ws, from_ws); - log::debug!("🐴 ws start poll stream"); + log::trace!("🐴 ws start poll stream"); match to_ws.poll_unpin(cx) { Poll::Ready(_) => Poll::Ready(()), Poll::Pending => match from_ws.poll_unpin(cx) { @@ -245,39 +225,18 @@ impl WsSender { #[cfg(test)] mod tests { use super::WsController; - use futures_util::{pin_mut, StreamExt}; - use tokio_tungstenite::connect_async; #[tokio::test] async fn connect() { + std::env::set_var("RUST_LOG", "Debug"); + env_logger::init(); + let mut controller = WsController::new(); let addr = format!("{}/123", flowy_net::config::WS_ADDR.as_str()); - let (a, b) = controller.make_connect(&addr); + let (a, b) = controller.make_connect(addr); tokio::select! { - _ = a => println!("write completed"), + r = a => println!("write completed {:?}", r), _ = b => println!("read completed"), }; } - - #[tokio::test] - async fn connect_raw() { - let _controller = WsController::new(); - let addr = format!("{}/123", flowy_net::config::WS_ADDR.as_str()); - let (tx, rx) = futures_channel::mpsc::unbounded(); - let (ws_write, ws_read) = connect_async(&addr).await.unwrap().0.split(); - let to_ws = rx.map(Ok).forward(ws_write); - let from_ws = ws_read.for_each(|message| async { - tx.unbounded_send(message.unwrap()).unwrap(); - }); - - pin_mut!(to_ws, from_ws); - tokio::select! { - _ = to_ws => { - log::debug!("ws write completed") - } - _ = from_ws => { - log::debug!("ws read completed") - } - }; - } }