feat: workspace service in user crate (#4373)

* refactor: user manager

* feat: implement workspace service

* refactor: migrate user data when sign up

* chore: fmt

* chore: enable beta cloud

* chore: update ci

* chore: trim slash
This commit is contained in:
Nathan.fooo 2024-01-12 14:34:59 +08:00 committed by GitHub
parent 690a3746fa
commit 9500abb363
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
66 changed files with 879 additions and 1079 deletions

View File

@ -22,7 +22,7 @@ on:
env:
CARGO_TERM_COLOR: always
FLUTTER_VERSION: "3.18.0-0.2.pre"
RUST_TOOLCHAIN: "1.70"
RUST_TOOLCHAIN: "1.75"
CARGO_MAKE_VERSION: "0.36.6"
concurrency:

View File

@ -26,7 +26,7 @@ void main() {
group('appflowy cloud', () {
testWidgets('anon user and then sign in', (tester) async {
await tester.initializeAppFlowy(
cloudType: AuthenticatorType.appflowyCloud,
cloudType: AuthenticatorType.appflowyCloudSelfHost,
);
tester.expectToSeeText(LocaleKeys.signIn_loginStartWithAnonymous.tr());

View File

@ -22,7 +22,7 @@ void main() {
group('appflowy cloud auth', () {
testWidgets('sign in', (tester) async {
await tester.initializeAppFlowy(
cloudType: AuthenticatorType.appflowyCloud,
cloudType: AuthenticatorType.appflowyCloudSelfHost,
);
await tester.tapGoogleLoginInButton();
await tester.expectToSeeHomePageWithGetStartedPage();
@ -30,7 +30,7 @@ void main() {
testWidgets('sign out', (tester) async {
await tester.initializeAppFlowy(
cloudType: AuthenticatorType.appflowyCloud,
cloudType: AuthenticatorType.appflowyCloudSelfHost,
);
await tester.tapGoogleLoginInButton();
@ -49,7 +49,7 @@ void main() {
testWidgets('sign in as annoymous', (tester) async {
await tester.initializeAppFlowy(
cloudType: AuthenticatorType.appflowyCloud,
cloudType: AuthenticatorType.appflowyCloudSelfHost,
);
await tester.tapSignInAsGuest();
@ -61,7 +61,7 @@ void main() {
testWidgets('enable sync', (tester) async {
await tester.initializeAppFlowy(
cloudType: AuthenticatorType.appflowyCloud,
cloudType: AuthenticatorType.appflowyCloudSelfHost,
);
await tester.tapGoogleLoginInButton();

View File

@ -31,7 +31,7 @@ void main() {
group('appflowy cloud document', () {
testWidgets('sync local docuemnt to server', (tester) async {
await tester.initializeAppFlowy(
cloudType: AuthenticatorType.appflowyCloud,
cloudType: AuthenticatorType.appflowyCloudSelfHost,
email: email,
);
await tester.tapGoogleLoginInButton();
@ -58,7 +58,7 @@ void main() {
testWidgets('sync doc from server', (tester) async {
await tester.initializeAppFlowy(
cloudType: AuthenticatorType.appflowyCloud,
cloudType: AuthenticatorType.appflowyCloudSelfHost,
email: email,
);
await tester.tapGoogleLoginInButton();

View File

@ -11,7 +11,7 @@ void main() {
group('Empty', () {
testWidgets('set appflowy cloud', (tester) async {
await tester.initializeAppFlowy(
cloudType: AuthenticatorType.appflowyCloud,
cloudType: AuthenticatorType.appflowyCloudSelfHost,
);
});
});

View File

@ -32,7 +32,7 @@ void main() {
group('appflowy cloud setting', () {
testWidgets('sync user name and icon to server', (tester) async {
await tester.initializeAppFlowy(
cloudType: AuthenticatorType.appflowyCloud,
cloudType: AuthenticatorType.appflowyCloudSelfHost,
email: email,
);
await tester.tapGoogleLoginInButton();
@ -70,7 +70,7 @@ void main() {
testWidgets('get user icon and name from server', (tester) async {
await tester.initializeAppFlowy(
cloudType: AuthenticatorType.appflowyCloud,
cloudType: AuthenticatorType.appflowyCloudSelfHost,
email: email,
);
await tester.tapGoogleLoginInButton();

View File

@ -63,6 +63,10 @@ extension AppFlowyTestBase on WidgetTester {
rustEnvs["GOTRUE_ADMIN_EMAIL"] = "admin@example.com";
rustEnvs["GOTRUE_ADMIN_PASSWORD"] = "password";
break;
case AuthenticatorType.appflowyCloudSelfHost:
rustEnvs["GOTRUE_ADMIN_EMAIL"] = "admin@example.com";
rustEnvs["GOTRUE_ADMIN_PASSWORD"] = "password";
break;
}
}
return rustEnvs;
@ -89,6 +93,13 @@ extension AppFlowyTestBase on WidgetTester {
() => AppFlowyCloudMockAuthService(email: email),
);
break;
case AuthenticatorType.appflowyCloudSelfHost:
await useAppFlowyCloud();
getIt.unregister<AuthService>();
getIt.registerFactory<AuthService>(
() => AppFlowyCloudMockAuthService(email: email),
);
break;
}
}
},
@ -258,7 +269,7 @@ Future<void> useSupabaseCloud() async {
}
Future<void> useAppFlowyCloud() async {
await setAuthenticatorType(AuthenticatorType.appflowyCloud);
await setAuthenticatorType(AuthenticatorType.appflowyCloudSelfHost);
await setAppFlowyCloudUrl(Some(TestEnv.afCloudUrl));
}

View File

@ -28,6 +28,9 @@ Future<void> setAuthenticatorType(AuthenticatorType ty) async {
case AuthenticatorType.appflowyCloud:
getIt<KeyValueStorage>().set(KVKeys.kCloudType, 2.toString());
break;
case AuthenticatorType.appflowyCloudSelfHost:
getIt<KeyValueStorage>().set(KVKeys.kCloudType, 3.toString());
break;
}
}
@ -52,6 +55,8 @@ Future<AuthenticatorType> getAuthenticatorType() async {
return AuthenticatorType.supabase;
case "2":
return AuthenticatorType.appflowyCloud;
case "3":
return AuthenticatorType.appflowyCloudSelfHost;
default:
return AuthenticatorType.local;
}
@ -75,7 +80,8 @@ bool get isAuthEnabled {
return env.supabaseConfig.isValid;
}
if (env.authenticatorType == AuthenticatorType.appflowyCloud) {
if (env.authenticatorType == AuthenticatorType.appflowyCloudSelfHost ||
env.authenticatorType == AuthenticatorType.appflowyCloud) {
return env.appflowyCloudConfig.isValid;
}
@ -92,20 +98,28 @@ bool get isAuthEnabled {
/// if the application is in release or develop mode and the current cloud type
/// is `CloudType.supabase`. Otherwise, it returns `false`.
bool get isSupabaseEnabled {
return currentCloudType() == AuthenticatorType.supabase;
return currentCloudType().isSupabaseEnabled;
}
/// Determines if AppFlowy Cloud is enabled.
bool get isAppFlowyCloudEnabled {
return currentCloudType() == AuthenticatorType.appflowyCloud;
return currentCloudType().isAppFlowyCloudEnabled;
}
enum AuthenticatorType {
local,
supabase,
appflowyCloud;
appflowyCloud,
appflowyCloudSelfHost;
bool get isLocal => this == AuthenticatorType.local;
bool get isAppFlowyCloudEnabled =>
this == AuthenticatorType.appflowyCloudSelfHost ||
this == AuthenticatorType.appflowyCloud;
bool get isSupabaseEnabled => this == AuthenticatorType.supabase;
int get value {
switch (this) {
case AuthenticatorType.local:
@ -114,6 +128,8 @@ enum AuthenticatorType {
return 1;
case AuthenticatorType.appflowyCloud:
return 2;
case AuthenticatorType.appflowyCloudSelfHost:
return 3;
}
}
@ -125,6 +141,8 @@ enum AuthenticatorType {
return AuthenticatorType.supabase;
case 2:
return AuthenticatorType.appflowyCloud;
case 3:
return AuthenticatorType.appflowyCloudSelfHost;
default:
return AuthenticatorType.local;
}
@ -160,7 +178,17 @@ class AppFlowyCloudSharedEnv {
// If [Env.enableCustomCloud] is true, then use the custom cloud configuration.
if (Env.enableCustomCloud) {
// Use the custom cloud configuration.
final cloudType = await getAuthenticatorType();
var cloudType = await getAuthenticatorType();
// In the backend, the value '2' represents the use of AppFlowy Cloud. However, in the frontend,
// we distinguish between [AuthenticatorType.appflowyCloudSelfHost] and [AuthenticatorType.appflowyCloud].
// When the cloud type is [AuthenticatorType.appflowyCloudSelfHost] in the frontend, it should be
// converted to [AuthenticatorType.appflowyCloud] to align with the backend representation,
// where both types are indicated by the value '2'.
if (cloudType == AuthenticatorType.appflowyCloudSelfHost) {
cloudType = AuthenticatorType.appflowyCloud;
}
final appflowyCloudConfig = cloudType.isLocal
? AppFlowyCloudConfiguration.defaultConfig()
: await getAppFlowyCloudConfig();

View File

@ -8,8 +8,10 @@ part 'env.g.dart';
abstract class Env {
// This flag is used to decide if users can dynamically configure cloud settings. It turns true when a .env file exists containing the APPFLOWY_CLOUD_URL variable. By default, this is set to false.
static bool get enableCustomCloud {
return Env.authenticatorType == AuthenticatorType.appflowyCloud.value &&
_Env.afCloudUrl.isEmpty;
return Env.authenticatorType ==
AuthenticatorType.appflowyCloudSelfHost.value ||
Env.authenticatorType == AuthenticatorType.appflowyCloud.value &&
_Env.afCloudUrl.isEmpty;
}
@EnviedField(

View File

@ -56,7 +56,7 @@ class DependencyResolver {
Future<void> _resolveCloudDeps(GetIt getIt) async {
final env = await AppFlowyCloudSharedEnv.fromEnv();
Log.info("cloud setting: \n$env");
Log.info("cloud setting: $env");
getIt.registerFactory<AppFlowyCloudSharedEnv>(() => env);
if (isAppFlowyCloudEnabled) {
@ -141,6 +141,7 @@ void _resolveUserDeps(GetIt getIt, IntegrationMode mode) {
getIt.registerFactory<AuthService>(() => SupabaseAuthService());
break;
case AuthenticatorType.appflowyCloud:
case AuthenticatorType.appflowyCloudSelfHost:
getIt.registerFactory<AuthService>(() => AppFlowyCloudAuthService());
break;
}

View File

@ -31,8 +31,8 @@ class AppFlowyCloudURLsBloc
} else {
validateUrl(state.updatedServerUrl).fold(
(url) async {
if (state.config.base_url != state.updatedServerUrl) {
await setAppFlowyCloudUrl(Some(state.updatedServerUrl));
if (state.config.base_url != url) {
await setAppFlowyCloudUrl(Some(url));
}
add(const AppFlowyCloudURLsEvent.didSaveConfig());
},
@ -83,7 +83,7 @@ class AppFlowyCloudURLsState with _$AppFlowyCloudURLsState {
Either<String, String> validateUrl(String url) {
try {
// Use Uri.parse to validate the url.
final uri = Uri.parse(url);
final uri = Uri.parse(removeTrailingSlash(url));
if (uri.isScheme('HTTP') || uri.isScheme('HTTPS')) {
return left(uri.toString());
} else {
@ -93,3 +93,10 @@ Either<String, String> validateUrl(String url) {
return right(e.toString());
}
}
String removeTrailingSlash(String input) {
if (input.endsWith('/')) {
return input.substring(0, input.length - 1);
}
return input;
}

View File

@ -11,7 +11,6 @@ class CloudSettingBloc extends Bloc<CloudSettingEvent, CloudSettingState> {
await event.when(
initial: () async {},
updateCloudType: (AuthenticatorType newCloudType) async {
await setAuthenticatorType(newCloudType);
emit(state.copyWith(cloudType: newCloudType));
},
);

View File

@ -2,7 +2,7 @@ import 'package:appflowy/plugins/database/application/defines.dart';
import 'package:appflowy_backend/dispatch/dispatch.dart';
import 'package:appflowy_backend/log.dart';
import 'package:appflowy_backend/protobuf/flowy-error/errors.pb.dart';
import 'package:appflowy_backend/protobuf/flowy-folder/import.pb.dart';
import 'package:appflowy_backend/protobuf/flowy-user/import_data.pb.dart';
import 'package:dartz/dartz.dart';
import 'package:easy_localization/easy_localization.dart';
import 'package:flutter_bloc/flutter_bloc.dart';
@ -25,7 +25,7 @@ class SettingFileImportBloc
emit(
state.copyWith(loadingState: const LoadingState.loading()),
);
FolderEventImportAppFlowyDataFolder(payload).send().then((result) {
UserEventImportAppFlowyDataFolder(payload).send().then((result) {
if (!isClosed) {
add(SettingFileImportEvent.finishImport(result));
}

View File

@ -1,3 +1,4 @@
import 'package:appflowy/env/cloud_env.dart';
import 'package:appflowy/env/env.dart';
import 'package:appflowy/generated/locale_keys.g.dart';
import 'package:appflowy/workspace/application/settings/appflowy_cloud_setting_bloc.dart';
@ -7,7 +8,7 @@ import 'package:appflowy_backend/dispatch/dispatch.dart';
import 'package:appflowy_backend/log.dart';
import 'package:appflowy_backend/protobuf/flowy-error/errors.pb.dart';
import 'package:appflowy_backend/protobuf/flowy-user/user_setting.pb.dart';
import 'package:dartz/dartz.dart' show Either;
import 'package:dartz/dartz.dart' show Either, Some;
import 'package:easy_localization/easy_localization.dart';
import 'package:flowy_infra/size.dart';
import 'package:flowy_infra_ui/flowy_infra_ui.dart';
@ -17,19 +18,65 @@ import 'package:flutter/material.dart';
import 'package:flutter_bloc/flutter_bloc.dart';
import 'package:url_launcher/url_launcher.dart';
class SettingAppFlowyCloudView extends StatelessWidget {
final VoidCallback didResetServerUrl;
const SettingAppFlowyCloudView({required this.didResetServerUrl, super.key});
class AppFlowyCloudViewSetting extends StatelessWidget {
final VoidCallback restartAppFlowy;
const AppFlowyCloudViewSetting({required this.restartAppFlowy, super.key});
@override
Widget build(BuildContext context) {
return CustomAppFlowyCloudView(didResetServerUrl: didResetServerUrl);
return FutureBuilder<Either<CloudSettingPB, FlowyError>>(
future: UserEventGetCloudConfig().send(),
builder: (context, snapshot) {
if (snapshot.data != null &&
snapshot.connectionState == ConnectionState.done) {
return snapshot.data!.fold(
(setting) => _renderContent(context, setting),
(err) => FlowyErrorPage.message(err.toString(), howToFix: ""),
);
} else {
return const Center(
child: CircularProgressIndicator(),
);
}
},
);
}
BlocProvider<AppFlowyCloudSettingBloc> _renderContent(
BuildContext context,
CloudSettingPB setting,
) {
return BlocProvider(
create: (context) => AppFlowyCloudSettingBloc(setting)
..add(const AppFlowyCloudSettingEvent.initial()),
child: Column(
children: [
const AppFlowyCloudEnableSync(),
const VSpace(40),
RestartButton(
onClick: () async {
NavigatorAlertDialog(
title: LocaleKeys.settings_menu_restartAppTip.tr(),
confirm: () async {
await setAppFlowyCloudUrl(
const Some("https://beta.appflowy.cloud"),
);
await setAuthenticatorType(AuthenticatorType.appflowyCloud);
restartAppFlowy();
},
).show(context);
},
),
],
),
);
}
}
class CustomAppFlowyCloudView extends StatelessWidget {
final VoidCallback didResetServerUrl;
const CustomAppFlowyCloudView({required this.didResetServerUrl, super.key});
final VoidCallback restartAppFlowy;
const CustomAppFlowyCloudView({required this.restartAppFlowy, super.key});
@override
Widget build(BuildContext context) {
@ -63,7 +110,7 @@ class CustomAppFlowyCloudView extends StatelessWidget {
// If the enableCustomCloud flag is true, then the user can dynamically configure cloud settings. Otherwise, the user cannot dynamically configure cloud settings.
if (Env.enableCustomCloud) {
children.add(
AppFlowyCloudURLs(didUpdateUrls: () => didResetServerUrl()),
AppFlowyCloudURLs(restartAppFlowy: () => restartAppFlowy()),
);
} else {
children.add(
@ -87,9 +134,9 @@ class CustomAppFlowyCloudView extends StatelessWidget {
}
class AppFlowyCloudURLs extends StatelessWidget {
final VoidCallback didUpdateUrls;
final VoidCallback restartAppFlowy;
const AppFlowyCloudURLs({
required this.didUpdateUrls,
required this.restartAppFlowy,
super.key,
});
@ -99,9 +146,10 @@ class AppFlowyCloudURLs extends StatelessWidget {
create: (context) =>
AppFlowyCloudURLsBloc()..add(const AppFlowyCloudURLsEvent.initial()),
child: BlocListener<AppFlowyCloudURLsBloc, AppFlowyCloudURLsState>(
listener: (context, state) {
listener: (context, state) async {
if (state.restartApp) {
didUpdateUrls();
await setAuthenticatorType(AuthenticatorType.appflowyCloudSelfHost);
restartAppFlowy();
}
},
child: BlocBuilder<AppFlowyCloudURLsBloc, AppFlowyCloudURLsState>(
@ -122,22 +170,15 @@ class AppFlowyCloudURLs extends StatelessWidget {
},
),
const VSpace(20),
FlowyButton(
isSelected: true,
useIntrinsicWidth: true,
margin: const EdgeInsets.symmetric(
horizontal: 30,
vertical: 10,
),
text: FlowyText(
LocaleKeys.settings_menu_restartApp.tr(),
),
onTap: () {
RestartButton(
onClick: () async {
NavigatorAlertDialog(
title: LocaleKeys.settings_menu_restartAppTip.tr(),
confirm: () => context.read<AppFlowyCloudURLsBloc>().add(
const AppFlowyCloudURLsEvent.confirmUpdate(),
),
confirm: () {
context.read<AppFlowyCloudURLsBloc>().add(
const AppFlowyCloudURLsEvent.confirmUpdate(),
);
},
).show(context);
},
),
@ -286,3 +327,24 @@ class AppFlowyCloudEnableSync extends StatelessWidget {
);
}
}
class RestartButton extends StatelessWidget {
final VoidCallback onClick;
const RestartButton({required this.onClick, super.key});
@override
Widget build(BuildContext context) {
return FlowyButton(
isSelected: true,
useIntrinsicWidth: true,
margin: const EdgeInsets.symmetric(
horizontal: 30,
vertical: 10,
),
text: FlowyText(
LocaleKeys.settings_menu_restartApp.tr(),
),
onTap: onClick,
);
}
}

View File

@ -79,8 +79,12 @@ class SettingCloud extends StatelessWidget {
didResetServerUrl: didResetServerUrl,
);
case AuthenticatorType.appflowyCloud:
return SettingAppFlowyCloudView(
didResetServerUrl: didResetServerUrl,
return AppFlowyCloudViewSetting(
restartAppFlowy: didResetServerUrl,
);
case AuthenticatorType.appflowyCloudSelfHost:
return CustomAppFlowyCloudView(
restartAppFlowy: didResetServerUrl,
);
}
}
@ -165,5 +169,7 @@ String titleFromCloudType(AuthenticatorType cloudType) {
return LocaleKeys.settings_menu_cloudSupabase.tr();
case AuthenticatorType.appflowyCloud:
return LocaleKeys.settings_menu_cloudAppFlowy.tr();
case AuthenticatorType.appflowyCloudSelfHost:
return LocaleKeys.settings_menu_cloudAppFlowySelfHost.tr();
}
}

View File

@ -1,3 +1,4 @@
import 'package:appflowy/env/cloud_env.dart';
import 'package:appflowy/generated/locale_keys.g.dart';
import 'package:appflowy/workspace/presentation/widgets/dialogs.dart';
import 'package:easy_localization/easy_localization.dart';
@ -29,7 +30,12 @@ class SettingLocalCloud extends StatelessWidget {
onTap: () {
NavigatorAlertDialog(
title: LocaleKeys.settings_menu_restartAppTip.tr(),
confirm: didResetServerUrl,
confirm: () async {
await setAuthenticatorType(
AuthenticatorType.local,
);
didResetServerUrl();
},
).show(context);
},
),

View File

@ -1,3 +1,4 @@
import 'package:appflowy/env/cloud_env.dart';
import 'package:appflowy/generated/locale_keys.g.dart';
import 'package:appflowy/workspace/application/settings/supabase_cloud_setting_bloc.dart';
import 'package:appflowy/workspace/application/settings/supabase_cloud_urls_bloc.dart';
@ -84,8 +85,9 @@ class SupabaseCloudURLs extends StatelessWidget {
return BlocProvider(
create: (context) => SupabaseCloudURLsBloc(),
child: BlocListener<SupabaseCloudURLsBloc, SupabaseCloudURLsState>(
listener: (context, state) {
listener: (context, state) async {
if (state.restartApp) {
await setAuthenticatorType(AuthenticatorType.supabase);
didUpdateUrls();
}
},

View File

@ -30,7 +30,7 @@ class SettingThirdPartyLogin extends StatelessWidget {
},
builder: (_, state) {
final indicator = state.isSubmitting
? const CircularProgressIndicator.adaptive()
? const LinearProgressIndicator(minHeight: 1)
: const SizedBox.shrink();
final promptMessage = state.isSubmitting
@ -50,12 +50,13 @@ class SettingThirdPartyLogin extends StatelessWidget {
fontSize: 16,
),
const HSpace(6),
indicator,
],
),
const VSpace(6),
promptMessage,
const VSpace(6),
indicator,
const VSpace(6),
if (isAuthEnabled) const ThirdPartySignInButtons(),
const VSpace(6),
],

View File

@ -282,7 +282,8 @@
"cloudSupabaseUrl": "Supabase URL",
"cloudSupabaseAnonKey": "Supabase anon key",
"cloudSupabaseAnonKeyCanNotBeEmpty": "The anon key can't be empty if the supabase url is not empty",
"cloudAppFlowy": "AppFlowy Cloud",
"cloudAppFlowy": "AppFlowy Cloud Beta",
"cloudAppFlowySelfHost": "AppFlowy Cloud Self-hosted",
"clickToCopy": "Click to copy",
"selfHostStart": "If you don't have a server, please refer to the",
"selfHostContent": "document",

View File

@ -3,8 +3,7 @@ use std::convert::TryFrom;
use std::sync::Arc;
use bytes::Bytes;
use flowy_folder::entities::ImportAppFlowyDataPB;
use flowy_folder::event_map::FolderEvent;
use nanoid::nanoid;
use protobuf::ProtobufError;
use tokio::sync::broadcast::{channel, Sender};
@ -17,10 +16,12 @@ use flowy_server::supabase::define::{USER_DEVICE_ID, USER_EMAIL, USER_SIGN_IN_UR
use flowy_server_pub::af_cloud_config::AFCloudConfiguration;
use flowy_server_pub::AuthenticatorType;
use flowy_user::entities::{
AuthenticatorPB, CloudSettingPB, OauthSignInPB, SignInUrlPB, SignInUrlPayloadPB, SignUpPayloadPB,
UpdateCloudConfigPB, UpdateUserProfilePayloadPB, UserProfilePB,
AuthenticatorPB, CloudSettingPB, ImportAppFlowyDataPB, OauthSignInPB, SignInUrlPB,
SignInUrlPayloadPB, SignUpPayloadPB, UpdateCloudConfigPB, UpdateUserProfilePayloadPB,
UserProfilePB,
};
use flowy_user::errors::{FlowyError, FlowyResult};
use flowy_user::event_map::UserEvent;
use flowy_user::event_map::UserEvent::*;
use lib_dispatch::prelude::{af_spawn, AFPluginDispatcher, AFPluginRequest, ToBytes};
@ -200,7 +201,7 @@ impl EventIntegrationTest {
import_container_name: name,
};
match EventBuilder::new(self.clone())
.event(FolderEvent::ImportAppFlowyDataFolder)
.event(UserEvent::ImportAppFlowyDataFolder)
.payload(payload)
.async_send()
.await

View File

@ -54,8 +54,8 @@ async fn af_cloud_sync_anon_user_document_test() {
// workspace:
// view: SyncDocument
let views = test.get_all_workspace_views().await;
assert_eq!(views.len(), 1);
let document_id = views[0].id.clone();
assert_eq!(views.len(), 2);
let document_id = views[1].id.clone();
test.open_document(document_id.clone()).await;
// wait all update are send to the remote

View File

@ -72,17 +72,16 @@ async fn migrate_anon_user_data_to_af_cloud_test() {
// The anon user data will be migrated to the AppFlowy cloud after sign up
let user = test.af_cloud_sign_up().await;
let user_trash = test.get_trash().await;
let workspace = test.get_current_workspace().await;
println!("user workspace: {:?}", workspace.id);
assert_eq!(user.authenticator, AuthenticatorPB::AppFlowyCloud);
let user_first_level_views = test.get_all_workspace_views().await;
// assert_eq!(user_first_level_views.len(), 2);
assert_eq!(user_first_level_views.len(), 2);
println!("user first level views: {:?}", user_first_level_views);
let user_second_level_views = test
.get_view(&user_first_level_views[0].id)
.get_view(&user_first_level_views[1].id)
.await
.child_views;
println!("user second level views: {:?}", user_second_level_views);
@ -94,11 +93,13 @@ async fn migrate_anon_user_data_to_af_cloud_test() {
// check first level
assert_eq!(anon_first_level_views.len(), 1);
assert_eq!(user_first_level_views.len(), 1);
assert_ne!(anon_first_level_views[0].id, user_first_level_views[0].id);
// the first view of user_first_level_views is the default get started view
assert_eq!(user_first_level_views.len(), 2);
assert_ne!(anon_first_level_views[0].id, user_first_level_views[1].id);
assert_eq!(
anon_first_level_views[0].name,
user_first_level_views[0].name
user_first_level_views[1].name
);
// check second level
@ -114,9 +115,5 @@ async fn migrate_anon_user_data_to_af_cloud_test() {
assert_eq!(user_third_level_views[0].name, "Grid1".to_string());
assert_eq!(user_third_level_views[1].name, "Grid2".to_string());
// check the trash
assert_eq!(user_trash.items.len(), 1);
assert_eq!(user_trash.items[0].name, anon_trash.items[0].name);
drop(cleaner);
}

View File

@ -7,12 +7,13 @@ use flowy_sqlite::{
prelude::*,
schema::{collab_snapshot, collab_snapshot::dsl},
};
use flowy_user::user_manager::UserManager;
use flowy_user::services::authenticate_user::AuthenticateUser;
use lib_infra::util::timestamp;
use std::sync::Weak;
use tracing::debug;
pub struct SnapshotDBImpl(pub Weak<UserManager>);
pub struct SnapshotDBImpl(pub Weak<AuthenticateUser>);
impl SnapshotPersistence for SnapshotDBImpl {
fn create_snapshot(
@ -24,16 +25,12 @@ impl SnapshotPersistence for SnapshotDBImpl {
) -> Result<(), PersistenceError> {
let collab_type = collab_type.clone();
let object_id = object_id.to_string();
let weak_user_session = self.0.clone();
let weak_user = self.0.clone();
tokio::task::spawn_blocking(move || {
if let Some(pool) = weak_user_session
if let Some(mut conn) = weak_user
.upgrade()
.and_then(|user_session| user_session.db_pool(uid).ok())
.and_then(|authenticate_user| authenticate_user.get_sqlite_connection(uid).ok())
{
let mut conn = pool
.get()
.map_err(|e| PersistenceError::Internal(e.into()))?;
// Save the snapshot data to disk
let result = CollabSnapshotSql::create(
CollabSnapshotRow::new(object_id.clone(), collab_type.to_string(), encoded_v1),

View File

@ -1,25 +1,22 @@
use std::sync::{Arc, Weak};
use tokio::sync::RwLock;
use collab_integrate::collab_builder::AppFlowyCollabBuilder;
use collab_integrate::CollabKVDB;
use flowy_database2::{DatabaseManager, DatabaseUser};
use flowy_database_pub::cloud::DatabaseCloudService;
use flowy_error::FlowyError;
use flowy_user::user_manager::UserManager;
use flowy_user::services::authenticate_user::AuthenticateUser;
use lib_infra::priority_task::TaskDispatcher;
use std::sync::{Arc, Weak};
use tokio::sync::RwLock;
pub struct DatabaseDepsResolver();
impl DatabaseDepsResolver {
pub async fn resolve(
user_manager: Weak<UserManager>,
authenticate_user: Weak<AuthenticateUser>,
task_scheduler: Arc<RwLock<TaskDispatcher>>,
collab_builder: Arc<AppFlowyCollabBuilder>,
cloud_service: Arc<dyn DatabaseCloudService>,
) -> Arc<DatabaseManager> {
let user = Arc::new(DatabaseUserImpl(user_manager));
let user = Arc::new(DatabaseUserImpl(authenticate_user));
Arc::new(DatabaseManager::new(
user,
task_scheduler,
@ -29,7 +26,7 @@ impl DatabaseDepsResolver {
}
}
struct DatabaseUserImpl(Weak<UserManager>);
struct DatabaseUserImpl(Weak<AuthenticateUser>);
impl DatabaseUser for DatabaseUserImpl {
fn user_id(&self) -> Result<i64, FlowyError> {
self
@ -39,14 +36,6 @@ impl DatabaseUser for DatabaseUserImpl {
.user_id()
}
fn token(&self) -> Result<Option<String>, FlowyError> {
self
.0
.upgrade()
.ok_or(FlowyError::internal().with_context("Unexpected error: UserSession is None"))?
.token()
}
fn collab_db(&self, uid: i64) -> Result<Weak<CollabKVDB>, FlowyError> {
self
.0

View File

@ -9,20 +9,20 @@ use flowy_document::manager::{DocumentManager, DocumentSnapshotService, Document
use flowy_document_pub::cloud::DocumentCloudService;
use flowy_error::{FlowyError, FlowyResult};
use flowy_storage::FileStorageService;
use flowy_user::user_manager::UserManager;
use flowy_user::services::authenticate_user::AuthenticateUser;
pub struct DocumentDepsResolver();
impl DocumentDepsResolver {
pub fn resolve(
user_manager: Weak<UserManager>,
authenticate_user: Weak<AuthenticateUser>,
_database_manager: &Arc<DatabaseManager>,
collab_builder: Arc<AppFlowyCollabBuilder>,
cloud_service: Arc<dyn DocumentCloudService>,
storage_service: Weak<dyn FileStorageService>,
) -> Arc<DocumentManager> {
let user_service: Arc<dyn DocumentUserService> =
Arc::new(DocumentUserImpl(user_manager.clone()));
let snapshot_service = Arc::new(DocumentSnapshotImpl(user_manager));
Arc::new(DocumentUserImpl(authenticate_user.clone()));
let snapshot_service = Arc::new(DocumentSnapshotImpl(authenticate_user));
Arc::new(DocumentManager::new(
user_service.clone(),
collab_builder,
@ -33,10 +33,10 @@ impl DocumentDepsResolver {
}
}
struct DocumentSnapshotImpl(Weak<UserManager>);
struct DocumentSnapshotImpl(Weak<AuthenticateUser>);
impl DocumentSnapshotImpl {
pub fn get_user_manager(&self) -> FlowyResult<Arc<UserManager>> {
pub fn get_authenticate_user(&self) -> FlowyResult<Arc<AuthenticateUser>> {
self
.0
.upgrade()
@ -49,9 +49,9 @@ impl DocumentSnapshotService for DocumentSnapshotImpl {
&self,
document_id: &str,
) -> FlowyResult<Vec<DocumentSnapshotMeta>> {
let user_manager = self.get_user_manager()?;
let uid = user_manager.user_id()?;
let mut db = user_manager.db_connection(uid)?;
let authenticate_user = self.get_authenticate_user()?;
let uid = authenticate_user.user_id()?;
let mut db = authenticate_user.get_sqlite_connection(uid)?;
CollabSnapshotSql::get_all_snapshots(document_id, &mut db).map(|rows| {
rows
.into_iter()
@ -65,9 +65,9 @@ impl DocumentSnapshotService for DocumentSnapshotImpl {
}
fn get_document_snapshot(&self, snapshot_id: &str) -> FlowyResult<DocumentSnapshotData> {
let user_manager = self.get_user_manager()?;
let uid = user_manager.user_id()?;
let mut db = user_manager.db_connection(uid)?;
let authenticate_user = self.get_authenticate_user()?;
let uid = authenticate_user.user_id()?;
let mut db = authenticate_user.get_sqlite_connection(uid)?;
CollabSnapshotSql::get_snapshot(snapshot_id, &mut db)
.map(|row| DocumentSnapshotData {
object_id: row.id,
@ -79,7 +79,7 @@ impl DocumentSnapshotService for DocumentSnapshotImpl {
}
}
struct DocumentUserImpl(Weak<UserManager>);
struct DocumentUserImpl(Weak<AuthenticateUser>);
impl DocumentUserService for DocumentUserImpl {
fn user_id(&self) -> Result<i64, FlowyError> {
self
@ -97,14 +97,6 @@ impl DocumentUserService for DocumentUserImpl {
.workspace_id()
}
fn token(&self) -> Result<Option<String>, FlowyError> {
self
.0
.upgrade()
.ok_or(FlowyError::internal().with_context("Unexpected error: UserSession is None"))?
.token()
}
fn collab_db(&self, uid: i64) -> Result<Weak<CollabKVDB>, FlowyError> {
self
.0

View File

@ -1,11 +1,4 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::{Arc, Weak};
use bytes::Bytes;
use tokio::sync::RwLock;
use collab_integrate::collab_builder::AppFlowyCollabBuilder;
use collab_integrate::CollabKVDB;
use flowy_database2::entities::DatabaseLayoutPB;
@ -21,11 +14,13 @@ use flowy_folder::manager::{FolderManager, FolderUser};
use flowy_folder::share::ImportType;
use flowy_folder::view_operation::{FolderOperationHandler, FolderOperationHandlers, View};
use flowy_folder::ViewLayout;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::{Arc, Weak};
use tokio::sync::RwLock;
use flowy_folder_pub::entities::ImportData;
use flowy_folder_pub::folder_builder::{ParentChildViews, WorkspaceViewBuilder};
use flowy_user::services::data_import::ImportDataSource;
use flowy_user::user_manager::UserManager;
use flowy_folder_pub::folder_builder::WorkspaceViewBuilder;
use flowy_user::services::authenticate_user::AuthenticateUser;
use crate::integrate::server::ServerProvider;
use lib_dispatch::prelude::ToBytes;
@ -35,15 +30,14 @@ use lib_infra::future::FutureResult;
pub struct FolderDepsResolver();
impl FolderDepsResolver {
pub async fn resolve(
user_manager: Weak<UserManager>,
authenticate_user: Weak<AuthenticateUser>,
document_manager: &Arc<DocumentManager>,
database_manager: &Arc<DatabaseManager>,
collab_builder: Arc<AppFlowyCollabBuilder>,
server_provider: Arc<ServerProvider>,
) -> Arc<FolderManager> {
let user: Arc<dyn FolderUser> = Arc::new(FolderUserImpl {
user_manager: user_manager.clone(),
database_manager: Arc::downgrade(database_manager),
authenticate_user: authenticate_user.clone(),
});
let handlers = folder_operation_handlers(document_manager.clone(), database_manager.clone());
@ -77,67 +71,26 @@ fn folder_operation_handlers(
}
struct FolderUserImpl {
user_manager: Weak<UserManager>,
database_manager: Weak<DatabaseManager>,
authenticate_user: Weak<AuthenticateUser>,
}
#[async_trait]
impl FolderUser for FolderUserImpl {
fn user_id(&self) -> Result<i64, FlowyError> {
self
.user_manager
.authenticate_user
.upgrade()
.ok_or(FlowyError::internal().with_context("Unexpected error: UserSession is None"))?
.user_id()
}
fn token(&self) -> Result<Option<String>, FlowyError> {
self
.user_manager
.upgrade()
.ok_or(FlowyError::internal().with_context("Unexpected error: UserSession is None"))?
.token()
}
fn collab_db(&self, uid: i64) -> Result<Weak<CollabKVDB>, FlowyError> {
self
.user_manager
.authenticate_user
.upgrade()
.ok_or(FlowyError::internal().with_context("Unexpected error: UserSession is None"))?
.get_collab_db(uid)
}
async fn import_appflowy_data_folder(
&self,
path: &str,
container_name: Option<String>,
) -> Result<Vec<ParentChildViews>, FlowyError> {
match (self.user_manager.upgrade(), self.database_manager.upgrade()) {
(Some(user_manager), Some(data_manager)) => {
let source = ImportDataSource::AppFlowyDataFolder {
path: path.to_string(),
container_name,
};
let import_data = user_manager.import_data_from_source(source).await?;
match import_data {
ImportData::AppFlowyDataFolder {
views,
database_view_ids_by_database_id,
row_object_ids: _,
database_object_ids: _,
document_object_ids: _,
} => {
let _uid = self.user_id()?;
data_manager
.track_database(database_view_ids_by_database_id)
.await?;
Ok(views)
},
}
},
_ => Err(FlowyError::internal().with_context("Unexpected error: UserSession is None")),
}
}
}
struct DocumentFolderOperation(Arc<DocumentManager>);

View File

@ -2,6 +2,7 @@ pub use collab_deps::*;
pub use database_deps::*;
pub use document_deps::*;
pub use folder_deps::*;
pub use user_deps::*;
mod collab_deps;
mod document_deps;

View File

@ -1 +1,62 @@
use crate::integrate::server::ServerProvider;
use collab_integrate::collab_builder::AppFlowyCollabBuilder;
use flowy_database2::DatabaseManager;
use flowy_error::FlowyResult;
use flowy_folder::manager::FolderManager;
use flowy_folder_pub::folder_builder::ParentChildViews;
use flowy_sqlite::kv::StorePreferences;
use flowy_user::services::authenticate_user::AuthenticateUser;
use flowy_user::user_manager::UserManager;
use flowy_user_pub::workspace_service::UserWorkspaceService;
use lib_infra::async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
pub struct UserDepsResolver();
impl UserDepsResolver {
pub async fn resolve(
authenticate_user: Arc<AuthenticateUser>,
collab_builder: Arc<AppFlowyCollabBuilder>,
server_provider: Arc<ServerProvider>,
store_preference: Arc<StorePreferences>,
database_manager: Arc<DatabaseManager>,
folder_manager: Arc<FolderManager>,
) -> Arc<UserManager> {
let workspace_service_impl = Arc::new(UserWorkspaceServiceImpl {
database_manager,
folder_manager,
});
UserManager::new(
server_provider,
store_preference,
Arc::downgrade(&collab_builder),
authenticate_user,
workspace_service_impl,
)
}
}
pub struct UserWorkspaceServiceImpl {
pub database_manager: Arc<DatabaseManager>,
pub folder_manager: Arc<FolderManager>,
}
#[async_trait]
impl UserWorkspaceService for UserWorkspaceServiceImpl {
async fn did_import_views(&self, views: Vec<ParentChildViews>) -> FlowyResult<()> {
self.folder_manager.insert_parent_child_views(views).await?;
Ok(())
}
async fn did_import_database_views(
&self,
ids_by_database_id: HashMap<String, Vec<String>>,
) -> FlowyResult<()> {
self
.database_manager
.track_database(ids_by_database_id)
.await?;
Ok(())
}
}

View File

@ -1,7 +1,7 @@
#![allow(unused_doc_comments)]
use std::sync::Arc;
use std::sync::Weak;
use std::time::Duration;
use tokio::sync::RwLock;
@ -13,9 +13,9 @@ use flowy_document::manager::DocumentManager;
use flowy_folder::manager::FolderManager;
use flowy_sqlite::kv::StorePreferences;
use flowy_storage::FileStorageService;
use flowy_user::services::authenticate_user::AuthenticateUser;
use flowy_user::services::entities::UserConfig;
use flowy_user::user_manager::UserManager;
use flowy_user_pub::cloud::UserCloudServiceProvider;
use lib_dispatch::prelude::*;
use lib_dispatch::runtime::AFPluginRuntime;
@ -118,18 +118,23 @@ impl AppFlowyCore {
config.device_id.clone(),
));
let user_manager = init_user_manager(
&config,
&store_preference,
server_provider.clone(),
Arc::downgrade(&collab_builder),
let user_config = UserConfig::new(
&config.name,
&config.storage_path,
&config.application_path,
&config.device_id,
);
let authenticate_user = Arc::new(AuthenticateUser::new(
user_config.clone(),
store_preference.clone(),
));
collab_builder
.set_snapshot_persistence(Arc::new(SnapshotDBImpl(Arc::downgrade(&user_manager))));
.set_snapshot_persistence(Arc::new(SnapshotDBImpl(Arc::downgrade(&authenticate_user))));
let database_manager = DatabaseDepsResolver::resolve(
Arc::downgrade(&user_manager),
Arc::downgrade(&authenticate_user),
task_dispatcher.clone(),
collab_builder.clone(),
server_provider.clone(),
@ -137,7 +142,7 @@ impl AppFlowyCore {
.await;
let document_manager = DocumentDepsResolver::resolve(
Arc::downgrade(&user_manager),
Arc::downgrade(&authenticate_user),
&database_manager,
collab_builder.clone(),
server_provider.clone(),
@ -145,7 +150,7 @@ impl AppFlowyCore {
);
let folder_manager = FolderDepsResolver::resolve(
Arc::downgrade(&user_manager),
Arc::downgrade(&authenticate_user),
&document_manager,
&database_manager,
collab_builder.clone(),
@ -153,6 +158,16 @@ impl AppFlowyCore {
)
.await;
let user_manager = UserDepsResolver::resolve(
authenticate_user,
collab_builder.clone(),
server_provider.clone(),
store_preference.clone(),
database_manager.clone(),
folder_manager.clone(),
)
.await;
(
user_manager,
folder_manager,
@ -216,26 +231,6 @@ impl AppFlowyCore {
}
}
fn init_user_manager(
config: &AppFlowyCoreConfig,
storage_preference: &Arc<StorePreferences>,
user_cloud_service_provider: Arc<dyn UserCloudServiceProvider>,
collab_builder: Weak<AppFlowyCollabBuilder>,
) -> Arc<UserManager> {
let user_config = UserConfig::new(
&config.name,
&config.storage_path,
&config.application_path,
&config.device_id,
);
UserManager::new(
user_config,
user_cloud_service_provider,
storage_preference.clone(),
collab_builder,
)
}
impl From<Server> for CollabPluginProviderType {
fn from(server_type: Server) -> Self {
match server_type {

View File

@ -35,7 +35,6 @@ use crate::services::share::csv::{CSVFormat, CSVImporter, ImportResult};
pub trait DatabaseUser: Send + Sync {
fn user_id(&self) -> Result<i64, FlowyError>;
fn token(&self) -> Result<Option<String>, FlowyError>;
fn collab_db(&self, uid: i64) -> Result<Weak<CollabKVDB>, FlowyError>;
}

View File

@ -29,7 +29,6 @@ use crate::reminder::DocumentReminderAction;
pub trait DocumentUserService: Send + Sync {
fn user_id(&self) -> Result<i64, FlowyError>;
fn workspace_id(&self) -> Result<String, FlowyError>;
fn token(&self) -> Result<Option<String>, FlowyError>; // unused now.
fn collab_db(&self, uid: i64) -> Result<Weak<CollabKVDB>, FlowyError>;
}

View File

@ -81,10 +81,6 @@ impl DocumentUserService for FakeUser {
Ok(Uuid::new_v4().to_string())
}
fn token(&self) -> Result<Option<String>, FlowyError> {
Ok(None)
}
fn collab_db(&self, _uid: i64) -> Result<std::sync::Weak<CollabKVDB>, FlowyError> {
Ok(Arc::downgrade(&self.collab_db))
}

View File

@ -3,6 +3,7 @@ use std::fmt::Debug;
use protobuf::ProtobufError;
use thiserror::Error;
use tokio::task::JoinError;
use validator::{ValidationError, ValidationErrors};
use flowy_derive::ProtoBuf;
@ -171,6 +172,12 @@ impl From<fancy_regex::Error> for FlowyError {
}
}
impl From<JoinError> for FlowyError {
fn from(e: JoinError) -> Self {
FlowyError::internal().with_context(e)
}
}
impl From<tokio::sync::oneshot::error::RecvError> for FlowyError {
fn from(e: tokio::sync::oneshot::error::RecvError) -> Self {
FlowyError::internal().with_context(e)

View File

@ -2,10 +2,16 @@ use crate::folder_builder::ParentChildViews;
use std::collections::HashMap;
pub enum ImportData {
AppFlowyDataFolder {
AppFlowyDataFolder { items: Vec<AppFlowyData> },
}
pub enum AppFlowyData {
Folder {
views: Vec<ParentChildViews>,
/// Used to update the [DatabaseViewTrackerList] when importing the database.
database_view_ids_by_database_id: HashMap<String, Vec<String>>,
},
CollabObject {
row_object_ids: Vec<String>,
document_object_ids: Vec<String>,
database_object_ids: Vec<String>,

View File

@ -3,7 +3,6 @@ use crate::entities::ViewLayoutPB;
use crate::share::{ImportParams, ImportType};
use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
use flowy_error::FlowyError;
use validator::Validate;
#[derive(Clone, Debug, ProtoBuf_Enum)]
pub enum ImportTypePB {
@ -84,13 +83,3 @@ impl TryInto<ImportParams> for ImportPB {
})
}
}
#[derive(ProtoBuf, Validate, Default)]
pub struct ImportAppFlowyDataPB {
#[pb(index = 1)]
#[validate(custom = "lib_infra::validator_fn::required_not_empty_str")]
pub path: String,
#[pb(index = 2, one_of)]
pub import_container_name: Option<String>,
}

View File

@ -329,16 +329,3 @@ pub(crate) async fn get_folder_snapshots_handler(
let snapshots = folder.get_folder_snapshots(&data.value, 10).await?;
data_result_ok(RepeatedFolderSnapshotPB { items: snapshots })
}
#[tracing::instrument(level = "debug", skip_all, err)]
pub async fn import_appflowy_data_folder_handler(
data: AFPluginData<ImportAppFlowyDataPB>,
folder: AFPluginState<Weak<FolderManager>>,
) -> Result<(), FlowyError> {
let folder = upgrade_folder(folder)?;
let data = data.try_into_inner()?;
folder
.import_appflowy_data(data.path, data.import_container_name)
.await?;
Ok(())
}

View File

@ -37,7 +37,6 @@ pub fn init(folder: Weak<FolderManager>) -> AFPlugin {
.event(FolderEvent::ReadRecentViews, read_recent_views_handler)
.event(FolderEvent::ToggleFavorite, toggle_favorites_handler)
.event(FolderEvent::UpdateRecentViews, update_recent_views_handler)
.event(FolderEvent::ImportAppFlowyDataFolder, import_appflowy_data_folder_handler)
}
#[derive(Clone, Copy, PartialEq, Eq, Debug, Display, Hash, ProtoBuf_Enum, Flowy_Event)]
@ -153,7 +152,4 @@ pub enum FolderEvent {
// used for add or remove recent views, like history
#[event(input = "UpdateRecentViewPayloadPB")]
UpdateRecentViews = 37,
#[event(input = "ImportAppFlowyDataPB")]
ImportAppFlowyDataFolder = 38,
}

View File

@ -12,7 +12,7 @@ use tracing::{error, event, info, instrument, Level};
use collab_integrate::collab_builder::{AppFlowyCollabBuilder, CollabBuilderConfig};
use collab_integrate::{CollabKVDB, CollabPersistenceConfig};
use flowy_error::{internal_error, ErrorCode, FlowyError, FlowyResult};
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_folder_pub::cloud::{gen_view_id, FolderCloudService};
use flowy_folder_pub::folder_builder::ParentChildViews;
use lib_infra::async_trait::async_trait;
@ -39,17 +39,7 @@ use crate::view_operation::{create_view, FolderOperationHandler, FolderOperation
#[async_trait]
pub trait FolderUser: Send + Sync {
fn user_id(&self) -> Result<i64, FlowyError>;
fn token(&self) -> Result<Option<String>, FlowyError>;
fn collab_db(&self, uid: i64) -> Result<Weak<CollabKVDB>, FlowyError>;
/// Import appflowy data from the given path.
/// If the container name is not empty, then the data will be imported to the given container.
/// Otherwise, the data will be imported to the current workspace.
async fn import_appflowy_data_folder(
&self,
path: &str,
container_name: Option<String>,
) -> Result<Vec<ParentChildViews>, FlowyError>;
}
pub struct FolderManager {
@ -289,6 +279,23 @@ impl FolderManager {
})
}
pub async fn insert_parent_child_views(
&self,
views: Vec<ParentChildViews>,
) -> Result<(), FlowyError> {
self.with_folder(
|| Err(FlowyError::internal().with_context("The folder is not initialized")),
|folder| {
for view in views {
insert_parent_child_views(folder, view);
}
Ok(())
},
)?;
Ok(())
}
pub async fn get_workspace_pb(&self) -> FlowyResult<WorkspacePB> {
let workspace_pb = {
let guard = self.mutex_folder.lock();
@ -838,35 +845,6 @@ impl FolderManager {
Ok(())
}
pub async fn import_appflowy_data(
&self,
path: String,
name: Option<String>,
) -> Result<(), FlowyError> {
let (tx, rx) = tokio::sync::oneshot::channel();
let folder = self.mutex_folder.clone();
let user = self.user.clone();
tokio::spawn(async move {
match user.import_appflowy_data_folder(&path, name).await {
Ok(views) => {
if let Some(folder) = &*folder.lock() {
for view in views {
insert_parent_child_views(folder, view);
}
}
let _ = tx.send(Ok(()));
},
Err(err) => {
let _ = tx.send(Err(err));
},
}
});
rx.await.map_err(internal_error)??;
Ok(())
}
pub(crate) async fn import(&self, import_data: ImportParams) -> FlowyResult<View> {
if import_data.data.is_none() && import_data.file_path.is_none() {
return Err(FlowyError::new(

View File

@ -1,5 +1,6 @@
use collab_folder::Folder;
use std::sync::Arc;
use tracing::{event, instrument};
use collab_integrate::CollabKVAction;
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
@ -32,7 +33,14 @@ pub(crate) fn workspace_data_not_sync_error(uid: i64, workspace_id: &str) -> Flo
})
}
#[instrument(level = "debug", skip(folder, view))]
pub(crate) fn insert_parent_child_views(folder: &Folder, view: ParentChildViews) {
event!(
tracing::Level::DEBUG,
"Inserting view: {}, view children: {}",
view.parent_view.id,
view.child_views.len()
);
folder.insert_view(view.parent_view, None);
for child_view in view.child_views {
insert_parent_child_views(folder, child_view);

View File

@ -61,8 +61,9 @@ where
}
fn sign_out(&self, _token: Option<String>) -> FutureResult<(), FlowyError> {
let try_get_client = self.server.try_get_client();
FutureResult::new(async move { Ok(try_get_client?.sign_out().await?) })
// Calling the sign_out method that will revoke all connected devices' refresh tokens.
// So do nothing here.
FutureResult::new(async move { Ok(()) })
}
fn generate_sign_in_url_with_email(&self, email: &str) -> FutureResult<String, FlowyError> {

View File

@ -13,6 +13,7 @@ const DB_NAME: &str = "cache.db";
/// [StorePreferences] uses a sqlite database to store key value pairs.
/// Most of the time, it used to storage AppFlowy configuration.
#[derive(Clone)]
pub struct StorePreferences {
database: Option<Database>,
}

View File

@ -7,6 +7,7 @@ use crate::sqlite_impl::{
pool::{ConnectionManager, ConnectionPool, PoolConfig},
};
#[derive(Clone)]
pub struct Database {
uri: String,
pool: Arc<ConnectionPool>,

View File

@ -1,4 +1,5 @@
pub mod cloud;
pub mod entities;
pub mod workspace_service;
pub const DEFAULT_USER_NAME: fn() -> String = || "Me".to_string();

View File

@ -0,0 +1,13 @@
use flowy_error::FlowyResult;
use flowy_folder_pub::folder_builder::ParentChildViews;
use lib_infra::async_trait::async_trait;
use std::collections::HashMap;
#[async_trait]
pub trait UserWorkspaceService: Send + Sync {
async fn did_import_views(&self, views: Vec<ParentChildViews>) -> FlowyResult<()>;
async fn did_import_database_views(
&self,
ids_by_database_id: HashMap<String, Vec<String>>,
) -> FlowyResult<()>;
}

View File

@ -19,27 +19,32 @@ use collab_integrate::{CollabKVAction, CollabKVDB, PersistenceError};
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_folder_pub::cloud::gen_view_id;
use crate::migrations::MigrationUser;
use crate::migrations::AnonUser;
use crate::services::entities::Session;
/// Migration the collab objects of the old user to new user. Currently, it only happens when
/// the user is a local user and try to use AppFlowy cloud service.
pub fn migration_anon_user_on_sign_up(
old_user: &MigrationUser,
old_user: &AnonUser,
old_collab_db: &Arc<CollabKVDB>,
new_user: &MigrationUser,
new_user_session: &Session,
new_collab_db: &Arc<CollabKVDB>,
) -> FlowyResult<()> {
new_collab_db
.with_write_txn(|new_collab_w_txn| {
let old_collab_r_txn = old_collab_db.read_txn();
let old_to_new_id_map = Arc::new(Mutex::new(OldToNewIdMap::new()));
migrate_user_awareness(old_to_new_id_map.lock().deref_mut(), old_user, new_user)?;
migrate_user_awareness(
old_to_new_id_map.lock().deref_mut(),
old_user,
new_user_session,
)?;
migrate_database_with_views_object(
&mut old_to_new_id_map.lock(),
old_user,
&old_collab_r_txn,
new_user,
new_user_session,
new_collab_w_txn,
)?;
@ -58,7 +63,7 @@ pub fn migration_anon_user_on_sign_up(
let collab_by_oid = make_collab_by_oid(old_user, &old_collab_r_txn, &object_ids);
migrate_databases(
&old_to_new_id_map,
new_user,
new_user_session,
new_collab_w_txn,
&mut object_ids,
&collab_by_oid,
@ -71,7 +76,7 @@ pub fn migration_anon_user_on_sign_up(
&mut old_to_new_id_map.lock(),
old_user,
&old_collab_r_txn,
new_user,
new_user_session,
new_collab_w_txn,
)?;
@ -82,7 +87,7 @@ pub fn migration_anon_user_on_sign_up(
tracing::debug!("migrate from: {}, to: {}", object_id, new_object_id,);
migrate_collab_object(
collab,
new_user.session.user_id,
new_user_session.user_id,
&new_object_id,
new_collab_w_txn,
);
@ -128,9 +133,9 @@ impl DerefMut for OldToNewIdMap {
fn migrate_database_with_views_object<'a, 'b, W, R>(
old_to_new_id_map: &mut OldToNewIdMap,
old_user: &MigrationUser,
old_user: &AnonUser,
old_collab_r_txn: &R,
new_user: &MigrationUser,
new_user_session: &Session,
new_collab_w_txn: &W,
) -> Result<(), PersistenceError>
where
@ -154,8 +159,8 @@ where
)
})?;
let new_uid = new_user.session.user_id;
let new_object_id = &new_user.session.user_workspace.database_view_tracker_id;
let new_uid = new_user_session.user_id;
let new_object_id = &new_user_session.user_workspace.database_view_tracker_id;
let array = DatabaseViewTrackerList::from_collab(&database_with_views_collab);
for database_view_tracker in array.get_all_database_tracker() {
@ -191,9 +196,9 @@ where
fn migrate_workspace_folder<'a, 'b, W, R>(
old_to_new_id_map: &mut OldToNewIdMap,
old_user: &MigrationUser,
old_user: &AnonUser,
old_collab_r_txn: &R,
new_user: &MigrationUser,
new_user_session: &Session,
new_collab_w_txn: &W,
) -> Result<(), PersistenceError>
where
@ -205,8 +210,8 @@ where
{
let old_uid = old_user.session.user_id;
let old_workspace_id = &old_user.session.user_workspace.id;
let new_uid = new_user.session.user_id;
let new_workspace_id = &new_user.session.user_workspace.id;
let new_uid = new_user_session.user_id;
let new_workspace_id = &new_user_session.user_workspace.id;
let old_folder_collab = Collab::new(old_uid, old_workspace_id, "phantom", vec![]);
old_folder_collab.with_origin_transact_mut(|txn| {
@ -317,11 +322,11 @@ where
fn migrate_user_awareness(
old_to_new_id_map: &mut OldToNewIdMap,
old_user: &MigrationUser,
new_user: &MigrationUser,
old_user: &AnonUser,
new_user_session: &Session,
) -> Result<(), PersistenceError> {
let old_uid = old_user.session.user_id;
let new_uid = new_user.session.user_id;
let new_uid = new_user_session.user_id;
tracing::debug!("migrate user awareness from: {}, to: {}", old_uid, new_uid);
old_to_new_id_map.insert(old_uid.to_string(), new_uid.to_string());
Ok(())
@ -329,7 +334,7 @@ fn migrate_user_awareness(
fn migrate_databases<'a, W>(
old_to_new_id_map: &Arc<Mutex<OldToNewIdMap>>,
new_user: &MigrationUser,
new_user_session: &Session,
new_collab_w_txn: &'a W,
object_ids: &mut Vec<String>,
collab_by_oid: &HashMap<String, Collab>,
@ -400,7 +405,7 @@ where
);
migrate_collab_object(
collab,
new_user.session.user_id,
new_user_session.user_id,
&new_object_id,
new_collab_w_txn,
);
@ -422,7 +427,7 @@ where
});
migrate_collab_object(
collab,
new_user.session.user_id,
new_user_session.user_id,
&new_object_id,
new_collab_w_txn,
);
@ -434,7 +439,7 @@ where
}
fn make_collab_by_oid<'a, R>(
old_user: &MigrationUser,
old_user: &AnonUser,
old_collab_r_txn: &R,
object_ids: &[String],
) -> HashMap<String, Collab>

View File

@ -1,47 +0,0 @@
use crate::migrations::MigrationUser;
use crate::services::data_import::importer::import_data;
use crate::services::data_import::{
import_appflowy_data_folder, upload_imported_data, ImportContext,
};
use collab_integrate::CollabKVDB;
use flowy_error::{internal_error, ErrorCode, FlowyError, FlowyResult};
use flowy_user_deps::cloud::UserCloudService;
use std::sync::Arc;
#[allow(dead_code)]
pub async fn migration_anon_user_on_appflowy_cloud_sign_up(
old_user: &MigrationUser,
old_collab_db: &Arc<CollabKVDB>,
new_user: &MigrationUser,
new_collab_db: &Arc<CollabKVDB>,
user_cloud_service: Arc<dyn UserCloudService>,
) -> FlowyResult<()> {
let import_context = ImportContext {
imported_session: old_user.session.clone(),
imported_collab_db: old_collab_db.clone(),
container_name: None,
};
let cloned_new_collab_db = new_collab_db.clone();
let import_data = tokio::task::spawn_blocking(move || {
import_appflowy_data_folder(
&new_user.session,
&new_user.session.user_workspace.id,
&cloned_new_collab_db,
import_context,
)
})
.await
.map_err(internal_error)??;
upload_imported_data(
new_user.session.user_id,
new_collab_db.clone(),
&new_user.session.user_workspace.id,
&new_user.user_profile.authenticator,
&import_data,
user_cloud_service,
)
.await?;
Ok(())
}

View File

@ -1,8 +1,5 @@
pub use migrate_anon_user_collab::*;
pub use sync_af_user_collab::*;
pub use sync_supabase_user_collab::*;
mod migrate_anon_user_collab;
// mod migrate_anon_user_to_appflowy_cloud;
mod sync_af_user_collab;
mod sync_supabase_user_collab;

View File

@ -1,379 +0,0 @@
use std::future::Future;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::Arc;
use anyhow::{anyhow, Error};
use collab::core::collab::MutexCollab;
use collab::preclude::Collab;
use collab_database::database::get_database_row_ids;
use collab_database::rows::database_row_document_id_from_row_id;
use collab_database::user::{get_all_database_view_trackers, DatabaseViewTracker};
use collab_entity::{CollabObject, CollabType};
use collab_folder::{Folder, View, ViewLayout};
use parking_lot::Mutex;
use collab_integrate::{CollabKVAction, CollabKVDB, PersistenceError};
use flowy_error::FlowyResult;
use flowy_user_pub::cloud::UserCloudService;
use crate::migrations::MigrationUser;
#[tracing::instrument(level = "info", skip_all, err)]
pub async fn sync_af_user_data_to_cloud(
user_service: Arc<dyn UserCloudService>,
device_id: &str,
new_user: &MigrationUser,
collab_db: &Arc<CollabKVDB>,
) -> FlowyResult<()> {
let workspace_id = new_user.session.user_workspace.id.clone();
let uid = new_user.session.user_id;
let folder = Arc::new(
sync_folder(
uid,
&workspace_id,
device_id,
collab_db,
user_service.clone(),
)
.await?,
);
let database_records = sync_database_views(
uid,
&workspace_id,
device_id,
&new_user.session.user_workspace.database_view_tracker_id,
collab_db,
user_service.clone(),
)
.await;
let synced_database = Arc::new(Mutex::new(vec![]));
let views = folder.lock().get_current_workspace_views();
for view in views {
let view_id = view.id.clone();
if let Err(err) = sync_view(
uid,
folder.clone(),
database_records.clone(),
workspace_id.to_string(),
device_id.to_string(),
view,
collab_db.clone(),
user_service.clone(),
synced_database.clone(),
)
.await
{
tracing::error!("🔴sync {} failed: {:?}", view_id, err);
}
}
tokio::task::yield_now().await;
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn sync_view(
uid: i64,
folder: Arc<MutexFolder>,
database_view_tracker: Vec<Arc<DatabaseViewTracker>>,
workspace_id: String,
device_id: String,
view: Arc<View>,
collab_db: Arc<CollabKVDB>,
user_service: Arc<dyn UserCloudService>,
synced_database: Arc<Mutex<Vec<String>>>,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + Sync>> {
Box::pin(async move {
let collab_type = collab_type_from_view_layout(&view.layout);
let object_id = object_id_from_view(&view, &database_view_tracker)?;
tracing::debug!(
"sync view: {:?}:{} with object_id: {}",
view.layout,
view.id,
object_id
);
let collab_object = CollabObject::new(
uid,
object_id,
collab_type,
workspace_id.to_string(),
device_id.clone(),
);
match view.layout {
ViewLayout::Document => {
let encode_v1 = get_collab_encode_v1(uid, &collab_object, &collab_db)?;
tracing::info!(
"sync object: {} with update: {}",
collab_object,
encode_v1.len()
);
user_service
.create_collab_object(&collab_object, encode_v1, false)
.await?;
},
ViewLayout::Grid | ViewLayout::Board | ViewLayout::Calendar => {
let is_synced = synced_database.lock().contains(&collab_object.object_id);
if !is_synced {
synced_database.lock().push(collab_object.object_id.clone());
let (database_encode_v1, row_ids) =
get_database_encode_v1(uid, &collab_object, &collab_db)?;
tracing::info!(
"sync object: {} with update: {}",
collab_object,
database_encode_v1.len()
);
user_service
.create_collab_object(&collab_object, database_encode_v1, false)
.await?;
// sync database's row
for row_id in row_ids {
tracing::debug!("sync row: {}", row_id);
let document_id = database_row_document_id_from_row_id(&row_id);
let database_row_collab_object = CollabObject::new(
uid,
row_id,
CollabType::DatabaseRow,
workspace_id.to_string(),
device_id.clone(),
);
let database_row_encode_v1 =
get_collab_encode_v1(uid, &database_row_collab_object, &collab_db)?;
tracing::info!(
"sync object: {} with update: {}",
database_row_collab_object,
database_row_encode_v1.len()
);
let _ = user_service
.create_collab_object(&database_row_collab_object, database_row_encode_v1, false)
.await;
let database_row_document = CollabObject::new(
uid,
document_id,
CollabType::Document,
workspace_id.to_string(),
device_id.to_string(),
);
// sync document in the row if exist
if let Ok(document_encode_v1) =
get_collab_encode_v1(uid, &database_row_document, &collab_db)
{
tracing::info!(
"sync database row document: {} with update: {}",
database_row_document,
document_encode_v1.len()
);
let _ = user_service
.create_collab_object(&database_row_document, document_encode_v1, false)
.await;
}
}
}
},
}
tokio::task::yield_now().await;
let child_views = folder.lock().views.get_views_belong_to(&view.id);
for child_view in child_views {
let cloned_child_view = child_view.clone();
if let Err(err) = Box::pin(sync_view(
uid,
folder.clone(),
database_view_tracker.clone(),
workspace_id.clone(),
device_id.to_string(),
child_view,
collab_db.clone(),
user_service.clone(),
synced_database.clone(),
))
.await
{
tracing::error!(
"🔴sync {:?}:{} failed: {:?}",
cloned_child_view.layout,
cloned_child_view.id,
err
)
}
tokio::task::yield_now().await;
}
Ok(())
})
}
fn get_collab_encode_v1(
uid: i64,
collab_object: &CollabObject,
collab_db: &Arc<CollabKVDB>,
) -> Result<Vec<u8>, PersistenceError> {
let collab = Collab::new(uid, &collab_object.object_id, "phantom", vec![]);
let _ = collab.with_origin_transact_mut(|txn| {
collab_db
.read_txn()
.load_doc_with_txn(uid, &collab_object.object_id, txn)
})?;
Ok(collab.encode_collab_v1().encode_to_bytes()?)
}
fn get_database_encode_v1(
uid: i64,
collab_object: &CollabObject,
collab_db: &Arc<CollabKVDB>,
) -> Result<(Vec<u8>, Vec<String>), PersistenceError> {
let collab = Collab::new(uid, &collab_object.object_id, "phantom", vec![]);
let _ = collab.with_origin_transact_mut(|txn| {
collab_db
.read_txn()
.load_doc_with_txn(uid, &collab_object.object_id, txn)
})?;
let row_ids = get_database_row_ids(&collab).unwrap_or_default();
Ok((collab.encode_collab_v1().encode_to_bytes()?, row_ids))
}
async fn sync_folder(
uid: i64,
workspace_id: &str,
device_id: &str,
collab_db: &Arc<CollabKVDB>,
user_service: Arc<dyn UserCloudService>,
) -> Result<MutexFolder, Error> {
let (folder, encode_v1) = {
let collab = Collab::new(uid, workspace_id, "phantom", vec![]);
// Use the temporary result to short the lifetime of the TransactionMut
collab.with_origin_transact_mut(|txn| {
collab_db
.read_txn()
.load_doc_with_txn(uid, workspace_id, txn)
})?;
let data = collab.encode_collab_v1().encode_to_bytes();
(
MutexFolder::new(Folder::open(
uid,
Arc::new(MutexCollab::from_collab(collab)),
None,
)?),
data,
)
};
let encode_v1 = encode_v1?;
let collab_object = CollabObject::new(
uid,
workspace_id.to_string(),
CollabType::Folder,
workspace_id.to_string(),
device_id.to_string(),
);
tracing::info!(
"sync object: {} with update: {}",
collab_object,
encode_v1.len()
);
if let Err(err) = user_service
.create_collab_object(&collab_object, encode_v1, true)
.await
{
tracing::error!("🔴sync folder failed: {:?}", err);
}
Ok(folder)
}
async fn sync_database_views(
uid: i64,
workspace_id: &str,
device_id: &str,
database_views_aggregate_id: &str,
collab_db: &Arc<CollabKVDB>,
user_service: Arc<dyn UserCloudService>,
) -> Vec<Arc<DatabaseViewTracker>> {
let collab_object = CollabObject::new(
uid,
database_views_aggregate_id.to_string(),
CollabType::WorkspaceDatabase,
workspace_id.to_string(),
device_id.to_string(),
);
// Use the temporary result to short the lifetime of the TransactionMut
let result = {
let collab = Collab::new(uid, database_views_aggregate_id, "phantom", vec![]);
collab
.with_origin_transact_mut(|txn| {
collab_db
.read_txn()
.load_doc_with_txn(uid, database_views_aggregate_id, txn)
})
.map(|_| {
(
get_all_database_view_trackers(&collab),
collab.encode_collab_v1().encode_to_bytes(),
)
})
};
if let Ok((records, encode_v1)) = result {
if let Ok(encode_v1) = encode_v1 {
let _ = user_service
.create_collab_object(&collab_object, encode_v1, false)
.await;
}
records.into_iter().map(Arc::new).collect()
} else {
vec![]
}
}
struct MutexFolder(Mutex<Folder>);
impl MutexFolder {
pub fn new(folder: Folder) -> Self {
Self(Mutex::new(folder))
}
}
impl Deref for MutexFolder {
type Target = Mutex<Folder>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
unsafe impl Sync for MutexFolder {}
unsafe impl Send for MutexFolder {}
fn collab_type_from_view_layout(view_layout: &ViewLayout) -> CollabType {
match view_layout {
ViewLayout::Document => CollabType::Document,
ViewLayout::Grid | ViewLayout::Board | ViewLayout::Calendar => CollabType::Database,
}
}
fn object_id_from_view(
view: &Arc<View>,
database_records: &[Arc<DatabaseViewTracker>],
) -> Result<String, Error> {
if view.layout.is_database() {
match database_records
.iter()
.find(|record| record.linked_views.contains(&view.id))
{
None => Err(anyhow!(
"🔴sync view: {} failed: no database for this view",
view.id
)),
Some(record) => Ok(record.database_id.clone()),
}
} else {
Ok(view.id.clone())
}
}

View File

@ -17,17 +17,17 @@ use collab_integrate::{CollabKVAction, CollabKVDB, PersistenceError};
use flowy_error::FlowyResult;
use flowy_user_pub::cloud::UserCloudService;
use crate::migrations::MigrationUser;
use crate::services::entities::Session;
#[tracing::instrument(level = "info", skip_all, err)]
pub async fn sync_supabase_user_data_to_cloud(
user_service: Arc<dyn UserCloudService>,
device_id: &str,
new_user: &MigrationUser,
new_user_session: &Session,
collab_db: &Arc<CollabKVDB>,
) -> FlowyResult<()> {
let workspace_id = new_user.session.user_workspace.id.clone();
let uid = new_user.session.user_id;
let workspace_id = new_user_session.user_workspace.id.clone();
let uid = new_user_session.user_id;
let folder = Arc::new(
sync_folder(
uid,
@ -43,7 +43,7 @@ pub async fn sync_supabase_user_data_to_cloud(
uid,
&workspace_id,
device_id,
&new_user.session.user_workspace.database_view_tracker_id,
&new_user_session.user_workspace.database_view_tracker_id,
collab_db,
user_service.clone(),
)

View File

@ -0,0 +1,12 @@
use flowy_derive::ProtoBuf;
use validator::Validate;
#[derive(ProtoBuf, Validate, Default)]
pub struct ImportAppFlowyDataPB {
#[pb(index = 1)]
#[validate(custom = "lib_infra::validator_fn::required_not_empty_str")]
pub path: String,
#[pb(index = 2, one_of)]
pub import_container_name: Option<String>,
}

View File

@ -1,4 +1,5 @@
pub use auth::*;
pub use import_data::*;
pub use realtime::*;
pub use reminder::*;
pub use user_profile::*;
@ -7,6 +8,7 @@ pub use workspace_member::*;
pub mod auth;
pub mod date_time;
mod import_data;
pub mod parser;
pub mod realtime;
mod reminder;

View File

@ -1,21 +1,20 @@
use std::sync::Weak;
use std::{convert::TryInto, sync::Arc};
use serde_json::Value;
use tracing::event;
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_sqlite::kv::StorePreferences;
use flowy_user_pub::cloud::UserCloudConfig;
use flowy_user_pub::entities::*;
use lib_dispatch::prelude::*;
use lib_infra::box_any::BoxAny;
use serde_json::Value;
use std::sync::Weak;
use std::{convert::TryInto, sync::Arc};
use tracing::event;
use crate::entities::*;
use crate::notification::{send_notification, UserNotification};
use crate::services::cloud_config::{
get_cloud_config, get_or_create_cloud_config, save_cloud_config,
};
use crate::services::data_import::get_appflowy_data_folder_import_context;
use crate::user_manager::UserManager;
fn upgrade_manager(manager: AFPluginState<Weak<UserManager>>) -> FlowyResult<Arc<UserManager>> {
@ -163,16 +162,13 @@ pub async fn get_appearance_setting(
match store_preferences.get_str(APPEARANCE_SETTING_CACHE_KEY) {
None => data_result_ok(AppearanceSettingsPB::default()),
Some(s) => {
let setting = match serde_json::from_str(&s) {
Ok(setting) => setting,
Err(e) => {
tracing::error!(
"Deserialize AppearanceSettings failed: {:?}, fallback to default",
e
);
AppearanceSettingsPB::default()
},
};
let setting = serde_json::from_str(&s).unwrap_or_else(|err| {
tracing::error!(
"Deserialize AppearanceSettings failed: {:?}, fallback to default",
err
);
AppearanceSettingsPB::default()
});
data_result_ok(setting)
},
}
@ -239,21 +235,41 @@ pub async fn get_notification_settings(
match store_preferences.get_str(NOTIFICATION_SETTINGS_CACHE_KEY) {
None => data_result_ok(NotificationSettingsPB::default()),
Some(s) => {
let setting = match serde_json::from_str(&s) {
Ok(setting) => setting,
Err(e) => {
tracing::error!(
"Deserialize NotificationSettings failed: {:?}, fallback to default",
e
);
NotificationSettingsPB::default()
},
};
let setting = serde_json::from_str(&s).unwrap_or_else(|e| {
tracing::error!(
"Deserialize NotificationSettings failed: {:?}, fallback to default",
e
);
NotificationSettingsPB::default()
});
data_result_ok(setting)
},
}
}
#[tracing::instrument(level = "debug", skip_all, err)]
pub async fn import_appflowy_data_folder_handler(
data: AFPluginData<ImportAppFlowyDataPB>,
manager: AFPluginState<Weak<UserManager>>,
) -> Result<(), FlowyError> {
let data = data.try_into_inner()?;
let (tx, rx) = tokio::sync::oneshot::channel();
af_spawn(async move {
let result = async {
let manager = upgrade_manager(manager)?;
let context = get_appflowy_data_folder_import_context(&data.path)
.map_err(|err| FlowyError::new(ErrorCode::AppFlowyDataFolderImportError, err.to_string()))?
.with_container_name(data.import_container_name);
manager.import_appflowy_data_folder(context).await?;
Ok::<(), FlowyError>(())
}
.await;
let _ = tx.send(result);
});
rx.await??;
Ok(())
}
#[tracing::instrument(level = "debug", skip_all, err)]
pub async fn get_user_setting(
manager: AFPluginState<Weak<UserManager>>,

View File

@ -52,7 +52,8 @@ pub fn init(user_session: Weak<UserManager>) -> AFPlugin {
.event(UserEvent::SetDateTimeSettings, set_date_time_settings)
.event(UserEvent::GetDateTimeSettings, get_date_time_settings)
.event(UserEvent::SetNotificationSettings, set_notification_settings)
.event(UserEvent::GetNotificationSettings, get_notification_settings)
.event(UserEvent::GetNotificationSettings, get_notification_settings)
.event(UserEvent::ImportAppFlowyDataFolder, import_appflowy_data_folder_handler)
// Workspace member
.event(UserEvent::AddWorkspaceMember, add_workspace_member_handler)
.event(UserEvent::RemoveWorkspaceMember, delete_workspace_member_handler)
@ -187,6 +188,9 @@ pub enum UserEvent {
#[event(output = "QueryWorkspacePB")]
GetWorkspaceMember = 40,
#[event(input = "ImportAppFlowyDataPB")]
ImportAppFlowyDataFolder = 41,
}
pub trait UserStatusCallback: Send + Sync + 'static {

View File

@ -1,24 +0,0 @@
use crate::services::db::UserDB;
use crate::services::sqlite_sql::user_sql::vacuum_database;
use flowy_sqlite::kv::StorePreferences;
use std::sync::Arc;
use tracing::{error, info};
const SQLITE_VACUUM_04: &str = "sqlite_vacuum_04";
pub fn vacuum_database_if_need(
uid: i64,
user_db: &Arc<UserDB>,
store_preferences: &Arc<StorePreferences>,
) {
if !store_preferences.get_bool(SQLITE_VACUUM_04) {
let _ = store_preferences.set_bool(SQLITE_VACUUM_04, true);
if let Ok(conn) = user_db.get_connection(uid) {
info!("vacuum database 04");
if let Err(err) = vacuum_database(conn) {
error!("vacuum database error: {:?}", err);
}
}
}
}

View File

@ -1,8 +1,5 @@
use flowy_user_pub::entities::UserProfile;
use crate::services::entities::Session;
pub mod database_vacuum;
pub mod document_empty_content;
pub mod migration;
pub mod session_migration;
@ -11,7 +8,6 @@ pub mod workspace_and_favorite_v1;
pub mod workspace_trash_v1;
#[derive(Clone, Debug)]
pub struct MigrationUser {
pub user_profile: UserProfile,
pub struct AnonUser {
pub session: Session,
}

View File

@ -0,0 +1,121 @@
use crate::migrations::session_migration::migrate_session_with_user_uuid;
use crate::services::db::UserDB;
use crate::services::entities::{Session, UserConfig, UserPaths};
use crate::services::sqlite_sql::user_sql::vacuum_database;
use collab_integrate::CollabKVDB;
use flowy_error::{internal_error, ErrorCode, FlowyError, FlowyResult};
use flowy_sqlite::kv::StorePreferences;
use flowy_sqlite::DBConnection;
use std::sync::{Arc, Weak};
use tracing::{debug, error, info};
const SQLITE_VACUUM_042: &str = "sqlite_vacuum_042_version";
pub struct AuthenticateUser {
pub(crate) user_config: UserConfig,
pub(crate) database: Arc<UserDB>,
pub(crate) user_paths: UserPaths,
store_preferences: Arc<StorePreferences>,
session: Arc<parking_lot::RwLock<Option<Session>>>,
}
impl AuthenticateUser {
pub fn new(user_config: UserConfig, store_preferences: Arc<StorePreferences>) -> Self {
let user_paths = UserPaths::new(user_config.storage_path.clone());
let database = Arc::new(UserDB::new(user_paths.clone()));
let session = Arc::new(parking_lot::RwLock::new(None));
*session.write() =
migrate_session_with_user_uuid(&user_config.session_cache_key, &store_preferences);
Self {
user_config,
database,
user_paths,
store_preferences,
session,
}
}
pub fn vacuum_database_if_need(&self) {
if !self.store_preferences.get_bool(SQLITE_VACUUM_042) {
if let Ok(session) = self.get_session() {
let _ = self.store_preferences.set_bool(SQLITE_VACUUM_042, true);
if let Ok(conn) = self.database.get_connection(session.user_id) {
info!("vacuum database 042");
if let Err(err) = vacuum_database(conn) {
error!("vacuum database error: {:?}", err);
}
}
}
}
}
pub fn user_id(&self) -> FlowyResult<i64> {
let session = self.get_session()?;
Ok(session.user_id)
}
pub fn workspace_id(&self) -> FlowyResult<String> {
let session = self.get_session()?;
Ok(session.user_workspace.id)
}
pub fn get_collab_db(&self, uid: i64) -> FlowyResult<Weak<CollabKVDB>> {
self
.database
.get_collab_db(uid)
.map(|collab_db| Arc::downgrade(&collab_db))
}
pub fn get_sqlite_connection(&self, uid: i64) -> FlowyResult<DBConnection> {
self.database.get_connection(uid)
}
pub fn close_db(&self) -> FlowyResult<()> {
let session = self.get_session()?;
info!("Close db for user: {}", session.user_id);
self.database.close(session.user_id)?;
Ok(())
}
pub fn set_session(&self, session: Option<Session>) -> Result<(), FlowyError> {
debug!("Set current user session: {:?}", session);
match &session {
None => {
self.session.write().take();
self
.store_preferences
.remove(self.user_config.session_cache_key.as_ref());
Ok(())
},
Some(session) => {
self.session.write().replace(session.clone());
self
.store_preferences
.set_object(&self.user_config.session_cache_key, session.clone())
.map_err(internal_error)?;
Ok(())
},
}
}
pub fn get_session(&self) -> FlowyResult<Session> {
if let Some(session) = (self.session.read()).clone() {
return Ok(session);
}
match self
.store_preferences
.get_object::<Session>(&self.user_config.session_cache_key)
{
None => Err(FlowyError::new(
ErrorCode::RecordNotFound,
"User is not logged in",
)),
Some(session) => {
self.session.write().replace(session.clone());
Ok(session)
},
}
}
}

View File

@ -22,7 +22,7 @@ use collab_folder::{Folder, UserId, View, ViewIdentifier, ViewLayout};
use collab_integrate::{CollabKVAction, CollabKVDB, PersistenceError};
use flowy_error::{internal_error, FlowyError};
use flowy_folder_pub::cloud::gen_view_id;
use flowy_folder_pub::entities::ImportData;
use flowy_folder_pub::entities::{AppFlowyData, ImportData};
use flowy_folder_pub::folder_builder::{ParentChildViews, ViewBuilder};
use flowy_sqlite::kv::StorePreferences;
use flowy_user_pub::cloud::{UserCloudService, UserCollabParams};
@ -30,6 +30,7 @@ use flowy_user_pub::entities::Authenticator;
use parking_lot::{Mutex, RwLock};
use std::collections::{HashMap, HashSet};
use std::ops::{Deref, DerefMut};
use std::path::Path;
use std::sync::Arc;
use tracing::{debug, error, event, info, instrument};
@ -47,6 +48,9 @@ impl ImportContext {
}
pub(crate) fn get_appflowy_data_folder_import_context(path: &str) -> anyhow::Result<ImportContext> {
if !Path::new(path).exists() {
return Err(anyhow!("The path: {} is not exist", path));
}
let user_paths = UserPaths::new(path.to_string());
let other_store_preferences = Arc::new(StorePreferences::new(path)?);
migrate_session_with_user_uuid("appflowy_session_cache", &other_store_preferences);
@ -59,10 +63,12 @@ pub(crate) fn get_appflowy_data_folder_import_context(path: &str) -> anyhow::Res
let collab_db_path = user_paths.collab_db_path(imported_session.user_id);
let sqlite_db_path = user_paths.sqlite_db_path(imported_session.user_id);
let imported_sqlite_db = flowy_sqlite::init(sqlite_db_path).map_err(|e| {
FlowyError::internal().with_context(format!("open import sqlite db failed, {:?}", e))
})?;
let imported_collab_db = Arc::new(CollabKVDB::open(collab_db_path)?);
let imported_sqlite_db = flowy_sqlite::init(sqlite_db_path)
.map_err(|err| anyhow!("open import collab db failed: {:?}", err))?;
let imported_collab_db = Arc::new(
CollabKVDB::open(collab_db_path)
.map_err(|err| anyhow!("open import collab db failed: {:?}", err))?,
);
let imported_user = select_user_profile(
imported_session.user_id,
imported_sqlite_db.get_connection()?,
@ -234,11 +240,17 @@ pub(crate) fn import_appflowy_data_folder(
}
})?;
Ok(ImportData::AppFlowyDataFolder {
views,
database_view_ids_by_database_id,
row_object_ids: row_object_ids.into_inner().into_iter().collect(),
database_object_ids: database_object_ids.into_inner().into_iter().collect(),
document_object_ids: document_object_ids.into_inner().into_iter().collect(),
items: vec![
AppFlowyData::Folder {
views,
database_view_ids_by_database_id,
},
AppFlowyData::CollabObject {
row_object_ids: row_object_ids.into_inner().into_iter().collect(),
database_object_ids: database_object_ids.into_inner().into_iter().collect(),
document_object_ids: document_object_ids.into_inner().into_iter().collect(),
},
],
})
}
@ -604,12 +616,12 @@ impl DerefMut for OldToNewIdMap {
}
#[instrument(level = "debug", skip_all)]
pub async fn upload_imported_data(
pub async fn upload_collab_objects_data(
uid: i64,
user_collab_db: Arc<CollabKVDB>,
workspace_id: &str,
user_authenticator: &Authenticator,
import_data: &ImportData,
appflowy_data: AppFlowyData,
user_cloud_service: Arc<dyn UserCloudService>,
) -> Result<(), FlowyError> {
// Only support uploading the collab data when the current server is AppFlowy Cloud server
@ -617,73 +629,69 @@ pub async fn upload_imported_data(
return Ok(());
}
let (row_object_ids, document_object_ids, database_object_ids) = match import_data {
ImportData::AppFlowyDataFolder {
views: _,
database_view_ids_by_database_id: _,
match appflowy_data {
AppFlowyData::Folder { .. } => {},
AppFlowyData::CollabObject {
row_object_ids,
document_object_ids,
database_object_ids,
} => (
row_object_ids.clone(),
document_object_ids.clone(),
database_object_ids.clone(),
),
};
} => {
let object_by_collab_type = tokio::task::spawn_blocking(move || {
let collab_read = user_collab_db.read_txn();
let mut object_by_collab_type = HashMap::new();
let object_by_collab_type = tokio::task::spawn_blocking(move || {
let collab_read = user_collab_db.read_txn();
let mut object_by_collab_type = HashMap::new();
event!(tracing::Level::DEBUG, "upload database collab data");
object_by_collab_type.insert(
CollabType::Database,
load_and_process_collab_data(uid, &collab_read, &database_object_ids),
);
event!(tracing::Level::DEBUG, "upload database collab data");
object_by_collab_type.insert(
CollabType::Database,
load_and_process_collab_data(uid, &collab_read, &database_object_ids),
);
event!(tracing::Level::DEBUG, "upload document collab data");
object_by_collab_type.insert(
CollabType::Document,
load_and_process_collab_data(uid, &collab_read, &document_object_ids),
);
event!(tracing::Level::DEBUG, "upload document collab data");
object_by_collab_type.insert(
CollabType::Document,
load_and_process_collab_data(uid, &collab_read, &document_object_ids),
);
event!(tracing::Level::DEBUG, "upload database row collab data");
object_by_collab_type.insert(
CollabType::DatabaseRow,
load_and_process_collab_data(uid, &collab_read, &row_object_ids),
);
event!(tracing::Level::DEBUG, "upload database row collab data");
object_by_collab_type.insert(
CollabType::DatabaseRow,
load_and_process_collab_data(uid, &collab_read, &row_object_ids),
);
object_by_collab_type
})
.await
.map_err(internal_error)?;
object_by_collab_type
})
.await
.map_err(internal_error)?;
// Upload
let mut size_counter = 0;
let mut objects: Vec<UserCollabParams> = vec![];
for (collab_type, encoded_collab_by_oid) in object_by_collab_type {
for (oid, encoded_collab) in encoded_collab_by_oid {
let obj_size = encoded_collab.len();
// Add the current object to the batch.
objects.push(UserCollabParams {
object_id: oid,
encoded_collab,
collab_type: collab_type.clone(),
});
size_counter += obj_size;
}
}
// Upload
let mut size_counter = 0;
let mut objects: Vec<UserCollabParams> = vec![];
for (collab_type, encoded_collab_by_oid) in object_by_collab_type {
for (oid, encoded_collab) in encoded_collab_by_oid {
let obj_size = encoded_collab.len();
// Add the current object to the batch.
objects.push(UserCollabParams {
object_id: oid,
encoded_collab,
collab_type: collab_type.clone(),
});
size_counter += obj_size;
}
if !objects.is_empty() {
batch_create(
uid,
workspace_id,
&user_cloud_service,
&size_counter,
objects,
)
.await;
}
},
}
if !objects.is_empty() {
batch_create(
uid,
workspace_id,
&user_cloud_service,
&size_counter,
objects,
)
.await;
}
Ok(())
}

View File

@ -9,13 +9,6 @@ use flowy_folder_pub::entities::ImportData;
use std::sync::Arc;
use tracing::instrument;
pub enum ImportDataSource {
AppFlowyDataFolder {
path: String,
container_name: Option<String>,
},
}
/// Import appflowy data from the given path.
/// If the container name is not empty, then the data will be imported to the given container.
/// Otherwise, the data will be imported to the current workspace.

View File

@ -3,4 +3,3 @@ pub use appflowy_data_import::*;
pub(crate) mod importer;
pub use importer::load_collab_by_oid;
pub use importer::ImportDataSource;

View File

@ -139,6 +139,7 @@ impl From<Authenticator> for AuthenticatorPB {
}
pub const URL_SAFE_ENGINE: GeneralPurpose = GeneralPurpose::new(&URL_SAFE, PAD);
#[derive(Clone)]
pub struct UserConfig {
/// Used to store the user data
pub storage_path: String,

View File

@ -1,3 +1,4 @@
pub mod authenticate_user;
pub mod cloud_config;
pub mod collab_interact;
pub mod data_import;

View File

@ -1,15 +1,6 @@
use std::string::ToString;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, Weak};
use collab_user::core::MutexUserAwareness;
use serde_json::Value;
use tokio::sync::{Mutex, RwLock};
use tokio_stream::StreamExt;
use tracing::{debug, error, event, info, instrument};
use collab_integrate::collab_builder::AppFlowyCollabBuilder;
use collab_integrate::CollabKVDB;
use collab_user::core::MutexUserAwareness;
use flowy_error::{internal_error, ErrorCode, FlowyResult};
use flowy_folder_pub::entities::ImportData;
use flowy_server_pub::AuthenticatorType;
@ -19,30 +10,33 @@ use flowy_sqlite::ConnectionPool;
use flowy_sqlite::{query_dsl::*, DBConnection, ExpressionMethods};
use flowy_user_pub::cloud::{UserCloudServiceProvider, UserUpdate};
use flowy_user_pub::entities::*;
use flowy_user_pub::workspace_service::UserWorkspaceService;
use serde_json::Value;
use std::string::ToString;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, Weak};
use tokio::sync::{Mutex, RwLock};
use tokio_stream::StreamExt;
use tracing::{debug, error, event, info, instrument};
use lib_dispatch::prelude::af_spawn;
use lib_infra::box_any::BoxAny;
use crate::anon_user::{
migration_anon_user_on_sign_up, sync_af_user_data_to_cloud, sync_supabase_user_data_to_cloud,
};
use crate::anon_user::{migration_anon_user_on_sign_up, sync_supabase_user_data_to_cloud};
use crate::entities::{AuthStateChangedPB, AuthStatePB, UserProfilePB, UserSettingPB};
use crate::event_map::{DefaultUserStatusCallback, UserStatusCallback};
use crate::migrations::database_vacuum::vacuum_database_if_need;
use crate::migrations::document_empty_content::HistoricalEmptyDocumentMigration;
use crate::migrations::migration::{UserDataMigration, UserLocalDataMigration};
use crate::migrations::session_migration::migrate_session_with_user_uuid;
use crate::migrations::workspace_and_favorite_v1::FavoriteV1AndWorkspaceArrayMigration;
use crate::migrations::workspace_trash_v1::WorkspaceTrashMapToSectionMigration;
use crate::migrations::MigrationUser;
use crate::migrations::AnonUser;
use crate::services::authenticate_user::AuthenticateUser;
use crate::services::cloud_config::get_cloud_config;
use crate::services::collab_interact::{CollabInteract, DefaultCollabInteract};
use crate::services::data_import::importer::{import_data, ImportDataSource};
use crate::services::data_import::{
get_appflowy_data_folder_import_context, upload_imported_data, ImportContext,
};
use crate::services::db::UserDB;
use crate::services::entities::{Session, UserConfig, UserPaths};
use crate::services::data_import::importer::import_data;
use crate::services::data_import::ImportContext;
use crate::services::entities::Session;
use crate::services::sqlite_sql::user_sql::{select_user_profile, UserTable, UserTableChangeset};
use crate::user_manager::manager_user_awareness::UserAwarenessDataSource;
use crate::user_manager::manager_user_encryption::validate_encryption_sign;
@ -51,41 +45,31 @@ use crate::user_manager::user_login_state::UserAuthProcess;
use crate::{errors::FlowyError, notification::*};
pub struct UserManager {
database: Arc<UserDB>,
user_paths: UserPaths,
pub(crate) user_config: UserConfig,
pub(crate) cloud_services: Arc<dyn UserCloudServiceProvider>,
pub(crate) store_preferences: Arc<StorePreferences>,
pub(crate) user_awareness: Arc<Mutex<Option<MutexUserAwareness>>>,
pub(crate) user_status_callback: RwLock<Arc<dyn UserStatusCallback>>,
pub(crate) collab_builder: Weak<AppFlowyCollabBuilder>,
pub(crate) collab_interact: RwLock<Arc<dyn CollabInteract>>,
pub(crate) user_workspace_service: Arc<dyn UserWorkspaceService>,
auth_process: Mutex<Option<UserAuthProcess>>,
current_session: Arc<parking_lot::RwLock<Option<Session>>>,
pub(crate) authenticate_user: Arc<AuthenticateUser>,
refresh_user_profile_since: AtomicI64,
}
impl UserManager {
pub fn new(
user_config: UserConfig,
cloud_services: Arc<dyn UserCloudServiceProvider>,
store_preferences: Arc<StorePreferences>,
collab_builder: Weak<AppFlowyCollabBuilder>,
authenticate_user: Arc<AuthenticateUser>,
user_workspace_service: Arc<dyn UserWorkspaceService>,
) -> Arc<Self> {
let user_paths = UserPaths::new(user_config.storage_path.clone());
let database = Arc::new(UserDB::new(user_paths.clone()));
let user_status_callback: RwLock<Arc<dyn UserStatusCallback>> =
RwLock::new(Arc::new(DefaultUserStatusCallback));
let current_session = Arc::new(parking_lot::RwLock::new(None));
*current_session.write() =
migrate_session_with_user_uuid(&user_config.session_cache_key, &store_preferences);
let refresh_user_profile_since = AtomicI64::new(0);
let user_manager = Arc::new(Self {
database,
user_paths,
user_config,
cloud_services,
store_preferences,
user_awareness: Arc::new(Default::default()),
@ -93,8 +77,9 @@ impl UserManager {
collab_builder,
collab_interact: RwLock::new(Arc::new(DefaultCollabInteract)),
auth_process: Default::default(),
current_session,
authenticate_user,
refresh_user_profile_since,
user_workspace_service,
});
let weak_user_manager = Arc::downgrade(&user_manager);
@ -116,11 +101,8 @@ impl UserManager {
}
pub fn close_db(&self) {
if let Ok(session) = self.get_session() {
info!("Close db for user: {}", session.user_id);
if let Err(err) = self.database.close(session.user_id) {
error!("Close db failed: {:?}", err);
}
if let Err(err) = self.authenticate_user.close_db() {
error!("Close db failed: {:?}", err);
}
}
@ -208,17 +190,18 @@ impl UserManager {
// Do the user data migration if needed
event!(tracing::Level::INFO, "Prepare user data migration");
match (
self.database.get_collab_db(session.user_id),
self.database.get_pool(session.user_id),
self
.authenticate_user
.database
.get_collab_db(session.user_id),
self.authenticate_user.database.get_pool(session.user_id),
) {
(Ok(collab_db), Ok(sqlite_pool)) => {
run_collab_data_migration(&session, &user, collab_db, sqlite_pool);
},
_ => error!("Failed to get collab db or sqlite pool"),
}
vacuum_database_if_need(session.user_id, &self.database, &self.store_preferences);
self.authenticate_user.vacuum_database_if_need();
let cloud_config = get_cloud_config(session.user_id, &self.store_preferences);
if let Err(e) = user_status_callback
.did_init(
@ -226,7 +209,7 @@ impl UserManager {
&user.authenticator,
&cloud_config,
&session.user_workspace,
&self.user_config.device_id,
&self.authenticate_user.user_config.device_id,
)
.await
{
@ -240,16 +223,21 @@ impl UserManager {
Ok(())
}
pub fn get_session(&self) -> FlowyResult<Session> {
self.authenticate_user.get_session()
}
pub fn db_connection(&self, uid: i64) -> Result<DBConnection, FlowyError> {
self.database.get_connection(uid)
self.authenticate_user.database.get_connection(uid)
}
pub fn db_pool(&self, uid: i64) -> Result<Arc<ConnectionPool>, FlowyError> {
self.database.get_pool(uid)
self.authenticate_user.database.get_pool(uid)
}
pub fn get_collab_db(&self, uid: i64) -> Result<Weak<CollabKVDB>, FlowyError> {
self
.authenticate_user
.database
.get_collab_db(uid)
.map(|collab_db| Arc::downgrade(&collab_db))
@ -257,7 +245,7 @@ impl UserManager {
#[cfg(debug_assertions)]
pub fn get_collab_backup_list(&self, uid: i64) -> Vec<String> {
self.database.get_collab_backup_list(uid)
self.authenticate_user.database.get_collab_backup_list(uid)
}
/// Performs a user sign-in, initializing user awareness and sending relevant notifications.
@ -301,7 +289,7 @@ impl UserManager {
.did_sign_in(
user_profile.uid,
&latest_workspace,
&self.user_config.device_id,
&self.authenticate_user.user_config.device_id,
)
.await
{
@ -375,7 +363,7 @@ impl UserManager {
async fn continue_sign_up(
&self,
new_user_profile: &UserProfile,
migration_user: Option<MigrationUser>,
migration_user: Option<AnonUser>,
response: AuthResponse,
authenticator: &Authenticator,
) -> FlowyResult<()> {
@ -390,40 +378,6 @@ impl UserManager {
self
.save_auth_data(&response, authenticator, &new_session)
.await?;
if response.is_new_user {
if let Some(old_user) = migration_user {
event!(
tracing::Level::INFO,
"Migrate anon user data from {:?} to {:?}",
old_user.user_profile.uid,
new_user_profile.uid
);
self
.migrate_anon_user_data_to_cloud(
&old_user,
&MigrationUser {
user_profile: new_user_profile.clone(),
session: new_session.clone(),
},
authenticator,
)
.await?;
// let old_collab_db = self.database.get_collab_db(old_user.session.user_id)?;
// self
// .import_appflowy_data_with_context(ImportContext {
// imported_session: old_user.session.clone(),
// imported_collab_db: old_collab_db,
// container_name: None,
// })
// .await?;
self.remove_anon_user();
let _ = self.database.close(old_user.session.user_id);
}
}
self
.user_status_callback
.read()
@ -432,7 +386,7 @@ impl UserManager {
response.is_new_user,
new_user_profile,
&new_session.user_workspace,
&self.user_config.device_id,
&self.authenticate_user.user_config.device_id,
)
.await?;
@ -440,6 +394,25 @@ impl UserManager {
.initialize_user_awareness(&new_session, user_awareness_source)
.await;
if response.is_new_user {
if let Some(old_user) = migration_user {
event!(
tracing::Level::INFO,
"Migrate anon user data from {:?} to {:?}",
old_user.session.user_id,
new_user_profile.uid
);
self
.migrate_anon_user_data_to_cloud(&old_user, &new_session, authenticator)
.await?;
self.remove_anon_user();
let _ = self
.authenticate_user
.database
.close(old_user.session.user_id);
}
}
send_auth_state_notification(AuthStateChangedPB {
state: AuthStatePB::AuthStateSignIn,
message: "Sign up success".to_string(),
@ -450,8 +423,9 @@ impl UserManager {
#[tracing::instrument(level = "info", skip(self))]
pub async fn sign_out(&self) -> Result<(), FlowyError> {
if let Ok(session) = self.get_session() {
self.database.close(session.user_id)?;
self.set_session(None)?;
let _ = remove_user_token(session.user_id, self.db_pool(session.user_id)?);
self.authenticate_user.database.close(session.user_id)?;
self.authenticate_user.set_session(None)?;
let server = self.cloud_services.get_user_service()?;
if let Err(err) = server.sign_out(None).await {
@ -488,7 +462,7 @@ impl UserManager {
}
pub async fn prepare_user(&self, session: &Session) {
let _ = self.database.close(session.user_id);
let _ = self.authenticate_user.database.close(session.user_id);
self.set_collab_config(session);
}
@ -499,6 +473,7 @@ impl UserManager {
// data. This backup should be in the form of a zip file and stored locally on the user's disk
// for safety and data integrity purposes
self
.authenticate_user
.database
.backup_or_restore(session.user_id, &session.user_workspace.id);
}
@ -536,7 +511,11 @@ impl UserManager {
validate_encryption_sign(old_user_profile, &new_user_profile.encryption_type.sign());
// Save the new user profile
let changeset = UserTableChangeset::from_user_profile(new_user_profile);
let _ = upsert_user_profile_change(uid, self.database.get_pool(uid)?, changeset);
let _ = upsert_user_profile_change(
uid,
self.authenticate_user.database.get_pool(uid)?,
changeset,
);
}
Ok(())
},
@ -564,7 +543,7 @@ impl UserManager {
#[instrument(level = "info", skip_all)]
pub fn user_dir(&self, uid: i64) -> String {
self.user_paths.user_data_dir(uid)
self.authenticate_user.user_paths.user_data_dir(uid)
}
pub fn user_setting(&self) -> Result<UserSettingPB, FlowyError> {
@ -625,67 +604,6 @@ impl UserManager {
}
}
/// Returns the current user session.
pub fn get_session(&self) -> Result<Session, FlowyError> {
if let Some(session) = (self.current_session.read()).clone() {
return Ok(session);
}
match self
.store_preferences
.get_object::<Session>(&self.user_config.session_cache_key)
{
None => Err(FlowyError::new(
ErrorCode::RecordNotFound,
"User is not logged in",
)),
Some(session) => {
self.current_session.write().replace(session.clone());
Ok(session)
},
}
}
pub async fn import_data_from_source(
&self,
source: ImportDataSource,
) -> Result<ImportData, FlowyError> {
match source {
ImportDataSource::AppFlowyDataFolder {
path,
container_name,
} => {
let context = get_appflowy_data_folder_import_context(&path)
.map_err(|err| {
FlowyError::new(ErrorCode::AppFlowyDataFolderImportError, err.to_string())
})?
.with_container_name(container_name);
self.import_appflowy_data(context).await
},
}
}
pub(crate) fn set_session(&self, session: Option<Session>) -> Result<(), FlowyError> {
debug!("Set current user session: {:?}", session);
match &session {
None => {
self.current_session.write().take();
self
.store_preferences
.remove(self.user_config.session_cache_key.as_ref());
Ok(())
},
Some(session) => {
self.current_session.write().replace(session.clone());
self
.store_preferences
.set_object(&self.user_config.session_cache_key, session.clone())
.map_err(internal_error)?;
Ok(())
},
}
}
pub(crate) async fn generate_sign_in_url_with_email(
&self,
authenticator: &Authenticator,
@ -731,7 +649,7 @@ impl UserManager {
save_user_workspaces(uid, self.db_pool(uid)?, response.user_workspaces())?;
event!(tracing::Level::INFO, "Save new user profile to disk");
self.set_session(Some(session.clone()))?;
self.authenticate_user.set_session(Some(session.clone()))?;
self
.save_user(uid, (user_profile, authenticator.clone()).into())
.await?;
@ -765,20 +683,26 @@ impl UserManager {
async fn migrate_anon_user_data_to_cloud(
&self,
old_user: &MigrationUser,
new_user: &MigrationUser,
old_user: &AnonUser,
new_user_session: &Session,
authenticator: &Authenticator,
) -> Result<(), FlowyError> {
let old_collab_db = self.database.get_collab_db(old_user.session.user_id)?;
let new_collab_db = self.database.get_collab_db(new_user.session.user_id)?;
let old_collab_db = self
.authenticate_user
.database
.get_collab_db(old_user.session.user_id)?;
let new_collab_db = self
.authenticate_user
.database
.get_collab_db(new_user_session.user_id)?;
match authenticator {
Authenticator::Supabase => {
migration_anon_user_on_sign_up(old_user, &old_collab_db, new_user, &new_collab_db)?;
migration_anon_user_on_sign_up(old_user, &old_collab_db, new_user_session, &new_collab_db)?;
if let Err(err) = sync_supabase_user_data_to_cloud(
self.cloud_services.get_user_service()?,
&self.user_config.device_id,
new_user,
&self.authenticate_user.user_config.device_id,
new_user_session,
&new_collab_db,
)
.await
@ -787,17 +711,9 @@ impl UserManager {
}
},
Authenticator::AppFlowyCloud => {
migration_anon_user_on_sign_up(old_user, &old_collab_db, new_user, &new_collab_db)?;
if let Err(err) = sync_af_user_data_to_cloud(
self.cloud_services.get_user_service()?,
&self.user_config.device_id,
new_user,
&new_collab_db,
)
.await
{
error!("Sync user data to cloud failed: {:?}", err);
}
self
.migration_anon_user_on_appflowy_cloud_sign_up(old_user, &old_collab_db)
.await?;
},
_ => {},
}
@ -805,35 +721,30 @@ impl UserManager {
// Save the old user workspace setting.
save_user_workspaces(
old_user.session.user_id,
self.database.get_pool(old_user.session.user_id)?,
self
.authenticate_user
.database
.get_pool(old_user.session.user_id)?,
&[old_user.session.user_workspace.clone()],
)?;
Ok(())
}
async fn import_appflowy_data(&self, context: ImportContext) -> Result<ImportData, FlowyError> {
pub(crate) async fn import_appflowy_data(
&self,
context: ImportContext,
) -> Result<ImportData, FlowyError> {
let session = self.get_session()?;
let uid = session.user_id;
let user_collab_db = self.database.get_collab_db(session.user_id)?;
let cloned_collab_db = user_collab_db.clone();
let user_collab_db = self
.authenticate_user
.database
.get_collab_db(session.user_id)?;
let import_data = tokio::task::spawn_blocking(move || {
import_data(&session, context, cloned_collab_db)
import_data(&session, context, user_collab_db)
.map_err(|err| FlowyError::new(ErrorCode::AppFlowyDataFolderImportError, err.to_string()))
})
.await
.map_err(internal_error)??;
let user = self.get_user_profile_from_disk(uid).await?;
upload_imported_data(
uid,
user_collab_db,
&user.workspace_id,
&user.authenticator,
&import_data,
self.cloud_services.get_user_service()?,
)
.await?;
Ok(import_data)
}
}
@ -875,6 +786,15 @@ fn save_user_token(uid: i64, pool: Arc<ConnectionPool>, token: String) -> FlowyR
upsert_user_profile_change(uid, pool, changeset)
}
#[instrument(level = "info", skip_all, err)]
fn remove_user_token(uid: i64, pool: Arc<ConnectionPool>) -> FlowyResult<()> {
let mut conn = pool.get()?;
diesel::update(user_table::dsl::user_table.filter(user_table::id.eq(&uid.to_string())))
.set(user_table::token.eq(""))
.execute(&mut *conn)?;
Ok(())
}
pub(crate) fn run_collab_data_migration(
session: &Session,
user: &UserProfile,

View File

@ -5,7 +5,7 @@ use crate::user_manager::UserManager;
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_user_pub::entities::Authenticator;
use crate::migrations::MigrationUser;
use crate::migrations::AnonUser;
use crate::services::entities::Session;
const ANON_USER: &str = "anon_user";
@ -14,7 +14,7 @@ impl UserManager {
pub async fn get_migration_user(
&self,
current_authenticator: &Authenticator,
) -> Option<MigrationUser> {
) -> Option<AnonUser> {
// No need to migrate if the user is already local
if current_authenticator.is_local() {
return None;
@ -27,10 +27,7 @@ impl UserManager {
.ok()?;
if user_profile.authenticator.is_local() {
Some(MigrationUser {
user_profile,
session,
})
Some(AnonUser { session })
} else {
None
}
@ -71,7 +68,7 @@ impl UserManager {
ErrorCode::RecordNotFound,
"Anon user not found",
))?;
self.set_session(Some(anon_session))?;
self.authenticate_user.set_session(Some(anon_session))?;
Ok(())
}
}

View File

@ -2,20 +2,103 @@ use std::convert::TryFrom;
use std::sync::Arc;
use collab_entity::{CollabObject, CollabType};
use collab_integrate::CollabKVDB;
use tracing::{error, instrument};
use flowy_error::{FlowyError, FlowyResult};
use flowy_folder_pub::entities::{AppFlowyData, ImportData};
use flowy_sqlite::schema::user_workspace_table;
use flowy_sqlite::{query_dsl::*, ConnectionPool, ExpressionMethods};
use flowy_user_pub::entities::{Role, UserWorkspace, WorkspaceMember};
use lib_dispatch::prelude::af_spawn;
use crate::entities::{RepeatedUserWorkspacePB, ResetWorkspacePB};
use crate::migrations::AnonUser;
use crate::notification::{send_notification, UserNotification};
use crate::services::data_import::{upload_collab_objects_data, ImportContext};
use crate::services::sqlite_sql::workspace_sql::UserWorkspaceTable;
use crate::user_manager::UserManager;
impl UserManager {
/// Import appflowy data from the given path.
/// If the container name is not empty, then the data will be imported to the given container.
/// Otherwise, the data will be imported to the current workspace.
#[instrument(skip_all, err)]
pub(crate) async fn import_appflowy_data_folder(
&self,
context: ImportContext,
) -> FlowyResult<()> {
let session = self.get_session()?;
let import_data = self.import_appflowy_data(context).await?;
match import_data {
ImportData::AppFlowyDataFolder { items } => {
for item in items {
match item {
AppFlowyData::Folder {
views,
database_view_ids_by_database_id,
} => {
let (tx, rx) = tokio::sync::oneshot::channel();
let cloned_workspace_service = self.user_workspace_service.clone();
tokio::spawn(async move {
let result = async {
cloned_workspace_service
.did_import_database_views(database_view_ids_by_database_id)
.await?;
cloned_workspace_service.did_import_views(views).await?;
Ok::<(), FlowyError>(())
}
.await;
let _ = tx.send(result);
})
.await?;
rx.await??;
},
AppFlowyData::CollabObject {
row_object_ids,
document_object_ids,
database_object_ids,
} => {
let user = self.get_user_profile_from_disk(session.user_id).await?;
let user_collab_db = self
.get_collab_db(session.user_id)?
.upgrade()
.ok_or_else(|| FlowyError::internal().with_context("Collab db not found"))?;
upload_collab_objects_data(
session.user_id,
user_collab_db,
&user.workspace_id,
&user.authenticator,
AppFlowyData::CollabObject {
row_object_ids,
document_object_ids,
database_object_ids,
},
self.cloud_services.get_user_service()?,
)
.await?;
},
}
}
},
}
Ok(())
}
pub async fn migration_anon_user_on_appflowy_cloud_sign_up(
&self,
old_user: &AnonUser,
old_collab_db: &Arc<CollabKVDB>,
) -> FlowyResult<()> {
let import_context = ImportContext {
imported_session: old_user.session.clone(),
imported_collab_db: old_collab_db.clone(),
container_name: None,
};
self.import_appflowy_data_folder(import_context).await?;
Ok(())
}
#[instrument(skip(self), err)]
pub async fn open_workspace(&self, workspace_id: &str) -> FlowyResult<()> {
let uid = self.user_id()?;
@ -129,7 +212,7 @@ impl UserManager {
reset.workspace_id.clone(),
CollabType::Folder,
reset.workspace_id.clone(),
self.user_config.device_id.clone(),
self.authenticate_user.user_config.device_id.clone(),
);
self
.cloud_services

View File

@ -1,4 +1,4 @@
use crate::migrations::MigrationUser;
use crate::migrations::AnonUser;
use flowy_user_pub::entities::{AuthResponse, Authenticator, UserProfile};
/// recording the intermediate state of the sign-in/sign-up process
@ -7,5 +7,5 @@ pub struct UserAuthProcess {
pub user_profile: UserProfile,
pub response: AuthResponse,
pub authenticator: Authenticator,
pub migration_user: Option<MigrationUser>,
pub migration_user: Option<AnonUser>,
}