Merge branch 'AppFlowy-IO:main' into main

This commit is contained in:
Simon 2024-08-14 08:27:05 +02:00 committed by GitHub
commit dbf1531cf9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
43 changed files with 998 additions and 1069 deletions

View File

@ -48,8 +48,6 @@ PODS:
- fluttertoast (0.0.2): - fluttertoast (0.0.2):
- Flutter - Flutter
- Toast - Toast
- image_gallery_saver (2.0.2):
- Flutter
- image_picker_ios (0.0.1): - image_picker_ios (0.0.1):
- Flutter - Flutter
- integration_test (0.0.1): - integration_test (0.0.1):
@ -95,7 +93,6 @@ DEPENDENCIES:
- flowy_infra_ui (from `.symlinks/plugins/flowy_infra_ui/ios`) - flowy_infra_ui (from `.symlinks/plugins/flowy_infra_ui/ios`)
- Flutter (from `Flutter`) - Flutter (from `Flutter`)
- fluttertoast (from `.symlinks/plugins/fluttertoast/ios`) - fluttertoast (from `.symlinks/plugins/fluttertoast/ios`)
- image_gallery_saver (from `.symlinks/plugins/image_gallery_saver/ios`)
- image_picker_ios (from `.symlinks/plugins/image_picker_ios/ios`) - image_picker_ios (from `.symlinks/plugins/image_picker_ios/ios`)
- integration_test (from `.symlinks/plugins/integration_test/ios`) - integration_test (from `.symlinks/plugins/integration_test/ios`)
- irondash_engine_context (from `.symlinks/plugins/irondash_engine_context/ios`) - irondash_engine_context (from `.symlinks/plugins/irondash_engine_context/ios`)
@ -136,8 +133,6 @@ EXTERNAL SOURCES:
:path: Flutter :path: Flutter
fluttertoast: fluttertoast:
:path: ".symlinks/plugins/fluttertoast/ios" :path: ".symlinks/plugins/fluttertoast/ios"
image_gallery_saver:
:path: ".symlinks/plugins/image_gallery_saver/ios"
image_picker_ios: image_picker_ios:
:path: ".symlinks/plugins/image_picker_ios/ios" :path: ".symlinks/plugins/image_picker_ios/ios"
integration_test: integration_test:
@ -176,7 +171,6 @@ SPEC CHECKSUMS:
flowy_infra_ui: 0455e1fa8c51885aa1437848e361e99419f34ebc flowy_infra_ui: 0455e1fa8c51885aa1437848e361e99419f34ebc
Flutter: e0871f40cf51350855a761d2e70bf5af5b9b5de7 Flutter: e0871f40cf51350855a761d2e70bf5af5b9b5de7
fluttertoast: e9a18c7be5413da53898f660530c56f35edfba9c fluttertoast: e9a18c7be5413da53898f660530c56f35edfba9c
image_gallery_saver: cb43cc43141711190510e92c460eb1655cd343cb
image_picker_ios: 99dfe1854b4fa34d0364e74a78448a0151025425 image_picker_ios: 99dfe1854b4fa34d0364e74a78448a0151025425
integration_test: ce0a3ffa1de96d1a89ca0ac26fca7ea18a749ef4 integration_test: ce0a3ffa1de96d1a89ca0ac26fca7ea18a749ef4
irondash_engine_context: 3458bf979b90d616ffb8ae03a150bafe2e860cc9 irondash_engine_context: 3458bf979b90d616ffb8ae03a150bafe2e860cc9

View File

@ -64,7 +64,7 @@ class _MobileBottomSheetRenameWidgetState
padding: const EdgeInsets.symmetric( padding: const EdgeInsets.symmetric(
horizontal: 16.0, horizontal: 16.0,
), ),
fontColor: Colors.white, textColor: Colors.white,
fillColor: Theme.of(context).primaryColor, fillColor: Theme.of(context).primaryColor,
onPressed: () { onPressed: () {
widget.onRename(controller.text); widget.onRename(controller.text);

View File

@ -87,6 +87,7 @@ class _PropertyCellState extends State<_PropertyCell> {
fieldInfo.name, fieldInfo.name,
overflow: TextOverflow.ellipsis, overflow: TextOverflow.ellipsis,
fontSize: 14, fontSize: 14,
figmaLineHeight: 16.0,
color: Theme.of(context).hintColor, color: Theme.of(context).hintColor,
), ),
), ),

View File

@ -12,6 +12,7 @@ import 'package:extended_text_field/extended_text_field.dart';
import 'package:flowy_infra/file_picker/file_picker_service.dart'; import 'package:flowy_infra/file_picker/file_picker_service.dart';
import 'package:flowy_infra/platform_extension.dart'; import 'package:flowy_infra/platform_extension.dart';
import 'package:flowy_infra/theme_extension.dart'; import 'package:flowy_infra/theme_extension.dart';
import 'package:flowy_infra_ui/flowy_infra_ui.dart';
import 'package:flutter/foundation.dart'; import 'package:flutter/foundation.dart';
import 'package:flutter/material.dart'; import 'package:flutter/material.dart';
import 'package:flutter/services.dart'; import 'package:flutter/services.dart';
@ -21,8 +22,8 @@ import 'package:flutter_chat_ui/flutter_chat_ui.dart';
import 'chat_at_button.dart'; import 'chat_at_button.dart';
import 'chat_input_attachment.dart'; import 'chat_input_attachment.dart';
import 'chat_send_button.dart';
import 'chat_input_span.dart'; import 'chat_input_span.dart';
import 'chat_send_button.dart';
import 'layout_define.dart'; import 'layout_define.dart';
class ChatInput extends StatefulWidget { class ChatInput extends StatefulWidget {
@ -114,7 +115,7 @@ class _ChatInputState extends State<ChatInput> {
child: Container( child: Container(
decoration: BoxDecoration( decoration: BoxDecoration(
border: Border.all( border: Border.all(
color: _inputFocusNode.hasFocus && !isMobile color: _inputFocusNode.hasFocus
? Theme.of(context).colorScheme.primary.withOpacity(0.6) ? Theme.of(context).colorScheme.primary.withOpacity(0.6)
: Theme.of(context).colorScheme.secondary, : Theme.of(context).colorScheme.secondary,
), ),
@ -161,9 +162,9 @@ class _ChatInputState extends State<ChatInput> {
Expanded(child: _inputTextField(context, textPadding)), Expanded(child: _inputTextField(context, textPadding)),
// mention button // mention button
// TODO(lucas): support mobile _mentionButton(buttonPadding),
if (PlatformExtension.isDesktop)
_mentionButton(buttonPadding), if (PlatformExtension.isMobile) const HSpace(6.0),
// send button // send button
_sendButton(buttonPadding), _sendButton(buttonPadding),
@ -245,6 +246,7 @@ class _ChatInputState extends State<ChatInput> {
InputDecoration _buildInputDecoration(BuildContext context) { InputDecoration _buildInputDecoration(BuildContext context) {
return InputDecoration( return InputDecoration(
border: InputBorder.none, border: InputBorder.none,
enabledBorder: InputBorder.none,
hintText: widget.hintText, hintText: widget.hintText,
focusedBorder: InputBorder.none, focusedBorder: InputBorder.none,
hintStyle: TextStyle( hintStyle: TextStyle(

View File

@ -18,7 +18,8 @@ import 'package:appflowy_editor/appflowy_editor.dart'
Node, Node,
Path, Path,
Delta, Delta,
composeAttributes; composeAttributes,
blockComponentDelta;
import 'package:collection/collection.dart'; import 'package:collection/collection.dart';
import 'package:nanoid/nanoid.dart'; import 'package:nanoid/nanoid.dart';
@ -81,6 +82,15 @@ class TransactionAdapter {
} }
final blockActions = final blockActions =
actions.map((e) => e.blockActionPB).toList(growable: false); actions.map((e) => e.blockActionPB).toList(growable: false);
for (final action in blockActions) {
if (enableDocumentInternalLog) {
Log.debug(
'[editor_transaction_adapter] action => ${action.toProto3Json()}',
);
}
}
await documentService.applyAction( await documentService.applyAction(
documentId: documentId, documentId: documentId,
actions: blockActions, actions: blockActions,
@ -164,6 +174,7 @@ extension on InsertOperation {
childrenId: nanoid(6), childrenId: nanoid(6),
externalId: textId, externalId: textId,
externalType: textId != null ? _kExternalTextType : null, externalType: textId != null ? _kExternalTextType : null,
attributes: {...node.attributes}..remove(blockComponentDelta),
) )
..parentId = parentId ..parentId = parentId
..prevId = prevId; ..prevId = prevId;
@ -234,10 +245,13 @@ extension on UpdateOperation {
) )
: null; : null;
final composedAttributes = composeAttributes(oldAttributes, attributes);
composedAttributes?.remove(blockComponentDelta);
final payload = BlockActionPayloadPB() final payload = BlockActionPayloadPB()
..block = node.toBlock( ..block = node.toBlock(
parentId: parentId, parentId: parentId,
attributes: composeAttributes(oldAttributes, attributes), attributes: composedAttributes,
) )
..parentId = parentId; ..parentId = parentId;
final blockActionPB = BlockActionPB() final blockActionPB = BlockActionPB()

View File

@ -73,7 +73,11 @@ class MentionBlock extends StatelessWidget {
switch (type) { switch (type) {
case MentionType.page: case MentionType.page:
final String pageId = mention[MentionBlockKeys.pageId]; final String? pageId = mention[MentionBlockKeys.pageId] as String?;
if (pageId == null) {
return const SizedBox.shrink();
}
return MentionPageBlock( return MentionPageBlock(
key: ValueKey(pageId), key: ValueKey(pageId),
editorState: editorState, editorState: editorState,

View File

@ -26,6 +26,7 @@ import 'package:easy_localization/easy_localization.dart';
import 'package:fixnum/fixnum.dart'; import 'package:fixnum/fixnum.dart';
import 'package:flowy_infra_ui/flowy_infra_ui.dart'; import 'package:flowy_infra_ui/flowy_infra_ui.dart';
import 'package:flutter/material.dart'; import 'package:flutter/material.dart';
import 'package:flutter/services.dart';
import 'package:flutter_bloc/flutter_bloc.dart'; import 'package:flutter_bloc/flutter_bloc.dart';
import 'package:nanoid/non_secure.dart'; import 'package:nanoid/non_secure.dart';
@ -182,68 +183,13 @@ class _MentionDateBlockState extends State<MentionDateBlock> {
return GestureDetector( return GestureDetector(
onTapDown: (details) { onTapDown: (details) {
if (widget.editorState.editable) { _showDatePicker(
if (PlatformExtension.isMobile) { context: context,
showMobileBottomSheet( offset: details.globalPosition,
context, reminder: reminder,
builder: (_) => DraggableScrollableSheet( timeStr: timeStr,
expand: false, options: options,
snap: true, );
initialChildSize: 0.7,
minChildSize: 0.4,
snapSizes: const [0.4, 0.7, 1.0],
builder: (_, controller) => Material(
color:
Theme.of(context).colorScheme.secondaryContainer,
child: ListView(
controller: controller,
children: [
ColoredBox(
color: Theme.of(context).colorScheme.surface,
child: const Center(child: DragHandle()),
),
const MobileDateHeader(),
MobileAppFlowyDatePicker(
selectedDay: parsedDate,
timeStr: timeStr,
dateStr: parsedDate != null
? options.dateFormat
.formatDate(parsedDate!, _includeTime)
: null,
includeTime: options.includeTime,
use24hFormat: options.timeFormat ==
UserTimeFormatPB.TwentyFourHour,
rebuildOnDaySelected: true,
rebuildOnTimeChanged: true,
timeFormat: options.timeFormat.simplified,
selectedReminderOption: widget.reminderOption,
onDaySelected: options.onDaySelected,
onStartTimeChanged: (time) => options
.onStartTimeChanged
?.call(time ?? ""),
onIncludeTimeChanged:
options.onIncludeTimeChanged,
liveDateFormatter: (selected) =>
appearance.dateFormat.formatDate(
selected,
false,
appearance.timeFormat,
),
onReminderSelected: (option) =>
_updateReminder(option, reminder),
),
],
),
),
),
);
} else {
DatePickerMenu(
context: context,
editorState: widget.editorState,
).show(details.globalPosition, options: options);
}
}
}, },
child: MouseRegion( child: MouseRegion(
cursor: SystemMouseCursors.click, cursor: SystemMouseCursors.click,
@ -251,15 +197,10 @@ class _MentionDateBlockState extends State<MentionDateBlock> {
mainAxisSize: MainAxisSize.min, mainAxisSize: MainAxisSize.min,
children: [ children: [
Text( Text(
widget.reminderId != null '@$formattedDate',
? '@$formattedDate' style: textStyle,
: formattedDate, strutStyle: textStyle != null
style: widget.textStyle?.copyWith( ? StrutStyle.fromTextStyle(textStyle)
color: color,
leadingDistribution: TextLeadingDistribution.even,
),
strutStyle: widget.textStyle != null
? StrutStyle.fromTextStyle(widget.textStyle!)
: null, : null,
), ),
const HSpace(4), const HSpace(4),
@ -402,4 +343,109 @@ class _MentionDateBlockState extends State<MentionDateBlock> {
), ),
); );
} }
void _showDatePicker({
required BuildContext context,
required DatePickerOptions options,
required Offset offset,
String? timeStr,
ReminderPB? reminder,
}) {
if (!widget.editorState.editable) {
return;
}
if (PlatformExtension.isMobile) {
SystemChannels.textInput.invokeMethod('TextInput.hide');
showMobileBottomSheet(
context,
builder: (_) => DraggableScrollableSheet(
expand: false,
snap: true,
initialChildSize: 0.7,
minChildSize: 0.4,
snapSizes: const [0.4, 0.7, 1.0],
builder: (_, controller) => _DatePickerBottomSheet(
controller: controller,
parsedDate: parsedDate,
timeStr: timeStr,
options: options,
includeTime: _includeTime,
reminderOption: widget.reminderOption,
onReminderSelected: (option) => _updateReminder(
option,
reminder,
),
),
),
);
} else {
DatePickerMenu(
context: context,
editorState: widget.editorState,
).show(offset, options: options);
}
}
}
class _DatePickerBottomSheet extends StatelessWidget {
const _DatePickerBottomSheet({
required this.controller,
required this.parsedDate,
required this.timeStr,
required this.options,
required this.includeTime,
this.reminderOption,
required this.onReminderSelected,
});
final ScrollController controller;
final DateTime? parsedDate;
final String? timeStr;
final DatePickerOptions options;
final bool includeTime;
final ReminderOption? reminderOption;
final void Function(ReminderOption) onReminderSelected;
@override
Widget build(BuildContext context) {
final appearance = context.read<AppearanceSettingsCubit>().state;
return Material(
color: Theme.of(context).colorScheme.secondaryContainer,
child: ListView(
controller: controller,
children: [
ColoredBox(
color: Theme.of(context).colorScheme.surface,
child: const Center(child: DragHandle()),
),
const MobileDateHeader(),
MobileAppFlowyDatePicker(
selectedDay: parsedDate,
timeStr: timeStr,
dateStr: parsedDate != null
? options.dateFormat.formatDate(parsedDate!, includeTime)
: null,
includeTime: options.includeTime,
use24hFormat: options.timeFormat == UserTimeFormatPB.TwentyFourHour,
rebuildOnDaySelected: true,
rebuildOnTimeChanged: true,
timeFormat: options.timeFormat.simplified,
selectedReminderOption: reminderOption,
onDaySelected: options.onDaySelected,
onStartTimeChanged: (time) =>
options.onStartTimeChanged?.call(time ?? ""),
onIncludeTimeChanged: options.onIncludeTimeChanged,
liveDateFormatter: (selected) => appearance.dateFormat.formatDate(
selected,
false,
appearance.timeFormat,
),
onReminderSelected: onReminderSelected,
),
],
),
);
}
} }

View File

@ -31,6 +31,7 @@ import 'package:appflowy/workspace/presentation/settings/widgets/feature_flags/m
import 'package:appflowy_backend/protobuf/flowy-database2/protobuf.dart'; import 'package:appflowy_backend/protobuf/flowy-database2/protobuf.dart';
import 'package:appflowy_editor/appflowy_editor.dart'; import 'package:appflowy_editor/appflowy_editor.dart';
import 'package:flowy_infra/time/duration.dart'; import 'package:flowy_infra/time/duration.dart';
import 'package:flutter/foundation.dart';
import 'package:flutter/material.dart'; import 'package:flutter/material.dart';
import 'package:go_router/go_router.dart'; import 'package:go_router/go_router.dart';
import 'package:sheet/route.dart'; import 'package:sheet/route.dart';
@ -558,10 +559,25 @@ GoRoute _mobileCardDetailScreenRoute() {
parentNavigatorKey: AppGlobals.rootNavKey, parentNavigatorKey: AppGlobals.rootNavKey,
path: MobileRowDetailPage.routeName, path: MobileRowDetailPage.routeName,
pageBuilder: (context, state) { pageBuilder: (context, state) {
final args = state.extra as Map<String, dynamic>; var extra = state.extra as Map<String, dynamic>?;
if (kDebugMode && extra == null) {
extra = _dynamicValues;
}
if (extra == null) {
return const MaterialExtendedPage(
child: SizedBox.shrink(),
);
}
final databaseController = final databaseController =
args[MobileRowDetailPage.argDatabaseController]; extra[MobileRowDetailPage.argDatabaseController];
final rowId = args[MobileRowDetailPage.argRowId]!; final rowId = extra[MobileRowDetailPage.argRowId]!;
if (kDebugMode) {
_dynamicValues = extra;
}
return MaterialExtendedPage( return MaterialExtendedPage(
child: MobileRowDetailPage( child: MobileRowDetailPage(
@ -629,3 +645,8 @@ Widget _buildFadeTransition(
Duration _slowDuration = Duration( Duration _slowDuration = Duration(
milliseconds: RouteDurations.slow.inMilliseconds.round(), milliseconds: RouteDurations.slow.inMilliseconds.round(),
); );
// ONLY USE IN DEBUG MODE
// this is a workaround for the issue of GoRouter not supporting extra with complex types
// https://github.com/flutter/flutter/issues/137248
Map<String, dynamic> _dynamicValues = {};

View File

@ -1,6 +1,3 @@
import 'package:flutter/cupertino.dart';
import 'package:flutter/material.dart';
import 'package:appflowy/generated/flowy_svgs.g.dart'; import 'package:appflowy/generated/flowy_svgs.g.dart';
import 'package:appflowy/generated/locale_keys.g.dart'; import 'package:appflowy/generated/locale_keys.g.dart';
import 'package:appflowy/mobile/presentation/base/app_bar/app_bar_actions.dart'; import 'package:appflowy/mobile/presentation/base/app_bar/app_bar_actions.dart';
@ -13,8 +10,9 @@ import 'package:appflowy/workspace/presentation/widgets/date_picker/widgets/mobi
import 'package:appflowy/workspace/presentation/widgets/date_picker/widgets/reminder_selector.dart'; import 'package:appflowy/workspace/presentation/widgets/date_picker/widgets/reminder_selector.dart';
import 'package:appflowy_backend/protobuf/flowy-database2/date_entities.pbenum.dart'; import 'package:appflowy_backend/protobuf/flowy-database2/date_entities.pbenum.dart';
import 'package:easy_localization/easy_localization.dart'; import 'package:easy_localization/easy_localization.dart';
import 'package:flowy_infra_ui/style_widget/text.dart'; import 'package:flowy_infra_ui/flowy_infra_ui.dart';
import 'package:flowy_infra_ui/widget/spacing.dart'; import 'package:flutter/cupertino.dart';
import 'package:flutter/material.dart';
import 'package:go_router/go_router.dart'; import 'package:go_router/go_router.dart';
class MobileAppFlowyDatePicker extends StatefulWidget { class MobileAppFlowyDatePicker extends StatefulWidget {
@ -389,57 +387,107 @@ class _IncludeTimePickerState extends State<_IncludeTimePicker> {
children.addAll([ children.addAll([
Expanded(child: FlowyText(dateStr, textAlign: TextAlign.center)), Expanded(child: FlowyText(dateStr, textAlign: TextAlign.center)),
Container(width: 1, height: 16, color: Colors.grey), Container(width: 1, height: 16, color: Colors.grey),
Expanded(child: FlowyText(timeStr ?? '', textAlign: TextAlign.center)), Expanded(
child: GestureDetector(
onTap: () => _showTimePicker(
context,
use24hFormat: use24hFormat,
isStartDay: isStartDay,
),
child: FlowyText(timeStr ?? '', textAlign: TextAlign.center),
),
),
]); ]);
} }
return GestureDetector( return Container(
onTap: !isIncludeTime constraints: const BoxConstraints(minHeight: 36),
? null decoration: BoxDecoration(
: () async { borderRadius: BorderRadius.circular(6),
await showMobileBottomSheet( color: Theme.of(context).colorScheme.secondaryContainer,
context, border: Border.all(
builder: (context) => ConstrainedBox( color: Theme.of(context).colorScheme.outline,
constraints: const BoxConstraints(maxHeight: 300),
child: CupertinoDatePicker(
mode: CupertinoDatePickerMode.time,
use24hFormat: use24hFormat,
onDateTimeChanged: (dateTime) {
final selectedTime = use24hFormat
? DateFormat('HH:mm').format(dateTime)
: DateFormat('hh:mm a').format(dateTime);
if (isStartDay) {
widget.onStartTimeChanged(selectedTime);
if (widget.rebuildOnTimeChanged && mounted) {
setState(() => _timeStr = selectedTime);
}
} else {
widget.onEndTimeChanged?.call(selectedTime);
if (widget.rebuildOnTimeChanged && mounted) {
setState(() => _endTimeStr = selectedTime);
}
}
},
),
),
);
},
child: Container(
constraints: const BoxConstraints(minHeight: 36),
decoration: BoxDecoration(
borderRadius: BorderRadius.circular(6),
color: Theme.of(context).colorScheme.secondaryContainer,
border: Border.all(
color: Theme.of(context).colorScheme.outline,
),
), ),
child: Row(children: children), ),
child: Row(children: children),
);
}
Future<void> _showTimePicker(
BuildContext context, {
required bool use24hFormat,
required bool isStartDay,
}) async {
String? selectedTime = isStartDay ? _timeStr : _endTimeStr;
final initialDateTime = selectedTime != null
? _convertTimeStringToDateTime(selectedTime)
: null;
return showMobileBottomSheet(
context,
builder: (context) => Column(
mainAxisSize: MainAxisSize.min,
children: [
ConstrainedBox(
constraints: const BoxConstraints(maxHeight: 300),
child: CupertinoDatePicker(
mode: CupertinoDatePickerMode.time,
initialDateTime: initialDateTime,
use24hFormat: use24hFormat,
onDateTimeChanged: (dateTime) {
selectedTime = use24hFormat
? DateFormat('HH:mm').format(dateTime)
: DateFormat('hh:mm a').format(dateTime);
},
),
),
Padding(
padding: const EdgeInsets.symmetric(horizontal: 36),
child: FlowyTextButton(
LocaleKeys.button_confirm.tr(),
constraints: const BoxConstraints.tightFor(height: 42),
mainAxisAlignment: MainAxisAlignment.center,
textColor: Theme.of(context).colorScheme.onPrimary,
fillColor: Theme.of(context).primaryColor,
onPressed: () {
if (isStartDay) {
widget.onStartTimeChanged(selectedTime);
if (widget.rebuildOnTimeChanged && mounted) {
setState(() => _timeStr = selectedTime);
}
} else {
widget.onEndTimeChanged?.call(selectedTime);
if (widget.rebuildOnTimeChanged && mounted) {
setState(() => _endTimeStr = selectedTime);
}
}
Navigator.of(context).pop();
},
),
),
const VSpace(18.0),
],
), ),
); );
} }
DateTime _convertTimeStringToDateTime(String timeString) {
final DateTime now = DateTime.now();
final List<String> timeParts = timeString.split(':');
if (timeParts.length != 2) {
return now;
}
final int hour = int.parse(timeParts[0]);
final int minute = int.parse(timeParts[1]);
return DateTime(now.year, now.month, now.day, hour, minute);
}
} }
class _EndDateSwitch extends StatelessWidget { class _EndDateSwitch extends StatelessWidget {

View File

@ -890,10 +890,10 @@ packages:
dependency: "direct main" dependency: "direct main"
description: description:
name: go_router name: go_router
sha256: "170c46e237d6eb0e6e9f0e8b3f56101e14fb64f787016e42edd74c39cf8b176a" sha256: ddc16d34b0d74cb313986918c0f0885a7ba2fc24d8fb8419de75f0015144ccfe
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "13.2.0" version: "14.2.3"
google_fonts: google_fonts:
dependency: "direct main" dependency: "direct main"
description: description:

View File

@ -99,7 +99,7 @@ dependencies:
url_protocol: url_protocol:
hive_flutter: ^1.1.0 hive_flutter: ^1.1.0
super_clipboard: ^0.8.4 super_clipboard: ^0.8.4
go_router: ^13.1.0 go_router: ^14.2.0
string_validator: ^1.0.0 string_validator: ^1.0.0
unsplash_client: ^2.1.1 unsplash_client: ^2.1.1
flutter_emoji_mart: flutter_emoji_mart:

View File

@ -1,7 +1,7 @@
use flowy_core::config::AppFlowyCoreConfig; use flowy_core::config::AppFlowyCoreConfig;
use flowy_core::{AppFlowyCore, DEFAULT_NAME}; use flowy_core::{AppFlowyCore, MutexAppFlowyCore, DEFAULT_NAME};
use lib_dispatch::runtime::AFPluginRuntime; use lib_dispatch::runtime::AFPluginRuntime;
use std::sync::Arc; use std::rc::Rc;
use dotenv::dotenv; use dotenv::dotenv;
@ -25,7 +25,7 @@ pub fn read_env() {
} }
} }
pub fn init_flowy_core() -> AppFlowyCore { pub fn init_flowy_core() -> MutexAppFlowyCore {
let config_json = include_str!("../tauri.conf.json"); let config_json = include_str!("../tauri.conf.json");
let config: tauri_utils::config::Config = serde_json::from_str(config_json).unwrap(); let config: tauri_utils::config::Config = serde_json::from_str(config_json).unwrap();
@ -35,7 +35,8 @@ pub fn init_flowy_core() -> AppFlowyCore {
.clone() .clone()
.map(|v| v.to_string()) .map(|v| v.to_string())
.unwrap_or_else(|| "0.5.8".to_string()); .unwrap_or_else(|| "0.5.8".to_string());
let app_version = semver::Version::parse(&app_version).unwrap_or_else(|_| semver::Version::new(0, 5, 8)); let app_version =
semver::Version::parse(&app_version).unwrap_or_else(|_| semver::Version::new(0, 5, 8));
let mut data_path = tauri::api::path::app_local_data_dir(&config).unwrap(); let mut data_path = tauri::api::path::app_local_data_dir(&config).unwrap();
if cfg!(debug_assertions) { if cfg!(debug_assertions) {
data_path.push("data_dev"); data_path.push("data_dev");
@ -60,7 +61,9 @@ pub fn init_flowy_core() -> AppFlowyCore {
) )
.log_filter("trace", vec!["appflowy_tauri".to_string()]); .log_filter("trace", vec!["appflowy_tauri".to_string()]);
let runtime = Arc::new(AFPluginRuntime::new().unwrap()); let runtime = Rc::new(AFPluginRuntime::new().unwrap());
let cloned_runtime = runtime.clone(); let cloned_runtime = runtime.clone();
runtime.block_on(async move { AppFlowyCore::new(config, cloned_runtime, None).await }) runtime.block_on(async move {
MutexAppFlowyCore::new(AppFlowyCore::new(config, cloned_runtime, None).await)
})
} }

View File

@ -1,4 +1,4 @@
use flowy_core::AppFlowyCore; use flowy_core::;
use lib_dispatch::prelude::{ use lib_dispatch::prelude::{
AFPluginDispatcher, AFPluginEventResponse, AFPluginRequest, StatusCode, AFPluginDispatcher, AFPluginEventResponse, AFPluginRequest, StatusCode,
}; };
@ -38,8 +38,8 @@ pub async fn invoke_request(
app_handler: AppHandle<Wry>, app_handler: AppHandle<Wry>,
) -> AFTauriResponse { ) -> AFTauriResponse {
let request: AFPluginRequest = request.into(); let request: AFPluginRequest = request.into();
let state: State<AppFlowyCore> = app_handler.state(); let state: State<MutexAppFlowyCore> = app_handler.state();
let dispatcher = state.inner().dispatcher(); let dispatcher = state.0.lock().dispatcher();
let response = AFPluginDispatcher::async_send(dispatcher.as_ref(), request).await; let response = AFPluginDispatcher::sync_send(dispatcher, request);
response.into() response.into()
} }

View File

@ -1,7 +1,7 @@
use flowy_core::config::AppFlowyCoreConfig; use flowy_core::config::AppFlowyCoreConfig;
use flowy_core::{AppFlowyCore, DEFAULT_NAME}; use flowy_core::{AppFlowyCore, MutexAppFlowyCore, DEFAULT_NAME};
use lib_dispatch::runtime::AFPluginRuntime; use lib_dispatch::runtime::AFPluginRuntime;
use std::sync::Arc; use std::rc::Rc;
use dotenv::dotenv; use dotenv::dotenv;
@ -9,28 +9,34 @@ pub fn read_env() {
dotenv().ok(); dotenv().ok();
let env = if cfg!(debug_assertions) { let env = if cfg!(debug_assertions) {
include_str!("../env.development") include_str!("../env.development")
} else { } else {
include_str!("../env.production") include_str!("../env.production")
}; };
for line in env.lines() { for line in env.lines() {
if let Some((key, value)) = line.split_once('=') { if let Some((key, value)) = line.split_once('=') {
// Check if the environment variable is not already set in the system // Check if the environment variable is not already set in the system
let current_value = std::env::var(key).unwrap_or_default(); let current_value = std::env::var(key).unwrap_or_default();
if current_value.is_empty() { if current_value.is_empty() {
std::env::set_var(key, value); std::env::set_var(key, value);
}
} }
}
} }
} }
pub fn init_flowy_core() -> AppFlowyCore { pub fn init_flowy_core() -> MutexAppFlowyCore {
let config_json = include_str!("../tauri.conf.json"); let config_json = include_str!("../tauri.conf.json");
let config: tauri_utils::config::Config = serde_json::from_str(config_json).unwrap(); let config: tauri_utils::config::Config = serde_json::from_str(config_json).unwrap();
let app_version = config.package.version.clone().map(|v| v.to_string()).unwrap_or_else(|| "0.0.0".to_string()); let app_version = config
let app_version = semver::Version::parse(&app_version).unwrap_or_else(|_| semver::Version::new(0, 5, 8)); .package
.version
.clone()
.map(|v| v.to_string())
.unwrap_or_else(|| "0.5.8".to_string());
let app_version =
semver::Version::parse(&app_version).unwrap_or_else(|_| semver::Version::new(0, 5, 8));
let mut data_path = tauri::api::path::app_local_data_dir(&config).unwrap(); let mut data_path = tauri::api::path::app_local_data_dir(&config).unwrap();
if cfg!(debug_assertions) { if cfg!(debug_assertions) {
data_path.push("data_dev"); data_path.push("data_dev");
@ -50,12 +56,14 @@ pub fn init_flowy_core() -> AppFlowyCore {
custom_application_path, custom_application_path,
application_path, application_path,
device_id, device_id,
"web".to_string(), "tauri".to_string(),
DEFAULT_NAME.to_string(), DEFAULT_NAME.to_string(),
) )
.log_filter("trace", vec!["appflowy_tauri".to_string()]); .log_filter("trace", vec!["appflowy_tauri".to_string()]);
let runtime = Arc::new(AFPluginRuntime::new().unwrap()); let runtime = Rc::new(AFPluginRuntime::new().unwrap());
let cloned_runtime = runtime.clone(); let cloned_runtime = runtime.clone();
runtime.block_on(async move { AppFlowyCore::new(config, cloned_runtime, None).await }) runtime.block_on(async move {
MutexAppFlowyCore::new(AppFlowyCore::new(config, cloned_runtime, None).await)
})
} }

View File

@ -1,4 +1,4 @@
use flowy_core::AppFlowyCore; use flowy_core::MutexAppFlowyCore;
use lib_dispatch::prelude::{ use lib_dispatch::prelude::{
AFPluginDispatcher, AFPluginEventResponse, AFPluginRequest, StatusCode, AFPluginDispatcher, AFPluginEventResponse, AFPluginRequest, StatusCode,
}; };
@ -38,8 +38,8 @@ pub async fn invoke_request(
app_handler: AppHandle<Wry>, app_handler: AppHandle<Wry>,
) -> AFTauriResponse { ) -> AFTauriResponse {
let request: AFPluginRequest = request.into(); let request: AFPluginRequest = request.into();
let state: State<AppFlowyCore> = app_handler.state(); let state: State<MutexAppFlowyCore> = app_handler.state();
let dispatcher = state.inner().dispatcher(); let dispatcher = state.0.lock().dispatcher();
let response = AFPluginDispatcher::async_send(dispatcher.as_ref(), request).await; let response = AFPluginDispatcher::sync_send(dispatcher, request);
response.into() response.into()
} }

View File

@ -28,7 +28,7 @@ lib-log.workspace = true
semver = "1.0.22" semver = "1.0.22"
# workspace # workspace
lib-dispatch = { workspace = true } lib-dispatch = { workspace = true, features = ["local_set"] }
# Core # Core
#flowy-core = { workspace = true, features = ["profiling"] } #flowy-core = { workspace = true, features = ["profiling"] }

View File

@ -4,6 +4,7 @@ use allo_isolate::Isolate;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use parking_lot::Mutex; use parking_lot::Mutex;
use semver::Version; use semver::Version;
use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
use std::{ffi::CStr, os::raw::c_char}; use std::{ffi::CStr, os::raw::c_char};
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, trace, warn};
@ -37,14 +38,14 @@ lazy_static! {
static ref LOG_STREAM_ISOLATE: Mutex<Option<Isolate>> = Mutex::new(None); static ref LOG_STREAM_ISOLATE: Mutex<Option<Isolate>> = Mutex::new(None);
} }
struct MutexAppFlowyCore(Arc<Mutex<Option<AppFlowyCore>>>); struct MutexAppFlowyCore(Rc<Mutex<Option<AppFlowyCore>>>);
impl MutexAppFlowyCore { impl MutexAppFlowyCore {
fn new() -> Self { fn new() -> Self {
Self(Arc::new(Mutex::new(None))) Self(Rc::new(Mutex::new(None)))
} }
fn dispatcher(&self) -> Option<Arc<AFPluginDispatcher>> { fn dispatcher(&self) -> Option<Rc<AFPluginDispatcher>> {
let binding = self.0.lock(); let binding = self.0.lock();
let core = binding.as_ref(); let core = binding.as_ref();
core.map(|core| core.event_dispatcher.clone()) core.map(|core| core.event_dispatcher.clone())
@ -90,7 +91,7 @@ pub extern "C" fn init_sdk(_port: i64, data: *mut c_char) -> i64 {
core.close_db(); core.close_db();
} }
let runtime = Arc::new(AFPluginRuntime::new().unwrap()); let runtime = Rc::new(AFPluginRuntime::new().unwrap());
let cloned_runtime = runtime.clone(); let cloned_runtime = runtime.clone();
let log_stream = LOG_STREAM_ISOLATE let log_stream = LOG_STREAM_ISOLATE

View File

@ -1,13 +1,12 @@
use flowy_user::errors::{internal_error, FlowyError};
use lib_dispatch::prelude::{
AFPluginDispatcher, AFPluginEventResponse, AFPluginFromBytes, AFPluginRequest, ToBytes, *,
};
use std::rc::Rc;
use std::{ use std::{
convert::TryFrom, convert::TryFrom,
fmt::{Debug, Display}, fmt::{Debug, Display},
hash::Hash, hash::Hash,
sync::Arc,
};
use flowy_user::errors::{internal_error, FlowyError};
use lib_dispatch::prelude::{
AFPluginDispatcher, AFPluginEventResponse, AFPluginFromBytes, AFPluginRequest, ToBytes, *,
}; };
use crate::EventIntegrationTest; use crate::EventIntegrationTest;
@ -86,7 +85,7 @@ impl EventBuilder {
.map(|data| data.into_inner()) .map(|data| data.into_inner())
} }
fn dispatch(&self) -> Arc<AFPluginDispatcher> { fn dispatch(&self) -> Rc<AFPluginDispatcher> {
self.context.sdk.dispatcher() self.context.sdk.dispatcher()
} }

View File

@ -5,6 +5,7 @@ use collab_document::document::Document;
use collab_entity::CollabType; use collab_entity::CollabType;
use std::env::temp_dir; use std::env::temp_dir;
use std::path::PathBuf; use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -163,13 +164,9 @@ pub fn document_from_document_doc_state(doc_id: &str, doc_state: Vec<u8>) -> Doc
} }
async fn init_core(config: AppFlowyCoreConfig) -> AppFlowyCore { async fn init_core(config: AppFlowyCoreConfig) -> AppFlowyCore {
std::thread::spawn(|| { let runtime = Rc::new(AFPluginRuntime::new().unwrap());
let runtime = Arc::new(AFPluginRuntime::new().unwrap()); let cloned_runtime = runtime.clone();
let cloned_runtime = runtime.clone(); AppFlowyCore::new(config, cloned_runtime, None).await
runtime.block_on(async move { AppFlowyCore::new(config, cloned_runtime, None).await })
})
.join()
.unwrap()
} }
impl std::ops::Deref for EventIntegrationTest { impl std::ops::Deref for EventIntegrationTest {

View File

@ -94,30 +94,24 @@ async fn af_cloud_upload_6_files_test() {
// Wait for all uploads to finish // Wait for all uploads to finish
let uploads = Arc::new(Mutex::new(created_uploads)); let uploads = Arc::new(Mutex::new(created_uploads));
let mut handles = vec![]; let mut handles = vec![];
for mut receiver in receivers { for mut receiver in receivers {
let cloned_uploads = uploads.clone(); let cloned_uploads = uploads.clone();
let cloned_test = test.clone(); let state = test.storage_manager.get_file_state(&receiver.file_id).await;
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
if let Some(state) = cloned_test if let Some(FileUploadState::Finished { file_id }) = state {
.storage_manager cloned_uploads
.get_file_state(&receiver.file_id) .lock()
.await .await
{ .retain(|upload| upload.file_id != file_id);
if let FileUploadState::Finished { file_id } = state { }
while let Some(value) = receiver.recv().await {
if let FileUploadState::Finished { file_id } = value {
cloned_uploads cloned_uploads
.lock() .lock()
.await .await
.retain(|upload| upload.file_id != file_id); .retain(|upload| upload.file_id != file_id);
} break;
} else {
while let Some(value) = receiver.recv().await {
if let FileUploadState::Finished { file_id } = value {
cloned_uploads
.lock()
.await
.retain(|upload| upload.file_id != file_id);
break;
}
} }
} }
}); });

View File

@ -24,11 +24,9 @@ async fn create_child_view_in_workspace_subscription_test() {
let cloned_test = test.clone(); let cloned_test = test.clone();
let cloned_workspace_id = workspace.id.clone(); let cloned_workspace_id = workspace.id.clone();
test.appflowy_core.dispatcher().spawn(async move { cloned_test
cloned_test .create_view(&cloned_workspace_id, "workspace child view".to_string())
.create_view(&cloned_workspace_id, "workspace child view".to_string()) .await;
.await;
});
let views = receive_with_timeout(rx, Duration::from_secs(30)) let views = receive_with_timeout(rx, Duration::from_secs(30))
.await .await
@ -50,14 +48,17 @@ async fn create_child_view_in_view_subscription_test() {
let cloned_test = test.clone(); let cloned_test = test.clone();
let child_view_id = workspace_child_view.id.clone(); let child_view_id = workspace_child_view.id.clone();
test.appflowy_core.dispatcher().spawn(async move { let local_set = tokio::task::LocalSet::new();
cloned_test local_set
.create_view( .run_until(async move {
&child_view_id, cloned_test
"workspace child view's child view".to_string(), .create_view(
) &child_view_id,
.await; "workspace child view's child view".to_string(),
}); )
.await;
})
.await;
let update = receive_with_timeout(rx, Duration::from_secs(30)) let update = receive_with_timeout(rx, Duration::from_secs(30))
.await .await
@ -81,22 +82,11 @@ async fn delete_view_subscription_test() {
let cloned_test = test.clone(); let cloned_test = test.clone();
let delete_view_id = workspace.views.first().unwrap().id.clone(); let delete_view_id = workspace.views.first().unwrap().id.clone();
let cloned_delete_view_id = delete_view_id.clone(); let cloned_delete_view_id = delete_view_id.clone();
test
.appflowy_core cloned_test.delete_view(&cloned_delete_view_id).await;
.dispatcher() let update = receive_with_timeout(rx, Duration::from_secs(60))
.spawn(async move {
cloned_test.delete_view(&cloned_delete_view_id).await;
})
.await .await
.unwrap(); .unwrap();
let update = test
.appflowy_core
.dispatcher()
.run_until(receive_with_timeout(rx, Duration::from_secs(60)))
.await
.unwrap();
assert_eq!(update.delete_child_views.len(), 1); assert_eq!(update.delete_child_views.len(), 1);
assert_eq!(update.delete_child_views[0], delete_view_id); assert_eq!(update.delete_child_views[0], delete_view_id);
} }
@ -114,17 +104,14 @@ async fn update_view_subscription_test() {
assert!(!view.is_favorite); assert!(!view.is_favorite);
let update_view_id = view.id.clone(); let update_view_id = view.id.clone();
test.appflowy_core.dispatcher().spawn(async move { cloned_test
cloned_test .update_view(UpdateViewPayloadPB {
.update_view(UpdateViewPayloadPB { view_id: update_view_id,
view_id: update_view_id, name: Some("hello world".to_string()),
name: Some("hello world".to_string()), is_favorite: Some(true),
is_favorite: Some(true), ..Default::default()
..Default::default() })
}) .await;
.await;
});
let update = receive_with_timeout(rx, Duration::from_secs(30)) let update = receive_with_timeout(rx, Duration::from_secs(30))
.await .await
.unwrap(); .unwrap();

View File

@ -5,6 +5,7 @@ use collab_folder::Folder;
use event_integration_test::user_event::user_localhost_af_cloud; use event_integration_test::user_event::user_localhost_af_cloud;
use event_integration_test::EventIntegrationTest; use event_integration_test::EventIntegrationTest;
use std::time::Duration; use std::time::Duration;
use tokio::task::LocalSet;
use tokio::time::sleep; use tokio::time::sleep;
use crate::user::af_cloud_test::util::get_synced_workspaces; use crate::user::af_cloud_test::util::get_synced_workspaces;
@ -158,9 +159,9 @@ async fn af_cloud_different_open_same_workspace_test() {
user_localhost_af_cloud().await; user_localhost_af_cloud().await;
// Set up the primary client and sign them up to the cloud. // Set up the primary client and sign them up to the cloud.
let client_1 = EventIntegrationTest::new().await; let test_runner = EventIntegrationTest::new().await;
let owner_profile = client_1.af_cloud_sign_up().await; let owner_profile = test_runner.af_cloud_sign_up().await;
let shared_workspace_id = client_1.get_current_workspace().await.id.clone(); let shared_workspace_id = test_runner.get_current_workspace().await.id.clone();
// Verify that the workspace ID from the profile matches the current session's workspace ID. // Verify that the workspace ID from the profile matches the current session's workspace ID.
assert_eq!(shared_workspace_id, owner_profile.workspace_id); assert_eq!(shared_workspace_id, owner_profile.workspace_id);
@ -181,7 +182,7 @@ async fn af_cloud_different_open_same_workspace_test() {
client.delete_view(&view.id).await; client.delete_view(&view.id).await;
} }
client_1 test_runner
.add_workspace_member(&owner_profile.workspace_id, &client) .add_workspace_member(&owner_profile.workspace_id, &client)
.await; .await;
clients.push((client, client_profile)); clients.push((client, client_profile));
@ -195,9 +196,10 @@ async fn af_cloud_different_open_same_workspace_test() {
// Simulate each client open different workspace 30 times // Simulate each client open different workspace 30 times
let mut handles = vec![]; let mut handles = vec![];
let local_set = LocalSet::new();
for client in clients.clone() { for client in clients.clone() {
let cloned_shared_workspace_id = shared_workspace_id.clone(); let cloned_shared_workspace_id = shared_workspace_id.clone();
let handle = tokio::spawn(async move { let handle = local_set.spawn_local(async move {
let (client, profile) = client; let (client, profile) = client;
let all_workspaces = get_synced_workspaces(&client, profile.id).await; let all_workspaces = get_synced_workspaces(&client, profile.id).await;
for i in 0..30 { for i in 0..30 {
@ -216,10 +218,16 @@ async fn af_cloud_different_open_same_workspace_test() {
}); });
handles.push(handle); handles.push(handle);
} }
futures::future::join_all(handles).await; let results = local_set
.run_until(futures::future::join_all(handles))
.await;
for result in results {
assert!(result.is_ok());
}
// Retrieve and verify the collaborative document state for Client 1's workspace. // Retrieve and verify the collaborative document state for Client 1's workspace.
let doc_state = client_1 let doc_state = test_runner
.get_collab_doc_state(&shared_workspace_id, CollabType::Folder) .get_collab_doc_state(&shared_workspace_id, CollabType::Folder)
.await .await
.unwrap(); .unwrap();

View File

@ -11,7 +11,6 @@ use client_api::error::AppResponseError;
use flowy_error::FlowyError; use flowy_error::FlowyError;
use futures::stream::BoxStream; use futures::stream::BoxStream;
use lib_infra::async_trait::async_trait; use lib_infra::async_trait::async_trait;
use lib_infra::future::FutureResult;
use serde_json::Value; use serde_json::Value;
use std::collections::HashMap; use std::collections::HashMap;
use std::path::Path; use std::path::Path;
@ -21,12 +20,12 @@ pub type StreamAnswer = BoxStream<'static, Result<QuestionStreamValue, FlowyErro
pub type StreamComplete = BoxStream<'static, Result<Bytes, FlowyError>>; pub type StreamComplete = BoxStream<'static, Result<Bytes, FlowyError>>;
#[async_trait] #[async_trait]
pub trait ChatCloudService: Send + Sync + 'static { pub trait ChatCloudService: Send + Sync + 'static {
fn create_chat( async fn create_chat(
&self, &self,
uid: &i64, uid: &i64,
workspace_id: &str, workspace_id: &str,
chat_id: &str, chat_id: &str,
) -> FutureResult<(), FlowyError>; ) -> Result<(), FlowyError>;
async fn create_question( async fn create_question(
&self, &self,

View File

@ -14,7 +14,6 @@ use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use lib_dispatch::prelude::{data_result_ok, AFPluginData, AFPluginState, DataResult}; use lib_dispatch::prelude::{data_result_ok, AFPluginData, AFPluginState, DataResult};
use lib_infra::isolate_stream::IsolateSink; use lib_infra::isolate_stream::IsolateSink;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use tokio::sync::oneshot;
use tracing::trace; use tracing::trace;
use validator::Validate; use validator::Validate;
@ -63,24 +62,18 @@ pub(crate) async fn stream_chat_message_handler(
.collect::<Vec<_>>(); .collect::<Vec<_>>();
trace!("Stream chat message with metadata: {:?}", metadata); trace!("Stream chat message with metadata: {:?}", metadata);
let (tx, rx) = oneshot::channel::<Result<ChatMessagePB, FlowyError>>();
let ai_manager = upgrade_ai_manager(ai_manager)?; let ai_manager = upgrade_ai_manager(ai_manager)?;
tokio::spawn(async move { let result = ai_manager
let result = ai_manager .stream_chat_message(
.stream_chat_message( &data.chat_id,
&data.chat_id, &data.message,
&data.message, message_type,
message_type, data.answer_stream_port,
data.answer_stream_port, data.question_stream_port,
data.question_stream_port, metadata,
metadata, )
) .await?;
.await; data_result_ok(result)
let _ = tx.send(result);
});
let question = rx.await??;
data_result_ok(question)
} }
#[tracing::instrument(level = "debug", skip_all, err)] #[tracing::instrument(level = "debug", skip_all, err)]
@ -120,15 +113,9 @@ pub(crate) async fn get_related_question_handler(
) -> DataResult<RepeatedRelatedQuestionPB, FlowyError> { ) -> DataResult<RepeatedRelatedQuestionPB, FlowyError> {
let ai_manager = upgrade_ai_manager(ai_manager)?; let ai_manager = upgrade_ai_manager(ai_manager)?;
let data = data.into_inner(); let data = data.into_inner();
let (tx, rx) = tokio::sync::oneshot::channel(); let messages = ai_manager
tokio::spawn(async move { .get_related_questions(&data.chat_id, data.message_id)
let messages = ai_manager .await?;
.get_related_questions(&data.chat_id, data.message_id)
.await?;
let _ = tx.send(messages);
Ok::<_, FlowyError>(())
});
let messages = rx.await?;
data_result_ok(messages) data_result_ok(messages)
} }
@ -139,15 +126,9 @@ pub(crate) async fn get_answer_handler(
) -> DataResult<ChatMessagePB, FlowyError> { ) -> DataResult<ChatMessagePB, FlowyError> {
let ai_manager = upgrade_ai_manager(ai_manager)?; let ai_manager = upgrade_ai_manager(ai_manager)?;
let data = data.into_inner(); let data = data.into_inner();
let (tx, rx) = tokio::sync::oneshot::channel(); let message = ai_manager
tokio::spawn(async move { .generate_answer(&data.chat_id, data.message_id)
let message = ai_manager .await?;
.generate_answer(&data.chat_id, data.message_id)
.await?;
let _ = tx.send(message);
Ok::<_, FlowyError>(())
});
let message = rx.await?;
data_result_ok(message) data_result_ok(message)
} }
@ -169,25 +150,17 @@ pub(crate) async fn refresh_local_ai_info_handler(
ai_manager: AFPluginState<Weak<AIManager>>, ai_manager: AFPluginState<Weak<AIManager>>,
) -> DataResult<LLMModelInfoPB, FlowyError> { ) -> DataResult<LLMModelInfoPB, FlowyError> {
let ai_manager = upgrade_ai_manager(ai_manager)?; let ai_manager = upgrade_ai_manager(ai_manager)?;
let (tx, rx) = oneshot::channel::<Result<LLMModelInfo, FlowyError>>(); let model_info = ai_manager.local_ai_controller.refresh().await;
tokio::spawn(async move { if model_info.is_err() {
let model_info = ai_manager.local_ai_controller.refresh().await; if let Some(llm_model) = ai_manager.local_ai_controller.get_current_model() {
if model_info.is_err() { let model_info = LLMModelInfo {
if let Some(llm_model) = ai_manager.local_ai_controller.get_current_model() { selected_model: llm_model.clone(),
let model_info = LLMModelInfo { models: vec![llm_model],
selected_model: llm_model.clone(), };
models: vec![llm_model], return data_result_ok(model_info.into());
};
let _ = tx.send(Ok(model_info));
return;
}
} }
}
let _ = tx.send(model_info); data_result_ok(model_info?.into())
});
let model_info = rx.await??;
data_result_ok(model_info.into())
} }
#[tracing::instrument(level = "debug", skip_all, err)] #[tracing::instrument(level = "debug", skip_all, err)]
@ -274,16 +247,9 @@ pub(crate) async fn chat_file_handler(
} }
tracing::debug!("File size: {} bytes", file_size); tracing::debug!("File size: {} bytes", file_size);
let ai_manager = upgrade_ai_manager(ai_manager)?;
let (tx, rx) = oneshot::channel::<Result<(), FlowyError>>(); ai_manager.chat_with_file(&data.chat_id, file_path).await?;
tokio::spawn(async move { Ok(())
let ai_manager = upgrade_ai_manager(ai_manager)?;
ai_manager.chat_with_file(&data.chat_id, file_path).await?;
let _ = tx.send(Ok(()));
Ok::<_, FlowyError>(())
});
rx.await?
} }
#[tracing::instrument(level = "debug", skip_all, err)] #[tracing::instrument(level = "debug", skip_all, err)]
@ -426,17 +392,10 @@ pub(crate) async fn get_offline_app_handler(
ai_manager: AFPluginState<Weak<AIManager>>, ai_manager: AFPluginState<Weak<AIManager>>,
) -> DataResult<OfflineAIPB, FlowyError> { ) -> DataResult<OfflineAIPB, FlowyError> {
let ai_manager = upgrade_ai_manager(ai_manager)?; let ai_manager = upgrade_ai_manager(ai_manager)?;
let (tx, rx) = oneshot::channel::<Result<String, FlowyError>>(); let link = ai_manager
tokio::spawn(async move { .local_ai_controller
let link = ai_manager .get_offline_ai_app_download_link()
.local_ai_controller .await?;
.get_offline_ai_app_download_link()
.await?;
let _ = tx.send(Ok(link));
Ok::<_, FlowyError>(())
});
let link = rx.await??;
data_result_ok(OfflineAIPB { link }) data_result_ok(OfflineAIPB { link })
} }

View File

@ -288,7 +288,7 @@ impl LocalAIResourceController {
while let Ok(value) = rx.recv().await { while let Ok(value) = rx.recv().await {
let is_finish = value == DOWNLOAD_FINISH; let is_finish = value == DOWNLOAD_FINISH;
if let Err(err) = progress_sink.send(value).await { if let Err(err) = progress_sink.send(value).await {
error!("Failed to send progress: {:?}", err); warn!("Failed to send progress: {:?}", err);
break; break;
} }

View File

@ -14,7 +14,6 @@ use flowy_ai_pub::cloud::{
use flowy_error::{FlowyError, FlowyResult}; use flowy_error::{FlowyError, FlowyResult};
use futures::{stream, Sink, StreamExt, TryStreamExt}; use futures::{stream, Sink, StreamExt, TryStreamExt};
use lib_infra::async_trait::async_trait; use lib_infra::async_trait::async_trait;
use lib_infra::future::FutureResult;
use crate::local_ai::stream_util::QuestionStream; use crate::local_ai::stream_util::QuestionStream;
use crate::stream_message::StreamMessage; use crate::stream_message::StreamMessage;
@ -108,13 +107,16 @@ impl AICloudServiceMiddleware {
#[async_trait] #[async_trait]
impl ChatCloudService for AICloudServiceMiddleware { impl ChatCloudService for AICloudServiceMiddleware {
fn create_chat( async fn create_chat(
&self, &self,
uid: &i64, uid: &i64,
workspace_id: &str, workspace_id: &str,
chat_id: &str, chat_id: &str,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
self.cloud_service.create_chat(uid, workspace_id, chat_id) self
.cloud_service
.create_chat(uid, workspace_id, chat_id)
.await
} }
async fn create_question( async fn create_question(

View File

@ -26,7 +26,7 @@ use flowy_sqlite::kv::KVStorePreferences;
use flowy_user::services::authenticate_user::AuthenticateUser; use flowy_user::services::authenticate_user::AuthenticateUser;
use flowy_user::services::data_import::{load_collab_by_object_id, load_collab_by_object_ids}; use flowy_user::services::data_import::{load_collab_by_object_id, load_collab_by_object_ids};
use lib_dispatch::prelude::ToBytes; use lib_dispatch::prelude::ToBytes;
use lib_infra::future::FutureResult;
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
@ -35,6 +35,7 @@ use tokio::sync::RwLock;
use crate::integrate::server::ServerProvider; use crate::integrate::server::ServerProvider;
use collab_plugins::local_storage::kv::KVTransactionDB; use collab_plugins::local_storage::kv::KVTransactionDB;
use lib_infra::async_trait::async_trait;
pub struct FolderDepsResolver(); pub struct FolderDepsResolver();
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
@ -113,363 +114,305 @@ impl FolderUser for FolderUserImpl {
} }
struct DocumentFolderOperation(Arc<DocumentManager>); struct DocumentFolderOperation(Arc<DocumentManager>);
#[async_trait]
impl FolderOperationHandler for DocumentFolderOperation { impl FolderOperationHandler for DocumentFolderOperation {
fn create_workspace_view( async fn create_workspace_view(
&self, &self,
uid: i64, uid: i64,
workspace_view_builder: Arc<RwLock<NestedViewBuilder>>, workspace_view_builder: Arc<RwLock<NestedViewBuilder>>,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
let manager = self.0.clone(); let manager = self.0.clone();
FutureResult::new(async move { let mut write_guard = workspace_view_builder.write().await;
let mut write_guard = workspace_view_builder.write().await; // Create a view named "Getting started" with an icon ⭐️ and the built-in README data.
// Don't modify this code unless you know what you are doing.
// Create a view named "Getting started" with an icon ⭐️ and the built-in README data. write_guard
// Don't modify this code unless you know what you are doing. .with_view_builder(|view_builder| async {
write_guard let view = view_builder
.with_view_builder(|view_builder| async { .with_name("Getting started")
let view = view_builder .with_icon("⭐️")
.with_name("Getting started") .build();
.with_icon("⭐️") // create a empty document
.build(); let json_str = include_str!("../../assets/read_me.json");
// create a empty document let document_pb = JsonToDocumentParser::json_str_to_document(json_str).unwrap();
let json_str = include_str!("../../assets/read_me.json"); manager
let document_pb = JsonToDocumentParser::json_str_to_document(json_str).unwrap(); .create_document(uid, &view.parent_view.id, Some(document_pb.into()))
manager .await
.create_document(uid, &view.parent_view.id, Some(document_pb.into())) .unwrap();
.await view
.unwrap(); })
view .await;
}) Ok(())
.await;
Ok(())
})
} }
fn open_view(&self, view_id: &str) -> FutureResult<(), FlowyError> { async fn open_view(&self, view_id: &str) -> Result<(), FlowyError> {
let manager = self.0.clone(); self.0.open_document(view_id).await?;
let view_id = view_id.to_string(); Ok(())
FutureResult::new(async move {
manager.open_document(&view_id).await?;
Ok(())
})
} }
/// Close the document view. /// Close the document view.
fn close_view(&self, view_id: &str) -> FutureResult<(), FlowyError> { async fn close_view(&self, view_id: &str) -> Result<(), FlowyError> {
let manager = self.0.clone(); self.0.close_document(view_id).await?;
let view_id = view_id.to_string(); Ok(())
FutureResult::new(async move {
manager.close_document(&view_id).await?;
Ok(())
})
} }
fn delete_view(&self, view_id: &str) -> FutureResult<(), FlowyError> { async fn delete_view(&self, view_id: &str) -> Result<(), FlowyError> {
let manager = self.0.clone(); match self.0.delete_document(view_id).await {
let view_id = view_id.to_string(); Ok(_) => tracing::trace!("Delete document: {}", view_id),
FutureResult::new(async move { Err(e) => tracing::error!("🔴delete document failed: {}", e),
match manager.delete_document(&view_id).await { }
Ok(_) => tracing::trace!("Delete document: {}", view_id), Ok(())
Err(e) => tracing::error!("🔴delete document failed: {}", e),
}
Ok(())
})
} }
fn duplicate_view(&self, view_id: &str) -> FutureResult<Bytes, FlowyError> { async fn duplicate_view(&self, view_id: &str) -> Result<Bytes, FlowyError> {
let manager = self.0.clone(); let data: DocumentDataPB = self.0.get_document_data(view_id).await?.into();
let view_id = view_id.to_string(); let data_bytes = data.into_bytes().map_err(|_| FlowyError::invalid_data())?;
FutureResult::new(async move { Ok(data_bytes)
let data: DocumentDataPB = manager.get_document_data(&view_id).await?.into();
let data_bytes = data.into_bytes().map_err(|_| FlowyError::invalid_data())?;
Ok(data_bytes)
})
} }
fn create_view_with_view_data( async fn create_view_with_view_data(
&self, &self,
user_id: i64, user_id: i64,
params: CreateViewParams, params: CreateViewParams,
) -> FutureResult<Option<EncodedCollab>, FlowyError> { ) -> Result<Option<EncodedCollab>, FlowyError> {
debug_assert_eq!(params.layout, ViewLayoutPB::Document); debug_assert_eq!(params.layout, ViewLayoutPB::Document);
let view_id = params.view_id.to_string(); let data = DocumentDataPB::try_from(Bytes::from(params.initial_data))?;
let manager = self.0.clone(); let encoded_collab = self
FutureResult::new(async move { .0
let data = DocumentDataPB::try_from(Bytes::from(params.initial_data))?; .create_document(user_id, &params.view_id, Some(data.into()))
let encoded_collab = manager .await?;
.create_document(user_id, &view_id, Some(data.into())) Ok(Some(encoded_collab))
.await?;
Ok(Some(encoded_collab))
})
} }
fn get_encoded_collab_v1_from_disk( async fn get_encoded_collab_v1_from_disk(
&self, &self,
user: Arc<dyn FolderUser>, user: Arc<dyn FolderUser>,
view_id: &str, view_id: &str,
) -> FutureResult<EncodedCollabWrapper, FlowyError> { ) -> Result<EncodedCollabWrapper, FlowyError> {
// get the collab_object_id for the document. // get the collab_object_id for the document.
// the collab_object_id for the document is the view_id. // the collab_object_id for the document is the view_id.
let oid = view_id.to_string();
FutureResult::new(async move { let uid = user
let uid = user .user_id()
.user_id() .map_err(|e| e.with_context("unable to get the uid: {}"))?;
.map_err(|e| e.with_context("unable to get the uid: {}"))?;
// get the collab db // get the collab db
let collab_db = user let collab_db = user
.collab_db(uid) .collab_db(uid)
.map_err(|e| e.with_context("unable to get the collab"))?; .map_err(|e| e.with_context("unable to get the collab"))?;
let collab_db = collab_db.upgrade().ok_or_else(|| { let collab_db = collab_db.upgrade().ok_or_else(|| {
FlowyError::internal().with_context( FlowyError::internal().with_context(
"The collab db has been dropped, indicating that the user has switched to a new account", "The collab db has been dropped, indicating that the user has switched to a new account",
) )
})?; })?;
let collab_read_txn = collab_db.read_txn(); let collab_read_txn = collab_db.read_txn();
// read the collab from the db // read the collab from the db
let collab = load_collab_by_object_id(uid, &collab_read_txn, &oid).map_err(|e| { let collab = load_collab_by_object_id(uid, &collab_read_txn, view_id).map_err(|e| {
FlowyError::internal().with_context(format!("load document collab failed: {}", e)) FlowyError::internal().with_context(format!("load document collab failed: {}", e))
})?; })?;
let encoded_collab = collab let encoded_collab = collab
// encode the collab and check the integrity of the collab // encode the collab and check the integrity of the collab
.encode_collab_v1(|collab| CollabType::Document.validate_require_data(collab)) .encode_collab_v1(|collab| CollabType::Document.validate_require_data(collab))
.map_err(|e| { .map_err(|e| {
FlowyError::internal().with_context(format!("encode document collab failed: {}", e)) FlowyError::internal().with_context(format!("encode document collab failed: {}", e))
})?; })?;
Ok(EncodedCollabWrapper::Document(DocumentEncodedCollab { Ok(EncodedCollabWrapper::Document(DocumentEncodedCollab {
document_encoded_collab: encoded_collab, document_encoded_collab: encoded_collab,
})) }))
})
} }
/// Create a view with built-in data. /// Create a view with built-in data.
fn create_built_in_view( async fn create_built_in_view(
&self, &self,
user_id: i64, user_id: i64,
view_id: &str, view_id: &str,
_name: &str, _name: &str,
layout: ViewLayout, layout: ViewLayout,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
debug_assert_eq!(layout, ViewLayout::Document); debug_assert_eq!(layout, ViewLayout::Document);
let view_id = view_id.to_string(); match self.0.create_document(user_id, view_id, None).await {
let manager = self.0.clone(); Ok(_) => Ok(()),
FutureResult::new(async move { Err(err) => {
match manager.create_document(user_id, &view_id, None).await { if err.is_already_exists() {
Ok(_) => Ok(()), Ok(())
Err(err) => { } else {
if err.is_already_exists() { Err(err)
Ok(()) }
} else { },
Err(err) }
}
},
}
})
} }
fn import_from_bytes( async fn import_from_bytes(
&self, &self,
uid: i64, uid: i64,
view_id: &str, view_id: &str,
_name: &str, _name: &str,
_import_type: ImportType, _import_type: ImportType,
bytes: Vec<u8>, bytes: Vec<u8>,
) -> FutureResult<EncodedCollab, FlowyError> { ) -> Result<EncodedCollab, FlowyError> {
let view_id = view_id.to_string(); let data = DocumentDataPB::try_from(Bytes::from(bytes))?;
let manager = self.0.clone(); let encoded_collab = self
FutureResult::new(async move { .0
let data = DocumentDataPB::try_from(Bytes::from(bytes))?; .create_document(uid, view_id, Some(data.into()))
let encoded_collab = manager .await?;
.create_document(uid, &view_id, Some(data.into())) Ok(encoded_collab)
.await?;
Ok(encoded_collab)
})
} }
// will implement soon // will implement soon
fn import_from_file_path( async fn import_from_file_path(
&self, &self,
_view_id: &str, _view_id: &str,
_name: &str, _name: &str,
_path: String, _path: String,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
FutureResult::new(async move { Ok(()) }) Ok(())
} }
} }
struct DatabaseFolderOperation(Arc<DatabaseManager>); struct DatabaseFolderOperation(Arc<DatabaseManager>);
#[async_trait]
impl FolderOperationHandler for DatabaseFolderOperation { impl FolderOperationHandler for DatabaseFolderOperation {
fn open_view(&self, view_id: &str) -> FutureResult<(), FlowyError> { async fn open_view(&self, view_id: &str) -> Result<(), FlowyError> {
let database_manager = self.0.clone(); self.0.open_database_view(view_id).await?;
let view_id = view_id.to_string(); Ok(())
FutureResult::new(async move {
database_manager.open_database_view(view_id).await?;
Ok(())
})
} }
fn close_view(&self, view_id: &str) -> FutureResult<(), FlowyError> { async fn close_view(&self, view_id: &str) -> Result<(), FlowyError> {
let database_manager = self.0.clone(); self.0.close_database_view(view_id).await?;
let view_id = view_id.to_string(); Ok(())
FutureResult::new(async move {
database_manager.close_database_view(view_id).await?;
Ok(())
})
} }
fn delete_view(&self, view_id: &str) -> FutureResult<(), FlowyError> { async fn delete_view(&self, view_id: &str) -> Result<(), FlowyError> {
let database_manager = self.0.clone(); match self.0.delete_database_view(view_id).await {
let view_id = view_id.to_string(); Ok(_) => tracing::trace!("Delete database view: {}", view_id),
FutureResult::new(async move { Err(e) => tracing::error!("🔴delete database failed: {}", e),
match database_manager.delete_database_view(&view_id).await { }
Ok(_) => tracing::trace!("Delete database view: {}", view_id), Ok(())
Err(e) => tracing::error!("🔴delete database failed: {}", e),
}
Ok(())
})
} }
fn get_encoded_collab_v1_from_disk( async fn get_encoded_collab_v1_from_disk(
&self, &self,
user: Arc<dyn FolderUser>, user: Arc<dyn FolderUser>,
view_id: &str, view_id: &str,
) -> FutureResult<EncodedCollabWrapper, FlowyError> { ) -> Result<EncodedCollabWrapper, FlowyError> {
let manager = self.0.clone(); // get the collab_object_id for the database.
let view_id = view_id.to_string(); //
// the collab object_id for the database is not the view_id,
// we should use the view_id to get the database_id
let oid = self.0.get_database_id_with_view_id(view_id).await?;
let row_oids = self.0.get_database_row_ids_with_view_id(view_id).await?;
let row_oids = row_oids
.into_iter()
.map(|oid| oid.into_inner())
.collect::<Vec<_>>();
let database_metas = self.0.get_all_databases_meta().await;
FutureResult::new(async move { let uid = user
// get the collab_object_id for the database. .user_id()
// .map_err(|e| e.with_context("unable to get the uid: {}"))?;
// the collab object_id for the database is not the view_id,
// we should use the view_id to get the database_id
let oid = manager.get_database_id_with_view_id(&view_id).await?;
let row_oids = manager.get_database_row_ids_with_view_id(&view_id).await?;
let row_oids = row_oids
.into_iter()
.map(|oid| oid.into_inner())
.collect::<Vec<_>>();
let database_metas = manager.get_all_databases_meta().await;
let uid = user // get the collab db
.user_id() let collab_db = user
.map_err(|e| e.with_context("unable to get the uid: {}"))?; .collab_db(uid)
.map_err(|e| e.with_context("unable to get the collab"))?;
let collab_db = collab_db.upgrade().ok_or_else(|| {
FlowyError::internal().with_context(
"The collab db has been dropped, indicating that the user has switched to a new account",
)
})?;
// get the collab db let collab_read_txn = collab_db.read_txn();
let collab_db = user
.collab_db(uid)
.map_err(|e| e.with_context("unable to get the collab"))?;
let collab_db = collab_db.upgrade().ok_or_else(|| {
FlowyError::internal().with_context(
"The collab db has been dropped, indicating that the user has switched to a new account",
)
})?;
let collab_read_txn = collab_db.read_txn(); // read the database collab from the db
let database_collab = load_collab_by_object_id(uid, &collab_read_txn, &oid).map_err(|e| {
FlowyError::internal().with_context(format!("load database collab failed: {}", e))
})?;
// read the database collab from the db let database_encoded_collab = database_collab
let database_collab = load_collab_by_object_id(uid, &collab_read_txn, &oid).map_err(|e| {
FlowyError::internal().with_context(format!("load database collab failed: {}", e))
})?;
let database_encoded_collab = database_collab
// encode the collab and check the integrity of the collab // encode the collab and check the integrity of the collab
.encode_collab_v1(|collab| CollabType::Database.validate_require_data(collab)) .encode_collab_v1(|collab| CollabType::Database.validate_require_data(collab))
.map_err(|e| { .map_err(|e| {
FlowyError::internal().with_context(format!("encode database collab failed: {}", e)) FlowyError::internal().with_context(format!("encode database collab failed: {}", e))
})?; })?;
// read the database rows collab from the db // read the database rows collab from the db
let database_row_collabs = load_collab_by_object_ids(uid, &collab_read_txn, &row_oids); let database_row_collabs = load_collab_by_object_ids(uid, &collab_read_txn, &row_oids);
let database_row_encoded_collabs = database_row_collabs let database_row_encoded_collabs = database_row_collabs
.into_iter() .into_iter()
.map(|(oid, collab)| { .map(|(oid, collab)| {
// encode the collab and check the integrity of the collab // encode the collab and check the integrity of the collab
let encoded_collab = collab let encoded_collab = collab
.encode_collab_v1(|collab| CollabType::DatabaseRow.validate_require_data(collab)) .encode_collab_v1(|collab| CollabType::DatabaseRow.validate_require_data(collab))
.map_err(|e| { .map_err(|e| {
FlowyError::internal() FlowyError::internal().with_context(format!("encode database row collab failed: {}", e))
.with_context(format!("encode database row collab failed: {}", e)) })?;
})?; Ok((oid, encoded_collab))
Ok((oid, encoded_collab)) })
}) .collect::<Result<HashMap<_, _>, FlowyError>>()?;
.collect::<Result<HashMap<_, _>, FlowyError>>()?;
// get the relation info from the database meta // get the relation info from the database meta
let database_relations = database_metas let database_relations = database_metas
.into_iter() .into_iter()
.filter_map(|meta| { .filter_map(|meta| {
let linked_views = meta.linked_views.into_iter().next()?; let linked_views = meta.linked_views.into_iter().next()?;
Some((meta.database_id, linked_views)) Some((meta.database_id, linked_views))
}) })
.collect::<HashMap<_, _>>(); .collect::<HashMap<_, _>>();
Ok(EncodedCollabWrapper::Database(DatabaseEncodedCollab { Ok(EncodedCollabWrapper::Database(DatabaseEncodedCollab {
database_encoded_collab, database_encoded_collab,
database_row_encoded_collabs, database_row_encoded_collabs,
database_relations, database_relations,
})) }))
})
} }
fn duplicate_view(&self, view_id: &str) -> FutureResult<Bytes, FlowyError> { async fn duplicate_view(&self, view_id: &str) -> Result<Bytes, FlowyError> {
let database_manager = self.0.clone(); let delta_bytes = self.0.duplicate_database(view_id).await?;
let view_id = view_id.to_owned(); Ok(Bytes::from(delta_bytes))
FutureResult::new(async move {
let delta_bytes = database_manager.duplicate_database(&view_id).await?;
Ok(Bytes::from(delta_bytes))
})
} }
/// Create a database view with duplicated data. /// Create a database view with duplicated data.
/// If the ext contains the {"database_id": "xx"}, then it will link /// If the ext contains the {"database_id": "xx"}, then it will link
/// to the existing database. /// to the existing database.
fn create_view_with_view_data( async fn create_view_with_view_data(
&self, &self,
_user_id: i64, _user_id: i64,
params: CreateViewParams, params: CreateViewParams,
) -> FutureResult<Option<EncodedCollab>, FlowyError> { ) -> Result<Option<EncodedCollab>, FlowyError> {
match CreateDatabaseExtParams::from_map(params.meta.clone()) { match CreateDatabaseExtParams::from_map(params.meta.clone()) {
None => { None => {
let database_manager = self.0.clone(); let encoded_collab = self
let view_id = params.view_id.to_string(); .0
FutureResult::new(async move { .create_database_with_database_data(&params.view_id, params.initial_data)
let encoded_collab = database_manager .await?;
.create_database_with_database_data(&view_id, params.initial_data) Ok(Some(encoded_collab))
.await?;
Ok(Some(encoded_collab))
})
}, },
Some(database_params) => { Some(database_params) => {
let database_manager = self.0.clone();
let layout = match params.layout { let layout = match params.layout {
ViewLayoutPB::Board => DatabaseLayoutPB::Board, ViewLayoutPB::Board => DatabaseLayoutPB::Board,
ViewLayoutPB::Calendar => DatabaseLayoutPB::Calendar, ViewLayoutPB::Calendar => DatabaseLayoutPB::Calendar,
ViewLayoutPB::Grid => DatabaseLayoutPB::Grid, ViewLayoutPB::Grid => DatabaseLayoutPB::Grid,
ViewLayoutPB::Document | ViewLayoutPB::Chat => { ViewLayoutPB::Document | ViewLayoutPB::Chat => {
return FutureResult::new(async move { Err(FlowyError::not_support()) }); return Err(FlowyError::not_support());
}, },
}; };
let name = params.name.to_string(); let name = params.name.to_string();
let database_view_id = params.view_id.to_string(); let database_view_id = params.view_id.to_string();
let database_parent_view_id = params.parent_view_id.to_string(); let database_parent_view_id = params.parent_view_id.to_string();
self
FutureResult::new(async move { .0
database_manager .create_linked_view(
.create_linked_view( name,
name, layout.into(),
layout.into(), database_params.database_id,
database_params.database_id, database_view_id,
database_view_id, database_parent_view_id,
database_parent_view_id, )
) .await?;
.await?; Ok(None)
Ok(None)
})
}, },
} }
} }
@ -478,110 +421,90 @@ impl FolderOperationHandler for DatabaseFolderOperation {
/// If the ext contains the {"database_id": "xx"}, then it will link to /// If the ext contains the {"database_id": "xx"}, then it will link to
/// the existing database. The data of the database will be shared within /// the existing database. The data of the database will be shared within
/// these references views. /// these references views.
fn create_built_in_view( async fn create_built_in_view(
&self, &self,
_user_id: i64, _user_id: i64,
view_id: &str, view_id: &str,
name: &str, name: &str,
layout: ViewLayout, layout: ViewLayout,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
let name = name.to_string(); let name = name.to_string();
let database_manager = self.0.clone();
let data = match layout { let data = match layout {
ViewLayout::Grid => make_default_grid(view_id, &name), ViewLayout::Grid => make_default_grid(view_id, &name),
ViewLayout::Board => make_default_board(view_id, &name), ViewLayout::Board => make_default_board(view_id, &name),
ViewLayout::Calendar => make_default_calendar(view_id, &name), ViewLayout::Calendar => make_default_calendar(view_id, &name),
ViewLayout::Document => { ViewLayout::Document | ViewLayout::Chat => {
return FutureResult::new(async move { return Err(
Err(FlowyError::internal().with_context(format!("Can't handle {:?} layout type", layout))) FlowyError::internal().with_context(format!("Can't handle {:?} layout type", layout)),
}); );
},
ViewLayout::Chat => {
// TODO(nathan): AI
todo!("AI")
}, },
}; };
FutureResult::new(async move { let result = self.0.create_database_with_params(data).await;
let result = database_manager.create_database_with_params(data).await; match result {
match result { Ok(_) => Ok(()),
Ok(_) => Ok(()), Err(err) => {
Err(err) => { if err.is_already_exists() {
if err.is_already_exists() { Ok(())
Ok(()) } else {
} else { Err(err)
Err(err) }
} },
}, }
}
})
} }
fn import_from_bytes( async fn import_from_bytes(
&self, &self,
_uid: i64, _uid: i64,
view_id: &str, view_id: &str,
_name: &str, _name: &str,
import_type: ImportType, import_type: ImportType,
bytes: Vec<u8>, bytes: Vec<u8>,
) -> FutureResult<EncodedCollab, FlowyError> { ) -> Result<EncodedCollab, FlowyError> {
let database_manager = self.0.clone();
let view_id = view_id.to_string();
let format = match import_type { let format = match import_type {
ImportType::CSV => CSVFormat::Original, ImportType::CSV => CSVFormat::Original,
ImportType::HistoryDatabase => CSVFormat::META, ImportType::HistoryDatabase => CSVFormat::META,
ImportType::RawDatabase => CSVFormat::META, ImportType::RawDatabase => CSVFormat::META,
_ => CSVFormat::Original, _ => CSVFormat::Original,
}; };
FutureResult::new(async move { let content = tokio::task::spawn_blocking(move || {
let content = tokio::task::spawn_blocking(move || { String::from_utf8(bytes).map_err(|err| FlowyError::internal().with_context(err))
String::from_utf8(bytes).map_err(|err| FlowyError::internal().with_context(err))
})
.await??;
let result = database_manager
.import_csv(view_id, content, format)
.await?;
Ok(result.encoded_collab)
}) })
.await??;
let result = self
.0
.import_csv(view_id.to_string(), content, format)
.await?;
Ok(result.encoded_collab)
} }
fn import_from_file_path( async fn import_from_file_path(
&self, &self,
_view_id: &str, _view_id: &str,
_name: &str, _name: &str,
path: String, path: String,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
let database_manager = self.0.clone(); self.0.import_csv_from_file(path, CSVFormat::META).await?;
FutureResult::new(async move { Ok(())
database_manager
.import_csv_from_file(path, CSVFormat::META)
.await?;
Ok(())
})
} }
fn did_update_view(&self, old: &View, new: &View) -> FutureResult<(), FlowyError> { async fn did_update_view(&self, old: &View, new: &View) -> Result<(), FlowyError> {
let database_layout = match new.layout { let database_layout = match new.layout {
ViewLayout::Document | ViewLayout::Chat => { ViewLayout::Document | ViewLayout::Chat => {
return FutureResult::new(async { return Err(FlowyError::internal().with_context("Can't handle document layout type"));
Err(FlowyError::internal().with_context("Can't handle document layout type"))
});
}, },
ViewLayout::Grid => DatabaseLayoutPB::Grid, ViewLayout::Grid => DatabaseLayoutPB::Grid,
ViewLayout::Board => DatabaseLayoutPB::Board, ViewLayout::Board => DatabaseLayoutPB::Board,
ViewLayout::Calendar => DatabaseLayoutPB::Calendar, ViewLayout::Calendar => DatabaseLayoutPB::Calendar,
}; };
let database_manager = self.0.clone();
let view_id = new.id.clone();
if old.layout != new.layout { if old.layout != new.layout {
FutureResult::new(async move { self
database_manager .0
.update_database_layout(&view_id, database_layout) .update_database_layout(&new.id, database_layout)
.await?; .await?;
Ok(()) Ok(())
})
} else { } else {
FutureResult::new(async move { Ok(()) }) Ok(())
} }
} }
} }
@ -599,78 +522,61 @@ impl CreateDatabaseExtParams {
} }
struct ChatFolderOperation(Arc<AIManager>); struct ChatFolderOperation(Arc<AIManager>);
#[async_trait]
impl FolderOperationHandler for ChatFolderOperation { impl FolderOperationHandler for ChatFolderOperation {
fn open_view(&self, view_id: &str) -> FutureResult<(), FlowyError> { async fn open_view(&self, view_id: &str) -> Result<(), FlowyError> {
let manager = self.0.clone(); self.0.open_chat(view_id).await
let view_id = view_id.to_string();
FutureResult::new(async move {
manager.open_chat(&view_id).await?;
Ok(())
})
} }
fn close_view(&self, view_id: &str) -> FutureResult<(), FlowyError> { async fn close_view(&self, view_id: &str) -> Result<(), FlowyError> {
let manager = self.0.clone(); self.0.close_chat(view_id).await
let view_id = view_id.to_string();
FutureResult::new(async move {
manager.close_chat(&view_id).await?;
Ok(())
})
} }
fn delete_view(&self, view_id: &str) -> FutureResult<(), FlowyError> { async fn delete_view(&self, view_id: &str) -> Result<(), FlowyError> {
let manager = self.0.clone(); self.0.delete_chat(view_id).await
let view_id = view_id.to_string();
FutureResult::new(async move {
manager.delete_chat(&view_id).await?;
Ok(())
})
} }
fn duplicate_view(&self, _view_id: &str) -> FutureResult<ViewData, FlowyError> { async fn duplicate_view(&self, _view_id: &str) -> Result<ViewData, FlowyError> {
FutureResult::new(async move { Err(FlowyError::not_support()) }) Err(FlowyError::not_support())
} }
fn create_view_with_view_data( async fn create_view_with_view_data(
&self, &self,
_user_id: i64, _user_id: i64,
_params: CreateViewParams, _params: CreateViewParams,
) -> FutureResult<Option<EncodedCollab>, FlowyError> { ) -> Result<Option<EncodedCollab>, FlowyError> {
FutureResult::new(async move { Err(FlowyError::not_support()) }) Err(FlowyError::not_support())
} }
fn create_built_in_view( async fn create_built_in_view(
&self, &self,
user_id: i64, user_id: i64,
view_id: &str, view_id: &str,
_name: &str, _name: &str,
_layout: ViewLayout, _layout: ViewLayout,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
let manager = self.0.clone(); self.0.create_chat(&user_id, view_id).await?;
let view_id = view_id.to_string(); Ok(())
FutureResult::new(async move {
manager.create_chat(&user_id, &view_id).await?;
Ok(())
})
} }
fn import_from_bytes( async fn import_from_bytes(
&self, &self,
_uid: i64, _uid: i64,
_view_id: &str, _view_id: &str,
_name: &str, _name: &str,
_import_type: ImportType, _import_type: ImportType,
_bytes: Vec<u8>, _bytes: Vec<u8>,
) -> FutureResult<EncodedCollab, FlowyError> { ) -> Result<EncodedCollab, FlowyError> {
FutureResult::new(async move { Err(FlowyError::not_support()) }) Err(FlowyError::not_support())
} }
fn import_from_file_path( async fn import_from_file_path(
&self, &self,
_view_id: &str, _view_id: &str,
_name: &str, _name: &str,
_path: String, _path: String,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
FutureResult::new(async move { Err(FlowyError::not_support()) }) Err(FlowyError::not_support())
} }
} }

View File

@ -595,22 +595,17 @@ impl CollabCloudPluginProvider for ServerProvider {
#[async_trait] #[async_trait]
impl ChatCloudService for ServerProvider { impl ChatCloudService for ServerProvider {
fn create_chat( async fn create_chat(
&self, &self,
uid: &i64, uid: &i64,
workspace_id: &str, workspace_id: &str,
chat_id: &str, chat_id: &str,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
let workspace_id = workspace_id.to_string();
let server = self.get_server(); let server = self.get_server();
let chat_id = chat_id.to_string(); server?
let uid = *uid; .chat_service()
FutureResult::new(async move { .create_chat(uid, workspace_id, chat_id)
server? .await
.chat_service()
.create_chat(&uid, &workspace_id, &chat_id)
.await
})
} }
async fn create_question( async fn create_question(

View File

@ -15,7 +15,7 @@ use flowy_storage::manager::StorageManager;
use flowy_user::event_map::UserStatusCallback; use flowy_user::event_map::UserStatusCallback;
use flowy_user_pub::cloud::{UserCloudConfig, UserCloudServiceProvider}; use flowy_user_pub::cloud::{UserCloudConfig, UserCloudServiceProvider};
use flowy_user_pub::entities::{Authenticator, UserProfile, UserWorkspace}; use flowy_user_pub::entities::{Authenticator, UserProfile, UserWorkspace};
use lib_infra::future::{to_fut, Fut}; use lib_infra::async_trait::async_trait;
use crate::integrate::server::{Server, ServerProvider}; use crate::integrate::server::{Server, ServerProvider};
@ -29,21 +29,16 @@ pub(crate) struct UserStatusCallbackImpl {
pub(crate) ai_manager: Arc<AIManager>, pub(crate) ai_manager: Arc<AIManager>,
} }
#[async_trait]
impl UserStatusCallback for UserStatusCallbackImpl { impl UserStatusCallback for UserStatusCallbackImpl {
fn did_init( async fn did_init(
&self, &self,
user_id: i64, user_id: i64,
user_authenticator: &Authenticator, user_authenticator: &Authenticator,
cloud_config: &Option<UserCloudConfig>, cloud_config: &Option<UserCloudConfig>,
user_workspace: &UserWorkspace, user_workspace: &UserWorkspace,
_device_id: &str, _device_id: &str,
) -> Fut<FlowyResult<()>> { ) -> FlowyResult<()> {
let user_id = user_id.to_owned();
let user_workspace = user_workspace.clone();
let folder_manager = self.folder_manager.clone();
let database_manager = self.database_manager.clone();
let document_manager = self.document_manager.clone();
self self
.server_provider .server_provider
.set_user_authenticator(user_authenticator); .set_user_authenticator(user_authenticator);
@ -59,159 +54,142 @@ impl UserStatusCallback for UserStatusCallbackImpl {
} }
} }
to_fut(async move { self
folder_manager .folder_manager
.initialize( .initialize(
user_id, user_id,
&user_workspace.id, &user_workspace.id,
FolderInitDataSource::LocalDisk { FolderInitDataSource::LocalDisk {
create_if_not_exist: false, create_if_not_exist: false,
}, },
) )
.await?; .await?;
database_manager.initialize(user_id).await?; self.database_manager.initialize(user_id).await?;
document_manager.initialize(user_id).await?; self.document_manager.initialize(user_id).await?;
Ok(()) Ok(())
})
} }
fn did_sign_in( async fn did_sign_in(
&self, &self,
user_id: i64, user_id: i64,
user_workspace: &UserWorkspace, user_workspace: &UserWorkspace,
device_id: &str, device_id: &str,
) -> Fut<FlowyResult<()>> { ) -> FlowyResult<()> {
let device_id = device_id.to_owned(); event!(
let user_id = user_id.to_owned(); tracing::Level::TRACE,
let user_workspace = user_workspace.clone(); "Notify did sign in: latest_workspace: {:?}, device_id: {}",
let folder_manager = self.folder_manager.clone(); user_workspace,
let database_manager = self.database_manager.clone(); device_id
let document_manager = self.document_manager.clone(); );
to_fut(async move { self
event!( .folder_manager
tracing::Level::TRACE, .initialize_with_workspace_id(user_id)
"Notify did sign in: latest_workspace: {:?}, device_id: {}", .await?;
user_workspace, self.database_manager.initialize(user_id).await?;
device_id self.document_manager.initialize(user_id).await?;
); Ok(())
folder_manager.initialize_with_workspace_id(user_id).await?;
database_manager.initialize(user_id).await?;
document_manager.initialize(user_id).await?;
Ok(())
})
} }
fn did_sign_up( async fn did_sign_up(
&self, &self,
is_new_user: bool, is_new_user: bool,
user_profile: &UserProfile, user_profile: &UserProfile,
user_workspace: &UserWorkspace, user_workspace: &UserWorkspace,
device_id: &str, device_id: &str,
) -> Fut<FlowyResult<()>> { ) -> FlowyResult<()> {
let device_id = device_id.to_owned();
let user_profile = user_profile.clone();
let folder_manager = self.folder_manager.clone();
let database_manager = self.database_manager.clone();
let user_workspace = user_workspace.clone();
let document_manager = self.document_manager.clone();
self self
.server_provider .server_provider
.set_user_authenticator(&user_profile.authenticator); .set_user_authenticator(&user_profile.authenticator);
let server_type = self.server_provider.get_server_type(); let server_type = self.server_provider.get_server_type();
to_fut(async move { event!(
event!( tracing::Level::TRACE,
tracing::Level::TRACE, "Notify did sign up: is new: {} user_workspace: {:?}, device_id: {}",
"Notify did sign up: is new: {} user_workspace: {:?}, device_id: {}", is_new_user,
is_new_user, user_workspace,
user_workspace, device_id
device_id );
);
// In the current implementation, when a user signs up for AppFlowy Cloud, a default workspace // In the current implementation, when a user signs up for AppFlowy Cloud, a default workspace
// is automatically created for them. However, for users who sign up through Supabase, the creation // is automatically created for them. However, for users who sign up through Supabase, the creation
// of the default workspace relies on the client-side operation. This means that the process // of the default workspace relies on the client-side operation. This means that the process
// for initializing a default workspace differs depending on the sign-up method used. // for initializing a default workspace differs depending on the sign-up method used.
let data_source = match folder_manager let data_source = match self
.cloud_service .folder_manager
.get_folder_doc_state( .cloud_service
&user_workspace.id, .get_folder_doc_state(
user_profile.uid, &user_workspace.id,
CollabType::Folder, user_profile.uid,
&user_workspace.id, CollabType::Folder,
) &user_workspace.id,
.await )
{ .await
Ok(doc_state) => match server_type { {
Server::Local => FolderInitDataSource::LocalDisk { Ok(doc_state) => match server_type {
create_if_not_exist: true, Server::Local => FolderInitDataSource::LocalDisk {
}, create_if_not_exist: true,
Server::AppFlowyCloud => FolderInitDataSource::Cloud(doc_state), },
Server::Supabase => { Server::AppFlowyCloud => FolderInitDataSource::Cloud(doc_state),
if is_new_user { Server::Supabase => {
FolderInitDataSource::LocalDisk { if is_new_user {
create_if_not_exist: true, FolderInitDataSource::LocalDisk {
} create_if_not_exist: true,
} else {
FolderInitDataSource::Cloud(doc_state)
} }
}, } else {
FolderInitDataSource::Cloud(doc_state)
}
}, },
Err(err) => match server_type { },
Server::Local => FolderInitDataSource::LocalDisk { Err(err) => match server_type {
create_if_not_exist: true, Server::Local => FolderInitDataSource::LocalDisk {
}, create_if_not_exist: true,
Server::AppFlowyCloud | Server::Supabase => {
return Err(FlowyError::from(err));
},
}, },
}; Server::AppFlowyCloud | Server::Supabase => {
return Err(FlowyError::from(err));
},
},
};
folder_manager self
.initialize_with_new_user( .folder_manager
user_profile.uid, .initialize_with_new_user(
&user_profile.token, user_profile.uid,
is_new_user, &user_profile.token,
data_source, is_new_user,
&user_workspace.id, data_source,
) &user_workspace.id,
.await )
.context("FolderManager error")?; .await
.context("FolderManager error")?;
database_manager self
.initialize_with_new_user(user_profile.uid) .database_manager
.await .initialize_with_new_user(user_profile.uid)
.context("DatabaseManager error")?; .await
.context("DatabaseManager error")?;
document_manager self
.initialize_with_new_user(user_profile.uid) .document_manager
.await .initialize_with_new_user(user_profile.uid)
.context("DocumentManager error")?; .await
Ok(()) .context("DocumentManager error")?;
}) Ok(())
} }
fn did_expired(&self, _token: &str, user_id: i64) -> Fut<FlowyResult<()>> { async fn did_expired(&self, _token: &str, user_id: i64) -> FlowyResult<()> {
let folder_manager = self.folder_manager.clone(); self.folder_manager.clear(user_id).await;
to_fut(async move { Ok(())
folder_manager.clear(user_id).await;
Ok(())
})
} }
fn open_workspace(&self, user_id: i64, _user_workspace: &UserWorkspace) -> Fut<FlowyResult<()>> { async fn open_workspace(&self, user_id: i64, _user_workspace: &UserWorkspace) -> FlowyResult<()> {
let folder_manager = self.folder_manager.clone(); self
let database_manager = self.database_manager.clone(); .folder_manager
let document_manager = self.document_manager.clone(); .initialize_with_workspace_id(user_id)
.await?;
to_fut(async move { self.database_manager.initialize(user_id).await?;
folder_manager.initialize_with_workspace_id(user_id).await?; self.document_manager.initialize(user_id).await?;
database_manager.initialize(user_id).await?; Ok(())
document_manager.initialize(user_id).await?;
Ok(())
})
} }
fn did_update_network(&self, reachable: bool) { fn did_update_network(&self, reachable: bool) {

View File

@ -2,6 +2,8 @@
use flowy_search::folder::indexer::FolderIndexManagerImpl; use flowy_search::folder::indexer::FolderIndexManagerImpl;
use flowy_search::services::manager::SearchManager; use flowy_search::services::manager::SearchManager;
use parking_lot::Mutex;
use std::rc::Rc;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::time::Duration; use std::time::Duration;
use sysinfo::System; use sysinfo::System;
@ -54,7 +56,7 @@ pub struct AppFlowyCore {
pub document_manager: Arc<DocumentManager>, pub document_manager: Arc<DocumentManager>,
pub folder_manager: Arc<FolderManager>, pub folder_manager: Arc<FolderManager>,
pub database_manager: Arc<DatabaseManager>, pub database_manager: Arc<DatabaseManager>,
pub event_dispatcher: Arc<AFPluginDispatcher>, pub event_dispatcher: Rc<AFPluginDispatcher>,
pub server_provider: Arc<ServerProvider>, pub server_provider: Arc<ServerProvider>,
pub task_dispatcher: Arc<RwLock<TaskDispatcher>>, pub task_dispatcher: Arc<RwLock<TaskDispatcher>>,
pub store_preference: Arc<KVStorePreferences>, pub store_preference: Arc<KVStorePreferences>,
@ -66,7 +68,7 @@ pub struct AppFlowyCore {
impl AppFlowyCore { impl AppFlowyCore {
pub async fn new( pub async fn new(
config: AppFlowyCoreConfig, config: AppFlowyCoreConfig,
runtime: Arc<AFPluginRuntime>, runtime: Rc<AFPluginRuntime>,
stream_log_sender: Option<Arc<dyn StreamLogSender>>, stream_log_sender: Option<Arc<dyn StreamLogSender>>,
) -> Self { ) -> Self {
let platform = OperatingSystem::from(&config.platform); let platform = OperatingSystem::from(&config.platform);
@ -102,7 +104,7 @@ impl AppFlowyCore {
} }
#[instrument(skip(config, runtime))] #[instrument(skip(config, runtime))]
async fn init(config: AppFlowyCoreConfig, runtime: Arc<AFPluginRuntime>) -> Self { async fn init(config: AppFlowyCoreConfig, runtime: Rc<AFPluginRuntime>) -> Self {
// Init the key value database // Init the key value database
let store_preference = Arc::new(KVStorePreferences::new(&config.storage_path).unwrap()); let store_preference = Arc::new(KVStorePreferences::new(&config.storage_path).unwrap());
info!("🔥{:?}", &config); info!("🔥{:?}", &config);
@ -261,7 +263,7 @@ impl AppFlowyCore {
error!("Init user failed: {}", err) error!("Init user failed: {}", err)
} }
} }
let event_dispatcher = Arc::new(AFPluginDispatcher::new( let event_dispatcher = Rc::new(AFPluginDispatcher::new(
runtime, runtime,
make_plugins( make_plugins(
Arc::downgrade(&folder_manager), Arc::downgrade(&folder_manager),
@ -290,7 +292,7 @@ impl AppFlowyCore {
} }
/// Only expose the dispatcher in test /// Only expose the dispatcher in test
pub fn dispatcher(&self) -> Arc<AFPluginDispatcher> { pub fn dispatcher(&self) -> Rc<AFPluginDispatcher> {
self.event_dispatcher.clone() self.event_dispatcher.clone()
} }
} }
@ -321,3 +323,13 @@ impl ServerUser for ServerUserImpl {
self.upgrade_user()?.workspace_id() self.upgrade_user()?.workspace_id()
} }
} }
pub struct MutexAppFlowyCore(pub Rc<Mutex<AppFlowyCore>>);
impl MutexAppFlowyCore {
pub fn new(appflowy_core: AppFlowyCore) -> Self {
Self(Rc::new(Mutex::new(appflowy_core)))
}
}
unsafe impl Sync for MutexAppFlowyCore {}
unsafe impl Send for MutexAppFlowyCore {}

View File

@ -1,16 +1,15 @@
use std::collections::HashMap; use async_trait::async_trait;
use std::sync::Arc;
use bytes::Bytes; use bytes::Bytes;
use collab::entity::EncodedCollab; use collab::entity::EncodedCollab;
pub use collab_folder::View; pub use collab_folder::View;
use collab_folder::ViewLayout; use collab_folder::ViewLayout;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use flowy_error::FlowyError; use flowy_error::FlowyError;
use flowy_folder_pub::folder_builder::NestedViewBuilder; use flowy_folder_pub::folder_builder::NestedViewBuilder;
use lib_infra::future::FutureResult;
use lib_infra::util::timestamp; use lib_infra::util::timestamp;
use crate::entities::{CreateViewParams, ViewLayoutPB}; use crate::entities::{CreateViewParams, ViewLayoutPB};
@ -42,36 +41,37 @@ pub struct DatabaseEncodedCollab {
/// view layout. Each [ViewLayout] will have a handler. So when creating a new /// view layout. Each [ViewLayout] will have a handler. So when creating a new
/// view, the [ViewLayout] will be used to get the handler. /// view, the [ViewLayout] will be used to get the handler.
/// ///
#[async_trait]
pub trait FolderOperationHandler { pub trait FolderOperationHandler {
/// Create the view for the workspace of new user. /// Create the view for the workspace of new user.
/// Only called once when the user is created. /// Only called once when the user is created.
fn create_workspace_view( async fn create_workspace_view(
&self, &self,
_uid: i64, _uid: i64,
_workspace_view_builder: Arc<RwLock<NestedViewBuilder>>, _workspace_view_builder: Arc<RwLock<NestedViewBuilder>>,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
FutureResult::new(async { Ok(()) }) Ok(())
} }
fn open_view(&self, view_id: &str) -> FutureResult<(), FlowyError>; async fn open_view(&self, view_id: &str) -> Result<(), FlowyError>;
/// Closes the view and releases the resources that this view has in /// Closes the view and releases the resources that this view has in
/// the backend /// the backend
fn close_view(&self, view_id: &str) -> FutureResult<(), FlowyError>; async fn close_view(&self, view_id: &str) -> Result<(), FlowyError>;
/// Called when the view is deleted. /// Called when the view is deleted.
/// This will called after the view is deleted from the trash. /// This will called after the view is deleted from the trash.
fn delete_view(&self, view_id: &str) -> FutureResult<(), FlowyError>; async fn delete_view(&self, view_id: &str) -> Result<(), FlowyError>;
/// Returns the [ViewData] that can be used to create the same view. /// Returns the [ViewData] that can be used to create the same view.
fn duplicate_view(&self, view_id: &str) -> FutureResult<ViewData, FlowyError>; async fn duplicate_view(&self, view_id: &str) -> Result<ViewData, FlowyError>;
/// get the encoded collab data from the disk. /// get the encoded collab data from the disk.
fn get_encoded_collab_v1_from_disk( async fn get_encoded_collab_v1_from_disk(
&self, &self,
_user: Arc<dyn FolderUser>, _user: Arc<dyn FolderUser>,
_view_id: &str, _view_id: &str,
) -> FutureResult<EncodedCollabWrapper, FlowyError> { ) -> Result<EncodedCollabWrapper, FlowyError> {
FutureResult::new(async move { Err(FlowyError::not_support()) }) Err(FlowyError::not_support())
} }
/// Create a view with the data. /// Create a view with the data.
@ -92,46 +92,46 @@ pub trait FolderOperationHandler {
/// ///
/// The return value is the [Option<EncodedCollab>] that can be used to create the view. /// The return value is the [Option<EncodedCollab>] that can be used to create the view.
/// It can be used in syncing the view data to cloud. /// It can be used in syncing the view data to cloud.
fn create_view_with_view_data( async fn create_view_with_view_data(
&self, &self,
user_id: i64, user_id: i64,
params: CreateViewParams, params: CreateViewParams,
) -> FutureResult<Option<EncodedCollab>, FlowyError>; ) -> Result<Option<EncodedCollab>, FlowyError>;
/// Create a view with the pre-defined data. /// Create a view with the pre-defined data.
/// For example, the initial data of the grid/calendar/kanban board when /// For example, the initial data of the grid/calendar/kanban board when
/// you create a new view. /// you create a new view.
fn create_built_in_view( async fn create_built_in_view(
&self, &self,
user_id: i64, user_id: i64,
view_id: &str, view_id: &str,
name: &str, name: &str,
layout: ViewLayout, layout: ViewLayout,
) -> FutureResult<(), FlowyError>; ) -> Result<(), FlowyError>;
/// Create a view by importing data /// Create a view by importing data
/// ///
/// The return value /// The return value
fn import_from_bytes( async fn import_from_bytes(
&self, &self,
uid: i64, uid: i64,
view_id: &str, view_id: &str,
name: &str, name: &str,
import_type: ImportType, import_type: ImportType,
bytes: Vec<u8>, bytes: Vec<u8>,
) -> FutureResult<EncodedCollab, FlowyError>; ) -> Result<EncodedCollab, FlowyError>;
/// Create a view by importing data from a file /// Create a view by importing data from a file
fn import_from_file_path( async fn import_from_file_path(
&self, &self,
view_id: &str, view_id: &str,
name: &str, name: &str,
path: String, path: String,
) -> FutureResult<(), FlowyError>; ) -> Result<(), FlowyError>;
/// Called when the view is updated. The handler is the `old` registered handler. /// Called when the view is updated. The handler is the `old` registered handler.
fn did_update_view(&self, _old: &View, _new: &View) -> FutureResult<(), FlowyError> { async fn did_update_view(&self, _old: &View, _new: &View) -> Result<(), FlowyError> {
FutureResult::new(async move { Ok(()) }) Ok(())
} }
} }

View File

@ -13,7 +13,6 @@ use flowy_ai_pub::cloud::{
use flowy_error::FlowyError; use flowy_error::FlowyError;
use futures_util::{StreamExt, TryStreamExt}; use futures_util::{StreamExt, TryStreamExt};
use lib_infra::async_trait::async_trait; use lib_infra::async_trait::async_trait;
use lib_infra::future::FutureResult;
use lib_infra::util::{get_operating_system, OperatingSystem}; use lib_infra::util::{get_operating_system, OperatingSystem};
use serde_json::{json, Value}; use serde_json::{json, Value};
use std::collections::HashMap; use std::collections::HashMap;
@ -28,29 +27,25 @@ impl<T> ChatCloudService for AFCloudChatCloudServiceImpl<T>
where where
T: AFServer, T: AFServer,
{ {
fn create_chat( async fn create_chat(
&self, &self,
_uid: &i64, _uid: &i64,
workspace_id: &str, workspace_id: &str,
chat_id: &str, chat_id: &str,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
let workspace_id = workspace_id.to_string();
let chat_id = chat_id.to_string(); let chat_id = chat_id.to_string();
let try_get_client = self.inner.try_get_client(); let try_get_client = self.inner.try_get_client();
let params = CreateChatParams {
chat_id,
name: "".to_string(),
rag_ids: vec![],
};
try_get_client?
.create_chat(workspace_id, params)
.await
.map_err(FlowyError::from)?;
FutureResult::new(async move { Ok(())
let params = CreateChatParams {
chat_id,
name: "".to_string(),
rag_ids: vec![],
};
try_get_client?
.create_chat(&workspace_id, params)
.await
.map_err(FlowyError::from)?;
Ok(())
})
} }
async fn create_question( async fn create_question(

View File

@ -5,7 +5,6 @@ use flowy_ai_pub::cloud::{
}; };
use flowy_error::FlowyError; use flowy_error::FlowyError;
use lib_infra::async_trait::async_trait; use lib_infra::async_trait::async_trait;
use lib_infra::future::FutureResult;
use serde_json::Value; use serde_json::Value;
use std::collections::HashMap; use std::collections::HashMap;
use std::path::Path; use std::path::Path;
@ -14,15 +13,13 @@ pub(crate) struct DefaultChatCloudServiceImpl;
#[async_trait] #[async_trait]
impl ChatCloudService for DefaultChatCloudServiceImpl { impl ChatCloudService for DefaultChatCloudServiceImpl {
fn create_chat( async fn create_chat(
&self, &self,
_uid: &i64, _uid: &i64,
_workspace_id: &str, _workspace_id: &str,
_chat_id: &str, _chat_id: &str,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
FutureResult::new(async move { Err(FlowyError::not_support().with_context("Chat is not supported in local server."))
Err(FlowyError::not_support().with_context("Chat is not supported in local server."))
})
} }
async fn create_question( async fn create_question(

View File

@ -7,7 +7,7 @@ use flowy_error::FlowyResult;
use flowy_user_pub::cloud::UserCloudConfig; use flowy_user_pub::cloud::UserCloudConfig;
use flowy_user_pub::entities::*; use flowy_user_pub::entities::*;
use lib_dispatch::prelude::*; use lib_dispatch::prelude::*;
use lib_infra::future::{to_fut, Fut}; use lib_infra::async_trait::async_trait;
use crate::event_handler::*; use crate::event_handler::*;
use crate::user_manager::UserManager; use crate::user_manager::UserManager;
@ -276,38 +276,53 @@ pub enum UserEvent {
NotifyDidSwitchPlan = 63, NotifyDidSwitchPlan = 63,
} }
#[async_trait]
pub trait UserStatusCallback: Send + Sync + 'static { pub trait UserStatusCallback: Send + Sync + 'static {
/// When the [Authenticator] changed, this method will be called. Currently, the auth type /// When the [Authenticator] changed, this method will be called. Currently, the auth type
/// will be changed when the user sign in or sign up. /// will be changed when the user sign in or sign up.
fn authenticator_did_changed(&self, _authenticator: Authenticator) {} fn authenticator_did_changed(&self, _authenticator: Authenticator) {}
/// This will be called after the application launches if the user is already signed in. /// This will be called after the application launches if the user is already signed in.
/// If the user is not signed in, this method will not be called /// If the user is not signed in, this method will not be called
fn did_init( async fn did_init(
&self, &self,
user_id: i64, _user_id: i64,
user_authenticator: &Authenticator, _user_authenticator: &Authenticator,
cloud_config: &Option<UserCloudConfig>, _cloud_config: &Option<UserCloudConfig>,
user_workspace: &UserWorkspace, _user_workspace: &UserWorkspace,
device_id: &str, _device_id: &str,
) -> Fut<FlowyResult<()>>; ) -> FlowyResult<()> {
Ok(())
}
/// Will be called after the user signed in. /// Will be called after the user signed in.
fn did_sign_in( async fn did_sign_in(
&self, &self,
user_id: i64, _user_id: i64,
user_workspace: &UserWorkspace, _user_workspace: &UserWorkspace,
device_id: &str, _device_id: &str,
) -> Fut<FlowyResult<()>>; ) -> FlowyResult<()> {
Ok(())
}
/// Will be called after the user signed up. /// Will be called after the user signed up.
fn did_sign_up( async fn did_sign_up(
&self, &self,
is_new_user: bool, _is_new_user: bool,
user_profile: &UserProfile, _user_profile: &UserProfile,
user_workspace: &UserWorkspace, _user_workspace: &UserWorkspace,
device_id: &str, _device_id: &str,
) -> Fut<FlowyResult<()>>; ) -> FlowyResult<()> {
Ok(())
}
fn did_expired(&self, token: &str, user_id: i64) -> Fut<FlowyResult<()>>; async fn did_expired(&self, _token: &str, _user_id: i64) -> FlowyResult<()> {
fn open_workspace(&self, user_id: i64, user_workspace: &UserWorkspace) -> Fut<FlowyResult<()>>; Ok(())
}
async fn open_workspace(
&self,
_user_id: i64,
_user_workspace: &UserWorkspace,
) -> FlowyResult<()> {
Ok(())
}
fn did_update_network(&self, _reachable: bool) {} fn did_update_network(&self, _reachable: bool) {}
fn did_update_plans(&self, _plans: Vec<SubscriptionPlan>) {} fn did_update_plans(&self, _plans: Vec<SubscriptionPlan>) {}
fn did_update_storage_limitation(&self, _can_write: bool) {} fn did_update_storage_limitation(&self, _can_write: bool) {}
@ -315,42 +330,4 @@ pub trait UserStatusCallback: Send + Sync + 'static {
/// Acts as a placeholder [UserStatusCallback] for the user session, but does not perform any function /// Acts as a placeholder [UserStatusCallback] for the user session, but does not perform any function
pub(crate) struct DefaultUserStatusCallback; pub(crate) struct DefaultUserStatusCallback;
impl UserStatusCallback for DefaultUserStatusCallback { impl UserStatusCallback for DefaultUserStatusCallback {}
fn did_init(
&self,
_user_id: i64,
_authenticator: &Authenticator,
_cloud_config: &Option<UserCloudConfig>,
_user_workspace: &UserWorkspace,
_device_id: &str,
) -> Fut<FlowyResult<()>> {
to_fut(async { Ok(()) })
}
fn did_sign_in(
&self,
_user_id: i64,
_user_workspace: &UserWorkspace,
_device_id: &str,
) -> Fut<FlowyResult<()>> {
to_fut(async { Ok(()) })
}
fn did_sign_up(
&self,
_is_new_user: bool,
_user_profile: &UserProfile,
_user_workspace: &UserWorkspace,
_device_id: &str,
) -> Fut<FlowyResult<()>> {
to_fut(async { Ok(()) })
}
fn did_expired(&self, _token: &str, _user_id: i64) -> Fut<FlowyResult<()>> {
to_fut(async { Ok(()) })
}
fn open_workspace(&self, _user_id: i64, _user_workspace: &UserWorkspace) -> Fut<FlowyResult<()>> {
to_fut(async { Ok(()) })
}
}

View File

@ -33,7 +33,7 @@ where
pub fn load_collab_by_object_id<'a, R>( pub fn load_collab_by_object_id<'a, R>(
uid: i64, uid: i64,
collab_read_txn: &R, collab_read_txn: &R,
object_id: &String, object_id: &str,
) -> Result<Collab, PersistenceError> ) -> Result<Collab, PersistenceError>
where where
R: CollabKVAction<'a>, R: CollabKVAction<'a>,

View File

@ -581,7 +581,6 @@ impl UserManager {
Ok(UseAISettingPB::from(settings)) Ok(UseAISettingPB::from(settings))
} }
#[instrument(level = "debug", skip(self), err)]
pub async fn get_workspace_member_info(&self, uid: i64) -> FlowyResult<WorkspaceMember> { pub async fn get_workspace_member_info(&self, uid: i64) -> FlowyResult<WorkspaceMember> {
let workspace_id = self.get_session()?.user_workspace.id.clone(); let workspace_id = self.get_session()?.user_workspace.id.clone();
let db = self.authenticate_user.get_sqlite_connection(uid)?; let db = self.authenticate_user.get_sqlite_connection(uid)?;

View File

@ -42,7 +42,7 @@ tokio = { workspace = true, features = ["rt"] }
futures-util = "0.3.26" futures-util = "0.3.26"
[features] [features]
default = ["use_protobuf"] default = ["local_set", "use_protobuf"]
use_serde = ["bincode", "serde_json", "serde", "serde_repr"] use_serde = ["bincode", "serde_json", "serde", "serde_repr"]
use_protobuf = ["protobuf"] use_protobuf = ["protobuf"]
local_set = [] local_set = []

View File

@ -1,10 +1,10 @@
use std::any::Any;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{future::Future, sync::Arc};
use derivative::*; use derivative::*;
use pin_project::pin_project; use pin_project::pin_project;
use std::any::Any;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use tracing::event; use tracing::event;
use crate::module::AFPluginStateMap; use crate::module::AFPluginStateMap;
@ -16,60 +16,50 @@ use crate::{
service::{AFPluginServiceFactory, Service}, service::{AFPluginServiceFactory, Service},
}; };
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
pub trait AFConcurrent {} pub trait AFConcurrent {}
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
impl<T> AFConcurrent for T where T: ?Sized {} impl<T> AFConcurrent for T where T: ?Sized {}
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(not(feature = "local_set"))]
pub trait AFConcurrent: Send + Sync {} pub trait AFConcurrent: Send + Sync {}
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(not(feature = "local_set"))]
impl<T> AFConcurrent for T where T: Send + Sync {} impl<T> AFConcurrent for T where T: Send + Sync {}
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
pub type AFBoxFuture<'a, T> = futures_core::future::LocalBoxFuture<'a, T>; pub type AFBoxFuture<'a, T> = futures_core::future::LocalBoxFuture<'a, T>;
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(not(feature = "local_set"))]
pub type AFBoxFuture<'a, T> = futures_core::future::BoxFuture<'a, T>; pub type AFBoxFuture<'a, T> = futures_core::future::BoxFuture<'a, T>;
pub type AFStateMap = std::sync::Arc<AFPluginStateMap>; pub type AFStateMap = std::sync::Arc<AFPluginStateMap>;
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
pub(crate) fn downcast_owned<T: 'static>(boxed: AFBox) -> Option<T> { pub(crate) fn downcast_owned<T: 'static>(boxed: AFBox) -> Option<T> {
boxed.downcast().ok().map(|boxed| *boxed) boxed.downcast().ok().map(|boxed| *boxed)
} }
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(not(feature = "local_set"))]
pub(crate) fn downcast_owned<T: 'static + Send + Sync>(boxed: AFBox) -> Option<T> { pub(crate) fn downcast_owned<T: 'static + Send + Sync>(boxed: AFBox) -> Option<T> {
boxed.downcast().ok().map(|boxed| *boxed) boxed.downcast().ok().map(|boxed| *boxed)
} }
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
pub(crate) type AFBox = Box<dyn Any>; pub(crate) type AFBox = Box<dyn Any>;
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(not(feature = "local_set"))]
pub(crate) type AFBox = Box<dyn Any + Send + Sync>; pub(crate) type AFBox = Box<dyn Any + Send + Sync>;
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
pub type BoxFutureCallback = pub type BoxFutureCallback =
Box<dyn FnOnce(AFPluginEventResponse) -> AFBoxFuture<'static, ()> + 'static>; Box<dyn FnOnce(AFPluginEventResponse) -> AFBoxFuture<'static, ()> + 'static>;
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(not(feature = "local_set"))]
pub type BoxFutureCallback = pub type BoxFutureCallback =
Box<dyn FnOnce(AFPluginEventResponse) -> AFBoxFuture<'static, ()> + Send + Sync + 'static>; Box<dyn FnOnce(AFPluginEventResponse) -> AFBoxFuture<'static, ()> + Send + Sync + 'static>;
#[cfg(any(target_arch = "wasm32", feature = "local_set"))]
pub fn af_spawn<T>(future: T) -> tokio::task::JoinHandle<T::Output>
where
T: Future + 'static,
T::Output: 'static,
{
tokio::task::spawn_local(future)
}
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))]
pub fn af_spawn<T>(future: T) -> tokio::task::JoinHandle<T::Output> pub fn af_spawn<T>(future: T) -> tokio::task::JoinHandle<T::Output>
where where
T: Future + Send + 'static, T: Future + Send + 'static,
@ -80,11 +70,11 @@ where
pub struct AFPluginDispatcher { pub struct AFPluginDispatcher {
plugins: AFPluginMap, plugins: AFPluginMap,
runtime: Arc<AFPluginRuntime>, runtime: Rc<AFPluginRuntime>,
} }
impl AFPluginDispatcher { impl AFPluginDispatcher {
pub fn new(runtime: Arc<AFPluginRuntime>, plugins: Vec<AFPlugin>) -> AFPluginDispatcher { pub fn new(runtime: Rc<AFPluginRuntime>, plugins: Vec<AFPlugin>) -> AFPluginDispatcher {
tracing::trace!("{}", plugin_info(&plugins)); tracing::trace!("{}", plugin_info(&plugins));
AFPluginDispatcher { AFPluginDispatcher {
plugins: plugin_map_or_crash(plugins), plugins: plugin_map_or_crash(plugins),
@ -126,14 +116,37 @@ impl AFPluginDispatcher {
// The provided future will start running in the background immediately // The provided future will start running in the background immediately
// when `spawn` is called, even if you don't await the returned // when `spawn` is called, even if you don't await the returned
// `JoinHandle`. // `JoinHandle`.
let handle = dispatch.runtime.spawn(async move { let result: Result<AFPluginEventResponse, DispatchError>;
service.call(service_ctx).await.unwrap_or_else(|e| { #[cfg(feature = "local_set")]
tracing::error!("Dispatch runtime error: {:?}", e); {
InternalError::Other(format!("{:?}", e)).as_response() let handle = dispatch.runtime.local.spawn_local(async move {
}) service.call(service_ctx).await.unwrap_or_else(|e| {
}); tracing::error!("Dispatch runtime error: {:?}", e);
InternalError::Other(format!("{:?}", e)).as_response()
})
});
result = dispatch
.runtime
.local
.run_until(handle)
.await
.map_err(|e| e.to_string().into())
}
#[cfg(not(feature = "local_set"))]
{
result = dispatch
.runtime
.spawn(async move {
service.call(service_ctx).await.unwrap_or_else(|e| {
tracing::error!("Dispatch runtime error: {:?}", e);
InternalError::Other(format!("{:?}", e)).as_response()
})
})
.await;
}
let result = dispatch.runtime.run_until(handle).await;
result.unwrap_or_else(|e| { result.unwrap_or_else(|e| {
let msg = format!("EVENT_DISPATCH join error: {:?}", e); let msg = format!("EVENT_DISPATCH join error: {:?}", e);
tracing::error!("{}", msg); tracing::error!("{}", msg);
@ -170,16 +183,17 @@ impl AFPluginDispatcher {
callback: Some(Box::new(callback)), callback: Some(Box::new(callback)),
}; };
let handle = dispatch.runtime.spawn(async move { #[cfg(feature = "local_set")]
service.call(service_ctx).await.unwrap_or_else(|e| {
tracing::error!("[dispatch]: runtime error: {:?}", e);
InternalError::Other(format!("{:?}", e)).as_response()
})
});
#[cfg(any(target_arch = "wasm32", feature = "local_set"))]
{ {
let result = dispatch.runtime.block_on(handle); let handle = dispatch.runtime.local.spawn_local(async move {
service.call(service_ctx).await.unwrap_or_else(|e| {
tracing::error!("Dispatch runtime error: {:?}", e);
InternalError::Other(format!("{:?}", e)).as_response()
})
});
let fut = dispatch.runtime.local.run_until(handle);
let result = dispatch.runtime.block_on(fut);
DispatchFuture { DispatchFuture {
fut: Box::pin(async move { fut: Box::pin(async move {
result.unwrap_or_else(|e| { result.unwrap_or_else(|e| {
@ -192,8 +206,18 @@ impl AFPluginDispatcher {
} }
} }
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(not(feature = "local_set"))]
{ {
let handle = dispatch.runtime.spawn(async move {
service
.call(crate::service::service::Service)
.await
.unwrap_or_else(|e| {
tracing::error!("[dispatch]: runtime error: {:?}", e);
InternalError::Other(format!("{:?}", e)).as_response()
})
});
let runtime = dispatch.runtime.clone(); let runtime = dispatch.runtime.clone();
DispatchFuture { DispatchFuture {
fut: Box::pin(async move { fut: Box::pin(async move {
@ -211,7 +235,7 @@ impl AFPluginDispatcher {
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub fn sync_send( pub fn sync_send(
dispatch: Arc<AFPluginDispatcher>, dispatch: Rc<AFPluginDispatcher>,
request: AFPluginRequest, request: AFPluginRequest,
) -> AFPluginEventResponse { ) -> AFPluginEventResponse {
futures::executor::block_on(AFPluginDispatcher::async_send_with_callback( futures::executor::block_on(AFPluginDispatcher::async_send_with_callback(
@ -221,16 +245,6 @@ impl AFPluginDispatcher {
)) ))
} }
#[cfg(any(target_arch = "wasm32", feature = "local_set"))]
#[track_caller]
pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
where
F: Future + 'static,
{
self.runtime.spawn(future)
}
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))]
#[track_caller] #[track_caller]
pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output> pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
where where
@ -239,24 +253,6 @@ impl AFPluginDispatcher {
{ {
self.runtime.spawn(future) self.runtime.spawn(future)
} }
#[cfg(any(target_arch = "wasm32", feature = "local_set"))]
pub async fn run_until<F>(&self, future: F) -> F::Output
where
F: Future + 'static,
{
let handle = self.runtime.spawn(future);
self.runtime.run_until(handle).await.unwrap()
}
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))]
pub async fn run_until<'a, F>(&self, future: F) -> F::Output
where
F: Future + Send + 'a,
<F as Future>::Output: Send + 'a,
{
self.runtime.run_until(future).await
}
} }
#[derive(Derivative)] #[derive(Derivative)]

View File

@ -1,3 +1,7 @@
use futures_core::ready;
use nanoid::nanoid;
use pin_project::pin_project;
use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
use std::{ use std::{
collections::HashMap, collections::HashMap,
@ -9,10 +13,6 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use futures_core::ready;
use nanoid::nanoid;
use pin_project::pin_project;
use crate::dispatcher::AFConcurrent; use crate::dispatcher::AFConcurrent;
use crate::prelude::{AFBoxFuture, AFStateMap}; use crate::prelude::{AFBoxFuture, AFStateMap};
use crate::service::AFPluginHandler; use crate::service::AFPluginHandler;
@ -26,12 +26,12 @@ use crate::{
}, },
}; };
pub type AFPluginMap = Arc<HashMap<AFPluginEvent, Arc<AFPlugin>>>; pub type AFPluginMap = Rc<HashMap<AFPluginEvent, Rc<AFPlugin>>>;
pub(crate) fn plugin_map_or_crash(plugins: Vec<AFPlugin>) -> AFPluginMap { pub(crate) fn plugin_map_or_crash(plugins: Vec<AFPlugin>) -> AFPluginMap {
let mut plugin_map: HashMap<AFPluginEvent, Arc<AFPlugin>> = HashMap::new(); let mut plugin_map: HashMap<AFPluginEvent, Rc<AFPlugin>> = HashMap::new();
plugins.into_iter().for_each(|m| { plugins.into_iter().for_each(|m| {
let events = m.events(); let events = m.events();
let plugins = Arc::new(m); let plugins = Rc::new(m);
events.into_iter().for_each(|e| { events.into_iter().for_each(|e| {
if plugin_map.contains_key(&e) { if plugin_map.contains_key(&e) {
let plugin_name = plugin_map.get(&e).map(|p| &p.name); let plugin_name = plugin_map.get(&e).map(|p| &p.name);
@ -40,7 +40,7 @@ pub(crate) fn plugin_map_or_crash(plugins: Vec<AFPlugin>) -> AFPluginMap {
plugin_map.insert(e, plugins.clone()); plugin_map.insert(e, plugins.clone());
}); });
}); });
Arc::new(plugin_map) Rc::new(plugin_map)
} }
#[derive(PartialEq, Eq, Hash, Debug, Clone)] #[derive(PartialEq, Eq, Hash, Debug, Clone)]
@ -67,7 +67,7 @@ pub struct AFPlugin {
/// Contains a list of factories that are used to generate the services used to handle the passed-in /// Contains a list of factories that are used to generate the services used to handle the passed-in
/// `ServiceRequest`. /// `ServiceRequest`.
/// ///
event_service_factory: Arc< event_service_factory: Rc<
HashMap<AFPluginEvent, BoxServiceFactory<(), ServiceRequest, ServiceResponse, DispatchError>>, HashMap<AFPluginEvent, BoxServiceFactory<(), ServiceRequest, ServiceResponse, DispatchError>>,
>, >,
} }
@ -77,7 +77,7 @@ impl std::default::Default for AFPlugin {
Self { Self {
name: "".to_owned(), name: "".to_owned(),
states: Default::default(), states: Default::default(),
event_service_factory: Arc::new(HashMap::new()), event_service_factory: Rc::new(HashMap::new()),
} }
} }
} }
@ -113,7 +113,7 @@ impl AFPlugin {
if self.event_service_factory.contains_key(&event) { if self.event_service_factory.contains_key(&event) {
panic!("Register duplicate Event: {:?}", &event); panic!("Register duplicate Event: {:?}", &event);
} else { } else {
Arc::get_mut(&mut self.event_service_factory) Rc::get_mut(&mut self.event_service_factory)
.unwrap() .unwrap()
.insert(event, factory(AFPluginHandlerService::new(handler))); .insert(event, factory(AFPluginHandlerService::new(handler)));
} }
@ -185,7 +185,7 @@ impl AFPluginServiceFactory<AFPluginRequest> for AFPlugin {
} }
pub struct AFPluginService { pub struct AFPluginService {
services: Arc< services: Rc<
HashMap<AFPluginEvent, BoxServiceFactory<(), ServiceRequest, ServiceResponse, DispatchError>>, HashMap<AFPluginEvent, BoxServiceFactory<(), ServiceRequest, ServiceResponse, DispatchError>>,
>, >,
states: AFStateMap, states: AFStateMap,

View File

@ -8,8 +8,8 @@ use tokio::task::JoinHandle;
pub struct AFPluginRuntime { pub struct AFPluginRuntime {
inner: Runtime, inner: Runtime,
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
local: tokio::task::LocalSet, pub(crate) local: tokio::task::LocalSet,
} }
impl Display for AFPluginRuntime { impl Display for AFPluginRuntime {
@ -27,21 +27,11 @@ impl AFPluginRuntime {
let inner = default_tokio_runtime()?; let inner = default_tokio_runtime()?;
Ok(Self { Ok(Self {
inner, inner,
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
local: tokio::task::LocalSet::new(), local: tokio::task::LocalSet::new(),
}) })
} }
#[cfg(any(target_arch = "wasm32", feature = "local_set"))]
#[track_caller]
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
{
self.local.spawn_local(future)
}
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))]
#[track_caller] #[track_caller]
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where where
@ -51,23 +41,7 @@ impl AFPluginRuntime {
self.inner.spawn(future) self.inner.spawn(future)
} }
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
pub async fn run_until<F>(&self, future: F) -> F::Output
where
F: Future,
{
self.local.run_until(future).await
}
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))]
pub async fn run_until<F>(&self, future: F) -> F::Output
where
F: Future,
{
future.await
}
#[cfg(any(target_arch = "wasm32", feature = "local_set"))]
#[track_caller] #[track_caller]
pub fn block_on<F>(&self, f: F) -> F::Output pub fn block_on<F>(&self, f: F) -> F::Output
where where
@ -76,7 +50,7 @@ impl AFPluginRuntime {
self.local.block_on(&self.inner, f) self.local.block_on(&self.inner, f)
} }
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(not(feature = "local_set"))]
#[track_caller] #[track_caller]
pub fn block_on<F>(&self, f: F) -> F::Output pub fn block_on<F>(&self, f: F) -> F::Output
where where
@ -86,14 +60,26 @@ impl AFPluginRuntime {
} }
} }
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
pub fn default_tokio_runtime() -> io::Result<Runtime> { pub fn default_tokio_runtime() -> io::Result<Runtime> {
runtime::Builder::new_current_thread() #[cfg(not(target_arch = "wasm32"))]
.thread_name("dispatch-rt-st") {
.build() runtime::Builder::new_multi_thread()
.enable_io()
.enable_time()
.thread_name("dispatch-rt-st")
.build()
}
#[cfg(target_arch = "wasm32")]
{
runtime::Builder::new_current_thread()
.thread_name("dispatch-rt-st")
.build()
}
} }
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(not(feature = "local_set"))]
pub fn default_tokio_runtime() -> io::Result<Runtime> { pub fn default_tokio_runtime() -> io::Result<Runtime> {
runtime::Builder::new_multi_thread() runtime::Builder::new_multi_thread()
.thread_name("dispatch-rt-mt") .thread_name("dispatch-rt-mt")

View File

@ -16,7 +16,7 @@ where
BoxServiceFactory(Box::new(FactoryWrapper(factory))) BoxServiceFactory(Box::new(FactoryWrapper(factory)))
} }
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
type Inner<Cfg, Req, Res, Err> = Box< type Inner<Cfg, Req, Res, Err> = Box<
dyn AFPluginServiceFactory< dyn AFPluginServiceFactory<
Req, Req,
@ -27,7 +27,7 @@ type Inner<Cfg, Req, Res, Err> = Box<
Future = AFBoxFuture<'static, Result<BoxService<Req, Res, Err>, Err>>, Future = AFBoxFuture<'static, Result<BoxService<Req, Res, Err>, Err>>,
>, >,
>; >;
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(not(feature = "local_set"))]
type Inner<Cfg, Req, Res, Err> = Box< type Inner<Cfg, Req, Res, Err> = Box<
dyn AFPluginServiceFactory< dyn AFPluginServiceFactory<
Req, Req,
@ -58,12 +58,12 @@ where
} }
} }
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
pub type BoxService<Req, Res, Err> = Box< pub type BoxService<Req, Res, Err> = Box<
dyn Service<Req, Response = Res, Error = Err, Future = AFBoxFuture<'static, Result<Res, Err>>>, dyn Service<Req, Response = Res, Error = Err, Future = AFBoxFuture<'static, Result<Res, Err>>>,
>; >;
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(not(feature = "local_set"))]
pub type BoxService<Req, Res, Err> = Box< pub type BoxService<Req, Res, Err> = Box<
dyn Service<Req, Response = Res, Error = Err, Future = AFBoxFuture<'static, Result<Res, Err>>> dyn Service<Req, Response = Res, Error = Err, Future = AFBoxFuture<'static, Result<Res, Err>>>
+ Sync + Sync

View File

@ -1,4 +1,4 @@
use std::sync::Arc; use std::rc::Rc;
use lib_dispatch::prelude::*; use lib_dispatch::prelude::*;
use lib_dispatch::runtime::AFPluginRuntime; use lib_dispatch::runtime::AFPluginRuntime;
@ -10,8 +10,8 @@ pub async fn hello() -> String {
#[tokio::test] #[tokio::test]
async fn test() { async fn test() {
let event = "1"; let event = "1";
let runtime = Arc::new(AFPluginRuntime::new().unwrap()); let runtime = Rc::new(AFPluginRuntime::new().unwrap());
let dispatch = Arc::new(AFPluginDispatcher::new( let dispatch = Rc::new(AFPluginDispatcher::new(
runtime, runtime,
vec![AFPlugin::new().event(event, hello)], vec![AFPlugin::new().event(event, hello)],
)); ));

View File

@ -71,10 +71,11 @@ if [ "$exclude_packages" = false ]; then
# Navigate back to the packages directory # Navigate back to the packages directory
cd .. cd ..
done done
cd ..
fi fi
# Navigate to the appflowy_flutter directory and generate files # Navigate to the appflowy_flutter directory and generate files
cd ..
echo "🧊 Start generating freezed files (AppFlowy)." echo "🧊 Start generating freezed files (AppFlowy)."
if [ "$skip_pub_packages_get" = false ]; then if [ "$skip_pub_packages_get" = false ]; then