refactor: revamp file upload and fix partitial upload bugs (#5924)

* chore: upload chat file to local ai

* chore: async func

* chore: individual file progress

* chore: fix test

* chore: fix file upload
This commit is contained in:
Nathan.fooo 2024-08-11 20:39:25 +08:00 committed by GitHub
parent 3ff47b7e1e
commit 23997e977c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
36 changed files with 551 additions and 436 deletions

View File

@ -1,8 +1,5 @@
import 'dart:async';
import 'package:appflowy/plugins/ai_chat/application/chat_entity.dart';
import 'package:appflowy_backend/dispatch/dispatch.dart';
import 'package:appflowy_backend/protobuf/flowy-ai/entities.pb.dart';
import 'package:flutter_bloc/flutter_bloc.dart';
import 'package:freezed_annotation/freezed_annotation.dart';
@ -10,40 +7,14 @@ part 'chat_input_file_bloc.freezed.dart';
class ChatInputFileBloc extends Bloc<ChatInputFileEvent, ChatInputFileState> {
ChatInputFileBloc({
// ignore: avoid_unused_constructor_parameters
required String chatId,
required this.file,
}) : super(const ChatInputFileState()) {
on<ChatInputFileEvent>(
(event, emit) async {
await event.when(
initial: () async {
final payload = ChatFilePB(
filePath: file.filePath,
chatId: chatId,
);
unawaited(
AIEventChatWithFile(payload).send().then((result) {
if (!isClosed) {
result.fold(
(_) {
add(
const ChatInputFileEvent.updateUploadState(
UploadFileIndicator.finish(),
),
);
},
(err) {
add(
ChatInputFileEvent.updateUploadState(
UploadFileIndicator.error(err.toString()),
),
);
},
);
}
}),
);
},
initial: () async {},
updateUploadState: (UploadFileIndicator indicator) {
emit(state.copyWith(uploadFileIndicator: indicator));
},

View File

@ -154,15 +154,16 @@ class _ChatInputState extends State<ChatInput> {
children: [
// TODO(lucas): support mobile
if (PlatformExtension.isDesktop &&
widget.aiType == const AIType.localAI())
widget.aiType.isLocalAI())
_attachmentButton(buttonPadding),
// text field
Expanded(child: _inputTextField(context, textPadding)),
// at button
// mention button
// TODO(lucas): support mobile
if (PlatformExtension.isDesktop) _atButton(buttonPadding),
if (PlatformExtension.isDesktop)
_mentionButton(buttonPadding),
// send button
_sendButton(buttonPadding),
@ -352,7 +353,7 @@ class _ChatInputState extends State<ChatInput> {
);
}
Widget _atButton(EdgeInsets buttonPadding) {
Widget _mentionButton(EdgeInsets buttonPadding) {
return Padding(
padding: buttonPadding,
child: SizedBox.square(

View File

@ -17,7 +17,7 @@ class ChatInputAttachment extends StatelessWidget {
message: LocaleKeys.chat_uploadFile.tr(),
child: FlowyIconButton(
hoverColor: AFThemeExtension.of(context).lightGreyHover,
radius: BorderRadius.circular(18),
radius: BorderRadius.circular(6),
icon: FlowySvg(
FlowySvgs.ai_attachment_s,
size: const Size.square(20),

View File

@ -172,7 +172,7 @@ checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
[[package]]
name = "app-error"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"bincode",
@ -192,7 +192,7 @@ dependencies = [
[[package]]
name = "appflowy-ai-client"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"bytes",
@ -826,7 +826,7 @@ dependencies = [
[[package]]
name = "client-api"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"again",
"anyhow",
@ -876,7 +876,7 @@ dependencies = [
[[package]]
name = "client-api-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"collab-entity",
"collab-rt-entity",
@ -888,7 +888,7 @@ dependencies = [
[[package]]
name = "client-websocket"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"futures-channel",
"futures-util",
@ -1132,7 +1132,7 @@ dependencies = [
[[package]]
name = "collab-rt-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"bincode",
@ -1157,7 +1157,7 @@ dependencies = [
[[package]]
name = "collab-rt-protocol"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"async-trait",
@ -1421,7 +1421,7 @@ dependencies = [
"cssparser-macros",
"dtoa-short",
"itoa 1.0.6",
"phf 0.11.2",
"phf 0.8.0",
"smallvec",
]
@ -1532,7 +1532,7 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308"
[[package]]
name = "database-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"app-error",
@ -3051,7 +3051,7 @@ dependencies = [
[[package]]
name = "gotrue"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"futures-util",
@ -3068,7 +3068,7 @@ dependencies = [
[[package]]
name = "gotrue-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"app-error",
@ -3500,7 +3500,7 @@ dependencies = [
[[package]]
name = "infra"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"bytes",
@ -6098,7 +6098,7 @@ dependencies = [
[[package]]
name = "shared-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"app-error",

View File

@ -53,7 +53,7 @@ collab-user = { version = "0.2" }
# Run the script:
# scripts/tool/update_client_api_rev.sh new_rev_id
# ⚠️⚠️⚠️️
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "135a67dc79848c39e9c53b4a99b6d14f444686ef" }
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "6a44490daccb101c8b855443d2d6ded0fb752016" }
[dependencies]
serde_json.workspace = true

View File

@ -163,7 +163,7 @@ checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
[[package]]
name = "app-error"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"bincode",
@ -183,7 +183,7 @@ dependencies = [
[[package]]
name = "appflowy-ai-client"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"bytes",
@ -800,7 +800,7 @@ dependencies = [
[[package]]
name = "client-api"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"again",
"anyhow",
@ -850,7 +850,7 @@ dependencies = [
[[package]]
name = "client-api-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"collab-entity",
"collab-rt-entity",
@ -862,7 +862,7 @@ dependencies = [
[[package]]
name = "client-websocket"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"futures-channel",
"futures-util",
@ -1115,7 +1115,7 @@ dependencies = [
[[package]]
name = "collab-rt-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"bincode",
@ -1140,7 +1140,7 @@ dependencies = [
[[package]]
name = "collab-rt-protocol"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"async-trait",
@ -1411,7 +1411,7 @@ dependencies = [
"cssparser-macros",
"dtoa-short",
"itoa 1.0.10",
"phf 0.11.2",
"phf 0.8.0",
"smallvec",
]
@ -1522,7 +1522,7 @@ checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5"
[[package]]
name = "database-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"app-error",
@ -3118,7 +3118,7 @@ dependencies = [
[[package]]
name = "gotrue"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"futures-util",
@ -3135,7 +3135,7 @@ dependencies = [
[[package]]
name = "gotrue-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"app-error",
@ -3572,7 +3572,7 @@ dependencies = [
[[package]]
name = "infra"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"bytes",
@ -6162,7 +6162,7 @@ dependencies = [
[[package]]
name = "shared-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"app-error",

View File

@ -52,7 +52,7 @@ collab-user = { version = "0.2" }
# Run the script:
# scripts/tool/update_client_api_rev.sh new_rev_id
# ⚠️⚠️⚠️️
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "135a67dc79848c39e9c53b4a99b6d14f444686ef" }
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "6a44490daccb101c8b855443d2d6ded0fb752016" }
[dependencies]
serde_json.workspace = true

View File

@ -163,7 +163,7 @@ checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
[[package]]
name = "app-error"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"bincode",
@ -183,7 +183,7 @@ dependencies = [
[[package]]
name = "appflowy-ai-client"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"bytes",
@ -718,7 +718,7 @@ dependencies = [
[[package]]
name = "client-api"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"again",
"anyhow",
@ -768,7 +768,7 @@ dependencies = [
[[package]]
name = "client-api-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"collab-entity",
"collab-rt-entity",
@ -780,7 +780,7 @@ dependencies = [
[[package]]
name = "client-websocket"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"futures-channel",
"futures-util",
@ -855,7 +855,7 @@ dependencies = [
"collab",
"collab-entity",
"collab-plugins",
"dashmap",
"dashmap 5.5.3",
"getrandom 0.2.10",
"js-sys",
"lazy_static",
@ -993,7 +993,7 @@ dependencies = [
[[package]]
name = "collab-rt-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"bincode",
@ -1018,7 +1018,7 @@ dependencies = [
[[package]]
name = "collab-rt-protocol"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"async-trait",
@ -1347,6 +1347,20 @@ dependencies = [
"parking_lot_core 0.9.8",
]
[[package]]
name = "dashmap"
version = "6.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown 0.14.3",
"lock_api",
"once_cell",
"parking_lot_core 0.9.8",
]
[[package]]
name = "data-encoding"
version = "2.4.0"
@ -1356,7 +1370,7 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308"
[[package]]
name = "database-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"app-error",
@ -1783,7 +1797,7 @@ dependencies = [
"appflowy-plugin",
"base64 0.21.5",
"bytes",
"dashmap",
"dashmap 6.0.1",
"dotenv",
"flowy-ai-pub",
"flowy-codegen",
@ -1791,6 +1805,7 @@ dependencies = [
"flowy-error",
"flowy-notification",
"flowy-sqlite",
"flowy-storage-pub",
"futures",
"futures-util",
"lib-dispatch",
@ -1958,7 +1973,7 @@ dependencies = [
"collab-integrate",
"collab-plugins",
"csv",
"dashmap",
"dashmap 6.0.1",
"event-integration-test",
"fancy-regex 0.11.0",
"flowy-codegen",
@ -2009,7 +2024,7 @@ dependencies = [
name = "flowy-derive"
version = "0.1.0"
dependencies = [
"dashmap",
"dashmap 6.0.1",
"flowy-ast",
"flowy-codegen",
"lazy_static",
@ -2031,7 +2046,7 @@ dependencies = [
"collab-entity",
"collab-integrate",
"collab-plugins",
"dashmap",
"dashmap 6.0.1",
"flowy-codegen",
"flowy-derive",
"flowy-document-pub",
@ -2171,7 +2186,7 @@ name = "flowy-notification"
version = "0.1.0"
dependencies = [
"bytes",
"dashmap",
"dashmap 6.0.1",
"flowy-codegen",
"flowy-derive",
"lazy_static",
@ -2320,6 +2335,7 @@ dependencies = [
"async-trait",
"bytes",
"chrono",
"dashmap 6.0.1",
"flowy-codegen",
"flowy-derive",
"flowy-error",
@ -2354,6 +2370,7 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"tracing",
]
[[package]]
@ -2730,7 +2747,7 @@ dependencies = [
[[package]]
name = "gotrue"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"futures-util",
@ -2747,7 +2764,7 @@ dependencies = [
[[package]]
name = "gotrue-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"app-error",
@ -3112,7 +3129,7 @@ dependencies = [
[[package]]
name = "infra"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"bytes",
@ -5321,7 +5338,7 @@ dependencies = [
[[package]]
name = "shared-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=135a67dc79848c39e9c53b4a99b6d14f444686ef#135a67dc79848c39e9c53b4a99b6d14f444686ef"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=6a44490daccb101c8b855443d2d6ded0fb752016#6a44490daccb101c8b855443d2d6ded0fb752016"
dependencies = [
"anyhow",
"app-error",

View File

@ -93,14 +93,15 @@ yrs = "0.19.2"
validator = { version = "0.16.1", features = ["derive"] }
tokio-util = "0.7.11"
zip = "2.1.3"
dashmap = "6.0.1"
# Please using the following command to update the revision id
# Current directory: frontend
# Run the script.add_workspace_members:
# scripts/tool/update_client_api_rev.sh new_rev_id
# ⚠️⚠️⚠️️
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "135a67dc79848c39e9c53b4a99b6d14f444686ef" }
client-api-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "135a67dc79848c39e9c53b4a99b6d14f444686ef" }
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "6a44490daccb101c8b855443d2d6ded0fb752016" }
client-api-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "6a44490daccb101c8b855443d2d6ded0fb752016" }
[profile.dev]
opt-level = 0

View File

@ -14,8 +14,8 @@ syn = { version = "1.0.109", features = ["extra-traits", "visit"] }
quote = "1.0"
proc-macro2 = "1.0"
flowy-ast.workspace = true
lazy_static = {version = "1.4.0"}
dashmap = "5"
lazy_static = { version = "1.4.0" }
dashmap.workspace = true
flowy-codegen.workspace = true
serde_json.workspace = true
walkdir = "2.3.2"

View File

@ -91,7 +91,7 @@ impl EventIntegrationTest {
Self::new_with_config(config).await
}
pub fn set_no_cleanup(&mut self) {
pub fn skip_clean(&mut self) {
self.cleaner.lock().should_clean = false;
}

View File

@ -1,39 +1,14 @@
use crate::document::generate_random_bytes;
use event_integration_test::user_event::user_localhost_af_cloud;
use event_integration_test::EventIntegrationTest;
use flowy_storage_pub::storage::UploadStatus;
use flowy_storage_pub::storage::FileUploadState;
use std::env::temp_dir;
use std::sync::Arc;
use std::time::Duration;
use tokio::fs;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
#[tokio::test]
async fn af_cloud_upload_file_test() {
user_localhost_af_cloud().await;
let test = EventIntegrationTest::new().await;
test.af_cloud_sign_up().await;
let workspace_id = test.get_current_workspace().await.id;
let file_path = generate_file_with_bytes_len(1024).await.0;
let mut rx = test.storage_manager.subscribe_upload_result();
let created_upload = test
.storage_manager
.storage_service
.create_upload(&workspace_id, "temp_test", &file_path)
.await
.unwrap();
while let Ok(result) = rx.recv().await {
if result.file_id == created_upload.file_id && result.status == UploadStatus::Finish {
break;
}
}
let _ = fs::remove_file(file_path).await;
}
use tokio::sync::Mutex;
#[tokio::test]
async fn af_cloud_upload_big_file_test() {
user_localhost_af_cloud().await;
@ -42,32 +17,39 @@ async fn af_cloud_upload_big_file_test() {
tokio::time::sleep(Duration::from_secs(6)).await;
let workspace_id = test.get_current_workspace().await.id;
let (file_path, upload_data) = generate_file_with_bytes_len(30 * 1024 * 1024).await;
let created_upload = test
let (file_path, upload_data) = generate_file_with_bytes_len(15 * 1024 * 1024).await;
let (created_upload, rx) = test
.storage_manager
.storage_service
.create_upload(&workspace_id, "temp_test", &file_path)
.create_upload(&workspace_id, "temp_test", &file_path, false)
.await
.unwrap();
let mut rx = test.storage_manager.subscribe_upload_result();
while let Ok(result) = rx.recv().await {
if result.file_id == created_upload.file_id && result.status == UploadStatus::InProgress {
break;
let mut rx = rx.unwrap();
while let Some(state) = rx.recv().await {
if let FileUploadState::Uploading { progress } = state {
if progress > 0.1 {
break;
}
}
}
// Simulate a restart
let config = test.config.clone();
test.set_no_cleanup();
test.skip_clean();
drop(test);
tokio::time::sleep(Duration::from_secs(3)).await;
// Restart the test. It will load unfinished uploads
let test = EventIntegrationTest::new_with_config(config).await;
let mut rx = test.storage_manager.subscribe_upload_result();
while let Ok(result) = rx.recv().await {
if result.file_id == created_upload.file_id && result.status == UploadStatus::Finish {
let mut rx = test
.storage_manager
.subscribe_file_state(&created_upload.file_id)
.await
.unwrap();
while let Some(state) = rx.recv().await {
if let FileUploadState::Finished { .. } = state {
break;
}
}
@ -90,35 +72,61 @@ async fn af_cloud_upload_6_files_test() {
user_localhost_af_cloud().await;
let test = EventIntegrationTest::new().await;
test.af_cloud_sign_up().await;
let workspace_id = test.get_current_workspace().await.id;
let mut rx = test.storage_manager.subscribe_upload_result();
let mut created_uploads = vec![];
let mut receivers = vec![];
for file_size in [1, 2, 5, 8, 12, 20] {
let file_path = generate_file_with_bytes_len(file_size * 1024 * 1024)
.await
.0;
let created_upload = test
let (created_upload, rx) = test
.storage_manager
.storage_service
.create_upload(&workspace_id, "temp_test", &file_path)
.create_upload(&workspace_id, "temp_test", &file_path, false)
.await
.unwrap();
receivers.push(rx.unwrap());
created_uploads.push(created_upload);
let _ = fs::remove_file(file_path).await;
}
while let Ok(result) = rx.recv().await {
if result.status == UploadStatus::Finish {
created_uploads.retain(|upload| upload.file_id != result.file_id);
}
if created_uploads.is_empty() {
break;
}
// Wait for all uploads to finish
let uploads = Arc::new(Mutex::new(created_uploads));
let mut handles = vec![];
for mut receiver in receivers {
let cloned_uploads = uploads.clone();
let cloned_test = test.clone();
let handle = tokio::spawn(async move {
if let Some(state) = cloned_test
.storage_manager
.get_file_state(&receiver.file_id)
.await
{
if let FileUploadState::Finished { file_id } = state {
cloned_uploads
.lock()
.await
.retain(|upload| upload.file_id != file_id);
}
} else {
while let Some(value) = receiver.recv().await {
if let FileUploadState::Finished { file_id } = value {
cloned_uploads
.lock()
.await
.retain(|upload| upload.file_id != file_id);
break;
}
}
}
});
handles.push(handle);
}
// join all handles
futures::future::join_all(handles).await;
assert_eq!(uploads.lock().await.len(), 0);
}
async fn generate_file_with_bytes_len(len: usize) -> (String, Vec<u8>) {

View File

@ -35,14 +35,14 @@ pub trait ChatCloudService: Send + Sync + 'static {
metadata: &[ChatMessageMetadata],
) -> Result<ChatMessage, FlowyError>;
fn create_answer(
async fn create_answer(
&self,
workspace_id: &str,
chat_id: &str,
message: &str,
question_id: i64,
metadata: Option<serde_json::Value>,
) -> FutureResult<ChatMessage, FlowyError>;
) -> Result<ChatMessage, FlowyError>;
async fn stream_answer(
&self,
@ -58,13 +58,13 @@ pub trait ChatCloudService: Send + Sync + 'static {
question_message_id: i64,
) -> Result<ChatMessage, FlowyError>;
fn get_chat_messages(
async fn get_chat_messages(
&self,
workspace_id: &str,
chat_id: &str,
offset: MessageCursor,
limit: u64,
) -> FutureResult<RepeatedChatMessage, FlowyError>;
) -> Result<RepeatedChatMessage, FlowyError>;
async fn get_related_message(
&self,

View File

@ -22,7 +22,7 @@ bytes.workspace = true
validator = { workspace = true, features = ["derive"] }
lib-infra = { workspace = true, features = ["isolate_flutter"] }
flowy-ai-pub.workspace = true
dashmap = "5.5"
dashmap.workspace = true
flowy-sqlite = { workspace = true }
tokio.workspace = true
futures.workspace = true
@ -44,6 +44,7 @@ md5 = "0.7.0"
zip = { workspace = true, features = ["deflate"] }
zip-extensions = "0.8.0"
pin-project = "1.1.5"
flowy-storage-pub = { workspace = true }
[target.'cfg(any(target_os = "macos", target_os = "linux", target_os = "windows"))'.dependencies]
notify = "6.1.1"

View File

@ -13,9 +13,10 @@ use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::kv::KVStorePreferences;
use flowy_sqlite::DBConnection;
use flowy_storage_pub::storage::StorageService;
use lib_infra::util::timestamp;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use tracing::{info, trace};
pub trait AIUserService: Send + Sync + 'static {
@ -38,6 +39,7 @@ impl AIManager {
chat_cloud_service: Arc<dyn ChatCloudService>,
user_service: impl AIUserService,
store_preferences: Arc<KVStorePreferences>,
storage_service: Weak<dyn StorageService>,
) -> AIManager {
let user_service = Arc::new(user_service);
let plugin_manager = Arc::new(PluginManager::new());
@ -53,6 +55,7 @@ impl AIManager {
user_service.clone(),
chat_cloud_service,
local_ai_controller.clone(),
storage_service,
));
Self {

View File

@ -89,6 +89,15 @@ impl Chat {
if message.len() > 2000 {
return Err(FlowyError::text_too_long().with_context("Exceeds maximum message 2000 length"));
}
trace!(
"[Chat] stream chat message: chat_id={}, message={}, message_type={:?}, metadata={:?}",
self.chat_id,
message,
message_type,
metadata
);
// clear
self
.stop_stream
@ -101,12 +110,7 @@ impl Chat {
let workspace_id = self.user_service.workspace_id()?;
let _ = question_sink
.send(
StreamMessage::Text {
text: message.to_string(),
}
.to_string(),
)
.send(StreamMessage::Text(message.to_string()).to_string())
.await;
let question = self
.chat_service
@ -124,31 +128,18 @@ impl Chat {
})?;
let _ = question_sink
.send(
StreamMessage::MessageId {
message_id: question.message_id,
}
.to_string(),
)
.send(StreamMessage::MessageId(question.message_id).to_string())
.await;
if self.chat_service.is_local_ai_enabled() && !metadata.is_empty() {
let _ = question_sink
.send(StreamMessage::IndexStart.to_string())
.await;
if let Err(err) = self
.chat_service
.index_message_metadata(&self.chat_id, &metadata, &mut question_sink)
.await
{
error!("Failed to index file: {}", err);
}
let _ = question_sink
.send(StreamMessage::IndexEnd.to_string())
.await;
if let Err(err) = self
.chat_service
.index_message_metadata(&self.chat_id, &metadata, &mut question_sink)
.await
{
error!("Failed to index file: {}", err);
}
let _ = question_sink.send(StreamMessage::Done.to_string()).await;
// Save message to disk
save_chat_message(
self.user_service.sqlite_connection(uid)?,
&self.chat_id,

View File

@ -51,7 +51,6 @@ pub(crate) async fn stream_chat_message_handler(
ChatMessageMetadata {
data: ChatMetadataData {
content: metadata.data,
url: None,
content_type,
size: content_len as i64,
},

View File

@ -26,7 +26,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::select;
use tokio_stream::StreamExt;
use tracing::{debug, error, info, instrument};
use tracing::{debug, error, info, instrument, trace};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LLMSetting {
@ -348,45 +348,54 @@ impl LocalAIController {
index_process_sink: &mut (impl Sink<String> + Unpin),
) -> FlowyResult<()> {
for metadata in metadata_list {
if let Err(err) = metadata.data.validate() {
error!(
"[AI Plugin] invalid metadata: {:?}, error: {:?}",
metadata, err
);
continue;
}
let mut index_metadata = HashMap::new();
index_metadata.insert("name".to_string(), json!(&metadata.name));
index_metadata.insert("at_name".to_string(), json!(format!("@{}", &metadata.name)));
index_metadata.insert("source".to_string(), json!(&metadata.source));
if let Some(url) = &metadata.data.url {
let file_path = Path::new(url);
if file_path.exists() {
match &metadata.data.content_type {
ChatMetadataContentType::Unknown => {
error!(
"[AI Plugin] unsupported content type: {:?}",
metadata.data.content_type
);
},
ChatMetadataContentType::Text | ChatMetadataContentType::Markdown => {
trace!("[AI Plugin]: index text: {}", metadata.data.content);
self
.process_index_file(
chat_id,
Some(file_path.to_path_buf()),
None,
Some(metadata.data.content.clone()),
metadata,
&index_metadata,
index_process_sink,
)
.await?;
}
} else if matches!(
metadata.data.content_type,
ChatMetadataContentType::Text | ChatMetadataContentType::Markdown
) && metadata.data.validate()
{
self
.process_index_file(
chat_id,
None,
Some(metadata.data.content.clone()),
metadata,
&index_metadata,
index_process_sink,
)
.await?;
} else {
error!(
"[AI Plugin] unsupported content type: {:?}",
metadata.data.content_type
);
},
ChatMetadataContentType::PDF => {
trace!("[AI Plugin]: index pdf file: {}", metadata.data.content);
let file_path = Path::new(&metadata.data.content);
if file_path.exists() {
self
.process_index_file(
chat_id,
Some(file_path.to_path_buf()),
None,
metadata,
&index_metadata,
index_process_sink,
)
.await?;
}
},
}
}

View File

@ -16,15 +16,19 @@ use lib_infra::async_trait::async_trait;
use lib_infra::future::FutureResult;
use crate::local_ai::stream_util::LocalAIStreamAdaptor;
use crate::stream_message::StreamMessage;
use flowy_storage_pub::storage::StorageService;
use futures_util::SinkExt;
use serde_json::json;
use std::path::Path;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use tracing::trace;
pub struct AICloudServiceMiddleware {
cloud_service: Arc<dyn ChatCloudService>,
user_service: Arc<dyn AIUserService>,
local_llm_controller: Arc<LocalAIController>,
storage_service: Weak<dyn StorageService>,
}
impl AICloudServiceMiddleware {
@ -32,11 +36,13 @@ impl AICloudServiceMiddleware {
user_service: Arc<dyn AIUserService>,
cloud_service: Arc<dyn ChatCloudService>,
local_llm_controller: Arc<LocalAIController>,
storage_service: Weak<dyn StorageService>,
) -> Self {
Self {
user_service,
cloud_service,
local_llm_controller,
storage_service,
}
}
@ -50,10 +56,23 @@ impl AICloudServiceMiddleware {
metadata_list: &[ChatMessageMetadata],
index_process_sink: &mut (impl Sink<String> + Unpin),
) -> Result<(), FlowyError> {
self
.local_llm_controller
.index_message_metadata(chat_id, metadata_list, index_process_sink)
.await?;
if metadata_list.is_empty() {
return Ok(());
}
if self.is_local_ai_enabled() {
let _ = index_process_sink
.send(StreamMessage::IndexStart.to_string())
.await;
self
.local_llm_controller
.index_message_metadata(chat_id, metadata_list, index_process_sink)
.await?;
let _ = index_process_sink
.send(StreamMessage::IndexEnd.to_string())
.await;
} else if let Some(_storage_service) = self.storage_service.upgrade() {
//
}
Ok(())
}
@ -110,17 +129,18 @@ impl ChatCloudService for AICloudServiceMiddleware {
.await
}
fn create_answer(
async fn create_answer(
&self,
workspace_id: &str,
chat_id: &str,
message: &str,
question_id: i64,
metadata: Option<serde_json::Value>,
) -> FutureResult<ChatMessage, FlowyError> {
) -> Result<ChatMessage, FlowyError> {
self
.cloud_service
.create_answer(workspace_id, chat_id, message, question_id, metadata)
.await
}
async fn stream_answer(
@ -184,16 +204,17 @@ impl ChatCloudService for AICloudServiceMiddleware {
}
}
fn get_chat_messages(
async fn get_chat_messages(
&self,
workspace_id: &str,
chat_id: &str,
offset: MessageCursor,
limit: u64,
) -> FutureResult<RepeatedChatMessage, FlowyError> {
) -> Result<RepeatedChatMessage, FlowyError> {
self
.cloud_service
.get_chat_messages(workspace_id, chat_id, offset, limit)
.await
}
async fn get_related_message(

View File

@ -1,22 +1,23 @@
use std::fmt::Display;
pub enum StreamMessage {
MessageId { message_id: i64 },
MessageId(i64),
IndexStart,
IndexEnd,
Text { text: String },
Text(String),
Done,
StartIndexFile { file_name: String },
EndIndexFile { file_name: String },
IndexFileError { file_name: String },
}
impl Display for StreamMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StreamMessage::MessageId { message_id } => write!(f, "message_id:{}", message_id),
StreamMessage::MessageId(message_id) => write!(f, "message_id:{}", message_id),
StreamMessage::IndexStart => write!(f, "index_start:"),
StreamMessage::IndexEnd => write!(f, "index_end"),
StreamMessage::Text { text } => {
StreamMessage::Text(text) => {
write!(f, "data:{}", text)
},
StreamMessage::Done => write!(f, "done:"),

View File

@ -3,6 +3,7 @@ use flowy_ai_pub::cloud::ChatCloudService;
use flowy_error::FlowyError;
use flowy_sqlite::kv::KVStorePreferences;
use flowy_sqlite::DBConnection;
use flowy_storage_pub::storage::StorageService;
use flowy_user::services::authenticate_user::AuthenticateUser;
use std::path::PathBuf;
use std::sync::{Arc, Weak};
@ -14,12 +15,14 @@ impl ChatDepsResolver {
authenticate_user: Weak<AuthenticateUser>,
cloud_service: Arc<dyn ChatCloudService>,
store_preferences: Arc<KVStorePreferences>,
storage_service: Weak<dyn StorageService>,
) -> Arc<AIManager> {
let user_service = ChatUserServiceImpl(authenticate_user);
Arc::new(AIManager::new(
cloud_service,
user_service,
store_preferences,
storage_service,
))
}
}

View File

@ -629,24 +629,19 @@ impl ChatCloudService for ServerProvider {
.await
}
fn create_answer(
async fn create_answer(
&self,
workspace_id: &str,
chat_id: &str,
message: &str,
question_id: i64,
metadata: Option<serde_json::Value>,
) -> FutureResult<ChatMessage, FlowyError> {
let workspace_id = workspace_id.to_string();
let chat_id = chat_id.to_string();
let message = message.to_string();
) -> Result<ChatMessage, FlowyError> {
let server = self.get_server();
FutureResult::new(async move {
server?
.chat_service()
.create_answer(&workspace_id, &chat_id, &message, question_id, metadata)
.await
})
server?
.chat_service()
.create_answer(workspace_id, chat_id, message, question_id, metadata)
.await
}
async fn stream_answer(
@ -664,22 +659,18 @@ impl ChatCloudService for ServerProvider {
.await
}
fn get_chat_messages(
async fn get_chat_messages(
&self,
workspace_id: &str,
chat_id: &str,
offset: MessageCursor,
limit: u64,
) -> FutureResult<RepeatedChatMessage, FlowyError> {
let workspace_id = workspace_id.to_string();
let chat_id = chat_id.to_string();
let server = self.get_server();
FutureResult::new(async move {
server?
.chat_service()
.get_chat_messages(&workspace_id, &chat_id, offset, limit)
.await
})
) -> Result<RepeatedChatMessage, FlowyError> {
self
.get_server()?
.chat_service()
.get_chat_messages(workspace_id, chat_id, offset, limit)
.await
}
async fn get_related_message(

View File

@ -165,6 +165,7 @@ impl AppFlowyCore {
Arc::downgrade(&authenticate_user),
server_provider.clone(),
store_preference.clone(),
Arc::downgrade(&storage_manager.storage_service),
);
let database_manager = DatabaseDepsResolver::resolve(

View File

@ -20,7 +20,7 @@ protobuf.workspace = true
flowy-error = { path = "../flowy-error", features = [
"impl_from_dispatch_error",
"impl_from_collab_database",
]}
] }
lib-dispatch = { workspace = true }
tokio = { workspace = true, features = ["sync"] }
@ -38,7 +38,7 @@ indexmap = { version = "2.1.0", features = ["serde"] }
url = { version = "2" }
fancy-regex = "0.11.0"
futures.workspace = true
dashmap = "5"
dashmap.workspace = true
anyhow.workspace = true
async-stream = "0.3.4"
rayon = "1.9.0"

View File

@ -35,7 +35,7 @@ indexmap = { version = "2.1.0", features = ["serde"] }
uuid.workspace = true
futures.workspace = true
tokio-stream = { workspace = true, features = ["sync"] }
dashmap = "5"
dashmap.workspace = true
scraper = "0.18.0"
[target.'cfg(target_arch = "wasm32")'.dependencies]

View File

@ -454,10 +454,17 @@ pub(crate) async fn upload_file_handler(
} = params.try_into_inner()?;
let manager = upgrade_document(manager)?;
let upload = manager
.upload_file(workspace_id, &document_id, &local_file_path)
.await?;
let (tx, rx) = tokio::sync::oneshot::channel();
let cloned_local_file_path = local_file_path.clone();
tokio::spawn(async move {
let result = manager
.upload_file(workspace_id, &document_id, &cloned_local_file_path)
.await;
let _ = tx.send(result);
Ok::<(), FlowyError>(())
});
let upload = rx.await??;
data_result_ok(UploadedFilePB {
url: upload.url,
local_file_path,

View File

@ -358,8 +358,9 @@ impl DocumentManager {
) -> FlowyResult<CreatedUpload> {
let storage_service = self.storage_service_upgrade()?;
let upload = storage_service
.create_upload(&workspace_id, document_id, local_file_path)
.await?;
.create_upload(&workspace_id, document_id, local_file_path, false)
.await?
.0;
Ok(upload)
}

View File

@ -21,7 +21,7 @@ use flowy_document::manager::{DocumentManager, DocumentSnapshotService, Document
use flowy_document_pub::cloud::*;
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_storage_pub::chunked_byte::ChunkedBytes;
use flowy_storage_pub::storage::{CreatedUpload, StorageService};
use flowy_storage_pub::storage::{CreatedUpload, FileProgressReceiver, StorageService};
use lib_infra::async_trait::async_trait;
use lib_infra::box_any::BoxAny;
use lib_infra::future::FutureResult;
@ -178,14 +178,6 @@ pub struct DocumentTestFileStorageService;
#[async_trait]
impl StorageService for DocumentTestFileStorageService {
fn upload_object(
&self,
_workspace_id: &str,
_local_file_path: &str,
) -> FutureResult<String, FlowyError> {
todo!()
}
fn delete_object(&self, _url: String, _local_file_path: String) -> FlowyResult<()> {
todo!()
}
@ -194,12 +186,13 @@ impl StorageService for DocumentTestFileStorageService {
todo!()
}
fn create_upload(
async fn create_upload(
&self,
_workspace_id: &str,
_parent_dir: &str,
_local_file_path: &str,
) -> FutureResult<CreatedUpload, flowy_error::FlowyError> {
_upload_immediately: bool,
) -> Result<(CreatedUpload, Option<FileProgressReceiver>), flowy_error::FlowyError> {
todo!()
}
@ -215,6 +208,10 @@ impl StorageService for DocumentTestFileStorageService {
) -> Result<(), FlowyError> {
todo!()
}
async fn subscribe_file_progress(&self, _url: &str) -> Result<FileProgressReceiver, FlowyError> {
todo!()
}
}
struct DefaultCollabStorageProvider();

View File

@ -13,7 +13,7 @@ protobuf.workspace = true
tracing.workspace = true
bytes.workspace = true
serde = { workspace = true, features = ["derive"] }
dashmap = "5.5"
dashmap.workspace = true
tokio-util = "0.7"
tokio = { workspace = true, features = ["time"] }

View File

@ -76,30 +76,25 @@ where
Ok(message)
}
fn create_answer(
async fn create_answer(
&self,
workspace_id: &str,
chat_id: &str,
message: &str,
question_id: i64,
metadata: Option<serde_json::Value>,
) -> FutureResult<ChatMessage, FlowyError> {
let workspace_id = workspace_id.to_string();
let chat_id = chat_id.to_string();
) -> Result<ChatMessage, FlowyError> {
let try_get_client = self.inner.try_get_client();
let params = CreateAnswerMessageParams {
content: message.to_string(),
metadata,
question_message_id: question_id,
};
FutureResult::new(async move {
let message = try_get_client?
.save_answer(&workspace_id, &chat_id, params)
.await
.map_err(FlowyError::from)?;
Ok(message)
})
let message = try_get_client?
.save_answer(workspace_id, chat_id, params)
.await
.map_err(FlowyError::from)?;
Ok(message)
}
async fn stream_answer(
@ -131,25 +126,20 @@ where
Ok(resp)
}
fn get_chat_messages(
async fn get_chat_messages(
&self,
workspace_id: &str,
chat_id: &str,
offset: MessageCursor,
limit: u64,
) -> FutureResult<RepeatedChatMessage, FlowyError> {
let workspace_id = workspace_id.to_string();
let chat_id = chat_id.to_string();
) -> Result<RepeatedChatMessage, FlowyError> {
let try_get_client = self.inner.try_get_client();
let resp = try_get_client?
.get_chat_messages(workspace_id, chat_id, offset, limit)
.await
.map_err(FlowyError::from)?;
FutureResult::new(async move {
let resp = try_get_client?
.get_chat_messages(&workspace_id, &chat_id, offset, limit)
.await
.map_err(FlowyError::from)?;
Ok(resp)
})
Ok(resp)
}
async fn get_related_message(

View File

@ -34,17 +34,15 @@ impl ChatCloudService for DefaultChatCloudServiceImpl {
Err(FlowyError::not_support().with_context("Chat is not supported in local server."))
}
fn create_answer(
async fn create_answer(
&self,
_workspace_id: &str,
_chat_id: &str,
_message: &str,
_question_id: i64,
_metadata: Option<serde_json::Value>,
) -> FutureResult<ChatMessage, FlowyError> {
FutureResult::new(async move {
Err(FlowyError::not_support().with_context("Chat is not supported in local server."))
})
) -> Result<ChatMessage, FlowyError> {
Err(FlowyError::not_support().with_context("Chat is not supported in local server."))
}
async fn stream_answer(
@ -56,16 +54,14 @@ impl ChatCloudService for DefaultChatCloudServiceImpl {
Err(FlowyError::not_support().with_context("Chat is not supported in local server."))
}
fn get_chat_messages(
async fn get_chat_messages(
&self,
_workspace_id: &str,
_chat_id: &str,
_offset: MessageCursor,
_limit: u64,
) -> FutureResult<RepeatedChatMessage, FlowyError> {
FutureResult::new(async move {
Err(FlowyError::not_support().with_context("Chat is not supported in local server."))
})
) -> Result<RepeatedChatMessage, FlowyError> {
Err(FlowyError::not_support().with_context("Chat is not supported in local server."))
}
async fn get_related_message(

View File

@ -17,3 +17,4 @@ mime_guess = "2.0.4"
client-api-entity = { workspace = true }
tokio = { workspace = true, features = ["sync", "io-util"] }
anyhow = "1.0.86"
tracing.workspace = true

View File

@ -3,26 +3,23 @@ use async_trait::async_trait;
pub use client_api_entity::{CompletedPartRequest, CreateUploadResponse, UploadPartResponse};
use flowy_error::{FlowyError, FlowyResult};
use lib_infra::box_any::BoxAny;
use lib_infra::future::FutureResult;
use std::ops::{Deref, DerefMut};
use tokio::sync::mpsc;
use tracing::error;
#[async_trait]
pub trait StorageService: Send + Sync {
fn upload_object(
&self,
workspace_id: &str,
local_file_path: &str,
) -> FutureResult<String, FlowyError>;
fn delete_object(&self, url: String, local_file_path: String) -> FlowyResult<()>;
fn download_object(&self, url: String, local_file_path: String) -> FlowyResult<()>;
fn create_upload(
async fn create_upload(
&self,
workspace_id: &str,
parent_dir: &str,
local_file_path: &str,
) -> FutureResult<CreatedUpload, FlowyError>;
upload_immediately: bool,
) -> Result<(CreatedUpload, Option<FileProgressReceiver>), FlowyError>;
async fn start_upload(&self, chunks: &ChunkedBytes, record: &BoxAny) -> Result<(), FlowyError>;
@ -32,22 +29,70 @@ pub trait StorageService: Send + Sync {
parent_dir: &str,
file_id: &str,
) -> Result<(), FlowyError>;
async fn subscribe_file_progress(
&self,
file_id: &str,
) -> Result<FileProgressReceiver, FlowyError>;
}
pub struct FileProgressReceiver {
pub rx: mpsc::Receiver<FileUploadState>,
pub file_id: String,
}
impl Deref for FileProgressReceiver {
type Target = mpsc::Receiver<FileUploadState>;
fn deref(&self) -> &Self::Target {
&self.rx
}
}
impl DerefMut for FileProgressReceiver {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.rx
}
}
#[derive(Clone, Debug)]
pub enum FileUploadState {
NotStarted,
Uploading { progress: f64 },
Finished { file_id: String },
}
#[derive(Debug)]
pub struct ProgressNotifier {
tx: mpsc::Sender<FileUploadState>,
pub current_value: Option<FileUploadState>,
}
impl ProgressNotifier {
pub fn new() -> (Self, mpsc::Receiver<FileUploadState>) {
let (tx, rx) = mpsc::channel(5);
(
ProgressNotifier {
tx,
current_value: None,
},
rx,
)
}
pub async fn notify(&mut self, progress: FileUploadState) {
self.current_value = Some(progress.clone());
// if self.tx.reserve().await.is_err() {
// return;
// }
if let Err(err) = self.tx.send(progress).await {
error!("Failed to send progress notification: {:?}", err);
}
}
}
pub struct CreatedUpload {
pub url: String,
pub file_id: String,
}
#[derive(Debug, Clone)]
pub struct UploadResult {
pub file_id: String,
pub status: UploadStatus,
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum UploadStatus {
Finish,
Failed,
InProgress,
}

View File

@ -22,6 +22,7 @@ chrono = "0.4.33"
flowy-notification = { workspace = true }
flowy-derive.workspace = true
protobuf = { workspace = true }
dashmap.workspace = true
[dev-dependencies]
tokio = { workspace = true, features = ["full"] }

View File

@ -7,16 +7,16 @@ use crate::sqlite_sql::{
};
use crate::uploader::{FileUploader, FileUploaderRunner, Signal, UploadTask, UploadTaskQueue};
use async_trait::async_trait;
use dashmap::DashMap;
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_sqlite::DBConnection;
use flowy_storage_pub::chunked_byte::{ChunkedBytes, MIN_CHUNK_SIZE};
use flowy_storage_pub::cloud::{ObjectIdentity, ObjectValue, StorageCloudService};
use flowy_storage_pub::storage::{
CompletedPartRequest, CreatedUpload, StorageService, UploadPartResponse, UploadResult,
UploadStatus,
CompletedPartRequest, CreatedUpload, FileProgressReceiver, FileUploadState, ProgressNotifier,
StorageService, UploadPartResponse,
};
use lib_infra::box_any::BoxAny;
use lib_infra::future::FutureResult;
use lib_infra::util::timestamp;
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
@ -37,7 +37,7 @@ pub trait StorageUserService: Send + Sync + 'static {
pub struct StorageManager {
pub storage_service: Arc<dyn StorageService>,
uploader: Arc<FileUploader>,
upload_status_notifier: tokio::sync::broadcast::Sender<UploadResult>,
progress_notifiers: Arc<DashMap<String, ProgressNotifier>>,
}
impl Drop for StorageManager {
@ -58,15 +58,15 @@ impl StorageManager {
));
let temp_storage = Arc::new(FileTempStorage::new(temp_storage_path));
let (notifier, notifier_rx) = watch::channel(Signal::Proceed);
let (upload_status_notifier, _) = tokio::sync::broadcast::channel::<UploadResult>(100);
let task_queue = Arc::new(UploadTaskQueue::new(notifier));
let progress_notifiers = Arc::new(DashMap::new());
let storage_service = Arc::new(StorageServiceImpl {
cloud_service,
user_service: user_service.clone(),
temp_storage,
task_queue: task_queue.clone(),
upload_status_notifier: upload_status_notifier.clone(),
is_exceed_storage_limit: is_exceed_storage_limit.clone(),
progress_notifiers: progress_notifiers.clone(),
});
let uploader = Arc::new(FileUploader::new(
@ -81,8 +81,8 @@ impl StorageManager {
let weak_uploader = Arc::downgrade(&uploader);
tokio::spawn(async move {
// Start uploading after 30 seconds
tokio::time::sleep(Duration::from_secs(30)).await;
// Start uploading after 20 seconds
tokio::time::sleep(Duration::from_secs(20)).await;
if let Some(uploader) = weak_uploader.upgrade() {
if let Err(err) = prepare_upload_task(uploader, user_service).await {
error!("prepare upload task failed: {}", err);
@ -93,7 +93,7 @@ impl StorageManager {
Self {
storage_service,
uploader,
upload_status_notifier,
progress_notifiers,
}
}
@ -115,8 +115,18 @@ impl StorageManager {
self.uploader.enable_storage_write();
}
pub fn subscribe_upload_result(&self) -> tokio::sync::broadcast::Receiver<UploadResult> {
self.upload_status_notifier.subscribe()
pub async fn subscribe_file_state(
&self,
file_id: &str,
) -> Result<FileProgressReceiver, FlowyError> {
self.storage_service.subscribe_file_progress(file_id).await
}
pub async fn get_file_state(&self, file_id: &str) -> Option<FileUploadState> {
self
.progress_notifiers
.get(file_id)
.and_then(|notifier| notifier.value().current_value.clone())
}
}
@ -147,37 +157,12 @@ pub struct StorageServiceImpl {
user_service: Arc<dyn StorageUserService>,
temp_storage: Arc<FileTempStorage>,
task_queue: Arc<UploadTaskQueue>,
upload_status_notifier: tokio::sync::broadcast::Sender<UploadResult>,
is_exceed_storage_limit: Arc<AtomicBool>,
progress_notifiers: Arc<DashMap<String, ProgressNotifier>>,
}
#[async_trait]
impl StorageService for StorageServiceImpl {
fn upload_object(
&self,
workspace_id: &str,
local_file_path: &str,
) -> FutureResult<String, FlowyError> {
let cloud_service = self.cloud_service.clone();
let workspace_id = workspace_id.to_string();
let local_file_path = local_file_path.to_string();
FutureResult::new(async move {
let (object_identity, object_value) =
object_from_disk(&workspace_id, &local_file_path).await?;
let url = cloud_service.get_object_url(object_identity).await?;
match cloud_service.put_object(url.clone(), object_value).await {
Ok(_) => {
debug!("[File] success uploaded file to cloud: {}", url);
},
Err(err) => {
error!("[File] upload file failed: {}", err);
return Err(err);
},
}
Ok(url)
})
}
fn delete_object(&self, url: String, local_file_path: String) -> FlowyResult<()> {
let cloud_service = self.cloud_service.clone();
tokio::spawn(async move {
@ -227,93 +212,106 @@ impl StorageService for StorageServiceImpl {
Ok(())
}
fn create_upload(
async fn create_upload(
&self,
workspace_id: &str,
parent_dir: &str,
file_path: &str,
) -> FutureResult<CreatedUpload, FlowyError> {
upload_immediately: bool,
) -> Result<(CreatedUpload, Option<FileProgressReceiver>), FlowyError> {
if workspace_id.is_empty() {
return FutureResult::new(async {
Err(FlowyError::internal().with_context("workspace id is empty"))
});
return Err(FlowyError::internal().with_context("workspace id is empty"));
}
if parent_dir.is_empty() {
return FutureResult::new(async {
Err(FlowyError::internal().with_context("parent dir is empty"))
});
return Err(FlowyError::internal().with_context("parent dir is empty"));
}
if file_path.is_empty() {
return FutureResult::new(async {
Err(FlowyError::internal().with_context("local file path is empty"))
});
return Err(FlowyError::internal().with_context("local file path is empty"));
}
let workspace_id = workspace_id.to_string();
let parent_dir = parent_dir.to_string();
let file_path = file_path.to_string();
let temp_storage = self.temp_storage.clone();
let task_queue = self.task_queue.clone();
let user_service = self.user_service.clone();
let cloud_service = self.cloud_service.clone();
let is_exceed_storage_limit = self.is_exceed_storage_limit.clone();
FutureResult::new(async move {
let is_exceed_limit = is_exceed_storage_limit.load(std::sync::atomic::Ordering::Relaxed);
if is_exceed_limit {
make_notification(StorageNotification::FileStorageLimitExceeded)
.payload(FlowyError::file_storage_limit())
.send();
let is_exceed_limit = self
.is_exceed_storage_limit
.load(std::sync::atomic::Ordering::Relaxed);
if is_exceed_limit {
make_notification(StorageNotification::FileStorageLimitExceeded)
.payload(FlowyError::file_storage_limit())
.send();
return Err(FlowyError::file_storage_limit());
}
return Err(FlowyError::file_storage_limit());
}
let local_file_path = temp_storage
.create_temp_file_from_existing(Path::new(&file_path))
.await
.map_err(|err| {
error!("[File] create temp file failed: {}", err);
FlowyError::internal()
.with_context(format!("create temp file for upload file failed: {}", err))
})?;
let local_file_path = self
.temp_storage
.create_temp_file_from_existing(Path::new(&file_path))
.await
.map_err(|err| {
error!("[File] create temp file failed: {}", err);
FlowyError::internal()
.with_context(format!("create temp file for upload file failed: {}", err))
})?;
// 1. create a file record and chunk the file
let (chunks, record) =
create_upload_record(workspace_id, parent_dir, local_file_path).await?;
// 1. create a file record and chunk the file
let (chunks, record) = create_upload_record(workspace_id, parent_dir, local_file_path).await?;
// 2. save the record to sqlite
let conn = user_service.sqlite_connection(user_service.user_id()?)?;
let url = cloud_service.get_object_url_v1(
&record.workspace_id,
&record.parent_dir,
&record.file_id,
)?;
let file_id = record.file_id.clone();
match insert_upload_file(conn, &record) {
Ok(_) => {
// 3. generate url for given file
task_queue
// 2. save the record to sqlite
let conn = self
.user_service
.sqlite_connection(self.user_service.user_id()?)?;
let url = self.cloud_service.get_object_url_v1(
&record.workspace_id,
&record.parent_dir,
&record.file_id,
)?;
let file_id = record.file_id.clone();
match insert_upload_file(conn, &record) {
Ok(_) => {
// 3. generate url for given file
if upload_immediately {
self
.task_queue
.queue_task(UploadTask::ImmediateTask {
chunks,
record,
retry_count: 3,
})
.await;
} else {
self
.task_queue
.queue_task(UploadTask::Task {
chunks,
record,
retry_count: 0,
})
.await;
}
Ok::<_, FlowyError>(CreatedUpload { url, file_id })
},
Err(err) => {
if matches!(err.code, ErrorCode::DuplicateSqliteRecord) {
info!("upload record already exists, skip creating new upload task");
Ok::<_, FlowyError>(CreatedUpload { url, file_id })
} else {
Err(err)
}
},
}
})
let (notifier, receiver) = ProgressNotifier::new();
let receiver = FileProgressReceiver {
rx: receiver,
file_id: file_id.to_string(),
};
self
.progress_notifiers
.insert(file_id.to_string(), notifier);
Ok::<_, FlowyError>((CreatedUpload { url, file_id }, Some(receiver)))
},
Err(err) => {
if matches!(err.code, ErrorCode::DuplicateSqliteRecord) {
info!("upload record already exists, skip creating new upload task");
Ok::<_, FlowyError>((CreatedUpload { url, file_id }, None))
} else {
Err(err)
}
},
}
}
async fn start_upload(&self, chunks: &ChunkedBytes, record: &BoxAny) -> Result<(), FlowyError> {
@ -327,7 +325,7 @@ impl StorageService for StorageServiceImpl {
&self.temp_storage,
chunks,
file_record,
self.upload_status_notifier.clone(),
self.progress_notifiers.clone(),
)
.await
{
@ -371,7 +369,7 @@ impl StorageService for StorageServiceImpl {
&self.temp_storage,
upload_file,
parts,
self.upload_status_notifier.clone(),
self.progress_notifiers.clone(),
)
.await?;
} else {
@ -379,6 +377,22 @@ impl StorageService for StorageServiceImpl {
}
Ok(())
}
async fn subscribe_file_progress(
&self,
file_id: &str,
) -> Result<FileProgressReceiver, FlowyError> {
trace!("[File]: subscribe file progress: {}", file_id);
let (notifier, receiver) = ProgressNotifier::new();
let receiver = FileProgressReceiver {
rx: receiver,
file_id: file_id.to_string(),
};
self
.progress_notifiers
.insert(file_id.to_string(), notifier);
Ok(receiver)
}
}
async fn create_upload_record(
@ -418,7 +432,7 @@ async fn start_upload(
temp_storage: &Arc<FileTempStorage>,
chunked_bytes: &ChunkedBytes,
upload_file: &UploadFileTable,
notifier: tokio::sync::broadcast::Sender<UploadResult>,
progress_notifiers: Arc<DashMap<String, ProgressNotifier>>,
) -> FlowyResult<()> {
let mut upload_file = upload_file.clone();
if upload_file.upload_id.is_empty() {
@ -466,22 +480,32 @@ async fn start_upload(
upload_file.upload_id = create_upload_resp.upload_id;
}
let _ = notifier.send(UploadResult {
file_id: upload_file.file_id.clone(),
status: UploadStatus::InProgress,
});
// 3. start uploading parts
trace!(
"[File] {} start uploading parts: {}",
upload_file.file_id,
chunked_bytes.iter().count()
);
let total_parts = chunked_bytes.iter().count();
let iter = chunked_bytes.iter().enumerate();
let mut completed_parts = Vec::new();
let mut conn = user_service.sqlite_connection(user_service.user_id()?)?;
// 4. gather existing completed parts
let mut completed_parts = select_upload_parts(&mut conn, &upload_file.upload_id)
.unwrap_or_default()
.into_iter()
.map(|part| CompletedPartRequest {
e_tag: part.e_tag,
part_number: part.part_num,
})
.collect::<Vec<_>>();
// when there are any existing parts, skip those parts by setting the current offset.
let offset = completed_parts.len();
for (index, chunk_bytes) in iter {
let part_number = index as i32 + 1;
let part_number = offset + index + 1;
trace!(
"[File] {} uploading part: {}, len:{}KB",
upload_file.file_id,
@ -496,18 +520,24 @@ async fn start_upload(
&upload_file.parent_dir,
&upload_file.upload_id,
&upload_file.file_id,
part_number,
part_number as i32,
chunk_bytes.to_vec(),
)
.await
{
Ok(resp) => {
let progress = (part_number as f64 / total_parts as f64).clamp(0.0, 1.0);
trace!(
"[File] {} upload {} part success, total:{},",
"[File] {} upload progress: {}",
upload_file.file_id,
part_number,
chunked_bytes.offsets.len()
progress
);
if let Some(mut notifier) = progress_notifiers.get_mut(&upload_file.file_id) {
notifier
.notify(FileUploadState::Uploading { progress })
.await;
}
// gather completed part
completed_parts.push(CompletedPartRequest {
e_tag: resp.e_tag,
@ -534,7 +564,7 @@ async fn start_upload(
temp_storage,
&upload_file,
completed_parts,
notifier,
&progress_notifiers,
)
.await;
if let Err(err) = complete_upload_result {
@ -556,7 +586,7 @@ async fn resume_upload(
temp_storage: &Arc<FileTempStorage>,
upload_file: UploadFileTable,
parts: Vec<UploadFilePartTable>,
notifier: tokio::sync::broadcast::Sender<UploadResult>,
progress_notifiers: Arc<DashMap<String, ProgressNotifier>>,
) -> FlowyResult<()> {
trace!(
"[File] resume upload for workspace: {}, parent_dir: {}, file_id: {}, local_file_path:{}",
@ -576,7 +606,7 @@ async fn resume_upload(
temp_storage,
&chunked_bytes,
&upload_file,
notifier,
progress_notifiers,
)
.await?;
},
@ -643,8 +673,13 @@ async fn complete_upload(
temp_storage: &Arc<FileTempStorage>,
upload_file: &UploadFileTable,
parts: Vec<CompletedPartRequest>,
notifier: tokio::sync::broadcast::Sender<UploadResult>,
progress_notifiers: &Arc<DashMap<String, ProgressNotifier>>,
) -> Result<(), FlowyError> {
trace!(
"[File]: completing file upload: {}, part: {}",
upload_file.file_id,
parts.len()
);
match cloud_service
.complete_upload(
&upload_file.workspace_id,
@ -656,13 +691,17 @@ async fn complete_upload(
.await
{
Ok(_) => {
info!("[File] completed upload file: {}", upload_file.upload_id);
trace!("[File] delete upload record from sqlite");
let _ = notifier.send(UploadResult {
file_id: upload_file.file_id.clone(),
status: UploadStatus::Finish,
});
info!("[File] completed upload file: {}", upload_file.file_id);
if let Some(mut notifier) = progress_notifiers.get_mut(&upload_file.file_id) {
trace!("[File]: notify upload finished");
notifier
.notify(FileUploadState::Finished {
file_id: upload_file.file_id.clone(),
})
.await;
}
trace!("[File] delete upload record from sqlite");
let conn = user_service.sqlite_connection(user_service.user_id()?)?;
delete_upload_file(conn, &upload_file.upload_id)?;
if let Err(err) = temp_storage

View File

@ -153,7 +153,12 @@ impl FileUploader {
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
match task {
UploadTask::Task {
UploadTask::ImmediateTask {
chunks,
record,
mut retry_count,
}
| UploadTask::Task {
chunks,
record,
mut retry_count,
@ -253,6 +258,11 @@ impl FileUploaderRunner {
}
pub enum UploadTask {
ImmediateTask {
chunks: ChunkedBytes,
record: UploadFileTable,
retry_count: u8,
},
Task {
chunks: ChunkedBytes,
record: UploadFileTable,
@ -270,8 +280,9 @@ pub enum UploadTask {
impl UploadTask {
pub fn retry_count(&self) -> u8 {
match self {
Self::Task { retry_count, .. } => *retry_count,
Self::BackgroundTask { retry_count, .. } => *retry_count,
UploadTask::ImmediateTask { retry_count, .. } => *retry_count,
UploadTask::Task { retry_count, .. } => *retry_count,
UploadTask::BackgroundTask { retry_count, .. } => *retry_count,
}
}
}
@ -279,8 +290,9 @@ impl UploadTask {
impl Display for UploadTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Task { record, .. } => write!(f, "Task: {}", record.file_id),
Self::BackgroundTask { file_id, .. } => write!(f, "BackgroundTask: {}", file_id),
UploadTask::Task { record, .. } => write!(f, "Task: {}", record.file_id),
UploadTask::BackgroundTask { file_id, .. } => write!(f, "BackgroundTask: {}", file_id),
UploadTask::ImmediateTask { record, .. } => write!(f, "Immediate Task: {}", record.file_id),
}
}
}
@ -290,6 +302,9 @@ impl Eq for UploadTask {}
impl PartialEq for UploadTask {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::ImmediateTask { record: lhs, .. }, Self::ImmediateTask { record: rhs, .. }) => {
lhs.local_file_path == rhs.local_file_path
},
(Self::Task { record: lhs, .. }, Self::Task { record: rhs, .. }) => {
lhs.local_file_path == rhs.local_file_path
},
@ -319,6 +334,11 @@ impl PartialOrd for UploadTask {
impl Ord for UploadTask {
fn cmp(&self, other: &Self) -> Ordering {
match (self, other) {
(Self::ImmediateTask { record: lhs, .. }, Self::ImmediateTask { record: rhs, .. }) => {
lhs.created_at.cmp(&rhs.created_at)
},
(_, Self::ImmediateTask { .. }) => Ordering::Less,
(Self::ImmediateTask { .. }, _) => Ordering::Greater,
(Self::Task { record: lhs, .. }, Self::Task { record: rhs, .. }) => {
lhs.created_at.cmp(&rhs.created_at)
},