diff --git a/frontend/app_flowy/lib/core/folder_notification.dart b/frontend/app_flowy/lib/core/folder_notification.dart new file mode 100644 index 0000000000..1f9d7751ce --- /dev/null +++ b/frontend/app_flowy/lib/core/folder_notification.dart @@ -0,0 +1,39 @@ +import 'dart:async'; +import 'dart:typed_data'; +import 'package:flowy_sdk/protobuf/dart-notify/protobuf.dart'; +import 'package:dartz/dartz.dart'; +import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; +import 'package:flowy_sdk/protobuf/flowy-folder/dart_notification.pb.dart'; +import 'package:flowy_sdk/rust_stream.dart'; + +import 'notification_helper.dart'; + +// Folder +typedef FolderNotificationCallback = void Function(FolderNotification, Either); + +class FolderNotificationParser extends NotificationParser { + FolderNotificationParser({String? id, required FolderNotificationCallback callback}) + : super( + id: id, + callback: callback, + tyParser: (ty) => FolderNotification.valueOf(ty), + errorParser: (bytes) => FlowyError.fromBuffer(bytes), + ); +} + +typedef FolderNotificationHandler = Function(FolderNotification ty, Either result); + +class FolderNotificationListener { + StreamSubscription? _subscription; + FolderNotificationParser? _parser; + + FolderNotificationListener({required String objectId, required FolderNotificationHandler handler}) + : _parser = FolderNotificationParser(id: objectId, callback: handler) { + _subscription = RustStreamReceiver.listen((observable) => _parser?.parse(observable)); + } + + Future stop() async { + _parser = null; + await _subscription?.cancel(); + } +} diff --git a/frontend/app_flowy/lib/core/grid_notification.dart b/frontend/app_flowy/lib/core/grid_notification.dart new file mode 100644 index 0000000000..e45bf5efe2 --- /dev/null +++ b/frontend/app_flowy/lib/core/grid_notification.dart @@ -0,0 +1,39 @@ +import 'dart:async'; +import 'dart:typed_data'; +import 'package:flowy_sdk/protobuf/dart-notify/protobuf.dart'; +import 'package:dartz/dartz.dart'; +import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; +import 'package:flowy_sdk/protobuf/flowy-grid/dart_notification.pb.dart'; +import 'package:flowy_sdk/rust_stream.dart'; + +import 'notification_helper.dart'; + +// Grid +typedef GridNotificationCallback = void Function(GridNotification, Either); + +class GridNotificationParser extends NotificationParser { + GridNotificationParser({String? id, required GridNotificationCallback callback}) + : super( + id: id, + callback: callback, + tyParser: (ty) => GridNotification.valueOf(ty), + errorParser: (bytes) => FlowyError.fromBuffer(bytes), + ); +} + +typedef GridNotificationHandler = Function(GridNotification ty, Either result); + +class GridNotificationListener { + StreamSubscription? _subscription; + GridNotificationParser? _parser; + + GridNotificationListener({required String objectId, required GridNotificationHandler handler}) + : _parser = GridNotificationParser(id: objectId, callback: handler) { + _subscription = RustStreamReceiver.listen((observable) => _parser?.parse(observable)); + } + + Future stop() async { + _parser = null; + await _subscription?.cancel(); + } +} diff --git a/frontend/app_flowy/lib/core/notification_helper.dart b/frontend/app_flowy/lib/core/notification_helper.dart index 650abc544c..ba99c8bab0 100644 --- a/frontend/app_flowy/lib/core/notification_helper.dart +++ b/frontend/app_flowy/lib/core/notification_helper.dart @@ -1,68 +1,6 @@ -import 'dart:async'; import 'dart:typed_data'; import 'package:flowy_sdk/protobuf/dart-notify/protobuf.dart'; -import 'package:flowy_sdk/protobuf/flowy-user/protobuf.dart'; import 'package:dartz/dartz.dart'; -import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; -import 'package:flowy_sdk/protobuf/flowy-folder/dart_notification.pb.dart'; -import 'package:flowy_sdk/protobuf/flowy-grid/dart_notification.pb.dart'; -import 'package:flowy_sdk/rust_stream.dart'; - -// User -typedef UserNotificationCallback = void Function(UserNotification, Either); - -class UserNotificationParser extends NotificationParser { - UserNotificationParser({required String id, required UserNotificationCallback callback}) - : super( - id: id, - callback: callback, - tyParser: (ty) => UserNotification.valueOf(ty), - errorParser: (bytes) => FlowyError.fromBuffer(bytes), - ); -} - -// Folder -typedef FolderNotificationCallback = void Function(FolderNotification, Either); - -class FolderNotificationParser extends NotificationParser { - FolderNotificationParser({String? id, required FolderNotificationCallback callback}) - : super( - id: id, - callback: callback, - tyParser: (ty) => FolderNotification.valueOf(ty), - errorParser: (bytes) => FlowyError.fromBuffer(bytes), - ); -} - -// Grid -typedef GridNotificationCallback = void Function(GridNotification, Either); - -class GridNotificationParser extends NotificationParser { - GridNotificationParser({String? id, required GridNotificationCallback callback}) - : super( - id: id, - callback: callback, - tyParser: (ty) => GridNotification.valueOf(ty), - errorParser: (bytes) => FlowyError.fromBuffer(bytes), - ); -} - -typedef GridNotificationHandler = Function(GridNotification ty, Either result); - -class GridNotificationListener { - StreamSubscription? _subscription; - GridNotificationParser? _parser; - - GridNotificationListener({required String objectId, required GridNotificationHandler handler}) - : _parser = GridNotificationParser(id: objectId, callback: handler) { - _subscription = RustStreamReceiver.listen((observable) => _parser?.parse(observable)); - } - - Future stop() async { - _parser = null; - await _subscription?.cancel(); - } -} class NotificationParser { String? id; diff --git a/frontend/app_flowy/lib/core/user_notification.dart b/frontend/app_flowy/lib/core/user_notification.dart new file mode 100644 index 0000000000..8e43d4b824 --- /dev/null +++ b/frontend/app_flowy/lib/core/user_notification.dart @@ -0,0 +1,39 @@ +import 'dart:async'; +import 'dart:typed_data'; +import 'package:flowy_sdk/protobuf/dart-notify/protobuf.dart'; +import 'package:flowy_sdk/protobuf/flowy-user/protobuf.dart'; +import 'package:dartz/dartz.dart'; +import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; +import 'package:flowy_sdk/rust_stream.dart'; + +import 'notification_helper.dart'; + +// User +typedef UserNotificationCallback = void Function(UserNotification, Either); + +class UserNotificationParser extends NotificationParser { + UserNotificationParser({required String id, required UserNotificationCallback callback}) + : super( + id: id, + callback: callback, + tyParser: (ty) => UserNotification.valueOf(ty), + errorParser: (bytes) => FlowyError.fromBuffer(bytes), + ); +} + +typedef UserNotificationHandler = Function(UserNotification ty, Either result); + +class UserNotificationListener { + StreamSubscription? _subscription; + UserNotificationParser? _parser; + + UserNotificationListener({required String objectId, required UserNotificationHandler handler}) + : _parser = UserNotificationParser(id: objectId, callback: handler) { + _subscription = RustStreamReceiver.listen((observable) => _parser?.parse(observable)); + } + + Future stop() async { + _parser = null; + await _subscription?.cancel(); + } +} diff --git a/frontend/app_flowy/lib/startup/deps_resolver.dart b/frontend/app_flowy/lib/startup/deps_resolver.dart index 56d28bbfe4..4de2bdbbfd 100644 --- a/frontend/app_flowy/lib/startup/deps_resolver.dart +++ b/frontend/app_flowy/lib/startup/deps_resolver.dart @@ -52,7 +52,7 @@ void _resolveHomeDeps(GetIt getIt) { getIt.registerSingleton(MenuSharedState()); getIt.registerFactoryParam( - (user, _) => UserListener(user: user), + (user, _) => UserListener(userProfile: user), ); // @@ -61,7 +61,7 @@ void _resolveHomeDeps(GetIt getIt) { getIt.registerFactoryParam( (user, _) => WelcomeBloc( userService: UserService(userId: user.id), - userListener: getIt(param1: user), + userWorkspaceListener: UserWorkspaceListener(userProfile: user), ), ); @@ -73,8 +73,8 @@ void _resolveHomeDeps(GetIt getIt) { void _resolveFolderDeps(GetIt getIt) { //workspace - getIt.registerFactoryParam((user, workspaceId) => - WorkspaceListener(service: WorkspaceListenerService(user: user, workspaceId: workspaceId))); + getIt.registerFactoryParam( + (user, workspaceId) => WorkspaceListener(user: user, workspaceId: workspaceId)); // View getIt.registerFactoryParam( @@ -98,10 +98,7 @@ void _resolveFolderDeps(GetIt getIt) { ); getIt.registerFactoryParam( - (user, _) => MenuUserBloc( - user, - getIt(param1: user), - ), + (user, _) => MenuUserBloc(user), ); // App diff --git a/frontend/app_flowy/lib/user/application/user_listener.dart b/frontend/app_flowy/lib/user/application/user_listener.dart index 291e955745..a42247c416 100644 --- a/frontend/app_flowy/lib/user/application/user_listener.dart +++ b/frontend/app_flowy/lib/user/application/user_listener.dart @@ -1,10 +1,11 @@ import 'dart:async'; +import 'package:app_flowy/core/folder_notification.dart'; +import 'package:app_flowy/core/user_notification.dart'; import 'package:dartz/dartz.dart'; import 'package:flowy_sdk/protobuf/flowy-error-code/code.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-folder-data-model/workspace.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; import 'dart:typed_data'; -import 'package:app_flowy/core/notification_helper.dart'; import 'package:flowy_infra/notifier.dart'; import 'package:flowy_sdk/protobuf/dart-notify/protobuf.dart'; import 'package:flowy_sdk/protobuf/flowy-folder/dart_notification.pb.dart'; @@ -14,104 +15,58 @@ import 'package:flowy_sdk/rust_stream.dart'; typedef UserProfileNotifyValue = Either; typedef AuthNotifyValue = Either; -typedef WorkspaceListNotifyValue = Either, FlowyError>; -typedef WorkspaceSettingNotifyValue = Either; class UserListener { StreamSubscription? _subscription; - final _profileNotifier = PublishNotifier(); - final _authNotifier = PublishNotifier(); - final _workspaceListNotifier = PublishNotifier(); - final _workSettingNotifier = PublishNotifier(); + PublishNotifier? _authNotifier = PublishNotifier(); + PublishNotifier? _profileNotifier = PublishNotifier(); - FolderNotificationParser? _workspaceParser; UserNotificationParser? _userParser; - final UserProfile _user; + final UserProfile _userProfile; UserListener({ - required UserProfile user, - }) : _user = user; + required UserProfile userProfile, + }) : _userProfile = userProfile; void start({ void Function(AuthNotifyValue)? onAuthChanged, void Function(UserProfileNotifyValue)? onProfileUpdated, - void Function(WorkspaceListNotifyValue)? onWorkspaceListUpdated, - void Function(WorkspaceSettingNotifyValue)? onWorkspaceSettingUpdated, }) { - if (onAuthChanged != null) { - _authNotifier.addListener(() { - onAuthChanged(_authNotifier.currentValue!); - }); - } - if (onProfileUpdated != null) { - _profileNotifier.addListener(() { - onProfileUpdated(_profileNotifier.currentValue!); - }); + _profileNotifier?.addPublishListener(onProfileUpdated); } - if (onWorkspaceListUpdated != null) { - _workspaceListNotifier.addListener(() { - onWorkspaceListUpdated(_workspaceListNotifier.currentValue!); - }); + if (onAuthChanged != null) { + _authNotifier?.addPublishListener(onAuthChanged); } - if (onWorkspaceSettingUpdated != null) { - _workSettingNotifier.addListener(() { - onWorkspaceSettingUpdated(_workSettingNotifier.currentValue!); - }); - } - - _workspaceParser = FolderNotificationParser(id: _user.token, callback: _notificationCallback); - _userParser = UserNotificationParser(id: _user.token, callback: _userNotificationCallback); + _userParser = UserNotificationParser(id: _userProfile.token, callback: _userNotificationCallback); _subscription = RustStreamReceiver.listen((observable) { - _workspaceParser?.parse(observable); _userParser?.parse(observable); }); } Future stop() async { - _workspaceParser = null; _userParser = null; await _subscription?.cancel(); - _profileNotifier.dispose(); - _authNotifier.dispose(); - _workspaceListNotifier.dispose(); - } + _profileNotifier?.dispose(); + _profileNotifier = null; - void _notificationCallback(FolderNotification ty, Either result) { - switch (ty) { - case FolderNotification.UserCreateWorkspace: - case FolderNotification.UserDeleteWorkspace: - case FolderNotification.WorkspaceListUpdated: - result.fold( - (payload) => _workspaceListNotifier.value = left(RepeatedWorkspace.fromBuffer(payload).items), - (error) => _workspaceListNotifier.value = right(error), - ); - break; - case FolderNotification.WorkspaceSetting: - result.fold( - (payload) => _workSettingNotifier.value = left(CurrentWorkspaceSetting.fromBuffer(payload)), - (error) => _workSettingNotifier.value = right(error), - ); - break; - case FolderNotification.UserUnauthorized: - result.fold( - (_) {}, - (error) => _authNotifier.value = right(FlowyError.create()..code = ErrorCode.UserUnauthorized.value), - ); - break; - - default: - break; - } + _authNotifier?.dispose(); + _authNotifier = null; } void _userNotificationCallback(user.UserNotification ty, Either result) { switch (ty) { case user.UserNotification.UserUnauthorized: result.fold( - (payload) => _profileNotifier.value = left(UserProfile.fromBuffer(payload)), - (error) => _profileNotifier.value = right(error), + (_) {}, + (error) => _authNotifier?.value = right(error), + ); + break; + case user.UserNotification.UserProfileUpdated: + result.fold( + (payload) => _profileNotifier?.value = left(UserProfile.fromBuffer(payload)), + (error) => _profileNotifier?.value = right(error), ); break; default: @@ -119,3 +74,81 @@ class UserListener { } } } + +typedef WorkspaceListNotifyValue = Either, FlowyError>; +typedef WorkspaceSettingNotifyValue = Either; + +class UserWorkspaceListener { + PublishNotifier? _authNotifier = PublishNotifier(); + PublishNotifier? _workspacesChangedNotifier = PublishNotifier(); + PublishNotifier? _settingChangedNotifier = PublishNotifier(); + + FolderNotificationListener? _listener; + final UserProfile _userProfile; + + UserWorkspaceListener({ + required UserProfile userProfile, + }) : _userProfile = userProfile; + + void start({ + void Function(AuthNotifyValue)? onAuthChanged, + void Function(WorkspaceListNotifyValue)? onWorkspacesUpdated, + void Function(WorkspaceSettingNotifyValue)? onSettingUpdated, + }) { + if (onAuthChanged != null) { + _authNotifier?.addPublishListener(onAuthChanged); + } + + if (onWorkspacesUpdated != null) { + _workspacesChangedNotifier?.addPublishListener(onWorkspacesUpdated); + } + + if (onSettingUpdated != null) { + _settingChangedNotifier?.addPublishListener(onSettingUpdated); + } + + _listener = FolderNotificationListener( + objectId: _userProfile.token, + handler: _handleObservableType, + ); + } + + void _handleObservableType(FolderNotification ty, Either result) { + switch (ty) { + case FolderNotification.UserCreateWorkspace: + case FolderNotification.UserDeleteWorkspace: + case FolderNotification.WorkspaceListUpdated: + result.fold( + (payload) => _workspacesChangedNotifier?.value = left(RepeatedWorkspace.fromBuffer(payload).items), + (error) => _workspacesChangedNotifier?.value = right(error), + ); + break; + case FolderNotification.WorkspaceSetting: + result.fold( + (payload) => _settingChangedNotifier?.value = left(CurrentWorkspaceSetting.fromBuffer(payload)), + (error) => _settingChangedNotifier?.value = right(error), + ); + break; + case FolderNotification.UserUnauthorized: + result.fold( + (_) {}, + (error) => _authNotifier?.value = right(FlowyError.create()..code = ErrorCode.UserUnauthorized.value), + ); + break; + default: + break; + } + } + + Future stop() async { + await _listener?.stop(); + _workspacesChangedNotifier?.dispose(); + _workspacesChangedNotifier = null; + + _settingChangedNotifier?.dispose(); + _settingChangedNotifier = null; + + _authNotifier?.dispose(); + _authNotifier = null; + } +} diff --git a/frontend/app_flowy/lib/user/presentation/skip_log_in_screen.dart b/frontend/app_flowy/lib/user/presentation/skip_log_in_screen.dart index 4a78815beb..15e9f7186b 100644 --- a/frontend/app_flowy/lib/user/presentation/skip_log_in_screen.dart +++ b/frontend/app_flowy/lib/user/presentation/skip_log_in_screen.dart @@ -1,5 +1,4 @@ import 'package:app_flowy/user/application/auth_service.dart'; -import 'package:app_flowy/user/application/user_listener.dart'; import 'package:app_flowy/user/presentation/router.dart'; import 'package:app_flowy/user/presentation/widgets/background.dart'; import 'package:easy_localization/easy_localization.dart'; @@ -34,8 +33,6 @@ class SkipLogInScreen extends StatefulWidget { } class _SkipLogInScreenState extends State { - UserListener? userListener; - @override Widget build(BuildContext context) { return Scaffold( diff --git a/frontend/app_flowy/lib/workspace/application/app/app_listener.dart b/frontend/app_flowy/lib/workspace/application/app/app_listener.dart index c1696e55f2..8d429ee8fa 100644 --- a/frontend/app_flowy/lib/workspace/application/app/app_listener.dart +++ b/frontend/app_flowy/lib/workspace/application/app/app_listener.dart @@ -1,7 +1,7 @@ import 'dart:async'; import 'dart:typed_data'; +import 'package:app_flowy/core/folder_notification.dart'; import 'package:dartz/dartz.dart'; -import 'package:app_flowy/core/notification_helper.dart'; import 'package:flowy_sdk/log.dart'; import 'package:flowy_sdk/protobuf/dart-notify/subject.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; diff --git a/frontend/app_flowy/lib/workspace/application/grid/block/block_listener.dart b/frontend/app_flowy/lib/workspace/application/grid/block/block_listener.dart index bdcecb0324..63accc38d0 100644 --- a/frontend/app_flowy/lib/workspace/application/grid/block/block_listener.dart +++ b/frontend/app_flowy/lib/workspace/application/grid/block/block_listener.dart @@ -1,7 +1,6 @@ import 'dart:async'; import 'dart:typed_data'; - -import 'package:app_flowy/core/notification_helper.dart'; +import 'package:app_flowy/core/grid_notification.dart'; import 'package:dartz/dartz.dart'; import 'package:flowy_infra/notifier.dart'; import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; diff --git a/frontend/app_flowy/lib/workspace/application/grid/cell/cell_listener.dart b/frontend/app_flowy/lib/workspace/application/grid/cell/cell_listener.dart index 05289fee03..5da5fce86d 100644 --- a/frontend/app_flowy/lib/workspace/application/grid/cell/cell_listener.dart +++ b/frontend/app_flowy/lib/workspace/application/grid/cell/cell_listener.dart @@ -1,10 +1,10 @@ +import 'package:app_flowy/core/grid_notification.dart'; import 'package:dartz/dartz.dart'; import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-grid/dart_notification.pb.dart'; import 'package:flowy_infra/notifier.dart'; import 'dart:async'; import 'dart:typed_data'; -import 'package:app_flowy/core/notification_helper.dart'; typedef UpdateFieldNotifiedValue = Either; diff --git a/frontend/app_flowy/lib/workspace/application/grid/field/field_listener.dart b/frontend/app_flowy/lib/workspace/application/grid/field/field_listener.dart index c6dd832973..21bd5befeb 100644 --- a/frontend/app_flowy/lib/workspace/application/grid/field/field_listener.dart +++ b/frontend/app_flowy/lib/workspace/application/grid/field/field_listener.dart @@ -1,10 +1,10 @@ +import 'package:app_flowy/core/grid_notification.dart'; import 'package:dartz/dartz.dart'; import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-grid/dart_notification.pb.dart'; import 'package:flowy_infra/notifier.dart'; import 'dart:async'; import 'dart:typed_data'; -import 'package:app_flowy/core/notification_helper.dart'; import 'package:flowy_sdk/protobuf/flowy-grid/field_entities.pb.dart'; typedef UpdateFieldNotifiedValue = Either; diff --git a/frontend/app_flowy/lib/workspace/application/grid/field/grid_listenr.dart b/frontend/app_flowy/lib/workspace/application/grid/field/grid_listenr.dart index 869ade19c0..00e94bb3e1 100644 --- a/frontend/app_flowy/lib/workspace/application/grid/field/grid_listenr.dart +++ b/frontend/app_flowy/lib/workspace/application/grid/field/grid_listenr.dart @@ -1,10 +1,10 @@ +import 'package:app_flowy/core/grid_notification.dart'; import 'package:dartz/dartz.dart'; import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-grid/dart_notification.pb.dart'; import 'package:flowy_infra/notifier.dart'; import 'dart:async'; import 'dart:typed_data'; -import 'package:app_flowy/core/notification_helper.dart'; import 'package:flowy_sdk/protobuf/flowy-grid/field_entities.pb.dart'; typedef UpdateFieldNotifiedValue = Either; diff --git a/frontend/app_flowy/lib/workspace/application/grid/row/row_listener.dart b/frontend/app_flowy/lib/workspace/application/grid/row/row_listener.dart index 5cf46acd11..98fddaeccf 100644 --- a/frontend/app_flowy/lib/workspace/application/grid/row/row_listener.dart +++ b/frontend/app_flowy/lib/workspace/application/grid/row/row_listener.dart @@ -1,10 +1,10 @@ +import 'package:app_flowy/core/grid_notification.dart'; import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-grid/block_entities.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-grid/dart_notification.pb.dart'; import 'package:flowy_infra/notifier.dart'; import 'dart:async'; import 'dart:typed_data'; -import 'package:app_flowy/core/notification_helper.dart'; import 'package:dartz/dartz.dart'; import 'package:flowy_sdk/protobuf/flowy-grid/field_entities.pb.dart'; diff --git a/frontend/app_flowy/lib/workspace/application/home/home_bloc.dart b/frontend/app_flowy/lib/workspace/application/home/home_bloc.dart index 19a1e6787d..b95044b399 100644 --- a/frontend/app_flowy/lib/workspace/application/home/home_bloc.dart +++ b/frontend/app_flowy/lib/workspace/application/home/home_bloc.dart @@ -11,19 +11,17 @@ import 'package:dartz/dartz.dart'; part 'home_bloc.freezed.dart'; class HomeBloc extends Bloc { - final UserListener _listener; + final UserWorkspaceListener _listener; HomeBloc(UserProfile user, CurrentWorkspaceSetting workspaceSetting) - : _listener = UserListener(user: user), + : _listener = UserWorkspaceListener(userProfile: user), super(HomeState.initial(workspaceSetting)) { on((event, emit) async { await event.map( initial: (_Initial value) { _listener.start( - onAuthChanged: (result) { - _authDidChanged(result); - }, - onWorkspaceSettingUpdated: (result) { + onAuthChanged: (result) => _authDidChanged(result), + onSettingUpdated: (result) { result.fold( (setting) => add(HomeEvent.didReceiveWorkspaceSetting(setting)), (r) => Log.error(r), diff --git a/frontend/app_flowy/lib/workspace/application/menu/menu_bloc.dart b/frontend/app_flowy/lib/workspace/application/menu/menu_bloc.dart index db8f2c534b..d8997f1b3d 100644 --- a/frontend/app_flowy/lib/workspace/application/menu/menu_bloc.dart +++ b/frontend/app_flowy/lib/workspace/application/menu/menu_bloc.dart @@ -22,7 +22,7 @@ class MenuBloc extends Bloc { on((event, emit) async { await event.map( initial: (e) async { - listener.start(addAppCallback: _handleAppsOrFail); + listener.start(appsChanged: _handleAppsOrFail); await _fetchApps(emit); }, openPage: (e) async { diff --git a/frontend/app_flowy/lib/workspace/application/menu/menu_user_bloc.dart b/frontend/app_flowy/lib/workspace/application/menu/menu_user_bloc.dart index 7016c2d74c..ee024bb1b2 100644 --- a/frontend/app_flowy/lib/workspace/application/menu/menu_user_bloc.dart +++ b/frontend/app_flowy/lib/workspace/application/menu/menu_user_bloc.dart @@ -12,29 +12,33 @@ part 'menu_user_bloc.freezed.dart'; class MenuUserBloc extends Bloc { final UserService _userService; - final UserListener userListener; + final UserListener _userListener; + final UserWorkspaceListener _userWorkspaceListener; final UserProfile userProfile; - MenuUserBloc(this.userProfile, this.userListener) - : _userService = UserService(userId: userProfile.id), + MenuUserBloc(this.userProfile) + : _userListener = UserListener(userProfile: userProfile), + _userWorkspaceListener = UserWorkspaceListener(userProfile: userProfile), + _userService = UserService(userId: userProfile.id), super(MenuUserState.initial(userProfile)) { on((event, emit) async { - await event.map( - initial: (_) async { - userListener.start( - onProfileUpdated: _profileUpdated, - onWorkspaceListUpdated: _workspaceListUpdated, - ); + await event.when( + initial: () async { + _userListener.start(onProfileUpdated: _profileUpdated); + _userWorkspaceListener.start(onWorkspacesUpdated: _workspaceListUpdated); await _initUser(); }, - fetchWorkspaces: (_FetchWorkspaces value) async {}, + fetchWorkspaces: () async { + // + }, ); }); } @override Future close() async { - await userListener.stop(); + await _userListener.stop(); + await _userWorkspaceListener.stop(); super.close(); } @@ -43,19 +47,10 @@ class MenuUserBloc extends Bloc { result.fold((l) => null, (error) => Log.error(error)); } - void _profileUpdated(Either userOrFailed) {} + void _profileUpdated(Either userProfileOrFailed) {} + void _workspaceListUpdated(Either, FlowyError> workspacesOrFailed) { - // fetch workspaces - // iUserImpl.fetchWorkspaces().then((result) { - // result.fold( - // (workspaces) async* { - // yield state.copyWith(workspaces: some(workspaces)); - // }, - // (error) async* { - // yield state.copyWith(successOrFailure: right(error.msg)); - // }, - // ); - // }); + // Do nothing by now } } @@ -68,13 +63,13 @@ class MenuUserEvent with _$MenuUserEvent { @freezed class MenuUserState with _$MenuUserState { const factory MenuUserState({ - required UserProfile user, + required UserProfile userProfile, required Option> workspaces, required Either successOrFailure, }) = _MenuUserState; - factory MenuUserState.initial(UserProfile user) => MenuUserState( - user: user, + factory MenuUserState.initial(UserProfile userProfile) => MenuUserState( + userProfile: userProfile, workspaces: none(), successOrFailure: left(unit), ); diff --git a/frontend/app_flowy/lib/workspace/application/trash/trash_listener.dart b/frontend/app_flowy/lib/workspace/application/trash/trash_listener.dart index 36dc982466..1127e1f9e1 100644 --- a/frontend/app_flowy/lib/workspace/application/trash/trash_listener.dart +++ b/frontend/app_flowy/lib/workspace/application/trash/trash_listener.dart @@ -1,7 +1,7 @@ import 'dart:async'; import 'dart:typed_data'; +import 'package:app_flowy/core/folder_notification.dart'; import 'package:dartz/dartz.dart'; -import 'package:app_flowy/core/notification_helper.dart'; import 'package:flowy_sdk/protobuf/dart-notify/subject.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-folder/dart_notification.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; diff --git a/frontend/app_flowy/lib/workspace/application/view/view_listener.dart b/frontend/app_flowy/lib/workspace/application/view/view_listener.dart index 9a2d913941..4a50211305 100644 --- a/frontend/app_flowy/lib/workspace/application/view/view_listener.dart +++ b/frontend/app_flowy/lib/workspace/application/view/view_listener.dart @@ -1,6 +1,6 @@ import 'dart:async'; import 'dart:typed_data'; -import 'package:app_flowy/core/notification_helper.dart'; +import 'package:app_flowy/core/folder_notification.dart'; import 'package:dartz/dartz.dart'; import 'package:flowy_sdk/protobuf/dart-notify/subject.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-folder-data-model/view.pb.dart'; diff --git a/frontend/app_flowy/lib/workspace/application/workspace/welcome_bloc.dart b/frontend/app_flowy/lib/workspace/application/workspace/welcome_bloc.dart index a743a3316f..8bfd6a744e 100644 --- a/frontend/app_flowy/lib/workspace/application/workspace/welcome_bloc.dart +++ b/frontend/app_flowy/lib/workspace/application/workspace/welcome_bloc.dart @@ -11,13 +11,13 @@ part 'welcome_bloc.freezed.dart'; class WelcomeBloc extends Bloc { final UserService userService; - final UserListener userListener; - WelcomeBloc({required this.userService, required this.userListener}) : super(WelcomeState.initial()) { + final UserWorkspaceListener userWorkspaceListener; + WelcomeBloc({required this.userService, required this.userWorkspaceListener}) : super(WelcomeState.initial()) { on( (event, emit) async { await event.map(initial: (e) async { - userListener.start( - onWorkspaceListUpdated: (result) => add(WelcomeEvent.workspacesReveived(result)), + userWorkspaceListener.start( + onWorkspacesUpdated: (result) => add(WelcomeEvent.workspacesReveived(result)), ); // await _fetchWorkspaces(emit); @@ -37,7 +37,7 @@ class WelcomeBloc extends Bloc { @override Future close() async { - await userListener.stop(); + await userWorkspaceListener.stop(); super.close(); } diff --git a/frontend/app_flowy/lib/workspace/application/workspace/workspace_listener.dart b/frontend/app_flowy/lib/workspace/application/workspace/workspace_listener.dart index 9ac51e36f8..2243ba211c 100644 --- a/frontend/app_flowy/lib/workspace/application/workspace/workspace_listener.dart +++ b/frontend/app_flowy/lib/workspace/application/workspace/workspace_listener.dart @@ -1,97 +1,73 @@ import 'dart:async'; import 'dart:typed_data'; - -import 'package:app_flowy/core/notification_helper.dart'; +import 'package:app_flowy/core/folder_notification.dart'; import 'package:dartz/dartz.dart'; -import 'package:flowy_sdk/log.dart'; -import 'package:flowy_sdk/protobuf/dart-notify/subject.pb.dart'; +import 'package:flowy_infra/notifier.dart'; import 'package:flowy_sdk/protobuf/flowy-user-data-model/protobuf.dart' show UserProfile; import 'package:flowy_sdk/protobuf/flowy-folder-data-model/app.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-folder-data-model/workspace.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-folder/dart_notification.pb.dart'; -import 'package:flowy_sdk/rust_stream.dart'; -typedef WorkspaceAppsChangedCallback = void Function(Either, FlowyError> appsOrFail); -typedef WorkspaceUpdatedCallback = void Function(String name, String desc); +typedef AppListNotifyValue = Either, FlowyError>; +typedef WorkspaceNotifyValue = Either; class WorkspaceListener { - WorkspaceListenerService service; - WorkspaceListener({ - required this.service, - }); + PublishNotifier? _appsChangedNotifier = PublishNotifier(); + PublishNotifier? _workspaceUpdatedNotifier = PublishNotifier(); - void start({WorkspaceAppsChangedCallback? addAppCallback, WorkspaceUpdatedCallback? updatedCallback}) { - service.startListening(appsChanged: addAppCallback, update: updatedCallback); - } - - Future stop() async { - await service.close(); - } -} - -class WorkspaceListenerService { - StreamSubscription? _subscription; - WorkspaceAppsChangedCallback? _appsChanged; - WorkspaceUpdatedCallback? _update; - FolderNotificationParser? _parser; + FolderNotificationListener? _listener; final UserProfile user; final String workspaceId; - WorkspaceListenerService({ + WorkspaceListener({ required this.user, required this.workspaceId, }); - void startListening({ - WorkspaceAppsChangedCallback? appsChanged, - WorkspaceUpdatedCallback? update, + void start({ + void Function(AppListNotifyValue)? appsChanged, + void Function(WorkspaceNotifyValue)? onWorkspaceUpdated, }) { - _appsChanged = appsChanged; - _update = update; + if (appsChanged != null) { + _appsChangedNotifier?.addPublishListener(appsChanged); + } - _parser = FolderNotificationParser( - id: workspaceId, - callback: (ty, result) { - _handleObservableType(ty, result); - }, + if (onWorkspaceUpdated != null) { + _workspaceUpdatedNotifier?.addPublishListener(onWorkspaceUpdated); + } + + _listener = FolderNotificationListener( + objectId: workspaceId, + handler: _handleObservableType, ); - - _subscription = RustStreamReceiver.listen((observable) => _parser?.parse(observable)); } void _handleObservableType(FolderNotification ty, Either result) { switch (ty) { case FolderNotification.WorkspaceUpdated: - if (_update != null) { - result.fold( - (payload) { - final workspace = Workspace.fromBuffer(payload); - _update!(workspace.name, workspace.desc); - }, - (error) => Log.error(error), - ); - } + result.fold( + (payload) => _workspaceUpdatedNotifier?.value = left(Workspace.fromBuffer(payload)), + (error) => _workspaceUpdatedNotifier?.value = right(error), + ); break; case FolderNotification.WorkspaceAppsChanged: - if (_appsChanged != null) { - result.fold( - (payload) => _appsChanged!( - left(RepeatedApp.fromBuffer(payload).items), - ), - (error) => _appsChanged!(right(error)), - ); - } + result.fold( + (payload) => _appsChangedNotifier?.value = left(RepeatedApp.fromBuffer(payload).items), + (error) => _appsChangedNotifier?.value = right(error), + ); break; default: break; } } - Future close() async { - _parser = null; - await _subscription?.cancel(); - // _appsChanged = null; - // _update = null; + Future stop() async { + await _listener?.stop(); + _appsChangedNotifier?.dispose(); + _appsChangedNotifier = null; + + _workspaceUpdatedNotifier?.dispose(); + _workspaceUpdatedNotifier = null; } } diff --git a/frontend/app_flowy/lib/workspace/presentation/home/menu/menu_user.dart b/frontend/app_flowy/lib/workspace/presentation/home/menu/menu_user.dart index 3a1674e486..6b6849e43d 100644 --- a/frontend/app_flowy/lib/workspace/presentation/home/menu/menu_user.dart +++ b/frontend/app_flowy/lib/workspace/presentation/home/menu/menu_user.dart @@ -58,9 +58,9 @@ class MenuUser extends StatelessWidget { } Widget _renderUserName(BuildContext context) { - String name = context.read().state.user.name; + String name = context.read().state.userProfile.name; if (name.isEmpty) { - name = context.read().state.user.email; + name = context.read().state.userProfile.email; } return FlowyText(name, fontSize: 12); } diff --git a/frontend/rust-lib/flowy-user/src/handlers/user_handler.rs b/frontend/rust-lib/flowy-user/src/handlers/user_handler.rs index 0db37ea984..c635f326c1 100644 --- a/frontend/rust-lib/flowy-user/src/handlers/user_handler.rs +++ b/frontend/rust-lib/flowy-user/src/handlers/user_handler.rs @@ -20,7 +20,7 @@ pub async fn check_user_handler(session: AppData>) -> DataResul #[tracing::instrument(level = "debug", skip(session))] pub async fn get_user_profile_handler(session: AppData>) -> DataResult { - let user_profile = session.user_profile().await?; + let user_profile = session.get_user_profile().await?; data_result(user_profile) } diff --git a/frontend/rust-lib/flowy-user/src/services/user_session.rs b/frontend/rust-lib/flowy-user/src/services/user_session.rs index 97b3d3f5c5..cf81b6be89 100644 --- a/frontend/rust-lib/flowy-user/src/services/user_session.rs +++ b/frontend/rust-lib/flowy-user/src/services/user_session.rs @@ -82,7 +82,7 @@ impl UserSession { #[tracing::instrument(level = "debug", skip(self))] pub async fn sign_in(&self, params: SignInParams) -> Result { if self.is_user_login(¶ms.email) { - self.user_profile().await + self.get_user_profile().await } else { let resp = self.cloud_service.sign_in(params).await?; let session: Session = resp.clone().into(); @@ -97,7 +97,7 @@ impl UserSession { #[tracing::instrument(level = "debug", skip(self))] pub async fn sign_up(&self, params: SignUpParams) -> Result { if self.is_user_login(¶ms.email) { - self.user_profile().await + self.get_user_profile().await } else { let resp = self.cloud_service.sign_up(params).await?; let session: Session = resp.clone().into(); @@ -131,6 +131,10 @@ impl UserSession { let changeset = UserTableChangeset::new(params.clone()); diesel_update_table!(user_table, changeset, &*self.db_connection()?); + let user_profile = self.get_user_profile().await?; + dart_notify(&session.token, UserNotification::UserProfileUpdated) + .payload(user_profile) + .send(); let _ = self.update_user_on_server(&session.token, params).await?; Ok(()) } @@ -150,7 +154,7 @@ impl UserSession { Ok(user.into()) } - pub async fn user_profile(&self) -> Result { + pub async fn get_user_profile(&self) -> Result { let (user_id, token) = self.get_session()?.into_part(); let user = dsl::user_table .filter(user_table::id.eq(&user_id)) @@ -185,14 +189,14 @@ impl UserSession { tokio::spawn(async move { match server.get_user(&token).await { Ok(profile) => { - dart_notify(&token, UserNotification::UserProfileUpdated) - .payload(profile) - .send(); + // dart_notify(&token, UserNotification::UserProfileUpdated) + // .payload(profile) + // .send(); } - Err(e) => { - dart_notify(&token, UserNotification::UserProfileUpdated) - .error(e) - .send(); + Err(_e) => { + // dart_notify(&token, UserNotification::UserProfileUpdated) + // .error(e) + // .send(); } } });