From 23997e977c36f00963b24a0ebb605da02d2f1c4c Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Sun, 11 Aug 2024 20:39:25 +0800 Subject: [PATCH] refactor: revamp file upload and fix partitial upload bugs (#5924) * chore: upload chat file to local ai * chore: async func * chore: individual file progress * chore: fix test * chore: fix file upload --- .../application/chat_input_file_bloc.dart | 33 +-- .../presentation/chat_input/chat_input.dart | 9 +- .../chat_input/chat_input_attachment.dart | 2 +- frontend/appflowy_tauri/src-tauri/Cargo.lock | 26 +- frontend/appflowy_tauri/src-tauri/Cargo.toml | 2 +- .../appflowy_web_app/src-tauri/Cargo.lock | 26 +- .../appflowy_web_app/src-tauri/Cargo.toml | 2 +- frontend/rust-lib/Cargo.lock | 53 ++-- frontend/rust-lib/Cargo.toml | 5 +- .../build-tool/flowy-derive/Cargo.toml | 4 +- .../event-integration-test/src/lib.rs | 2 +- .../af_cloud_test/file_upload_test.rs | 112 +++---- frontend/rust-lib/flowy-ai-pub/src/cloud.rs | 8 +- frontend/rust-lib/flowy-ai/Cargo.toml | 3 +- frontend/rust-lib/flowy-ai/src/ai_manager.rs | 5 +- frontend/rust-lib/flowy-ai/src/chat.rs | 45 ++- .../rust-lib/flowy-ai/src/event_handler.rs | 1 - .../flowy-ai/src/local_ai/local_llm_chat.rs | 63 ++-- .../src/middleware/chat_service_mw.rs | 39 ++- .../rust-lib/flowy-ai/src/stream_message.rs | 9 +- .../flowy-core/src/deps_resolve/chat_deps.rs | 3 + .../flowy-core/src/integrate/trait_impls.rs | 35 +-- frontend/rust-lib/flowy-core/src/lib.rs | 1 + frontend/rust-lib/flowy-database2/Cargo.toml | 4 +- frontend/rust-lib/flowy-document/Cargo.toml | 2 +- .../flowy-document/src/event_handler.rs | 13 +- .../rust-lib/flowy-document/src/manager.rs | 5 +- .../flowy-document/tests/document/util.rs | 19 +- .../rust-lib/flowy-notification/Cargo.toml | 2 +- .../flowy-server/src/af_cloud/impls/chat.rs | 38 +-- .../rust-lib/flowy-server/src/default_impl.rs | 16 +- .../rust-lib/flowy-storage-pub/Cargo.toml | 1 + .../rust-lib/flowy-storage-pub/src/storage.rs | 89 ++++-- frontend/rust-lib/flowy-storage/Cargo.toml | 1 + .../rust-lib/flowy-storage/src/manager.rs | 279 ++++++++++-------- .../rust-lib/flowy-storage/src/uploader.rs | 30 +- 36 files changed, 551 insertions(+), 436 deletions(-) diff --git a/frontend/appflowy_flutter/lib/plugins/ai_chat/application/chat_input_file_bloc.dart b/frontend/appflowy_flutter/lib/plugins/ai_chat/application/chat_input_file_bloc.dart index 6f8877497f..048c8709b3 100644 --- a/frontend/appflowy_flutter/lib/plugins/ai_chat/application/chat_input_file_bloc.dart +++ b/frontend/appflowy_flutter/lib/plugins/ai_chat/application/chat_input_file_bloc.dart @@ -1,8 +1,5 @@ -import 'dart:async'; import 'package:appflowy/plugins/ai_chat/application/chat_entity.dart'; -import 'package:appflowy_backend/dispatch/dispatch.dart'; -import 'package:appflowy_backend/protobuf/flowy-ai/entities.pb.dart'; import 'package:flutter_bloc/flutter_bloc.dart'; import 'package:freezed_annotation/freezed_annotation.dart'; @@ -10,40 +7,14 @@ part 'chat_input_file_bloc.freezed.dart'; class ChatInputFileBloc extends Bloc { ChatInputFileBloc({ + // ignore: avoid_unused_constructor_parameters required String chatId, required this.file, }) : super(const ChatInputFileState()) { on( (event, emit) async { await event.when( - initial: () async { - final payload = ChatFilePB( - filePath: file.filePath, - chatId: chatId, - ); - unawaited( - AIEventChatWithFile(payload).send().then((result) { - if (!isClosed) { - result.fold( - (_) { - add( - const ChatInputFileEvent.updateUploadState( - UploadFileIndicator.finish(), - ), - ); - }, - (err) { - add( - ChatInputFileEvent.updateUploadState( - UploadFileIndicator.error(err.toString()), - ), - ); - }, - ); - } - }), - ); - }, + initial: () async {}, updateUploadState: (UploadFileIndicator indicator) { emit(state.copyWith(uploadFileIndicator: indicator)); }, diff --git a/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_input/chat_input.dart b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_input/chat_input.dart index 9f68586757..60de3fd528 100644 --- a/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_input/chat_input.dart +++ b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_input/chat_input.dart @@ -154,15 +154,16 @@ class _ChatInputState extends State { children: [ // TODO(lucas): support mobile if (PlatformExtension.isDesktop && - widget.aiType == const AIType.localAI()) + widget.aiType.isLocalAI()) _attachmentButton(buttonPadding), // text field Expanded(child: _inputTextField(context, textPadding)), - // at button + // mention button // TODO(lucas): support mobile - if (PlatformExtension.isDesktop) _atButton(buttonPadding), + if (PlatformExtension.isDesktop) + _mentionButton(buttonPadding), // send button _sendButton(buttonPadding), @@ -352,7 +353,7 @@ class _ChatInputState extends State { ); } - Widget _atButton(EdgeInsets buttonPadding) { + Widget _mentionButton(EdgeInsets buttonPadding) { return Padding( padding: buttonPadding, child: SizedBox.square( diff --git a/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_input/chat_input_attachment.dart b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_input/chat_input_attachment.dart index b4eee5d8ef..954988da7c 100644 --- a/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_input/chat_input_attachment.dart +++ b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_input/chat_input_attachment.dart @@ -17,7 +17,7 @@ class ChatInputAttachment extends StatelessWidget { message: LocaleKeys.chat_uploadFile.tr(), child: FlowyIconButton( hoverColor: AFThemeExtension.of(context).lightGreyHover, - radius: BorderRadius.circular(18), + radius: BorderRadius.circular(6), icon: FlowySvg( FlowySvgs.ai_attachment_s, size: const Size.square(20), diff --git a/frontend/appflowy_tauri/src-tauri/Cargo.lock b/frontend/appflowy_tauri/src-tauri/Cargo.lock index 692ea6eb43..20ede978d8 100644 --- a/frontend/appflowy_tauri/src-tauri/Cargo.lock +++ b/frontend/appflowy_tauri/src-tauri/Cargo.lock @@ -172,7 +172,7 @@ checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" [[package]] name = "app-error" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "bincode", @@ -192,7 +192,7 @@ dependencies = [ [[package]] name = "appflowy-ai-client" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "bytes", @@ -826,7 +826,7 @@ dependencies = [ [[package]] name = "client-api" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "again", "anyhow", @@ -876,7 +876,7 @@ dependencies = [ [[package]] name = "client-api-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "collab-entity", "collab-rt-entity", @@ -888,7 +888,7 @@ dependencies = [ [[package]] name = "client-websocket" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "futures-channel", "futures-util", @@ -1132,7 +1132,7 @@ dependencies = [ [[package]] name = "collab-rt-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "bincode", @@ -1157,7 +1157,7 @@ dependencies = [ [[package]] name = "collab-rt-protocol" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "async-trait", @@ -1421,7 +1421,7 @@ dependencies = [ "cssparser-macros", "dtoa-short", "itoa 1.0.6", - "phf 0.11.2", + "phf 0.8.0", "smallvec", ] @@ -1532,7 +1532,7 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" [[package]] name = "database-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "app-error", @@ -3051,7 +3051,7 @@ dependencies = [ [[package]] name = "gotrue" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "futures-util", @@ -3068,7 +3068,7 @@ dependencies = [ [[package]] name = "gotrue-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "app-error", @@ -3500,7 +3500,7 @@ dependencies = [ [[package]] name = "infra" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "bytes", @@ -6098,7 +6098,7 @@ dependencies = [ [[package]] name = "shared-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "app-error", diff --git a/frontend/appflowy_tauri/src-tauri/Cargo.toml b/frontend/appflowy_tauri/src-tauri/Cargo.toml index b91db1138a..026dd6daf6 100644 --- a/frontend/appflowy_tauri/src-tauri/Cargo.toml +++ b/frontend/appflowy_tauri/src-tauri/Cargo.toml @@ -53,7 +53,7 @@ collab-user = { version = "0.2" } # Run the script: # scripts/tool/update_client_api_rev.sh new_rev_id # ⚠️⚠️⚠️️ -client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "135a67dc79848c39e9c53b4a99b6d14f444686ef" } +client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "6a44490daccb101c8b855443d2d6ded0fb752016" } [dependencies] serde_json.workspace = true diff --git a/frontend/appflowy_web_app/src-tauri/Cargo.lock b/frontend/appflowy_web_app/src-tauri/Cargo.lock index ae08a44db4..e940bdd58a 100644 --- a/frontend/appflowy_web_app/src-tauri/Cargo.lock +++ b/frontend/appflowy_web_app/src-tauri/Cargo.lock @@ -163,7 +163,7 @@ checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" [[package]] name = "app-error" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "bincode", @@ -183,7 +183,7 @@ dependencies = [ [[package]] name = "appflowy-ai-client" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "bytes", @@ -800,7 +800,7 @@ dependencies = [ [[package]] name = "client-api" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "again", "anyhow", @@ -850,7 +850,7 @@ dependencies = [ [[package]] name = "client-api-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "collab-entity", "collab-rt-entity", @@ -862,7 +862,7 @@ dependencies = [ [[package]] name = "client-websocket" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "futures-channel", "futures-util", @@ -1115,7 +1115,7 @@ dependencies = [ [[package]] name = "collab-rt-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "bincode", @@ -1140,7 +1140,7 @@ dependencies = [ [[package]] name = "collab-rt-protocol" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "async-trait", @@ -1411,7 +1411,7 @@ dependencies = [ "cssparser-macros", "dtoa-short", "itoa 1.0.10", - "phf 0.11.2", + "phf 0.8.0", "smallvec", ] @@ -1522,7 +1522,7 @@ checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" [[package]] name = "database-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "app-error", @@ -3118,7 +3118,7 @@ dependencies = [ [[package]] name = "gotrue" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "futures-util", @@ -3135,7 +3135,7 @@ dependencies = [ [[package]] name = "gotrue-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "app-error", @@ -3572,7 +3572,7 @@ dependencies = [ [[package]] name = "infra" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "bytes", @@ -6162,7 +6162,7 @@ dependencies = [ [[package]] name = "shared-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "app-error", diff --git a/frontend/appflowy_web_app/src-tauri/Cargo.toml b/frontend/appflowy_web_app/src-tauri/Cargo.toml index 59cc7abffb..e2b61d6d39 100644 --- a/frontend/appflowy_web_app/src-tauri/Cargo.toml +++ b/frontend/appflowy_web_app/src-tauri/Cargo.toml @@ -52,7 +52,7 @@ collab-user = { version = "0.2" } # Run the script: # scripts/tool/update_client_api_rev.sh new_rev_id # ⚠️⚠️⚠️️ -client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "135a67dc79848c39e9c53b4a99b6d14f444686ef" } +client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "6a44490daccb101c8b855443d2d6ded0fb752016" } [dependencies] serde_json.workspace = true diff --git a/frontend/rust-lib/Cargo.lock b/frontend/rust-lib/Cargo.lock index ae4b034439..9ff90f08e9 100644 --- a/frontend/rust-lib/Cargo.lock +++ b/frontend/rust-lib/Cargo.lock @@ -163,7 +163,7 @@ checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" [[package]] name = "app-error" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "bincode", @@ -183,7 +183,7 @@ dependencies = [ [[package]] name = "appflowy-ai-client" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "bytes", @@ -718,7 +718,7 @@ dependencies = [ [[package]] name = "client-api" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "again", "anyhow", @@ -768,7 +768,7 @@ dependencies = [ [[package]] name = "client-api-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "collab-entity", "collab-rt-entity", @@ -780,7 +780,7 @@ dependencies = [ [[package]] name = "client-websocket" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "futures-channel", "futures-util", @@ -855,7 +855,7 @@ dependencies = [ "collab", "collab-entity", "collab-plugins", - "dashmap", + "dashmap 5.5.3", "getrandom 0.2.10", "js-sys", "lazy_static", @@ -993,7 +993,7 @@ dependencies = [ [[package]] name = "collab-rt-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "bincode", @@ -1018,7 +1018,7 @@ dependencies = [ [[package]] name = "collab-rt-protocol" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "async-trait", @@ -1347,6 +1347,20 @@ dependencies = [ "parking_lot_core 0.9.8", ] +[[package]] +name = "dashmap" +version = "6.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.3", + "lock_api", + "once_cell", + "parking_lot_core 0.9.8", +] + [[package]] name = "data-encoding" version = "2.4.0" @@ -1356,7 +1370,7 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" [[package]] name = "database-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "app-error", @@ -1783,7 +1797,7 @@ dependencies = [ "appflowy-plugin", "base64 0.21.5", "bytes", - "dashmap", + "dashmap 6.0.1", "dotenv", "flowy-ai-pub", "flowy-codegen", @@ -1791,6 +1805,7 @@ dependencies = [ "flowy-error", "flowy-notification", "flowy-sqlite", + "flowy-storage-pub", "futures", "futures-util", "lib-dispatch", @@ -1958,7 +1973,7 @@ dependencies = [ "collab-integrate", "collab-plugins", "csv", - "dashmap", + "dashmap 6.0.1", "event-integration-test", "fancy-regex 0.11.0", "flowy-codegen", @@ -2009,7 +2024,7 @@ dependencies = [ name = "flowy-derive" version = "0.1.0" dependencies = [ - "dashmap", + "dashmap 6.0.1", "flowy-ast", "flowy-codegen", "lazy_static", @@ -2031,7 +2046,7 @@ dependencies = [ "collab-entity", "collab-integrate", "collab-plugins", - "dashmap", + "dashmap 6.0.1", "flowy-codegen", "flowy-derive", "flowy-document-pub", @@ -2171,7 +2186,7 @@ name = "flowy-notification" version = "0.1.0" dependencies = [ "bytes", - "dashmap", + "dashmap 6.0.1", "flowy-codegen", "flowy-derive", "lazy_static", @@ -2320,6 +2335,7 @@ dependencies = [ "async-trait", "bytes", "chrono", + "dashmap 6.0.1", "flowy-codegen", "flowy-derive", "flowy-error", @@ -2354,6 +2370,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "tracing", ] [[package]] @@ -2730,7 +2747,7 @@ dependencies = [ [[package]] name = "gotrue" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "futures-util", @@ -2747,7 +2764,7 @@ dependencies = [ [[package]] name = "gotrue-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "app-error", @@ -3112,7 +3129,7 @@ dependencies = [ [[package]] name = "infra" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "bytes", @@ -5321,7 +5338,7 @@ dependencies = [ [[package]] name = "shared-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016" dependencies = [ "anyhow", "app-error", diff --git a/frontend/rust-lib/Cargo.toml b/frontend/rust-lib/Cargo.toml index a6aebe740b..54b2f1a187 100644 --- a/frontend/rust-lib/Cargo.toml +++ b/frontend/rust-lib/Cargo.toml @@ -93,14 +93,15 @@ yrs = "0.19.2" validator = { version = "0.16.1", features = ["derive"] } tokio-util = "0.7.11" zip = "2.1.3" +dashmap = "6.0.1" # Please using the following command to update the revision id # Current directory: frontend # Run the script.add_workspace_members: # scripts/tool/update_client_api_rev.sh new_rev_id # ⚠️⚠️⚠️️ -client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "135a67dc79848c39e9c53b4a99b6d14f444686ef" } -client-api-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "135a67dc79848c39e9c53b4a99b6d14f444686ef" } +client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "6a44490daccb101c8b855443d2d6ded0fb752016" } +client-api-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "6a44490daccb101c8b855443d2d6ded0fb752016" } [profile.dev] opt-level = 0 diff --git a/frontend/rust-lib/build-tool/flowy-derive/Cargo.toml b/frontend/rust-lib/build-tool/flowy-derive/Cargo.toml index ce84acd0eb..763210c558 100644 --- a/frontend/rust-lib/build-tool/flowy-derive/Cargo.toml +++ b/frontend/rust-lib/build-tool/flowy-derive/Cargo.toml @@ -14,8 +14,8 @@ syn = { version = "1.0.109", features = ["extra-traits", "visit"] } quote = "1.0" proc-macro2 = "1.0" flowy-ast.workspace = true -lazy_static = {version = "1.4.0"} -dashmap = "5" +lazy_static = { version = "1.4.0" } +dashmap.workspace = true flowy-codegen.workspace = true serde_json.workspace = true walkdir = "2.3.2" diff --git a/frontend/rust-lib/event-integration-test/src/lib.rs b/frontend/rust-lib/event-integration-test/src/lib.rs index 69096bbd37..cd2a01d84f 100644 --- a/frontend/rust-lib/event-integration-test/src/lib.rs +++ b/frontend/rust-lib/event-integration-test/src/lib.rs @@ -91,7 +91,7 @@ impl EventIntegrationTest { Self::new_with_config(config).await } - pub fn set_no_cleanup(&mut self) { + pub fn skip_clean(&mut self) { self.cleaner.lock().should_clean = false; } diff --git a/frontend/rust-lib/event-integration-test/tests/document/af_cloud_test/file_upload_test.rs b/frontend/rust-lib/event-integration-test/tests/document/af_cloud_test/file_upload_test.rs index ba9e2023fe..cf0050341e 100644 --- a/frontend/rust-lib/event-integration-test/tests/document/af_cloud_test/file_upload_test.rs +++ b/frontend/rust-lib/event-integration-test/tests/document/af_cloud_test/file_upload_test.rs @@ -1,39 +1,14 @@ use crate::document::generate_random_bytes; use event_integration_test::user_event::user_localhost_af_cloud; use event_integration_test::EventIntegrationTest; -use flowy_storage_pub::storage::UploadStatus; +use flowy_storage_pub::storage::FileUploadState; use std::env::temp_dir; +use std::sync::Arc; use std::time::Duration; use tokio::fs; use tokio::fs::File; use tokio::io::AsyncWriteExt; - -#[tokio::test] -async fn af_cloud_upload_file_test() { - user_localhost_af_cloud().await; - let test = EventIntegrationTest::new().await; - test.af_cloud_sign_up().await; - - let workspace_id = test.get_current_workspace().await.id; - let file_path = generate_file_with_bytes_len(1024).await.0; - let mut rx = test.storage_manager.subscribe_upload_result(); - - let created_upload = test - .storage_manager - .storage_service - .create_upload(&workspace_id, "temp_test", &file_path) - .await - .unwrap(); - - while let Ok(result) = rx.recv().await { - if result.file_id == created_upload.file_id && result.status == UploadStatus::Finish { - break; - } - } - - let _ = fs::remove_file(file_path).await; -} - +use tokio::sync::Mutex; #[tokio::test] async fn af_cloud_upload_big_file_test() { user_localhost_af_cloud().await; @@ -42,32 +17,39 @@ async fn af_cloud_upload_big_file_test() { tokio::time::sleep(Duration::from_secs(6)).await; let workspace_id = test.get_current_workspace().await.id; - let (file_path, upload_data) = generate_file_with_bytes_len(30 * 1024 * 1024).await; - let created_upload = test + let (file_path, upload_data) = generate_file_with_bytes_len(15 * 1024 * 1024).await; + let (created_upload, rx) = test .storage_manager .storage_service - .create_upload(&workspace_id, "temp_test", &file_path) + .create_upload(&workspace_id, "temp_test", &file_path, false) .await .unwrap(); - let mut rx = test.storage_manager.subscribe_upload_result(); - while let Ok(result) = rx.recv().await { - if result.file_id == created_upload.file_id && result.status == UploadStatus::InProgress { - break; + let mut rx = rx.unwrap(); + while let Some(state) = rx.recv().await { + if let FileUploadState::Uploading { progress } = state { + if progress > 0.1 { + break; + } } } // Simulate a restart let config = test.config.clone(); - test.set_no_cleanup(); + test.skip_clean(); drop(test); tokio::time::sleep(Duration::from_secs(3)).await; // Restart the test. It will load unfinished uploads let test = EventIntegrationTest::new_with_config(config).await; - let mut rx = test.storage_manager.subscribe_upload_result(); - while let Ok(result) = rx.recv().await { - if result.file_id == created_upload.file_id && result.status == UploadStatus::Finish { + let mut rx = test + .storage_manager + .subscribe_file_state(&created_upload.file_id) + .await + .unwrap(); + + while let Some(state) = rx.recv().await { + if let FileUploadState::Finished { .. } = state { break; } } @@ -90,35 +72,61 @@ async fn af_cloud_upload_6_files_test() { user_localhost_af_cloud().await; let test = EventIntegrationTest::new().await; test.af_cloud_sign_up().await; - let workspace_id = test.get_current_workspace().await.id; - let mut rx = test.storage_manager.subscribe_upload_result(); let mut created_uploads = vec![]; + let mut receivers = vec![]; for file_size in [1, 2, 5, 8, 12, 20] { let file_path = generate_file_with_bytes_len(file_size * 1024 * 1024) .await .0; - let created_upload = test + let (created_upload, rx) = test .storage_manager .storage_service - .create_upload(&workspace_id, "temp_test", &file_path) + .create_upload(&workspace_id, "temp_test", &file_path, false) .await .unwrap(); + receivers.push(rx.unwrap()); created_uploads.push(created_upload); - let _ = fs::remove_file(file_path).await; } - while let Ok(result) = rx.recv().await { - if result.status == UploadStatus::Finish { - created_uploads.retain(|upload| upload.file_id != result.file_id); - } - - if created_uploads.is_empty() { - break; - } + // Wait for all uploads to finish + let uploads = Arc::new(Mutex::new(created_uploads)); + let mut handles = vec![]; + for mut receiver in receivers { + let cloned_uploads = uploads.clone(); + let cloned_test = test.clone(); + let handle = tokio::spawn(async move { + if let Some(state) = cloned_test + .storage_manager + .get_file_state(&receiver.file_id) + .await + { + if let FileUploadState::Finished { file_id } = state { + cloned_uploads + .lock() + .await + .retain(|upload| upload.file_id != file_id); + } + } else { + while let Some(value) = receiver.recv().await { + if let FileUploadState::Finished { file_id } = value { + cloned_uploads + .lock() + .await + .retain(|upload| upload.file_id != file_id); + break; + } + } + } + }); + handles.push(handle); } + + // join all handles + futures::future::join_all(handles).await; + assert_eq!(uploads.lock().await.len(), 0); } async fn generate_file_with_bytes_len(len: usize) -> (String, Vec) { diff --git a/frontend/rust-lib/flowy-ai-pub/src/cloud.rs b/frontend/rust-lib/flowy-ai-pub/src/cloud.rs index 5096c8b981..e29ddae174 100644 --- a/frontend/rust-lib/flowy-ai-pub/src/cloud.rs +++ b/frontend/rust-lib/flowy-ai-pub/src/cloud.rs @@ -35,14 +35,14 @@ pub trait ChatCloudService: Send + Sync + 'static { metadata: &[ChatMessageMetadata], ) -> Result; - fn create_answer( + async fn create_answer( &self, workspace_id: &str, chat_id: &str, message: &str, question_id: i64, metadata: Option, - ) -> FutureResult; + ) -> Result; async fn stream_answer( &self, @@ -58,13 +58,13 @@ pub trait ChatCloudService: Send + Sync + 'static { question_message_id: i64, ) -> Result; - fn get_chat_messages( + async fn get_chat_messages( &self, workspace_id: &str, chat_id: &str, offset: MessageCursor, limit: u64, - ) -> FutureResult; + ) -> Result; async fn get_related_message( &self, diff --git a/frontend/rust-lib/flowy-ai/Cargo.toml b/frontend/rust-lib/flowy-ai/Cargo.toml index 8d7db1609f..3e26f38a5e 100644 --- a/frontend/rust-lib/flowy-ai/Cargo.toml +++ b/frontend/rust-lib/flowy-ai/Cargo.toml @@ -22,7 +22,7 @@ bytes.workspace = true validator = { workspace = true, features = ["derive"] } lib-infra = { workspace = true, features = ["isolate_flutter"] } flowy-ai-pub.workspace = true -dashmap = "5.5" +dashmap.workspace = true flowy-sqlite = { workspace = true } tokio.workspace = true futures.workspace = true @@ -44,6 +44,7 @@ md5 = "0.7.0" zip = { workspace = true, features = ["deflate"] } zip-extensions = "0.8.0" pin-project = "1.1.5" +flowy-storage-pub = { workspace = true } [target.'cfg(any(target_os = "macos", target_os = "linux", target_os = "windows"))'.dependencies] notify = "6.1.1" diff --git a/frontend/rust-lib/flowy-ai/src/ai_manager.rs b/frontend/rust-lib/flowy-ai/src/ai_manager.rs index 1ba15f754d..a8b9f5fc8f 100644 --- a/frontend/rust-lib/flowy-ai/src/ai_manager.rs +++ b/frontend/rust-lib/flowy-ai/src/ai_manager.rs @@ -13,9 +13,10 @@ use flowy_error::{FlowyError, FlowyResult}; use flowy_sqlite::kv::KVStorePreferences; use flowy_sqlite::DBConnection; +use flowy_storage_pub::storage::StorageService; use lib_infra::util::timestamp; use std::path::PathBuf; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use tracing::{info, trace}; pub trait AIUserService: Send + Sync + 'static { @@ -38,6 +39,7 @@ impl AIManager { chat_cloud_service: Arc, user_service: impl AIUserService, store_preferences: Arc, + storage_service: Weak, ) -> AIManager { let user_service = Arc::new(user_service); let plugin_manager = Arc::new(PluginManager::new()); @@ -53,6 +55,7 @@ impl AIManager { user_service.clone(), chat_cloud_service, local_ai_controller.clone(), + storage_service, )); Self { diff --git a/frontend/rust-lib/flowy-ai/src/chat.rs b/frontend/rust-lib/flowy-ai/src/chat.rs index 6305ac44da..b0fd1c42d3 100644 --- a/frontend/rust-lib/flowy-ai/src/chat.rs +++ b/frontend/rust-lib/flowy-ai/src/chat.rs @@ -89,6 +89,15 @@ impl Chat { if message.len() > 2000 { return Err(FlowyError::text_too_long().with_context("Exceeds maximum message 2000 length")); } + + trace!( + "[Chat] stream chat message: chat_id={}, message={}, message_type={:?}, metadata={:?}", + self.chat_id, + message, + message_type, + metadata + ); + // clear self .stop_stream @@ -101,12 +110,7 @@ impl Chat { let workspace_id = self.user_service.workspace_id()?; let _ = question_sink - .send( - StreamMessage::Text { - text: message.to_string(), - } - .to_string(), - ) + .send(StreamMessage::Text(message.to_string()).to_string()) .await; let question = self .chat_service @@ -124,31 +128,18 @@ impl Chat { })?; let _ = question_sink - .send( - StreamMessage::MessageId { - message_id: question.message_id, - } - .to_string(), - ) + .send(StreamMessage::MessageId(question.message_id).to_string()) .await; - - if self.chat_service.is_local_ai_enabled() && !metadata.is_empty() { - let _ = question_sink - .send(StreamMessage::IndexStart.to_string()) - .await; - if let Err(err) = self - .chat_service - .index_message_metadata(&self.chat_id, &metadata, &mut question_sink) - .await - { - error!("Failed to index file: {}", err); - } - let _ = question_sink - .send(StreamMessage::IndexEnd.to_string()) - .await; + if let Err(err) = self + .chat_service + .index_message_metadata(&self.chat_id, &metadata, &mut question_sink) + .await + { + error!("Failed to index file: {}", err); } let _ = question_sink.send(StreamMessage::Done.to_string()).await; + // Save message to disk save_chat_message( self.user_service.sqlite_connection(uid)?, &self.chat_id, diff --git a/frontend/rust-lib/flowy-ai/src/event_handler.rs b/frontend/rust-lib/flowy-ai/src/event_handler.rs index 26251560b3..21f14070f4 100644 --- a/frontend/rust-lib/flowy-ai/src/event_handler.rs +++ b/frontend/rust-lib/flowy-ai/src/event_handler.rs @@ -51,7 +51,6 @@ pub(crate) async fn stream_chat_message_handler( ChatMessageMetadata { data: ChatMetadataData { content: metadata.data, - url: None, content_type, size: content_len as i64, }, diff --git a/frontend/rust-lib/flowy-ai/src/local_ai/local_llm_chat.rs b/frontend/rust-lib/flowy-ai/src/local_ai/local_llm_chat.rs index 2fe6998a0a..8ea17c1638 100644 --- a/frontend/rust-lib/flowy-ai/src/local_ai/local_llm_chat.rs +++ b/frontend/rust-lib/flowy-ai/src/local_ai/local_llm_chat.rs @@ -26,7 +26,7 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::select; use tokio_stream::StreamExt; -use tracing::{debug, error, info, instrument}; +use tracing::{debug, error, info, instrument, trace}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct LLMSetting { @@ -348,45 +348,54 @@ impl LocalAIController { index_process_sink: &mut (impl Sink + Unpin), ) -> FlowyResult<()> { for metadata in metadata_list { + if let Err(err) = metadata.data.validate() { + error!( + "[AI Plugin] invalid metadata: {:?}, error: {:?}", + metadata, err + ); + continue; + } + let mut index_metadata = HashMap::new(); index_metadata.insert("name".to_string(), json!(&metadata.name)); index_metadata.insert("at_name".to_string(), json!(format!("@{}", &metadata.name))); index_metadata.insert("source".to_string(), json!(&metadata.source)); - - if let Some(url) = &metadata.data.url { - let file_path = Path::new(url); - if file_path.exists() { + match &metadata.data.content_type { + ChatMetadataContentType::Unknown => { + error!( + "[AI Plugin] unsupported content type: {:?}", + metadata.data.content_type + ); + }, + ChatMetadataContentType::Text | ChatMetadataContentType::Markdown => { + trace!("[AI Plugin]: index text: {}", metadata.data.content); self .process_index_file( chat_id, - Some(file_path.to_path_buf()), None, + Some(metadata.data.content.clone()), metadata, &index_metadata, index_process_sink, ) .await?; - } - } else if matches!( - metadata.data.content_type, - ChatMetadataContentType::Text | ChatMetadataContentType::Markdown - ) && metadata.data.validate() - { - self - .process_index_file( - chat_id, - None, - Some(metadata.data.content.clone()), - metadata, - &index_metadata, - index_process_sink, - ) - .await?; - } else { - error!( - "[AI Plugin] unsupported content type: {:?}", - metadata.data.content_type - ); + }, + ChatMetadataContentType::PDF => { + trace!("[AI Plugin]: index pdf file: {}", metadata.data.content); + let file_path = Path::new(&metadata.data.content); + if file_path.exists() { + self + .process_index_file( + chat_id, + Some(file_path.to_path_buf()), + None, + metadata, + &index_metadata, + index_process_sink, + ) + .await?; + } + }, } } diff --git a/frontend/rust-lib/flowy-ai/src/middleware/chat_service_mw.rs b/frontend/rust-lib/flowy-ai/src/middleware/chat_service_mw.rs index dbab55610d..8da2505e75 100644 --- a/frontend/rust-lib/flowy-ai/src/middleware/chat_service_mw.rs +++ b/frontend/rust-lib/flowy-ai/src/middleware/chat_service_mw.rs @@ -16,15 +16,19 @@ use lib_infra::async_trait::async_trait; use lib_infra::future::FutureResult; use crate::local_ai::stream_util::LocalAIStreamAdaptor; +use crate::stream_message::StreamMessage; +use flowy_storage_pub::storage::StorageService; +use futures_util::SinkExt; use serde_json::json; use std::path::Path; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use tracing::trace; pub struct AICloudServiceMiddleware { cloud_service: Arc, user_service: Arc, local_llm_controller: Arc, + storage_service: Weak, } impl AICloudServiceMiddleware { @@ -32,11 +36,13 @@ impl AICloudServiceMiddleware { user_service: Arc, cloud_service: Arc, local_llm_controller: Arc, + storage_service: Weak, ) -> Self { Self { user_service, cloud_service, local_llm_controller, + storage_service, } } @@ -50,10 +56,23 @@ impl AICloudServiceMiddleware { metadata_list: &[ChatMessageMetadata], index_process_sink: &mut (impl Sink + Unpin), ) -> Result<(), FlowyError> { - self - .local_llm_controller - .index_message_metadata(chat_id, metadata_list, index_process_sink) - .await?; + if metadata_list.is_empty() { + return Ok(()); + } + if self.is_local_ai_enabled() { + let _ = index_process_sink + .send(StreamMessage::IndexStart.to_string()) + .await; + self + .local_llm_controller + .index_message_metadata(chat_id, metadata_list, index_process_sink) + .await?; + let _ = index_process_sink + .send(StreamMessage::IndexEnd.to_string()) + .await; + } else if let Some(_storage_service) = self.storage_service.upgrade() { + // + } Ok(()) } @@ -110,17 +129,18 @@ impl ChatCloudService for AICloudServiceMiddleware { .await } - fn create_answer( + async fn create_answer( &self, workspace_id: &str, chat_id: &str, message: &str, question_id: i64, metadata: Option, - ) -> FutureResult { + ) -> Result { self .cloud_service .create_answer(workspace_id, chat_id, message, question_id, metadata) + .await } async fn stream_answer( @@ -184,16 +204,17 @@ impl ChatCloudService for AICloudServiceMiddleware { } } - fn get_chat_messages( + async fn get_chat_messages( &self, workspace_id: &str, chat_id: &str, offset: MessageCursor, limit: u64, - ) -> FutureResult { + ) -> Result { self .cloud_service .get_chat_messages(workspace_id, chat_id, offset, limit) + .await } async fn get_related_message( diff --git a/frontend/rust-lib/flowy-ai/src/stream_message.rs b/frontend/rust-lib/flowy-ai/src/stream_message.rs index d2b2b14100..c507262b85 100644 --- a/frontend/rust-lib/flowy-ai/src/stream_message.rs +++ b/frontend/rust-lib/flowy-ai/src/stream_message.rs @@ -1,22 +1,23 @@ use std::fmt::Display; pub enum StreamMessage { - MessageId { message_id: i64 }, + MessageId(i64), IndexStart, IndexEnd, - Text { text: String }, + Text(String), Done, StartIndexFile { file_name: String }, EndIndexFile { file_name: String }, IndexFileError { file_name: String }, } + impl Display for StreamMessage { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - StreamMessage::MessageId { message_id } => write!(f, "message_id:{}", message_id), + StreamMessage::MessageId(message_id) => write!(f, "message_id:{}", message_id), StreamMessage::IndexStart => write!(f, "index_start:"), StreamMessage::IndexEnd => write!(f, "index_end"), - StreamMessage::Text { text } => { + StreamMessage::Text(text) => { write!(f, "data:{}", text) }, StreamMessage::Done => write!(f, "done:"), diff --git a/frontend/rust-lib/flowy-core/src/deps_resolve/chat_deps.rs b/frontend/rust-lib/flowy-core/src/deps_resolve/chat_deps.rs index 1cc7c5c490..c12cc0f181 100644 --- a/frontend/rust-lib/flowy-core/src/deps_resolve/chat_deps.rs +++ b/frontend/rust-lib/flowy-core/src/deps_resolve/chat_deps.rs @@ -3,6 +3,7 @@ use flowy_ai_pub::cloud::ChatCloudService; use flowy_error::FlowyError; use flowy_sqlite::kv::KVStorePreferences; use flowy_sqlite::DBConnection; +use flowy_storage_pub::storage::StorageService; use flowy_user::services::authenticate_user::AuthenticateUser; use std::path::PathBuf; use std::sync::{Arc, Weak}; @@ -14,12 +15,14 @@ impl ChatDepsResolver { authenticate_user: Weak, cloud_service: Arc, store_preferences: Arc, + storage_service: Weak, ) -> Arc { let user_service = ChatUserServiceImpl(authenticate_user); Arc::new(AIManager::new( cloud_service, user_service, store_preferences, + storage_service, )) } } diff --git a/frontend/rust-lib/flowy-core/src/integrate/trait_impls.rs b/frontend/rust-lib/flowy-core/src/integrate/trait_impls.rs index 26e17f2a5c..afb482d64a 100644 --- a/frontend/rust-lib/flowy-core/src/integrate/trait_impls.rs +++ b/frontend/rust-lib/flowy-core/src/integrate/trait_impls.rs @@ -629,24 +629,19 @@ impl ChatCloudService for ServerProvider { .await } - fn create_answer( + async fn create_answer( &self, workspace_id: &str, chat_id: &str, message: &str, question_id: i64, metadata: Option, - ) -> FutureResult { - let workspace_id = workspace_id.to_string(); - let chat_id = chat_id.to_string(); - let message = message.to_string(); + ) -> Result { let server = self.get_server(); - FutureResult::new(async move { - server? - .chat_service() - .create_answer(&workspace_id, &chat_id, &message, question_id, metadata) - .await - }) + server? + .chat_service() + .create_answer(workspace_id, chat_id, message, question_id, metadata) + .await } async fn stream_answer( @@ -664,22 +659,18 @@ impl ChatCloudService for ServerProvider { .await } - fn get_chat_messages( + async fn get_chat_messages( &self, workspace_id: &str, chat_id: &str, offset: MessageCursor, limit: u64, - ) -> FutureResult { - let workspace_id = workspace_id.to_string(); - let chat_id = chat_id.to_string(); - let server = self.get_server(); - FutureResult::new(async move { - server? - .chat_service() - .get_chat_messages(&workspace_id, &chat_id, offset, limit) - .await - }) + ) -> Result { + self + .get_server()? + .chat_service() + .get_chat_messages(workspace_id, chat_id, offset, limit) + .await } async fn get_related_message( diff --git a/frontend/rust-lib/flowy-core/src/lib.rs b/frontend/rust-lib/flowy-core/src/lib.rs index 2e72aa8469..761708bfe5 100644 --- a/frontend/rust-lib/flowy-core/src/lib.rs +++ b/frontend/rust-lib/flowy-core/src/lib.rs @@ -165,6 +165,7 @@ impl AppFlowyCore { Arc::downgrade(&authenticate_user), server_provider.clone(), store_preference.clone(), + Arc::downgrade(&storage_manager.storage_service), ); let database_manager = DatabaseDepsResolver::resolve( diff --git a/frontend/rust-lib/flowy-database2/Cargo.toml b/frontend/rust-lib/flowy-database2/Cargo.toml index cc32f944f5..f4acee0d4d 100644 --- a/frontend/rust-lib/flowy-database2/Cargo.toml +++ b/frontend/rust-lib/flowy-database2/Cargo.toml @@ -20,7 +20,7 @@ protobuf.workspace = true flowy-error = { path = "../flowy-error", features = [ "impl_from_dispatch_error", "impl_from_collab_database", -]} +] } lib-dispatch = { workspace = true } tokio = { workspace = true, features = ["sync"] } @@ -38,7 +38,7 @@ indexmap = { version = "2.1.0", features = ["serde"] } url = { version = "2" } fancy-regex = "0.11.0" futures.workspace = true -dashmap = "5" +dashmap.workspace = true anyhow.workspace = true async-stream = "0.3.4" rayon = "1.9.0" diff --git a/frontend/rust-lib/flowy-document/Cargo.toml b/frontend/rust-lib/flowy-document/Cargo.toml index e9bc84b8fe..f64c960b12 100644 --- a/frontend/rust-lib/flowy-document/Cargo.toml +++ b/frontend/rust-lib/flowy-document/Cargo.toml @@ -35,7 +35,7 @@ indexmap = { version = "2.1.0", features = ["serde"] } uuid.workspace = true futures.workspace = true tokio-stream = { workspace = true, features = ["sync"] } -dashmap = "5" +dashmap.workspace = true scraper = "0.18.0" [target.'cfg(target_arch = "wasm32")'.dependencies] diff --git a/frontend/rust-lib/flowy-document/src/event_handler.rs b/frontend/rust-lib/flowy-document/src/event_handler.rs index b99f51c8c5..66a98e3105 100644 --- a/frontend/rust-lib/flowy-document/src/event_handler.rs +++ b/frontend/rust-lib/flowy-document/src/event_handler.rs @@ -454,10 +454,17 @@ pub(crate) async fn upload_file_handler( } = params.try_into_inner()?; let manager = upgrade_document(manager)?; - let upload = manager - .upload_file(workspace_id, &document_id, &local_file_path) - .await?; + let (tx, rx) = tokio::sync::oneshot::channel(); + let cloned_local_file_path = local_file_path.clone(); + tokio::spawn(async move { + let result = manager + .upload_file(workspace_id, &document_id, &cloned_local_file_path) + .await; + let _ = tx.send(result); + Ok::<(), FlowyError>(()) + }); + let upload = rx.await??; data_result_ok(UploadedFilePB { url: upload.url, local_file_path, diff --git a/frontend/rust-lib/flowy-document/src/manager.rs b/frontend/rust-lib/flowy-document/src/manager.rs index 7718eead4e..5ea5aeb2de 100644 --- a/frontend/rust-lib/flowy-document/src/manager.rs +++ b/frontend/rust-lib/flowy-document/src/manager.rs @@ -358,8 +358,9 @@ impl DocumentManager { ) -> FlowyResult { let storage_service = self.storage_service_upgrade()?; let upload = storage_service - .create_upload(&workspace_id, document_id, local_file_path) - .await?; + .create_upload(&workspace_id, document_id, local_file_path, false) + .await? + .0; Ok(upload) } diff --git a/frontend/rust-lib/flowy-document/tests/document/util.rs b/frontend/rust-lib/flowy-document/tests/document/util.rs index ae2351ddd9..58663abd14 100644 --- a/frontend/rust-lib/flowy-document/tests/document/util.rs +++ b/frontend/rust-lib/flowy-document/tests/document/util.rs @@ -21,7 +21,7 @@ use flowy_document::manager::{DocumentManager, DocumentSnapshotService, Document use flowy_document_pub::cloud::*; use flowy_error::{ErrorCode, FlowyError, FlowyResult}; use flowy_storage_pub::chunked_byte::ChunkedBytes; -use flowy_storage_pub::storage::{CreatedUpload, StorageService}; +use flowy_storage_pub::storage::{CreatedUpload, FileProgressReceiver, StorageService}; use lib_infra::async_trait::async_trait; use lib_infra::box_any::BoxAny; use lib_infra::future::FutureResult; @@ -178,14 +178,6 @@ pub struct DocumentTestFileStorageService; #[async_trait] impl StorageService for DocumentTestFileStorageService { - fn upload_object( - &self, - _workspace_id: &str, - _local_file_path: &str, - ) -> FutureResult { - todo!() - } - fn delete_object(&self, _url: String, _local_file_path: String) -> FlowyResult<()> { todo!() } @@ -194,12 +186,13 @@ impl StorageService for DocumentTestFileStorageService { todo!() } - fn create_upload( + async fn create_upload( &self, _workspace_id: &str, _parent_dir: &str, _local_file_path: &str, - ) -> FutureResult { + _upload_immediately: bool, + ) -> Result<(CreatedUpload, Option), flowy_error::FlowyError> { todo!() } @@ -215,6 +208,10 @@ impl StorageService for DocumentTestFileStorageService { ) -> Result<(), FlowyError> { todo!() } + + async fn subscribe_file_progress(&self, _url: &str) -> Result { + todo!() + } } struct DefaultCollabStorageProvider(); diff --git a/frontend/rust-lib/flowy-notification/Cargo.toml b/frontend/rust-lib/flowy-notification/Cargo.toml index b459c9afbf..b7a96898ff 100644 --- a/frontend/rust-lib/flowy-notification/Cargo.toml +++ b/frontend/rust-lib/flowy-notification/Cargo.toml @@ -13,7 +13,7 @@ protobuf.workspace = true tracing.workspace = true bytes.workspace = true serde = { workspace = true, features = ["derive"] } -dashmap = "5.5" +dashmap.workspace = true tokio-util = "0.7" tokio = { workspace = true, features = ["time"] } diff --git a/frontend/rust-lib/flowy-server/src/af_cloud/impls/chat.rs b/frontend/rust-lib/flowy-server/src/af_cloud/impls/chat.rs index 43fbc74cf0..3bdaf7e1a6 100644 --- a/frontend/rust-lib/flowy-server/src/af_cloud/impls/chat.rs +++ b/frontend/rust-lib/flowy-server/src/af_cloud/impls/chat.rs @@ -76,30 +76,25 @@ where Ok(message) } - fn create_answer( + async fn create_answer( &self, workspace_id: &str, chat_id: &str, message: &str, question_id: i64, metadata: Option, - ) -> FutureResult { - let workspace_id = workspace_id.to_string(); - let chat_id = chat_id.to_string(); + ) -> Result { let try_get_client = self.inner.try_get_client(); let params = CreateAnswerMessageParams { content: message.to_string(), metadata, question_message_id: question_id, }; - - FutureResult::new(async move { - let message = try_get_client? - .save_answer(&workspace_id, &chat_id, params) - .await - .map_err(FlowyError::from)?; - Ok(message) - }) + let message = try_get_client? + .save_answer(workspace_id, chat_id, params) + .await + .map_err(FlowyError::from)?; + Ok(message) } async fn stream_answer( @@ -131,25 +126,20 @@ where Ok(resp) } - fn get_chat_messages( + async fn get_chat_messages( &self, workspace_id: &str, chat_id: &str, offset: MessageCursor, limit: u64, - ) -> FutureResult { - let workspace_id = workspace_id.to_string(); - let chat_id = chat_id.to_string(); + ) -> Result { let try_get_client = self.inner.try_get_client(); + let resp = try_get_client? + .get_chat_messages(workspace_id, chat_id, offset, limit) + .await + .map_err(FlowyError::from)?; - FutureResult::new(async move { - let resp = try_get_client? - .get_chat_messages(&workspace_id, &chat_id, offset, limit) - .await - .map_err(FlowyError::from)?; - - Ok(resp) - }) + Ok(resp) } async fn get_related_message( diff --git a/frontend/rust-lib/flowy-server/src/default_impl.rs b/frontend/rust-lib/flowy-server/src/default_impl.rs index ff409faeae..be0ce1092e 100644 --- a/frontend/rust-lib/flowy-server/src/default_impl.rs +++ b/frontend/rust-lib/flowy-server/src/default_impl.rs @@ -34,17 +34,15 @@ impl ChatCloudService for DefaultChatCloudServiceImpl { Err(FlowyError::not_support().with_context("Chat is not supported in local server.")) } - fn create_answer( + async fn create_answer( &self, _workspace_id: &str, _chat_id: &str, _message: &str, _question_id: i64, _metadata: Option, - ) -> FutureResult { - FutureResult::new(async move { - Err(FlowyError::not_support().with_context("Chat is not supported in local server.")) - }) + ) -> Result { + Err(FlowyError::not_support().with_context("Chat is not supported in local server.")) } async fn stream_answer( @@ -56,16 +54,14 @@ impl ChatCloudService for DefaultChatCloudServiceImpl { Err(FlowyError::not_support().with_context("Chat is not supported in local server.")) } - fn get_chat_messages( + async fn get_chat_messages( &self, _workspace_id: &str, _chat_id: &str, _offset: MessageCursor, _limit: u64, - ) -> FutureResult { - FutureResult::new(async move { - Err(FlowyError::not_support().with_context("Chat is not supported in local server.")) - }) + ) -> Result { + Err(FlowyError::not_support().with_context("Chat is not supported in local server.")) } async fn get_related_message( diff --git a/frontend/rust-lib/flowy-storage-pub/Cargo.toml b/frontend/rust-lib/flowy-storage-pub/Cargo.toml index 3dee8f9f5e..ecab2212f8 100644 --- a/frontend/rust-lib/flowy-storage-pub/Cargo.toml +++ b/frontend/rust-lib/flowy-storage-pub/Cargo.toml @@ -17,3 +17,4 @@ mime_guess = "2.0.4" client-api-entity = { workspace = true } tokio = { workspace = true, features = ["sync", "io-util"] } anyhow = "1.0.86" +tracing.workspace = true diff --git a/frontend/rust-lib/flowy-storage-pub/src/storage.rs b/frontend/rust-lib/flowy-storage-pub/src/storage.rs index 8d40bef64c..30a9231dab 100644 --- a/frontend/rust-lib/flowy-storage-pub/src/storage.rs +++ b/frontend/rust-lib/flowy-storage-pub/src/storage.rs @@ -3,26 +3,23 @@ use async_trait::async_trait; pub use client_api_entity::{CompletedPartRequest, CreateUploadResponse, UploadPartResponse}; use flowy_error::{FlowyError, FlowyResult}; use lib_infra::box_any::BoxAny; -use lib_infra::future::FutureResult; +use std::ops::{Deref, DerefMut}; +use tokio::sync::mpsc; +use tracing::error; #[async_trait] pub trait StorageService: Send + Sync { - fn upload_object( - &self, - workspace_id: &str, - local_file_path: &str, - ) -> FutureResult; - fn delete_object(&self, url: String, local_file_path: String) -> FlowyResult<()>; fn download_object(&self, url: String, local_file_path: String) -> FlowyResult<()>; - fn create_upload( + async fn create_upload( &self, workspace_id: &str, parent_dir: &str, local_file_path: &str, - ) -> FutureResult; + upload_immediately: bool, + ) -> Result<(CreatedUpload, Option), FlowyError>; async fn start_upload(&self, chunks: &ChunkedBytes, record: &BoxAny) -> Result<(), FlowyError>; @@ -32,22 +29,70 @@ pub trait StorageService: Send + Sync { parent_dir: &str, file_id: &str, ) -> Result<(), FlowyError>; + + async fn subscribe_file_progress( + &self, + file_id: &str, + ) -> Result; +} + +pub struct FileProgressReceiver { + pub rx: mpsc::Receiver, + pub file_id: String, +} + +impl Deref for FileProgressReceiver { + type Target = mpsc::Receiver; + + fn deref(&self) -> &Self::Target { + &self.rx + } +} + +impl DerefMut for FileProgressReceiver { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.rx + } +} + +#[derive(Clone, Debug)] +pub enum FileUploadState { + NotStarted, + Uploading { progress: f64 }, + Finished { file_id: String }, +} + +#[derive(Debug)] +pub struct ProgressNotifier { + tx: mpsc::Sender, + pub current_value: Option, +} + +impl ProgressNotifier { + pub fn new() -> (Self, mpsc::Receiver) { + let (tx, rx) = mpsc::channel(5); + ( + ProgressNotifier { + tx, + current_value: None, + }, + rx, + ) + } + + pub async fn notify(&mut self, progress: FileUploadState) { + self.current_value = Some(progress.clone()); + // if self.tx.reserve().await.is_err() { + // return; + // } + + if let Err(err) = self.tx.send(progress).await { + error!("Failed to send progress notification: {:?}", err); + } + } } pub struct CreatedUpload { pub url: String, pub file_id: String, } - -#[derive(Debug, Clone)] -pub struct UploadResult { - pub file_id: String, - pub status: UploadStatus, -} - -#[derive(Debug, Clone, Eq, PartialEq)] -pub enum UploadStatus { - Finish, - Failed, - InProgress, -} diff --git a/frontend/rust-lib/flowy-storage/Cargo.toml b/frontend/rust-lib/flowy-storage/Cargo.toml index 8bbd24e4ff..d5a0ae0ff2 100644 --- a/frontend/rust-lib/flowy-storage/Cargo.toml +++ b/frontend/rust-lib/flowy-storage/Cargo.toml @@ -22,6 +22,7 @@ chrono = "0.4.33" flowy-notification = { workspace = true } flowy-derive.workspace = true protobuf = { workspace = true } +dashmap.workspace = true [dev-dependencies] tokio = { workspace = true, features = ["full"] } diff --git a/frontend/rust-lib/flowy-storage/src/manager.rs b/frontend/rust-lib/flowy-storage/src/manager.rs index 0ae2751b5a..fb96e18529 100644 --- a/frontend/rust-lib/flowy-storage/src/manager.rs +++ b/frontend/rust-lib/flowy-storage/src/manager.rs @@ -7,16 +7,16 @@ use crate::sqlite_sql::{ }; use crate::uploader::{FileUploader, FileUploaderRunner, Signal, UploadTask, UploadTaskQueue}; use async_trait::async_trait; +use dashmap::DashMap; use flowy_error::{ErrorCode, FlowyError, FlowyResult}; use flowy_sqlite::DBConnection; use flowy_storage_pub::chunked_byte::{ChunkedBytes, MIN_CHUNK_SIZE}; use flowy_storage_pub::cloud::{ObjectIdentity, ObjectValue, StorageCloudService}; use flowy_storage_pub::storage::{ - CompletedPartRequest, CreatedUpload, StorageService, UploadPartResponse, UploadResult, - UploadStatus, + CompletedPartRequest, CreatedUpload, FileProgressReceiver, FileUploadState, ProgressNotifier, + StorageService, UploadPartResponse, }; use lib_infra::box_any::BoxAny; -use lib_infra::future::FutureResult; use lib_infra::util::timestamp; use std::io::ErrorKind; use std::path::{Path, PathBuf}; @@ -37,7 +37,7 @@ pub trait StorageUserService: Send + Sync + 'static { pub struct StorageManager { pub storage_service: Arc, uploader: Arc, - upload_status_notifier: tokio::sync::broadcast::Sender, + progress_notifiers: Arc>, } impl Drop for StorageManager { @@ -58,15 +58,15 @@ impl StorageManager { )); let temp_storage = Arc::new(FileTempStorage::new(temp_storage_path)); let (notifier, notifier_rx) = watch::channel(Signal::Proceed); - let (upload_status_notifier, _) = tokio::sync::broadcast::channel::(100); let task_queue = Arc::new(UploadTaskQueue::new(notifier)); + let progress_notifiers = Arc::new(DashMap::new()); let storage_service = Arc::new(StorageServiceImpl { cloud_service, user_service: user_service.clone(), temp_storage, task_queue: task_queue.clone(), - upload_status_notifier: upload_status_notifier.clone(), is_exceed_storage_limit: is_exceed_storage_limit.clone(), + progress_notifiers: progress_notifiers.clone(), }); let uploader = Arc::new(FileUploader::new( @@ -81,8 +81,8 @@ impl StorageManager { let weak_uploader = Arc::downgrade(&uploader); tokio::spawn(async move { - // Start uploading after 30 seconds - tokio::time::sleep(Duration::from_secs(30)).await; + // Start uploading after 20 seconds + tokio::time::sleep(Duration::from_secs(20)).await; if let Some(uploader) = weak_uploader.upgrade() { if let Err(err) = prepare_upload_task(uploader, user_service).await { error!("prepare upload task failed: {}", err); @@ -93,7 +93,7 @@ impl StorageManager { Self { storage_service, uploader, - upload_status_notifier, + progress_notifiers, } } @@ -115,8 +115,18 @@ impl StorageManager { self.uploader.enable_storage_write(); } - pub fn subscribe_upload_result(&self) -> tokio::sync::broadcast::Receiver { - self.upload_status_notifier.subscribe() + pub async fn subscribe_file_state( + &self, + file_id: &str, + ) -> Result { + self.storage_service.subscribe_file_progress(file_id).await + } + + pub async fn get_file_state(&self, file_id: &str) -> Option { + self + .progress_notifiers + .get(file_id) + .and_then(|notifier| notifier.value().current_value.clone()) } } @@ -147,37 +157,12 @@ pub struct StorageServiceImpl { user_service: Arc, temp_storage: Arc, task_queue: Arc, - upload_status_notifier: tokio::sync::broadcast::Sender, is_exceed_storage_limit: Arc, + progress_notifiers: Arc>, } #[async_trait] impl StorageService for StorageServiceImpl { - fn upload_object( - &self, - workspace_id: &str, - local_file_path: &str, - ) -> FutureResult { - let cloud_service = self.cloud_service.clone(); - let workspace_id = workspace_id.to_string(); - let local_file_path = local_file_path.to_string(); - FutureResult::new(async move { - let (object_identity, object_value) = - object_from_disk(&workspace_id, &local_file_path).await?; - let url = cloud_service.get_object_url(object_identity).await?; - match cloud_service.put_object(url.clone(), object_value).await { - Ok(_) => { - debug!("[File] success uploaded file to cloud: {}", url); - }, - Err(err) => { - error!("[File] upload file failed: {}", err); - return Err(err); - }, - } - Ok(url) - }) - } - fn delete_object(&self, url: String, local_file_path: String) -> FlowyResult<()> { let cloud_service = self.cloud_service.clone(); tokio::spawn(async move { @@ -227,93 +212,106 @@ impl StorageService for StorageServiceImpl { Ok(()) } - fn create_upload( + async fn create_upload( &self, workspace_id: &str, parent_dir: &str, file_path: &str, - ) -> FutureResult { + upload_immediately: bool, + ) -> Result<(CreatedUpload, Option), FlowyError> { if workspace_id.is_empty() { - return FutureResult::new(async { - Err(FlowyError::internal().with_context("workspace id is empty")) - }); + return Err(FlowyError::internal().with_context("workspace id is empty")); } if parent_dir.is_empty() { - return FutureResult::new(async { - Err(FlowyError::internal().with_context("parent dir is empty")) - }); + return Err(FlowyError::internal().with_context("parent dir is empty")); } if file_path.is_empty() { - return FutureResult::new(async { - Err(FlowyError::internal().with_context("local file path is empty")) - }); + return Err(FlowyError::internal().with_context("local file path is empty")); } let workspace_id = workspace_id.to_string(); let parent_dir = parent_dir.to_string(); let file_path = file_path.to_string(); - let temp_storage = self.temp_storage.clone(); - let task_queue = self.task_queue.clone(); - let user_service = self.user_service.clone(); - let cloud_service = self.cloud_service.clone(); - let is_exceed_storage_limit = self.is_exceed_storage_limit.clone(); - FutureResult::new(async move { - let is_exceed_limit = is_exceed_storage_limit.load(std::sync::atomic::Ordering::Relaxed); - if is_exceed_limit { - make_notification(StorageNotification::FileStorageLimitExceeded) - .payload(FlowyError::file_storage_limit()) - .send(); + let is_exceed_limit = self + .is_exceed_storage_limit + .load(std::sync::atomic::Ordering::Relaxed); + if is_exceed_limit { + make_notification(StorageNotification::FileStorageLimitExceeded) + .payload(FlowyError::file_storage_limit()) + .send(); - return Err(FlowyError::file_storage_limit()); - } + return Err(FlowyError::file_storage_limit()); + } - let local_file_path = temp_storage - .create_temp_file_from_existing(Path::new(&file_path)) - .await - .map_err(|err| { - error!("[File] create temp file failed: {}", err); - FlowyError::internal() - .with_context(format!("create temp file for upload file failed: {}", err)) - })?; + let local_file_path = self + .temp_storage + .create_temp_file_from_existing(Path::new(&file_path)) + .await + .map_err(|err| { + error!("[File] create temp file failed: {}", err); + FlowyError::internal() + .with_context(format!("create temp file for upload file failed: {}", err)) + })?; - // 1. create a file record and chunk the file - let (chunks, record) = - create_upload_record(workspace_id, parent_dir, local_file_path).await?; + // 1. create a file record and chunk the file + let (chunks, record) = create_upload_record(workspace_id, parent_dir, local_file_path).await?; - // 2. save the record to sqlite - let conn = user_service.sqlite_connection(user_service.user_id()?)?; - let url = cloud_service.get_object_url_v1( - &record.workspace_id, - &record.parent_dir, - &record.file_id, - )?; - let file_id = record.file_id.clone(); - match insert_upload_file(conn, &record) { - Ok(_) => { - // 3. generate url for given file - task_queue + // 2. save the record to sqlite + let conn = self + .user_service + .sqlite_connection(self.user_service.user_id()?)?; + let url = self.cloud_service.get_object_url_v1( + &record.workspace_id, + &record.parent_dir, + &record.file_id, + )?; + let file_id = record.file_id.clone(); + match insert_upload_file(conn, &record) { + Ok(_) => { + // 3. generate url for given file + if upload_immediately { + self + .task_queue + .queue_task(UploadTask::ImmediateTask { + chunks, + record, + retry_count: 3, + }) + .await; + } else { + self + .task_queue .queue_task(UploadTask::Task { chunks, record, retry_count: 0, }) .await; + } - Ok::<_, FlowyError>(CreatedUpload { url, file_id }) - }, - Err(err) => { - if matches!(err.code, ErrorCode::DuplicateSqliteRecord) { - info!("upload record already exists, skip creating new upload task"); - Ok::<_, FlowyError>(CreatedUpload { url, file_id }) - } else { - Err(err) - } - }, - } - }) + let (notifier, receiver) = ProgressNotifier::new(); + let receiver = FileProgressReceiver { + rx: receiver, + file_id: file_id.to_string(), + }; + self + .progress_notifiers + .insert(file_id.to_string(), notifier); + + Ok::<_, FlowyError>((CreatedUpload { url, file_id }, Some(receiver))) + }, + Err(err) => { + if matches!(err.code, ErrorCode::DuplicateSqliteRecord) { + info!("upload record already exists, skip creating new upload task"); + Ok::<_, FlowyError>((CreatedUpload { url, file_id }, None)) + } else { + Err(err) + } + }, + } } async fn start_upload(&self, chunks: &ChunkedBytes, record: &BoxAny) -> Result<(), FlowyError> { @@ -327,7 +325,7 @@ impl StorageService for StorageServiceImpl { &self.temp_storage, chunks, file_record, - self.upload_status_notifier.clone(), + self.progress_notifiers.clone(), ) .await { @@ -371,7 +369,7 @@ impl StorageService for StorageServiceImpl { &self.temp_storage, upload_file, parts, - self.upload_status_notifier.clone(), + self.progress_notifiers.clone(), ) .await?; } else { @@ -379,6 +377,22 @@ impl StorageService for StorageServiceImpl { } Ok(()) } + + async fn subscribe_file_progress( + &self, + file_id: &str, + ) -> Result { + trace!("[File]: subscribe file progress: {}", file_id); + let (notifier, receiver) = ProgressNotifier::new(); + let receiver = FileProgressReceiver { + rx: receiver, + file_id: file_id.to_string(), + }; + self + .progress_notifiers + .insert(file_id.to_string(), notifier); + Ok(receiver) + } } async fn create_upload_record( @@ -418,7 +432,7 @@ async fn start_upload( temp_storage: &Arc, chunked_bytes: &ChunkedBytes, upload_file: &UploadFileTable, - notifier: tokio::sync::broadcast::Sender, + progress_notifiers: Arc>, ) -> FlowyResult<()> { let mut upload_file = upload_file.clone(); if upload_file.upload_id.is_empty() { @@ -466,22 +480,32 @@ async fn start_upload( upload_file.upload_id = create_upload_resp.upload_id; } - let _ = notifier.send(UploadResult { - file_id: upload_file.file_id.clone(), - status: UploadStatus::InProgress, - }); - // 3. start uploading parts trace!( "[File] {} start uploading parts: {}", upload_file.file_id, chunked_bytes.iter().count() ); + let total_parts = chunked_bytes.iter().count(); let iter = chunked_bytes.iter().enumerate(); - let mut completed_parts = Vec::new(); + + let mut conn = user_service.sqlite_connection(user_service.user_id()?)?; + + // 4. gather existing completed parts + let mut completed_parts = select_upload_parts(&mut conn, &upload_file.upload_id) + .unwrap_or_default() + .into_iter() + .map(|part| CompletedPartRequest { + e_tag: part.e_tag, + part_number: part.part_num, + }) + .collect::>(); + + // when there are any existing parts, skip those parts by setting the current offset. + let offset = completed_parts.len(); for (index, chunk_bytes) in iter { - let part_number = index as i32 + 1; + let part_number = offset + index + 1; trace!( "[File] {} uploading part: {}, len:{}KB", upload_file.file_id, @@ -496,18 +520,24 @@ async fn start_upload( &upload_file.parent_dir, &upload_file.upload_id, &upload_file.file_id, - part_number, + part_number as i32, chunk_bytes.to_vec(), ) .await { Ok(resp) => { + let progress = (part_number as f64 / total_parts as f64).clamp(0.0, 1.0); trace!( - "[File] {} upload {} part success, total:{},", + "[File] {} upload progress: {}", upload_file.file_id, - part_number, - chunked_bytes.offsets.len() + progress ); + if let Some(mut notifier) = progress_notifiers.get_mut(&upload_file.file_id) { + notifier + .notify(FileUploadState::Uploading { progress }) + .await; + } + // gather completed part completed_parts.push(CompletedPartRequest { e_tag: resp.e_tag, @@ -534,7 +564,7 @@ async fn start_upload( temp_storage, &upload_file, completed_parts, - notifier, + &progress_notifiers, ) .await; if let Err(err) = complete_upload_result { @@ -556,7 +586,7 @@ async fn resume_upload( temp_storage: &Arc, upload_file: UploadFileTable, parts: Vec, - notifier: tokio::sync::broadcast::Sender, + progress_notifiers: Arc>, ) -> FlowyResult<()> { trace!( "[File] resume upload for workspace: {}, parent_dir: {}, file_id: {}, local_file_path:{}", @@ -576,7 +606,7 @@ async fn resume_upload( temp_storage, &chunked_bytes, &upload_file, - notifier, + progress_notifiers, ) .await?; }, @@ -643,8 +673,13 @@ async fn complete_upload( temp_storage: &Arc, upload_file: &UploadFileTable, parts: Vec, - notifier: tokio::sync::broadcast::Sender, + progress_notifiers: &Arc>, ) -> Result<(), FlowyError> { + trace!( + "[File]: completing file upload: {}, part: {}", + upload_file.file_id, + parts.len() + ); match cloud_service .complete_upload( &upload_file.workspace_id, @@ -656,13 +691,17 @@ async fn complete_upload( .await { Ok(_) => { - info!("[File] completed upload file: {}", upload_file.upload_id); - trace!("[File] delete upload record from sqlite"); - let _ = notifier.send(UploadResult { - file_id: upload_file.file_id.clone(), - status: UploadStatus::Finish, - }); + info!("[File] completed upload file: {}", upload_file.file_id); + if let Some(mut notifier) = progress_notifiers.get_mut(&upload_file.file_id) { + trace!("[File]: notify upload finished"); + notifier + .notify(FileUploadState::Finished { + file_id: upload_file.file_id.clone(), + }) + .await; + } + trace!("[File] delete upload record from sqlite"); let conn = user_service.sqlite_connection(user_service.user_id()?)?; delete_upload_file(conn, &upload_file.upload_id)?; if let Err(err) = temp_storage diff --git a/frontend/rust-lib/flowy-storage/src/uploader.rs b/frontend/rust-lib/flowy-storage/src/uploader.rs index f05878c7f8..7a92f24e03 100644 --- a/frontend/rust-lib/flowy-storage/src/uploader.rs +++ b/frontend/rust-lib/flowy-storage/src/uploader.rs @@ -153,7 +153,12 @@ impl FileUploader { .fetch_add(1, std::sync::atomic::Ordering::SeqCst); match task { - UploadTask::Task { + UploadTask::ImmediateTask { + chunks, + record, + mut retry_count, + } + | UploadTask::Task { chunks, record, mut retry_count, @@ -253,6 +258,11 @@ impl FileUploaderRunner { } pub enum UploadTask { + ImmediateTask { + chunks: ChunkedBytes, + record: UploadFileTable, + retry_count: u8, + }, Task { chunks: ChunkedBytes, record: UploadFileTable, @@ -270,8 +280,9 @@ pub enum UploadTask { impl UploadTask { pub fn retry_count(&self) -> u8 { match self { - Self::Task { retry_count, .. } => *retry_count, - Self::BackgroundTask { retry_count, .. } => *retry_count, + UploadTask::ImmediateTask { retry_count, .. } => *retry_count, + UploadTask::Task { retry_count, .. } => *retry_count, + UploadTask::BackgroundTask { retry_count, .. } => *retry_count, } } } @@ -279,8 +290,9 @@ impl UploadTask { impl Display for UploadTask { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Task { record, .. } => write!(f, "Task: {}", record.file_id), - Self::BackgroundTask { file_id, .. } => write!(f, "BackgroundTask: {}", file_id), + UploadTask::Task { record, .. } => write!(f, "Task: {}", record.file_id), + UploadTask::BackgroundTask { file_id, .. } => write!(f, "BackgroundTask: {}", file_id), + UploadTask::ImmediateTask { record, .. } => write!(f, "Immediate Task: {}", record.file_id), } } } @@ -290,6 +302,9 @@ impl Eq for UploadTask {} impl PartialEq for UploadTask { fn eq(&self, other: &Self) -> bool { match (self, other) { + (Self::ImmediateTask { record: lhs, .. }, Self::ImmediateTask { record: rhs, .. }) => { + lhs.local_file_path == rhs.local_file_path + }, (Self::Task { record: lhs, .. }, Self::Task { record: rhs, .. }) => { lhs.local_file_path == rhs.local_file_path }, @@ -319,6 +334,11 @@ impl PartialOrd for UploadTask { impl Ord for UploadTask { fn cmp(&self, other: &Self) -> Ordering { match (self, other) { + (Self::ImmediateTask { record: lhs, .. }, Self::ImmediateTask { record: rhs, .. }) => { + lhs.created_at.cmp(&rhs.created_at) + }, + (_, Self::ImmediateTask { .. }) => Ordering::Less, + (Self::ImmediateTask { .. }, _) => Ordering::Greater, (Self::Task { record: lhs, .. }, Self::Task { record: rhs, .. }) => { lhs.created_at.cmp(&rhs.created_at) },