[flutter]: refactor iuser listener

This commit is contained in:
appflowy 2021-11-08 23:15:29 +08:00
parent b79267d15c
commit 9dd3420ede
19 changed files with 158 additions and 124 deletions

View File

@ -66,9 +66,9 @@ class SignUpBloc extends Bloc<SignUpEvent, SignUpState> {
final result = await authManager.signUp(state.email, state.password, state.email);
yield result.fold(
(newUser) => state.copyWith(
(profile) => state.copyWith(
isSubmitting: false,
successOrFail: some(left(newUser.profile)),
successOrFail: some(left(profile)),
emailError: none(),
passwordError: none(),
repeatPasswordError: none(),

View File

@ -13,7 +13,7 @@ class NewUser {
abstract class IAuth {
Future<Either<UserProfile, UserError>> signIn(String? email, String? password);
Future<Either<NewUser, UserError>> signUp(String? name, String? password, String? email);
Future<Either<UserProfile, UserError>> signUp(String? name, String? password, String? email);
Future<Either<Unit, UserError>> signOut();
}

View File

@ -22,17 +22,8 @@ class AuthImpl extends IAuth {
}
@override
Future<Either<NewUser, UserError>> signUp(String? name, String? password, String? email) {
return repo.signUp(name: name, password: password, email: email).then((result) {
return result.fold(
(tuple) => left(
NewUser(
profile: tuple.value1,
workspaceId: tuple.value2,
),
),
(error) => right(error));
});
Future<Either<UserProfile, UserError>> signUp(String? name, String? password, String? email) {
return repo.signUp(name: name, password: password, email: email);
}
@override

View File

@ -12,24 +12,26 @@ class AuthRepository {
return UserEventSignIn(request).send();
}
Future<Either<Tuple2<UserProfile, String>, UserError>> signUp(
Future<Either<UserProfile, UserError>> signUp(
{required String? name, required String? password, required String? email}) {
final request = SignUpRequest.create()
..email = email ?? ''
..name = name ?? ''
..password = password ?? '';
return UserEventSignUp(request).send().then((result) {
return result.fold((userProfile) async {
return await WorkspaceEventCreateDefaultWorkspace().send().then((result) {
return result.fold((workspaceIdentifier) {
return left(Tuple2(userProfile, workspaceIdentifier.workspaceId));
}, (error) {
throw UnimplementedError;
});
});
}, (error) => right(error));
});
return UserEventSignUp(request).send();
// return UserEventSignUp(request).send().then((result) {
// return result.fold((userProfile) async {
// return await WorkspaceEventCreateDefaultWorkspace().send().then((result) {
// return result.fold((workspaceIdentifier) {
// return left(Tuple2(userProfile, workspaceIdentifier.workspaceId));
// }, (error) {
// throw UnimplementedError;
// });
// });
// }, (error) => right(error));
// });
}
Future<Either<Unit, UserError>> signOut() {

View File

@ -51,7 +51,6 @@ class SkipLogInScreen extends StatelessWidget {
_launchURL('https://github.com/AppFlowy-IO/appflowy');
},
),
const Spacer(),
InkWell(
child: const Text(
'Subscribe to Newsletter',

View File

@ -15,7 +15,7 @@ class HomeListenBloc extends Bloc<HomeListenEvent, HomeListenState> {
) async* {
yield* event.map(
started: (_) async* {
listener.setAuthCallback(_authStateChanged);
listener.authDidChangedNotifier.addPublishListener(_authDidChanged);
listener.start();
},
stop: (_) async* {},
@ -31,7 +31,7 @@ class HomeListenBloc extends Bloc<HomeListenEvent, HomeListenState> {
super.close();
}
void _authStateChanged(Either<Unit, UserError> errorOrNothing) {
void _authDidChanged(Either<Unit, UserError> errorOrNothing) {
errorOrNothing.fold((_) {}, (error) {
if (error.code == ErrorCode.UserUnauthorized.value) {
add(HomeListenEvent.unauthorized(error.msg));

View File

@ -19,8 +19,8 @@ class MenuUserBloc extends Bloc<MenuUserEvent, MenuUserState> {
Stream<MenuUserState> mapEventToState(MenuUserEvent event) async* {
yield* event.map(
initial: (_) async* {
listener.setProfileCallback(_profileUpdated);
listener.setWorkspacesCallback(_workspacesUpdated);
listener.profileUpdatedNotifier.addPublishListener(_profileUpdated);
listener.workspaceUpdatedNotifier.addPublishListener(_workspacesUpdated);
listener.start();
await _initUser();

View File

@ -19,7 +19,7 @@ class WelcomeBloc extends Bloc<WelcomeEvent, WelcomeState> {
WelcomeEvent event,
) async* {
yield* event.map(initial: (e) async* {
listener.setWorkspacesCallback(_workspacesUpdated);
listener.workspaceUpdatedNotifier.addPublishListener(_workspacesUpdated);
listener.start();
//
yield* _fetchWorkspaces();

View File

@ -1,4 +1,5 @@
import 'package:dartz/dartz.dart';
import 'package:flowy_infra/notifier.dart';
import 'package:flowy_sdk/protobuf/flowy-user/errors.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-user/user_profile.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-workspace-infra/workspace_create.pb.dart';
@ -15,16 +16,16 @@ abstract class IUser {
Future<Either<Unit, UserError>> initUser();
}
typedef UserProfileUpdateCallback = void Function(Either<UserProfile, UserError>);
typedef AuthChangedCallback = void Function(Either<Unit, UserError>);
typedef WorkspacesUpdatedCallback = void Function(Either<List<Workspace>, WorkspaceError> workspacesOrFailed);
typedef UserProfileUpdatedNotifierValue = Either<UserProfile, UserError>;
typedef AuthNotifierValue = Either<Unit, UserError>;
typedef WorkspaceUpdatedNotifierValue = Either<List<Workspace>, WorkspaceError>;
abstract class IUserListener {
void start();
void setProfileCallback(UserProfileUpdateCallback profileCallback);
void setAuthCallback(AuthChangedCallback authCallback);
void setWorkspacesCallback(WorkspacesUpdatedCallback workspacesCallback);
PublishNotifier<UserProfileUpdatedNotifierValue> get profileUpdatedNotifier;
PublishNotifier<AuthNotifierValue> get authDidChangedNotifier;
PublishNotifier<WorkspaceUpdatedNotifierValue> get workspaceUpdatedNotifier;
Future<void> stop();
}

View File

@ -4,6 +4,7 @@ import 'package:app_flowy/workspace/infrastructure/repos/helper.dart';
import 'package:dartz/dartz.dart';
import 'package:app_flowy/workspace/domain/i_user.dart';
import 'package:app_flowy/workspace/infrastructure/repos/user_repo.dart';
import 'package:flowy_infra/notifier.dart';
import 'package:flowy_sdk/protobuf/flowy-dart-notify/protobuf.dart';
import 'package:flowy_sdk/protobuf/flowy-user-infra/errors.pb.dart';
// import 'package:flowy_sdk/protobuf/flowy-user/errors.pb.dart' as user_error;
@ -53,9 +54,15 @@ class IUserImpl extends IUser {
class IUserListenerImpl extends IUserListener {
StreamSubscription<SubscribeObject>? _subscription;
WorkspacesUpdatedCallback? _workspacesUpdated;
AuthChangedCallback? _authChanged;
UserProfileUpdateCallback? _profileUpdated;
@override
final profileUpdatedNotifier = PublishNotifier<UserProfileUpdatedNotifierValue>();
@override
final authDidChangedNotifier = PublishNotifier<AuthNotifierValue>();
@override
final workspaceUpdatedNotifier = PublishNotifier<WorkspaceUpdatedNotifierValue>();
late WorkspaceNotificationParser _workspaceParser;
late UserNotificationParser _userParser;
@ -69,9 +76,7 @@ class IUserListenerImpl extends IUserListener {
@override
void start() {
_workspaceParser = WorkspaceNotificationParser(id: _user.token, callback: _notificationCallback);
_userParser = UserNotificationParser(id: _user.token, callback: _userNotificationCallback);
_subscription = RustStreamReceiver.listen((observable) {
_workspaceParser.parse(observable);
_userParser.parse(observable);
@ -83,43 +88,21 @@ class IUserListenerImpl extends IUserListener {
await _subscription?.cancel();
}
@override
void setAuthCallback(AuthChangedCallback authCallback) {
_authChanged = authCallback;
}
@override
void setProfileCallback(UserProfileUpdateCallback profileCallback) {
_profileUpdated = profileCallback;
}
@override
void setWorkspacesCallback(WorkspacesUpdatedCallback workspacesCallback) {
_workspacesUpdated = workspacesCallback;
}
void _notificationCallback(WorkspaceNotification ty, Either<Uint8List, WorkspaceError> result) {
switch (ty) {
case WorkspaceNotification.UserCreateWorkspace:
case WorkspaceNotification.UserDeleteWorkspace:
case WorkspaceNotification.WorkspaceListUpdated:
if (_workspacesUpdated != null) {
result.fold(
(payload) {
final workspaces = RepeatedWorkspace.fromBuffer(payload);
_workspacesUpdated!(left(workspaces.items));
},
(error) => _workspacesUpdated!(right(error)),
);
}
result.fold(
(payload) => workspaceUpdatedNotifier.value = left(RepeatedWorkspace.fromBuffer(payload).items),
(error) => workspaceUpdatedNotifier.value = right(error),
);
break;
case WorkspaceNotification.UserUnauthorized:
if (_authChanged != null) {
result.fold(
(_) {},
(error) => {_authChanged!(right(UserError.create()..code = ErrorCode.UserUnauthorized.value))},
);
}
result.fold(
(_) {},
(error) => authDidChangedNotifier.value = right(UserError.create()..code = ErrorCode.UserUnauthorized.value),
);
break;
default:
break;
@ -129,15 +112,10 @@ class IUserListenerImpl extends IUserListener {
void _userNotificationCallback(user.UserNotification ty, Either<Uint8List, UserError> result) {
switch (ty) {
case user.UserNotification.UserUnauthorized:
if (_profileUpdated != null) {
result.fold(
(payload) {
final userProfile = UserProfile.fromBuffer(payload);
_profileUpdated!(left(userProfile));
},
(error) => _profileUpdated!(right(error)),
);
}
result.fold(
(payload) => profileUpdatedNotifier.value = left(UserProfile.fromBuffer(payload)),
(error) => profileUpdatedNotifier.value = right(error),
);
break;
default:
break;

View File

@ -174,9 +174,9 @@ extension QuestionBubbleExtension on QuestionBubbleAction {
Widget get emoji {
switch (this) {
case QuestionBubbleAction.whatsNews:
return const Text('⭐️', style: TextStyle(fontSize: 16));
return const Text('⭐️', style: TextStyle(fontSize: 12));
case QuestionBubbleAction.help:
return const Text('👥', style: TextStyle(fontSize: 16));
return const Text('👥', style: TextStyle(fontSize: 12));
}
}
}

View File

@ -93,6 +93,7 @@ class ActionItem<T extends ActionItemData> extends StatelessWidget {
child: SizedBox(
height: itemHeight,
child: Row(
crossAxisAlignment: CrossAxisAlignment.center,
children: [
if (action.icon != null) action.icon!,
HSpace(ActionListSizes.itemHPadding),

View File

@ -4,7 +4,6 @@ use flowy_workspace::{
errors::WorkspaceError,
module::{WorkspaceDatabase, WorkspaceUser},
};
use std::sync::Arc;
pub struct WorkspaceDepsResolver {

View File

@ -6,7 +6,10 @@ use crate::deps_resolve::WorkspaceDepsResolver;
use flowy_dispatch::prelude::*;
use flowy_document::prelude::FlowyDocument;
use flowy_net::config::ServerConfig;
use flowy_user::services::user::{UserSession, UserSessionBuilder};
use flowy_user::{
entities::UserStatus,
services::user::{UserSession, UserSessionBuilder},
};
use flowy_workspace::prelude::WorkspaceController;
use module::mk_modules;
pub use module::*;
@ -14,6 +17,7 @@ use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use tokio::sync::broadcast;
static INIT_LOG: AtomicBool = AtomicBool::new(false);
@ -66,8 +70,8 @@ impl FlowySDK {
pub fn new(config: FlowySDKConfig) -> Self {
init_log(&config);
init_kv(&config.root);
tracing::debug!("🔥 {:?}", config);
let user_session = Arc::new(
UserSessionBuilder::new()
.root_dir(&config.root, &config.server_config)
@ -78,6 +82,9 @@ impl FlowySDK {
let modules = mk_modules(workspace.clone(), user_session.clone());
let dispatch = Arc::new(EventDispatch::construct(|| modules));
let subscribe = user_session.status_subscribe();
listen_on_user_status_changed(&dispatch, subscribe, workspace.clone());
Self {
config,
user_session,
@ -90,6 +97,32 @@ impl FlowySDK {
pub fn dispatch(&self) -> Arc<EventDispatch> { self.dispatch.clone() }
}
fn listen_on_user_status_changed(
dispatch: &EventDispatch,
mut subscribe: broadcast::Receiver<UserStatus>,
workspace_controller: Arc<WorkspaceController>,
) {
dispatch.spawn(async move {
//
loop {
match subscribe.recv().await {
Ok(status) => match status {
UserStatus::Login { .. } => {
workspace_controller.user_did_login();
},
UserStatus::Expired { .. } => {
workspace_controller.user_session_expired();
},
UserStatus::SignUp { .. } => {
workspace_controller.user_did_sign_up().await;
},
},
Err(_) => {},
}
}
});
}
fn init_kv(root: &str) {
match flowy_infra::kv::KV::init(root) {
Ok(_) => {},

View File

@ -6,6 +6,13 @@ use crate::{
parser::{UserEmail, UserId, UserName, UserPassword},
};
#[derive(Clone)]
pub enum UserStatus {
Login { token: String },
Expired { token: String },
SignUp { profile: UserProfile },
}
#[derive(Default, ProtoBuf)]
pub struct UserToken {
#[pb(index = 1)]

View File

@ -1,32 +1,21 @@
use crate::services::user::{SessionStatusCallback, UserSession, UserSessionConfig};
use crate::services::user::{UserSession, UserSessionConfig};
use flowy_net::config::ServerConfig;
use std::sync::Arc;
pub struct UserSessionBuilder {
config: Option<UserSessionConfig>,
callback: SessionStatusCallback,
}
impl UserSessionBuilder {
pub fn new() -> Self {
Self {
config: None,
callback: Arc::new(|_| {}),
}
}
pub fn new() -> Self { Self { config: None } }
pub fn root_dir(mut self, dir: &str, server_config: &ServerConfig) -> Self {
self.config = Some(UserSessionConfig::new(dir, server_config));
self
}
pub fn status_callback(mut self, callback: SessionStatusCallback) -> Self {
self.callback = callback;
self
}
pub fn build(mut self) -> UserSession {
let config = self.config.take().unwrap();
UserSession::new(config, self.callback.clone())
UserSession::new(config)
}
}

View File

@ -19,10 +19,12 @@ use flowy_database::{
use flowy_infra::kv::KV;
use flowy_net::config::ServerConfig;
use flowy_sqlite::ConnectionPool;
use flowy_user_infra::entities::UserStatus;
use flowy_ws::{WsController, WsMessageHandler, WsState};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::broadcast;
pub struct UserSessionConfig {
root_dir: String,
@ -38,12 +40,6 @@ impl UserSessionConfig {
}
}
pub enum SessionStatus {
Login { token: String },
Expired { token: String },
}
pub type SessionStatusCallback = Arc<dyn Fn(SessionStatus) + Send + Sync>;
pub struct UserSession {
database: UserDB,
config: UserSessionConfig,
@ -51,25 +47,28 @@ pub struct UserSession {
server: Server,
session: RwLock<Option<Session>>,
pub ws_controller: Arc<WsController>,
status_callback: SessionStatusCallback,
status_notifier: broadcast::Sender<UserStatus>,
}
impl UserSession {
pub fn new(config: UserSessionConfig, status_callback: SessionStatusCallback) -> Self {
pub fn new(config: UserSessionConfig) -> Self {
let db = UserDB::new(&config.root_dir);
let server = construct_user_server(&config.server_config);
let ws_controller = Arc::new(WsController::new());
let (status_notifier, _) = broadcast::channel(10);
let user_session = Self {
database: db,
config,
server,
session: RwLock::new(None),
ws_controller,
status_callback,
status_notifier,
};
user_session
}
pub fn status_subscribe(&self) -> broadcast::Receiver<UserStatus> { self.status_notifier.subscribe() }
pub fn db_connection(&self) -> Result<DBConnection, UserError> {
let user_id = self.get_session()?.user_id;
self.database.get_connection(&user_id)
@ -96,7 +95,7 @@ impl UserSession {
let _ = self.set_session(Some(session))?;
let user_table = self.save_user(resp.into()).await?;
let user_profile: UserProfile = user_table.into();
(self.status_callback)(SessionStatus::Login {
let _ = self.status_notifier.send(UserStatus::Login {
token: user_profile.token.clone(),
});
Ok(user_profile)
@ -113,8 +112,8 @@ impl UserSession {
let _ = self.set_session(Some(session))?;
let user_table = self.save_user(resp.into()).await?;
let user_profile: UserProfile = user_table.into();
(self.status_callback)(SessionStatus::Login {
token: user_profile.token.clone(),
let _ = self.status_notifier.send(UserStatus::SignUp {
profile: user_profile.clone(),
});
Ok(user_profile)
}
@ -127,7 +126,7 @@ impl UserSession {
diesel::delete(dsl::user_table.filter(dsl::id.eq(&session.user_id))).execute(&*(self.db_connection()?))?;
let _ = self.database.close_user_db(&session.user_id)?;
let _ = self.set_session(None)?;
(self.status_callback)(SessionStatus::Expired {
let _ = self.status_notifier.send(UserStatus::Expired {
token: session.token.clone(),
});
let _ = self.sign_out_on_server(&session.token).await?;

View File

@ -1,17 +1,15 @@
use std::sync::Arc;
use flowy_database::DBConnection;
use flowy_dispatch::prelude::*;
use flowy_document::module::FlowyDocument;
use flowy_net::config::ServerConfig;
use flowy_sqlite::ConnectionPool;
use crate::{
errors::WorkspaceError,
event::WorkspaceEvent,
handlers::*,
services::{server::construct_workspace_server, AppController, TrashCan, ViewController, WorkspaceController},
};
use flowy_database::DBConnection;
use flowy_dispatch::prelude::*;
use flowy_document::module::FlowyDocument;
use flowy_net::config::ServerConfig;
use flowy_sqlite::ConnectionPool;
use std::sync::Arc;
pub trait WorkspaceDeps: WorkspaceUser + WorkspaceDatabase {}

View File

@ -5,9 +5,13 @@ use crate::{
services::{helper::spawn, read_local_workspace_apps, server::Server, AppController, TrashCan, ViewController},
sql_tables::workspace::{WorkspaceTable, WorkspaceTableChangeset, WorkspaceTableSql},
};
use chrono::Utc;
use flowy_database::SqliteConnection;
use flowy_infra::kv::KV;
use flowy_workspace_infra::entities::{app::RepeatedApp, workspace::*};
use flowy_workspace_infra::{
entities::{app::RepeatedApp, workspace::*},
user_default,
};
use std::sync::Arc;
pub struct WorkspaceController {
@ -46,9 +50,42 @@ impl WorkspaceController {
let _ = self.trash_can.init()?;
let _ = self.view_controller.init()?;
let _ = self.app_controller.init()?;
Ok(())
}
pub fn user_did_login(&self) {}
pub fn user_session_expired(&self) {}
pub async fn user_did_sign_up(&self) {
log::debug!("Create user default workspace");
let time = Utc::now();
let mut workspace = user_default::create_default_workspace(time);
let apps = workspace.take_apps().into_inner();
let _ = self.create_workspace(workspace).await?;
for mut app in apps {
let views = app.take_belongings().into_inner();
let _ = self.app_controller.create_app(app).await?;
for view in views {
let _ = self.view_controller.create_view(view).await?;
}
}
match self.user.token() {
Ok(token) => {
let repeated_workspace = RepeatedWorkspace { items: vec![workspace] };
send_dart_notification(&token, WorkspaceNotification::UserCreateWorkspace)
.payload(repeated_workspace)
.send();
},
Err(e) => {
log::error!("{:?}", e);
},
}
}
pub(crate) async fn create_workspace_from_params(
&self,
params: CreateWorkspaceParams,