From bb3e9d5bd8c9c91583e8c8efdf8fd75fab8573c7 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Sun, 9 Jun 2024 14:02:32 +0800 Subject: [PATCH] feat: Stream chat message (#5498) * chore: stream message * chore: stream message * chore: fix streaming * chore: fix clippy --- .../application/chat_ai_message_bloc.dart | 115 ++++- .../ai_chat/application/chat_bloc.dart | 419 ++++++++++++------ .../application/chat_message_listener.dart | 16 +- .../chat_related_question_bloc.dart | 103 ----- .../lib/plugins/ai_chat/chat_page.dart | 150 ++++--- ...ai_message.dart => ai_message_bubble.dart} | 42 +- .../ai_chat/presentation/chat_input.dart | 108 +++-- .../ai_chat/presentation/chat_loading.dart | 18 +- .../presentation/chat_related_question.dart | 36 -- .../presentation/chat_stream_text_field.dart | 10 + .../chat_streaming_error_message.dart | 157 ++++--- .../presentation/chat_welcome_page.dart | 46 +- .../presentation/message/ai_text_message.dart | 295 ++++++++++++ .../message/user_text_message.dart | 60 +++ ..._message.dart => user_message_bubble.dart} | 0 frontend/appflowy_flutter/pubspec.lock | 18 +- frontend/appflowy_flutter/pubspec.yaml | 2 + .../chat_test/chat_load_message_test.dart | 16 + .../test/bloc_test/chat_test/util.dart | 44 ++ frontend/appflowy_flutter/test/util.dart | 10 - frontend/appflowy_tauri/src-tauri/Cargo.lock | 47 +- frontend/appflowy_tauri/src-tauri/Cargo.toml | 2 +- frontend/appflowy_web/wasm-libs/Cargo.lock | 28 +- frontend/appflowy_web/wasm-libs/Cargo.toml | 2 +- .../appflowy_web_app/src-tauri/Cargo.lock | 43 +- .../appflowy_web_app/src-tauri/Cargo.toml | 2 +- .../flowy_icons/16x/ai_stream_stop.svg | 1 + frontend/rust-lib/Cargo.lock | 52 ++- frontend/rust-lib/Cargo.toml | 5 +- .../event-integration-test/src/chat_event.rs | 2 +- frontend/rust-lib/flowy-chat-pub/src/cloud.rs | 26 +- frontend/rust-lib/flowy-chat/Cargo.toml | 4 +- frontend/rust-lib/flowy-chat/src/chat.rs | 319 +++++++------ frontend/rust-lib/flowy-chat/src/entities.rs | 33 +- .../rust-lib/flowy-chat/src/event_handler.rs | 31 +- frontend/rust-lib/flowy-chat/src/event_map.rs | 14 +- frontend/rust-lib/flowy-chat/src/manager.rs | 20 +- .../rust-lib/flowy-chat/src/notification.rs | 6 +- .../src/persistence/chat_message_sql.rs | 35 -- .../src/deps_resolve/folder_deps.rs | 1 + .../flowy-core/src/integrate/trait_impls.rs | 55 +++ .../flowy-server/src/af_cloud/impls/chat.rs | 78 +++- .../rust-lib/flowy-server/src/default_impl.rs | 35 +- frontend/rust-lib/lib-infra/Cargo.toml | 9 +- .../rust-lib/lib-infra/src/isolate_stream.rs | 44 ++ frontend/rust-lib/lib-infra/src/lib.rs | 2 + 46 files changed, 1691 insertions(+), 870 deletions(-) delete mode 100644 frontend/appflowy_flutter/lib/plugins/ai_chat/application/chat_related_question_bloc.dart rename frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/{chat_ai_message.dart => ai_message_bubble.dart} (83%) create mode 100644 frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_stream_text_field.dart create mode 100644 frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/message/ai_text_message.dart create mode 100644 frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/message/user_text_message.dart rename frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/{chat_user_message.dart => user_message_bubble.dart} (100%) create mode 100644 frontend/appflowy_flutter/test/bloc_test/chat_test/chat_load_message_test.dart create mode 100644 frontend/appflowy_flutter/test/bloc_test/chat_test/util.dart create mode 100644 frontend/resources/flowy_icons/16x/ai_stream_stop.svg create mode 100644 frontend/rust-lib/lib-infra/src/isolate_stream.rs diff --git a/frontend/appflowy_flutter/lib/plugins/ai_chat/application/chat_ai_message_bloc.dart b/frontend/appflowy_flutter/lib/plugins/ai_chat/application/chat_ai_message_bloc.dart index 2c5c8f5b0e..a806a5c97d 100644 --- a/frontend/appflowy_flutter/lib/plugins/ai_chat/application/chat_ai_message_bloc.dart +++ b/frontend/appflowy_flutter/lib/plugins/ai_chat/application/chat_ai_message_bloc.dart @@ -1,42 +1,131 @@ -import 'package:appflowy_backend/protobuf/flowy-document/protobuf.dart'; -import 'package:appflowy_backend/protobuf/flowy-user/protobuf.dart'; +import 'dart:async'; + +import 'package:appflowy/plugins/ai_chat/application/chat_bloc.dart'; +import 'package:appflowy_backend/dispatch/dispatch.dart'; +import 'package:appflowy_backend/log.dart'; +import 'package:appflowy_backend/protobuf/flowy-chat/entities.pb.dart'; +import 'package:fixnum/fixnum.dart'; import 'package:flutter_bloc/flutter_bloc.dart'; -import 'package:flutter_chat_types/flutter_chat_types.dart'; import 'package:freezed_annotation/freezed_annotation.dart'; part 'chat_ai_message_bloc.freezed.dart'; class ChatAIMessageBloc extends Bloc { ChatAIMessageBloc({ - required Message message, + dynamic message, + required this.chatId, + required this.questionId, }) : super(ChatAIMessageState.initial(message)) { + if (state.stream != null) { + _subscription = state.stream!.listen((text) { + if (isClosed) { + return; + } + + if (text.startsWith("data:")) { + add(ChatAIMessageEvent.newText(text.substring(5))); + } else if (text.startsWith("error:")) { + add(ChatAIMessageEvent.receiveError(text.substring(5))); + } + }); + + if (state.stream!.error != null) { + Future.delayed(const Duration(milliseconds: 300), () { + if (!isClosed) { + add(ChatAIMessageEvent.receiveError(state.stream!.error!)); + } + }); + } + } + on( (event, emit) async { await event.when( initial: () async {}, - update: (userProfile, deviceId, states) {}, + newText: (newText) { + emit(state.copyWith(text: state.text + newText, error: null)); + }, + receiveError: (error) { + emit(state.copyWith(error: error)); + }, + retry: () { + if (questionId is! Int64) { + Log.error("Question id is not Int64: $questionId"); + return; + } + emit( + state.copyWith( + retryState: const LoadingState.loading(), + error: null, + ), + ); + + final payload = ChatMessageIdPB( + chatId: chatId, + messageId: questionId, + ); + ChatEventGetAnswerForQuestion(payload).send().then((result) { + if (!isClosed) { + result.fold( + (answer) { + add(ChatAIMessageEvent.retryResult(answer.content)); + }, + (err) { + Log.error("Failed to get answer: $err"); + add(ChatAIMessageEvent.receiveError(err.toString())); + }, + ); + } + }); + }, + retryResult: (String text) { + emit( + state.copyWith( + text: text, + error: null, + retryState: const LoadingState.finish(), + ), + ); + }, ); }, ); } + + @override + Future close() { + _subscription?.cancel(); + return super.close(); + } + + StreamSubscription? _subscription; + final String chatId; + final Int64? questionId; } @freezed class ChatAIMessageEvent with _$ChatAIMessageEvent { const factory ChatAIMessageEvent.initial() = Initial; - const factory ChatAIMessageEvent.update( - UserProfilePB userProfile, - String deviceId, - DocumentAwarenessStatesPB states, - ) = Update; + const factory ChatAIMessageEvent.newText(String text) = _NewText; + const factory ChatAIMessageEvent.receiveError(String error) = _ReceiveError; + const factory ChatAIMessageEvent.retry() = _Retry; + const factory ChatAIMessageEvent.retryResult(String text) = _RetryResult; } @freezed class ChatAIMessageState with _$ChatAIMessageState { const factory ChatAIMessageState({ - required Message message, + AnswerStream? stream, + String? error, + required String text, + required LoadingState retryState, }) = _ChatAIMessageState; - factory ChatAIMessageState.initial(Message message) => - ChatAIMessageState(message: message); + factory ChatAIMessageState.initial(dynamic text) { + return ChatAIMessageState( + text: text is String ? text : "", + stream: text is AnswerStream ? text : null, + retryState: const LoadingState.finish(), + ); + } } diff --git a/frontend/appflowy_flutter/lib/plugins/ai_chat/application/chat_bloc.dart b/frontend/appflowy_flutter/lib/plugins/ai_chat/application/chat_bloc.dart index 17eaad8c92..6adc99ad87 100644 --- a/frontend/appflowy_flutter/lib/plugins/ai_chat/application/chat_bloc.dart +++ b/frontend/appflowy_flutter/lib/plugins/ai_chat/application/chat_bloc.dart @@ -1,6 +1,12 @@ +import 'dart:async'; +import 'dart:collection'; +import 'dart:ffi'; +import 'dart:isolate'; + import 'package:appflowy_backend/dispatch/dispatch.dart'; import 'package:appflowy_backend/log.dart'; import 'package:appflowy_backend/protobuf/flowy-chat/entities.pb.dart'; +import 'package:appflowy_backend/protobuf/flowy-error/code.pb.dart'; import 'package:appflowy_backend/protobuf/flowy-folder/protobuf.dart'; import 'package:appflowy_backend/protobuf/flowy-folder/view.pb.dart'; import 'package:appflowy_backend/protobuf/flowy-user/user_profile.pb.dart'; @@ -11,10 +17,8 @@ import 'package:flutter_chat_types/flutter_chat_types.dart'; import 'package:freezed_annotation/freezed_annotation.dart'; import 'package:nanoid/nanoid.dart'; import 'chat_message_listener.dart'; - part 'chat_bloc.freezed.dart'; -const canRetryKey = "canRetry"; const sendMessageErrorKey = "sendMessageError"; class ChatBloc extends Bloc { @@ -26,78 +30,31 @@ class ChatBloc extends Bloc { super( ChatState.initial(view, userProfile), ) { + _startListening(); _dispatch(); - - listener.start( - chatMessageCallback: _handleChatMessage, - lastUserSentMessageCallback: (message) { - if (!isClosed) { - add(ChatEvent.didSentUserMessage(message)); - } - }, - chatErrorMessageCallback: (err) { - if (!isClosed) { - Log.error("chat error: ${err.errorMessage}"); - final metadata = OnetimeShotType.serverStreamError.toMap(); - if (state.lastSentMessage != null) { - metadata[canRetryKey] = "true"; - } - final error = CustomMessage( - metadata: metadata, - author: const User(id: "system"), - id: 'system', - ); - add(ChatEvent.streaming([error])); - add(const ChatEvent.didFinishStreaming()); - } - }, - latestMessageCallback: (list) { - if (!isClosed) { - final messages = list.messages.map(_createChatMessage).toList(); - add(ChatEvent.didLoadLatestMessages(messages)); - } - }, - prevMessageCallback: (list) { - if (!isClosed) { - final messages = list.messages.map(_createChatMessage).toList(); - add(ChatEvent.didLoadPreviousMessages(messages, list.hasMore)); - } - }, - finishAnswerQuestionCallback: () { - if (!isClosed) { - add(const ChatEvent.didFinishStreaming()); - if (state.lastSentMessage != null) { - final payload = ChatMessageIdPB( - chatId: chatId, - messageId: state.lastSentMessage!.messageId, - ); - // When user message was sent to the server, we start gettting related question - ChatEventGetRelatedQuestion(payload).send().then((result) { - if (!isClosed) { - result.fold( - (list) { - add( - ChatEvent.didReceiveRelatedQuestion(list.items), - ); - }, - (err) { - Log.error("Failed to get related question: $err"); - }, - ); - } - }); - } - } - }, - ); } final ChatMessageListener listener; final String chatId; + /// The last streaming message id + String lastStreamMessageId = ''; + + /// Using a temporary map to associate the real message ID with the last streaming message ID. + /// + /// When a message is streaming, it does not have a real message ID. To maintain the relationship + /// between the real message ID and the last streaming message ID, we use this map to store the associations. + /// + /// This map will be updated when receiving a message from the server and its author type + /// is 3 (AI response). + final HashMap temporaryMessageIDMap = HashMap(); + @override - Future close() { - listener.stop(); + Future close() async { + if (state.answerStream != null) { + await state.answerStream?.dispose(); + } + await listener.stop(); return super.close(); } @@ -114,8 +71,9 @@ class ChatBloc extends Bloc { }, startLoadingPrevMessage: () async { Int64? beforeMessageId; - if (state.messages.isNotEmpty) { - beforeMessageId = Int64.parseInt(state.messages.last.id); + final oldestMessage = _getOlderstMessage(); + if (oldestMessage != null) { + beforeMessageId = Int64.parseInt(oldestMessage.id); } _loadPrevMessage(beforeMessageId); emit( @@ -126,8 +84,13 @@ class ChatBloc extends Bloc { }, didLoadPreviousMessages: (List messages, bool hasMore) { Log.debug("did load previous messages: ${messages.length}"); - final uniqueMessages = {...state.messages, ...messages}.toList() + final onetimeMessages = _getOnetimeMessages(); + final allMessages = _perminentMessages(); + final uniqueMessages = {...allMessages, ...messages}.toList() ..sort((a, b) => b.id.compareTo(a.id)); + + uniqueMessages.insertAll(0, onetimeMessages); + emit( state.copyWith( messages: uniqueMessages, @@ -137,8 +100,12 @@ class ChatBloc extends Bloc { ); }, didLoadLatestMessages: (List messages) { - final uniqueMessages = {...state.messages, ...messages}.toList() + final onetimeMessages = _getOnetimeMessages(); + final allMessages = _perminentMessages(); + final uniqueMessages = {...allMessages, ...messages}.toList() ..sort((a, b) => b.id.compareTo(a.id)); + uniqueMessages.insertAll(0, onetimeMessages); + emit( state.copyWith( messages: uniqueMessages, @@ -146,55 +113,43 @@ class ChatBloc extends Bloc { ), ); }, - streaming: (List messages) { + streaming: (Message message) { final allMessages = _perminentMessages(); - allMessages.insertAll(0, messages); - emit(state.copyWith(messages: allMessages)); - }, - didFinishStreaming: () { + allMessages.insert(0, message); emit( state.copyWith( - answerQuestionStatus: const LoadingState.finish(), + messages: allMessages, + streamingStatus: const LoadingState.loading(), ), ); }, - sendMessage: (String message) async { - await _handleSentMessage(message, emit); - - // Create a loading indicator - final loadingMessage = - _loadingMessage(state.userProfile.id.toString()); - final allMessages = List.from(state.messages) - ..insert(0, loadingMessage); - + didFinishStreaming: () { + emit( + state.copyWith(streamingStatus: const LoadingState.finish()), + ); + }, + receveMessage: (Message message) { + final allMessages = _perminentMessages(); + // remove message with the same id + allMessages.removeWhere((element) => element.id == message.id); + allMessages.insert(0, message); + emit( + state.copyWith( + messages: allMessages, + ), + ); + }, + sendMessage: (String message) { + _startStreamingMessage(message, emit); + final allMessages = _perminentMessages(); emit( state.copyWith( lastSentMessage: null, messages: allMessages, - answerQuestionStatus: const LoadingState.loading(), relatedQuestions: [], ), ); }, - retryGenerate: () { - if (state.lastSentMessage == null) { - return; - } - final payload = ChatMessageIdPB( - chatId: chatId, - messageId: state.lastSentMessage!.messageId, - ); - ChatEventGetAnswerForQuestion(payload).send().then((result) { - if (!isClosed) { - result.fold( - (answer) => _handleChatMessage(answer), - (err) { - Log.error("Failed to get answer: $err"); - }, - ); - } - }); - }, didReceiveRelatedQuestion: (List questions) { final allMessages = _perminentMessages(); final message = CustomMessage( @@ -224,11 +179,104 @@ class ChatBloc extends Bloc { ), ); }, + didUpdateAnswerStream: (AnswerStream stream) { + emit(state.copyWith(answerStream: stream)); + }, + stopStream: () async { + if (state.answerStream == null) { + return; + } + + final payload = StopStreamPB(chatId: chatId); + await ChatEventStopStream(payload).send(); + final allMessages = _perminentMessages(); + if (state.streamingStatus != const LoadingState.finish()) { + // If the streaming is not started, remove the message from the list + if (!state.answerStream!.hasStarted) { + allMessages.removeWhere( + (element) => element.id == lastStreamMessageId, + ); + lastStreamMessageId = ""; + } + + // when stop stream, we will set the answer stream to null. Which means the streaming + // is finished or canceled. + emit( + state.copyWith( + messages: allMessages, + answerStream: null, + streamingStatus: const LoadingState.finish(), + ), + ); + } + }, ); }, ); } + void _startListening() { + listener.start( + chatMessageCallback: (pb) { + if (!isClosed) { + // 3 mean message response from AI + if (pb.authorType == 3 && lastStreamMessageId.isNotEmpty) { + temporaryMessageIDMap[pb.messageId.toString()] = + lastStreamMessageId; + lastStreamMessageId = ""; + } + + final message = _createTextMessage(pb); + add(ChatEvent.receveMessage(message)); + } + }, + chatErrorMessageCallback: (err) { + if (!isClosed) { + Log.error("chat error: ${err.errorMessage}"); + add(const ChatEvent.didFinishStreaming()); + } + }, + latestMessageCallback: (list) { + if (!isClosed) { + final messages = list.messages.map(_createTextMessage).toList(); + add(ChatEvent.didLoadLatestMessages(messages)); + } + }, + prevMessageCallback: (list) { + if (!isClosed) { + final messages = list.messages.map(_createTextMessage).toList(); + add(ChatEvent.didLoadPreviousMessages(messages, list.hasMore)); + } + }, + finishStreamingCallback: () { + if (!isClosed) { + add(const ChatEvent.didFinishStreaming()); + // The answer strema will bet set to null after the streaming is finished or canceled. + // so if the answer stream is null, we will not get related question. + if (state.lastSentMessage != null && state.answerStream != null) { + final payload = ChatMessageIdPB( + chatId: chatId, + messageId: state.lastSentMessage!.messageId, + ); + // When user message was sent to the server, we start gettting related question + ChatEventGetRelatedQuestion(payload).send().then((result) { + if (!isClosed) { + result.fold( + (list) { + add(ChatEvent.didReceiveRelatedQuestion(list.items)); + }, + (err) { + Log.error("Failed to get related question: $err"); + }, + ); + } + }); + } + } + }, + ); + } + // Returns the list of messages that are not include one-time messages. List _perminentMessages() { final allMessages = state.messages.where((element) { @@ -238,6 +286,22 @@ class ChatBloc extends Bloc { return allMessages; } + List _getOnetimeMessages() { + final messages = state.messages.where((element) { + return (element.metadata?.containsKey(onetimeShotType) == true); + }).toList(); + + return messages; + } + + Message? _getOlderstMessage() { + // get the last message that is not a one-time message + final message = state.messages.lastWhereOrNull((element) { + return !(element.metadata?.containsKey(onetimeShotType) == true); + }); + return message; + } + void _loadPrevMessage(Int64? beforeMessageId) { final payload = LoadPrevChatMessagePB( chatId: state.view.id, @@ -247,68 +311,91 @@ class ChatBloc extends Bloc { ChatEventLoadPrevMessage(payload).send(); } - Future _handleSentMessage( + Future _startStreamingMessage( String message, Emitter emit, ) async { - final payload = SendChatPayloadPB( + if (state.answerStream != null) { + await state.answerStream?.dispose(); + } + + final answerStream = AnswerStream(); + add(ChatEvent.didUpdateAnswerStream(answerStream)); + + final payload = StreamChatPayloadPB( chatId: state.view.id, message: message, messageType: ChatMessageTypePB.User, + textStreamPort: Int64(answerStream.nativePort), ); - final result = await ChatEventSendMessage(payload).send(); + + // Stream message to the server + final result = await ChatEventStreamMessage(payload).send(); result.fold( - (_) {}, + (ChatMessagePB question) { + if (!isClosed) { + add(ChatEvent.didSentUserMessage(question)); + + final questionMessageId = question.messageId; + final message = _createTextMessage(question); + add(ChatEvent.receveMessage(message)); + + final streamAnswer = + _createStreamMessage(answerStream, questionMessageId); + add(ChatEvent.streaming(streamAnswer)); + } + }, (err) { if (!isClosed) { Log.error("Failed to send message: ${err.msg}"); final metadata = OnetimeShotType.invalidSendMesssage.toMap(); - metadata[sendMessageErrorKey] = err.msg; + if (err.code != ErrorCode.Internal) { + metadata[sendMessageErrorKey] = err.msg; + } + final error = CustomMessage( metadata: metadata, author: const User(id: "system"), id: 'system', ); - add(ChatEvent.streaming([error])); + add(ChatEvent.receveMessage(error)); } }, ); } - void _handleChatMessage(ChatMessagePB pb) { - if (!isClosed) { - final message = _createChatMessage(pb); - final messages = pb.hasFollowing - ? [_loadingMessage(0.toString()), message] - : [message]; - add(ChatEvent.streaming(messages)); - } - } + Message _createStreamMessage(AnswerStream stream, Int64 questionMessageId) { + final streamMessageId = nanoid(); + lastStreamMessageId = streamMessageId; - Message _loadingMessage(String id) { - return CustomMessage( - author: User(id: id), - metadata: OnetimeShotType.loading.toMap(), - // fake id - id: nanoid(), + return TextMessage( + author: User(id: nanoid()), + metadata: { + "$AnswerStream": stream, + "question": questionMessageId, + "chatId": chatId, + }, + id: streamMessageId, + text: '', ); } - Message _createChatMessage(ChatMessagePB message) { - final messageId = message.messageId.toString(); + Message _createTextMessage(ChatMessagePB message) { + String messageId = message.messageId.toString(); + + /// If the message id is in the temporary map, we will use the previous fake message id + if (temporaryMessageIDMap.containsKey(messageId)) { + messageId = temporaryMessageIDMap[messageId]!; + } + return TextMessage( author: User(id: message.authorId), id: messageId, text: message.content, createdAt: message.createdAt.toInt(), - repliedMessage: _getReplyMessage(state.messages, messageId), ); } - - Message? _getReplyMessage(List messages, String messageId) { - return messages.firstWhereOrNull((element) => element?.id == messageId); - } } @freezed @@ -322,15 +409,20 @@ class ChatEvent with _$ChatEvent { ) = _DidLoadPreviousMessages; const factory ChatEvent.didLoadLatestMessages(List messages) = _DidLoadMessages; - const factory ChatEvent.streaming(List messages) = _DidStreamMessage; + const factory ChatEvent.streaming(Message message) = _StreamingMessage; + const factory ChatEvent.receveMessage(Message message) = _ReceiveMessage; + const factory ChatEvent.didFinishStreaming() = _FinishStreamingMessage; const factory ChatEvent.didReceiveRelatedQuestion( List questions, ) = _DidReceiveRelatedQueston; const factory ChatEvent.clearReleatedQuestion() = _ClearRelatedQuestion; - const factory ChatEvent.retryGenerate() = _RetryGenerate; const factory ChatEvent.didSentUserMessage(ChatMessagePB message) = _DidSendUserMessage; + const factory ChatEvent.didUpdateAnswerStream( + AnswerStream stream, + ) = _DidUpdateAnswerStream; + const factory ChatEvent.stopStream() = _StopStream; } @freezed @@ -347,13 +439,14 @@ class ChatState with _$ChatState { required LoadingState loadingPreviousStatus, // When sending a user message, the status will be set as loading. // After the message is sent, the status will be set as finished. - required LoadingState answerQuestionStatus, + required LoadingState streamingStatus, // Indicate whether there are more previous messages to load. required bool hasMorePrevMessage, // The related questions that are received after the user message is sent. required List relatedQuestions, // The last user message that is sent to the server. ChatMessagePB? lastSentMessage, + AnswerStream? answerStream, }) = _ChatState; factory ChatState.initial(ViewPB view, UserProfilePB userProfile) => @@ -363,7 +456,7 @@ class ChatState with _$ChatState { userProfile: userProfile, initialLoadingStatus: const LoadingState.finish(), loadingPreviousStatus: const LoadingState.finish(), - answerQuestionStatus: const LoadingState.finish(), + streamingStatus: const LoadingState.finish(), hasMorePrevMessage: true, relatedQuestions: [], ); @@ -377,10 +470,8 @@ class LoadingState with _$LoadingState { enum OnetimeShotType { unknown, - loading, - serverStreamError, relatedQuestion, - invalidSendMesssage + invalidSendMesssage, } const onetimeShotType = "OnetimeShotType"; @@ -388,10 +479,6 @@ const onetimeShotType = "OnetimeShotType"; extension OnetimeMessageTypeExtension on OnetimeShotType { static OnetimeShotType fromString(String value) { switch (value) { - case 'OnetimeShotType.loading': - return OnetimeShotType.loading; - case 'OnetimeShotType.serverStreamError': - return OnetimeShotType.serverStreamError; case 'OnetimeShotType.relatedQuestion': return OnetimeShotType.relatedQuestion; case 'OnetimeShotType.invalidSendMesssage': @@ -402,7 +489,7 @@ extension OnetimeMessageTypeExtension on OnetimeShotType { } } - Map toMap() { + Map toMap() { return { onetimeShotType: toString(), }; @@ -421,3 +508,43 @@ OnetimeShotType? onetimeMessageTypeFromMeta(Map? metadata) { } return null; } + +typedef AnswerStreamElement = String; + +class AnswerStream { + AnswerStream() { + _port.handler = _controller.add; + _subscription = _controller.stream.listen( + (event) { + if (event.startsWith("data:")) { + _hasStarted = true; + } else if (event.startsWith("error:")) { + _error = event.substring(5); + } + }, + ); + } + + final RawReceivePort _port = RawReceivePort(); + final StreamController _controller = + StreamController.broadcast(); + late StreamSubscription _subscription; + bool _hasStarted = false; + String? _error; + + int get nativePort => _port.sendPort.nativePort; + bool get hasStarted => _hasStarted; + String? get error => _error; + + Future dispose() async { + await _controller.close(); + await _subscription.cancel(); + _port.close(); + } + + StreamSubscription listen( + void Function(AnswerStreamElement event)? onData, + ) { + return _controller.stream.listen(onData); + } +} diff --git a/frontend/appflowy_flutter/lib/plugins/ai_chat/application/chat_message_listener.dart b/frontend/appflowy_flutter/lib/plugins/ai_chat/application/chat_message_listener.dart index 3b40c18d36..a26acd916f 100644 --- a/frontend/appflowy_flutter/lib/plugins/ai_chat/application/chat_message_listener.dart +++ b/frontend/appflowy_flutter/lib/plugins/ai_chat/application/chat_message_listener.dart @@ -28,26 +28,23 @@ class ChatMessageListener { ChatNotificationParser? _parser; ChatMessageCallback? chatMessageCallback; - ChatMessageCallback? lastUserSentMessageCallback; ChatErrorMessageCallback? chatErrorMessageCallback; LatestMessageCallback? latestMessageCallback; PrevMessageCallback? prevMessageCallback; - void Function()? finishAnswerQuestionCallback; + void Function()? finishStreamingCallback; void start({ ChatMessageCallback? chatMessageCallback, ChatErrorMessageCallback? chatErrorMessageCallback, LatestMessageCallback? latestMessageCallback, PrevMessageCallback? prevMessageCallback, - ChatMessageCallback? lastUserSentMessageCallback, - void Function()? finishAnswerQuestionCallback, + void Function()? finishStreamingCallback, }) { this.chatMessageCallback = chatMessageCallback; this.chatErrorMessageCallback = chatErrorMessageCallback; this.latestMessageCallback = latestMessageCallback; this.prevMessageCallback = prevMessageCallback; - this.lastUserSentMessageCallback = lastUserSentMessageCallback; - this.finishAnswerQuestionCallback = finishAnswerQuestionCallback; + this.finishStreamingCallback = finishStreamingCallback; } void _callback( @@ -59,9 +56,6 @@ class ChatMessageListener { case ChatNotification.DidReceiveChatMessage: chatMessageCallback?.call(ChatMessagePB.fromBuffer(r)); break; - case ChatNotification.LastUserSentMessage: - lastUserSentMessageCallback?.call(ChatMessagePB.fromBuffer(r)); - break; case ChatNotification.StreamChatMessageError: chatErrorMessageCallback?.call(ChatMessageErrorPB.fromBuffer(r)); break; @@ -71,8 +65,8 @@ class ChatMessageListener { case ChatNotification.DidLoadPrevChatMessage: prevMessageCallback?.call(ChatMessageListPB.fromBuffer(r)); break; - case ChatNotification.FinishAnswerQuestion: - finishAnswerQuestionCallback?.call(); + case ChatNotification.FinishStreaming: + finishStreamingCallback?.call(); break; default: break; diff --git a/frontend/appflowy_flutter/lib/plugins/ai_chat/application/chat_related_question_bloc.dart b/frontend/appflowy_flutter/lib/plugins/ai_chat/application/chat_related_question_bloc.dart deleted file mode 100644 index d6db6afc37..0000000000 --- a/frontend/appflowy_flutter/lib/plugins/ai_chat/application/chat_related_question_bloc.dart +++ /dev/null @@ -1,103 +0,0 @@ -import 'package:appflowy/plugins/ai_chat/application/chat_message_listener.dart'; -import 'package:appflowy_backend/dispatch/dispatch.dart'; -import 'package:appflowy_backend/log.dart'; -import 'package:appflowy_backend/protobuf/flowy-chat/entities.pb.dart'; -import 'package:flutter_bloc/flutter_bloc.dart'; -import 'package:freezed_annotation/freezed_annotation.dart'; - -part 'chat_related_question_bloc.freezed.dart'; - -class ChatRelatedMessageBloc - extends Bloc { - ChatRelatedMessageBloc({ - required String chatId, - }) : listener = ChatMessageListener(chatId: chatId), - super(ChatRelatedMessageState.initial()) { - on( - (event, emit) async { - await event.when( - initial: () async { - listener.start( - lastUserSentMessageCallback: (message) { - if (!isClosed) { - add(ChatRelatedMessageEvent.updateLastSentMessage(message)); - } - }, - ); - }, - didReceiveRelatedQuestion: (List questions) { - Log.debug("Related questions: $questions"); - emit( - state.copyWith( - relatedQuestions: questions, - ), - ); - }, - updateLastSentMessage: (ChatMessagePB message) { - final payload = - ChatMessageIdPB(chatId: chatId, messageId: message.messageId); - ChatEventGetRelatedQuestion(payload).send().then((result) { - if (!isClosed) { - result.fold( - (list) { - add( - ChatRelatedMessageEvent.didReceiveRelatedQuestion( - list.items, - ), - ); - }, - (err) { - Log.error("Failed to get related question: $err"); - }, - ); - } - }); - - emit( - state.copyWith( - lastSentMessage: message, - relatedQuestions: [], - ), - ); - }, - clear: () { - emit( - state.copyWith( - relatedQuestions: [], - ), - ); - }, - ); - }, - ); - } - - final ChatMessageListener listener; - @override - Future close() { - listener.stop(); - return super.close(); - } -} - -@freezed -class ChatRelatedMessageEvent with _$ChatRelatedMessageEvent { - const factory ChatRelatedMessageEvent.initial() = Initial; - const factory ChatRelatedMessageEvent.updateLastSentMessage( - ChatMessagePB message, - ) = _LastSentMessage; - const factory ChatRelatedMessageEvent.didReceiveRelatedQuestion( - List questions, - ) = _RelatedQuestion; - const factory ChatRelatedMessageEvent.clear() = _Clear; -} - -@freezed -class ChatRelatedMessageState with _$ChatRelatedMessageState { - const factory ChatRelatedMessageState({ - ChatMessagePB? lastSentMessage, - @Default([]) List relatedQuestions, - }) = _ChatRelatedMessageState; - - factory ChatRelatedMessageState.initial() => const ChatRelatedMessageState(); -} diff --git a/frontend/appflowy_flutter/lib/plugins/ai_chat/chat_page.dart b/frontend/appflowy_flutter/lib/plugins/ai_chat/chat_page.dart index 3e6142c3ec..1a7393a0a9 100644 --- a/frontend/appflowy_flutter/lib/plugins/ai_chat/chat_page.dart +++ b/frontend/appflowy_flutter/lib/plugins/ai_chat/chat_page.dart @@ -1,9 +1,8 @@ import 'package:appflowy/generated/locale_keys.g.dart'; import 'package:appflowy/plugins/ai_chat/application/chat_bloc.dart'; -import 'package:appflowy/plugins/ai_chat/presentation/chat_ai_message.dart'; -import 'package:appflowy/plugins/ai_chat/presentation/chat_streaming_error_message.dart'; +import 'package:appflowy/plugins/ai_chat/presentation/ai_message_bubble.dart'; import 'package:appflowy/plugins/ai_chat/presentation/chat_related_question.dart'; -import 'package:appflowy/plugins/ai_chat/presentation/chat_user_message.dart'; +import 'package:appflowy/plugins/ai_chat/presentation/user_message_bubble.dart'; import 'package:appflowy/workspace/presentation/home/toast.dart'; import 'package:appflowy_backend/protobuf/flowy-folder/view.pb.dart'; import 'package:appflowy_backend/protobuf/flowy-user/protobuf.dart'; @@ -19,11 +18,12 @@ import 'package:flutter_chat_ui/flutter_chat_ui.dart' show Chat; import 'package:flutter_chat_types/flutter_chat_types.dart' as types; import 'presentation/chat_input.dart'; -import 'presentation/chat_loading.dart'; import 'presentation/chat_popmenu.dart'; import 'presentation/chat_theme.dart'; import 'presentation/chat_user_invalid_message.dart'; import 'presentation/chat_welcome_page.dart'; +import 'presentation/message/ai_text_message.dart'; +import 'presentation/message/user_text_message.dart'; class AIChatUILayout { static EdgeInsets get chatPadding => @@ -108,7 +108,6 @@ class _AIChatPageState extends State { customBottomWidget: buildChatInput(blocContext), user: _user, theme: buildTheme(context), - customMessageBuilder: _customMessageBuilder, onEndReached: () async { if (state.hasMorePrevMessage && state.loadingPreviousStatus != @@ -138,6 +137,13 @@ class _AIChatPageState extends State { }, ), messageWidthRatio: AIChatUILayout.messageWidthRatio, + textMessageBuilder: ( + textMessage, { + required messageWidth, + required showName, + }) { + return _buildAITextMessage(blocContext, textMessage); + }, bubbleBuilder: ( child, { required message, @@ -149,46 +155,7 @@ class _AIChatPageState extends State { child: child, ); } else { - final messageType = onetimeMessageTypeFromMeta( - message.metadata, - ); - if (messageType == OnetimeShotType.serverStreamError) { - return ChatStreamingError( - message: message, - onRetryPressed: () { - blocContext - .read() - .add(const ChatEvent.retryGenerate()); - }, - ); - } - - if (messageType == OnetimeShotType.invalidSendMesssage) { - return ChatInvalidUserMessage( - message: message, - ); - } - - if (messageType == OnetimeShotType.relatedQuestion) { - return RelatedQuestionList( - onQuestionSelected: (question) { - blocContext - .read() - .add(ChatEvent.sendMessage(question)); - blocContext - .read() - .add(const ChatEvent.clearReleatedQuestion()); - }, - chatId: widget.view.id, - relatedQuestions: state.relatedQuestions, - ); - } - - return ChatAIMessageBubble( - message: message, - customMessageType: messageType, - child: child, - ); + return _buildAIBubble(message, blocContext, state, child); } }, ); @@ -199,10 +166,67 @@ class _AIChatPageState extends State { ); } + Widget _buildAITextMessage(BuildContext context, TextMessage message) { + final isAuthor = message.author.id == _user.id; + if (isAuthor) { + return ChatTextMessageWidget( + user: message.author, + messageUserId: message.id, + text: message.text, + ); + } else { + final stream = message.metadata?["$AnswerStream"]; + final questionId = message.metadata?["question"]; + return ChatAITextMessageWidget( + user: message.author, + messageUserId: message.id, + text: stream is AnswerStream ? stream : message.text, + key: ValueKey(message.id), + questionId: questionId, + chatId: widget.view.id, + ); + } + } + + Widget _buildAIBubble( + Message message, + BuildContext blocContext, + ChatState state, + Widget child, + ) { + final messageType = onetimeMessageTypeFromMeta( + message.metadata, + ); + + if (messageType == OnetimeShotType.invalidSendMesssage) { + return ChatInvalidUserMessage( + message: message, + ); + } + + if (messageType == OnetimeShotType.relatedQuestion) { + return RelatedQuestionList( + onQuestionSelected: (question) { + blocContext.read().add(ChatEvent.sendMessage(question)); + blocContext + .read() + .add(const ChatEvent.clearReleatedQuestion()); + }, + chatId: widget.view.id, + relatedQuestions: state.relatedQuestions, + ); + } + + return ChatAIMessageBubble( + message: message, + customMessageType: messageType, + child: child, + ); + } + Widget buildBubble(Message message, Widget child) { final isAuthor = message.author.id == _user.id; const borderRadius = BorderRadius.all(Radius.circular(6)); - final childWithPadding = isAuthor ? Padding( padding: const EdgeInsets.symmetric(horizontal: 20, vertical: 16), @@ -261,33 +285,25 @@ class _AIChatPageState extends State { } } - Widget _customMessageBuilder( - types.CustomMessage message, { - required int messageWidth, - }) { - // iteration custom message type - final messageType = onetimeMessageTypeFromMeta(message.metadata); - if (messageType == null) { - return const SizedBox.shrink(); - } - - switch (messageType) { - case OnetimeShotType.loading: - return const ChatAILoading(); - default: - return const SizedBox.shrink(); - } - } - Widget buildChatInput(BuildContext context) { return ClipRect( child: Padding( padding: AIChatUILayout.safeAreaInsets(context), child: Column( children: [ - ChatInput( - chatId: widget.view.id, - onSendPressed: (message) => onSendPressed(context, message.text), + BlocSelector( + selector: (state) => state.streamingStatus, + builder: (context, state) { + return ChatInput( + chatId: widget.view.id, + onSendPressed: (message) => + onSendPressed(context, message.text), + isStreaming: state != const LoadingState.finish(), + onStopStreaming: () { + context.read().add(const ChatEvent.stopStream()); + }, + ); + }, ), const VSpace(6), Opacity( diff --git a/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_ai_message.dart b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/ai_message_bubble.dart similarity index 83% rename from frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_ai_message.dart rename to frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/ai_message_bubble.dart index 7a19360490..8246378bda 100644 --- a/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_ai_message.dart +++ b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/ai_message_bubble.dart @@ -1,6 +1,5 @@ import 'package:appflowy/generated/flowy_svgs.g.dart'; import 'package:appflowy/generated/locale_keys.g.dart'; -import 'package:appflowy/plugins/ai_chat/application/chat_ai_message_bloc.dart'; import 'package:appflowy/plugins/ai_chat/application/chat_bloc.dart'; import 'package:appflowy/plugins/ai_chat/presentation/chat_avatar.dart'; import 'package:appflowy/plugins/ai_chat/presentation/chat_input.dart'; @@ -13,7 +12,6 @@ import 'package:flowy_infra_ui/style_widget/icon_button.dart'; import 'package:flowy_infra_ui/widget/flowy_tooltip.dart'; import 'package:flutter/material.dart'; import 'package:flutter/services.dart'; -import 'package:flutter_bloc/flutter_bloc.dart'; import 'package:flutter_chat_types/flutter_chat_types.dart'; import 'package:styled_widget/styled_widget.dart'; @@ -35,30 +33,22 @@ class ChatAIMessageBubble extends StatelessWidget { Widget build(BuildContext context) { const padding = EdgeInsets.symmetric(horizontal: _leftPadding); final childWithPadding = Padding(padding: padding, child: child); + final widget = isMobile + ? _wrapPopMenu(childWithPadding) + : _wrapHover(childWithPadding); - return BlocProvider( - create: (context) => ChatAIMessageBloc(message: message), - child: BlocBuilder( - builder: (context, state) { - final widget = isMobile - ? _wrapPopMenu(childWithPadding) - : _wrapHover(childWithPadding); - - return Row( - mainAxisSize: MainAxisSize.min, - crossAxisAlignment: CrossAxisAlignment.start, - children: [ - const ChatBorderedCircleAvatar( - child: FlowySvg( - FlowySvgs.flowy_ai_chat_logo_s, - size: Size.square(24), - ), - ), - Expanded(child: widget), - ], - ); - }, - ), + return Row( + mainAxisSize: MainAxisSize.min, + crossAxisAlignment: CrossAxisAlignment.start, + children: [ + const ChatBorderedCircleAvatar( + child: FlowySvg( + FlowySvgs.flowy_ai_chat_logo_s, + size: Size.square(24), + ), + ), + Expanded(child: widget), + ], ); } @@ -118,7 +108,7 @@ class _ChatAIMessageHoverState extends State { borderRadius: Corners.s6Border, ), child: Padding( - padding: const EdgeInsets.only(bottom: 40), + padding: const EdgeInsets.only(bottom: 30), child: widget.child, ), ), diff --git a/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_input.dart b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_input.dart index b33c59d640..93bf80b4a5 100644 --- a/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_input.dart +++ b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_input.dart @@ -18,13 +18,17 @@ class ChatInput extends StatefulWidget { required this.onSendPressed, required this.chatId, this.options = const InputOptions(), + required this.isStreaming, + required this.onStopStreaming, }); final bool? isAttachmentUploading; final VoidCallback? onAttachmentPressed; final void Function(types.PartialText) onSendPressed; + final void Function() onStopStreaming; final InputOptions options; final String chatId; + final bool isStreaming; @override State createState() => _ChatInputState(); @@ -68,26 +72,23 @@ class _ChatInputState extends State { void _handleSendButtonVisibilityModeChange() { _textController.removeListener(_handleTextControllerChange); - if (widget.options.sendButtonVisibilityMode == - SendButtonVisibilityMode.hidden) { - _sendButtonVisible = false; - } else if (widget.options.sendButtonVisibilityMode == - SendButtonVisibilityMode.editing) { - _sendButtonVisible = _textController.text.trim() != ''; - _textController.addListener(_handleTextControllerChange); - } else { - _sendButtonVisible = true; - } + _sendButtonVisible = + _textController.text.trim() != '' || widget.isStreaming; + _textController.addListener(_handleTextControllerChange); } void _handleSendPressed() { - final trimmedText = _textController.text.trim(); - if (trimmedText != '') { - final partialText = types.PartialText(text: trimmedText); - widget.onSendPressed(partialText); + if (widget.isStreaming) { + widget.onStopStreaming(); + } else { + final trimmedText = _textController.text.trim(); + if (trimmedText != '') { + final partialText = types.PartialText(text: trimmedText); + widget.onSendPressed(partialText); - if (widget.options.inputClearMode == InputClearMode.always) { - _textController.clear(); + if (widget.options.inputClearMode == InputClearMode.always) { + _textController.clear(); + } } } } @@ -138,6 +139,7 @@ class _ChatInputState extends State { padding: textPadding, child: TextField( controller: _textController, + readOnly: widget.isStreaming, focusNode: _inputFocusNode, decoration: InputDecoration( border: InputBorder.none, @@ -153,7 +155,6 @@ class _ChatInputState extends State { autocorrect: widget.options.autocorrect, autofocus: widget.options.autofocus, enableSuggestions: widget.options.enableSuggestions, - spellCheckConfiguration: const SpellCheckConfiguration(), keyboardType: widget.options.keyboardType, textCapitalization: TextCapitalization.sentences, maxLines: 10, @@ -173,8 +174,14 @@ class _ChatInputState extends State { visible: _sendButtonVisible, child: Padding( padding: buttonPadding, - child: SendButton( - onPressed: _handleSendPressed, + child: AccessoryButton( + onSendPressed: () { + _handleSendPressed(); + }, + onStopStreaming: () { + widget.onStopStreaming(); + }, + isStreaming: widget.isStreaming, ), ), ), @@ -184,10 +191,7 @@ class _ChatInputState extends State { @override void didUpdateWidget(covariant ChatInput oldWidget) { super.didUpdateWidget(oldWidget); - if (widget.options.sendButtonVisibilityMode != - oldWidget.options.sendButtonVisibilityMode) { - _handleSendButtonVisibilityModeChange(); - } + _handleSendButtonVisibilityModeChange(); } @override @@ -211,7 +215,6 @@ class InputOptions { this.keyboardType = TextInputType.multiline, this.onTextChanged, this.onTextFieldTap, - this.sendButtonVisibilityMode = SendButtonVisibilityMode.editing, this.textEditingController, this.autocorrect = true, this.autofocus = false, @@ -231,11 +234,6 @@ class InputOptions { /// Will be called on [TextField] tap. final VoidCallback? onTextFieldTap; - /// Controls the visibility behavior of the [SendButton] based on the - /// [TextField] state inside the [ChatInput] widget. - /// Defaults to [SendButtonVisibilityMode.editing]. - final SendButtonVisibilityMode sendButtonVisibilityMode; - /// Custom [TextEditingController]. If not provided, defaults to the /// [InputTextFieldController], which extends [TextEditingController] and has /// additional fatures like markdown support. If you want to keep additional @@ -260,24 +258,46 @@ class InputOptions { final isMobile = defaultTargetPlatform == TargetPlatform.android || defaultTargetPlatform == TargetPlatform.iOS; -class SendButton extends StatelessWidget { - const SendButton({required this.onPressed, super.key}); +class AccessoryButton extends StatelessWidget { + const AccessoryButton({ + required this.onSendPressed, + required this.onStopStreaming, + required this.isStreaming, + super.key, + }); - final void Function() onPressed; + final void Function() onSendPressed; + final void Function() onStopStreaming; + final bool isStreaming; @override Widget build(BuildContext context) { - return FlowyIconButton( - width: 36, - fillColor: AFThemeExtension.of(context).lightGreyHover, - hoverColor: AFThemeExtension.of(context).lightGreyHover, - radius: BorderRadius.circular(18), - icon: FlowySvg( - FlowySvgs.send_s, - size: const Size.square(24), - color: Theme.of(context).colorScheme.primary, - ), - onPressed: onPressed, - ); + if (isStreaming) { + return FlowyIconButton( + width: 36, + icon: FlowySvg( + FlowySvgs.ai_stream_stop_s, + size: const Size.square(28), + color: Theme.of(context).colorScheme.primary, + ), + onPressed: onStopStreaming, + radius: BorderRadius.circular(18), + fillColor: AFThemeExtension.of(context).lightGreyHover, + hoverColor: AFThemeExtension.of(context).lightGreyHover, + ); + } else { + return FlowyIconButton( + width: 36, + fillColor: AFThemeExtension.of(context).lightGreyHover, + hoverColor: AFThemeExtension.of(context).lightGreyHover, + radius: BorderRadius.circular(18), + icon: FlowySvg( + FlowySvgs.send_s, + size: const Size.square(24), + color: Theme.of(context).colorScheme.primary, + ), + onPressed: onSendPressed, + ); + } } } diff --git a/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_loading.dart b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_loading.dart index 4b5c984382..9c4bd64cb6 100644 --- a/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_loading.dart +++ b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_loading.dart @@ -53,15 +53,15 @@ class ContentPlaceholder extends StatelessWidget { ), ], ), - Container( - width: 140, - height: 16.0, - margin: const EdgeInsets.only(bottom: 8.0), - decoration: BoxDecoration( - color: AFThemeExtension.of(context).lightGreyHover, - borderRadius: BorderRadius.circular(4.0), - ), - ), + // Container( + // width: 140, + // height: 16.0, + // margin: const EdgeInsets.only(bottom: 8.0), + // decoration: BoxDecoration( + // color: AFThemeExtension.of(context).lightGreyHover, + // borderRadius: BorderRadius.circular(4.0), + // ), + // ), ], ), ); diff --git a/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_related_question.dart b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_related_question.dart index 0cf2398f68..a37a0824ed 100644 --- a/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_related_question.dart +++ b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_related_question.dart @@ -1,47 +1,11 @@ import 'package:appflowy/generated/flowy_svgs.g.dart'; import 'package:appflowy/generated/locale_keys.g.dart'; -import 'package:appflowy/plugins/ai_chat/application/chat_related_question_bloc.dart'; import 'package:appflowy_backend/protobuf/flowy-chat/entities.pb.dart'; import 'package:easy_localization/easy_localization.dart'; import 'package:flowy_infra_ui/style_widget/text.dart'; import 'package:flowy_infra_ui/widget/spacing.dart'; import 'package:flutter/material.dart'; -import 'package:flutter_bloc/flutter_bloc.dart'; -class RelatedQuestionPage extends StatefulWidget { - const RelatedQuestionPage({ - required this.chatId, - required this.onQuestionSelected, - super.key, - }); - - final String chatId; - final Function(String) onQuestionSelected; - - @override - State createState() => _RelatedQuestionPageState(); -} - -class _RelatedQuestionPageState extends State { - @override - Widget build(BuildContext context) { - return BlocProvider( - create: (context) => ChatRelatedMessageBloc(chatId: widget.chatId) - ..add( - const ChatRelatedMessageEvent.initial(), - ), - child: BlocBuilder( - builder: (blocContext, state) { - return RelatedQuestionList( - chatId: widget.chatId, - onQuestionSelected: widget.onQuestionSelected, - relatedQuestions: state.relatedQuestions, - ); - }, - ), - ); - } -} class RelatedQuestionList extends StatelessWidget { const RelatedQuestionList({ diff --git a/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_stream_text_field.dart b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_stream_text_field.dart new file mode 100644 index 0000000000..7589a1b634 --- /dev/null +++ b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_stream_text_field.dart @@ -0,0 +1,10 @@ +import 'package:flutter/widgets.dart'; + +class StreamTextField extends StatelessWidget { + const StreamTextField({super.key}); + + @override + Widget build(BuildContext context) { + return const Placeholder(); + } +} diff --git a/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_streaming_error_message.dart b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_streaming_error_message.dart index c0552a7e8e..1f825d3355 100644 --- a/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_streaming_error_message.dart +++ b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_streaming_error_message.dart @@ -1,84 +1,83 @@ -import 'package:appflowy/generated/locale_keys.g.dart'; -import 'package:appflowy/plugins/ai_chat/application/chat_bloc.dart'; -import 'package:easy_localization/easy_localization.dart'; -import 'package:flowy_infra_ui/flowy_infra_ui.dart'; -import 'package:flutter/material.dart'; -import 'package:flutter_chat_types/flutter_chat_types.dart'; +// import 'package:appflowy/generated/locale_keys.g.dart'; +// import 'package:easy_localization/easy_localization.dart'; +// import 'package:flowy_infra_ui/flowy_infra_ui.dart'; +// import 'package:flutter/material.dart'; +// import 'package:flutter_chat_types/flutter_chat_types.dart'; -class ChatStreamingError extends StatelessWidget { - const ChatStreamingError({ - required this.message, - required this.onRetryPressed, - super.key, - }); +// class ChatStreamingError extends StatelessWidget { +// const ChatStreamingError({ +// required this.message, +// required this.onRetryPressed, +// super.key, +// }); - final void Function() onRetryPressed; - final Message message; - @override - Widget build(BuildContext context) { - final canRetry = message.metadata?[canRetryKey] != null; +// final void Function() onRetryPressed; +// final Message message; +// @override +// Widget build(BuildContext context) { +// final canRetry = message.metadata?[canRetryKey] != null; - if (canRetry) { - return Column( - children: [ - const Divider(height: 4, thickness: 1), - const VSpace(16), - Center( - child: Column( - children: [ - _aiUnvaliable(), - const VSpace(10), - _retryButton(), - ], - ), - ), - ], - ); - } else { - return Center( - child: Column( - children: [ - const Divider(height: 20, thickness: 1), - Padding( - padding: const EdgeInsets.all(8.0), - child: FlowyText( - LocaleKeys.chat_serverUnavailable.tr(), - fontSize: 14, - ), - ), - ], - ), - ); - } - } +// if (canRetry) { +// return Column( +// children: [ +// const Divider(height: 4, thickness: 1), +// const VSpace(16), +// Center( +// child: Column( +// children: [ +// _aiUnvaliable(), +// const VSpace(10), +// _retryButton(), +// ], +// ), +// ), +// ], +// ); +// } else { +// return Center( +// child: Column( +// children: [ +// const Divider(height: 20, thickness: 1), +// Padding( +// padding: const EdgeInsets.all(8.0), +// child: FlowyText( +// LocaleKeys.chat_serverUnavailable.tr(), +// fontSize: 14, +// ), +// ), +// ], +// ), +// ); +// } +// } - FlowyButton _retryButton() { - return FlowyButton( - radius: BorderRadius.circular(20), - useIntrinsicWidth: true, - text: Padding( - padding: const EdgeInsets.symmetric(horizontal: 12, vertical: 8), - child: FlowyText( - LocaleKeys.chat_regenerateAnswer.tr(), - fontSize: 14, - ), - ), - onTap: onRetryPressed, - iconPadding: 0, - leftIcon: const Icon( - Icons.refresh, - size: 20, - ), - ); - } +// FlowyButton _retryButton() { +// return FlowyButton( +// radius: BorderRadius.circular(20), +// useIntrinsicWidth: true, +// text: Padding( +// padding: const EdgeInsets.symmetric(horizontal: 12, vertical: 8), +// child: FlowyText( +// LocaleKeys.chat_regenerateAnswer.tr(), +// fontSize: 14, +// ), +// ), +// onTap: onRetryPressed, +// iconPadding: 0, +// leftIcon: const Icon( +// Icons.refresh, +// size: 20, +// ), +// ); +// } - Padding _aiUnvaliable() { - return Padding( - padding: const EdgeInsets.all(8.0), - child: FlowyText( - LocaleKeys.chat_aiServerUnavailable.tr(), - fontSize: 14, - ), - ); - } -} +// Padding _aiUnvaliable() { +// return Padding( +// padding: const EdgeInsets.all(8.0), +// child: FlowyText( +// LocaleKeys.chat_aiServerUnavailable.tr(), +// fontSize: 14, +// ), +// ); +// } +// } diff --git a/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_welcome_page.dart b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_welcome_page.dart index 1014b9ef5e..5aceca2025 100644 --- a/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_welcome_page.dart +++ b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_welcome_page.dart @@ -20,29 +20,33 @@ class ChatWelcomePage extends StatelessWidget { ]; @override Widget build(BuildContext context) { - return Column( - mainAxisAlignment: MainAxisAlignment.center, - children: [ - const FlowySvg( - FlowySvgs.flowy_ai_chat_logo_s, - size: Size.square(44), - ), - const SizedBox(height: 40), - GridView.builder( - shrinkWrap: true, - gridDelegate: SliverGridDelegateWithFixedCrossAxisCount( - crossAxisCount: isMobile ? 2 : 4, - crossAxisSpacing: 6, - mainAxisSpacing: 6, - childAspectRatio: 16.0 / 9.0, + return AnimatedOpacity( + opacity: 1.0, + duration: const Duration(seconds: 3), + child: Column( + mainAxisAlignment: MainAxisAlignment.center, + children: [ + const FlowySvg( + FlowySvgs.flowy_ai_chat_logo_s, + size: Size.square(44), ), - itemCount: items.length, - itemBuilder: (context, index) => WelcomeQuestion( - question: items[index], - onSelected: onSelectedQuestion, + const SizedBox(height: 40), + GridView.builder( + shrinkWrap: true, + gridDelegate: SliverGridDelegateWithFixedCrossAxisCount( + crossAxisCount: isMobile ? 2 : 4, + crossAxisSpacing: 6, + mainAxisSpacing: 6, + childAspectRatio: 16.0 / 9.0, + ), + itemCount: items.length, + itemBuilder: (context, index) => WelcomeQuestion( + question: items[index], + onSelected: onSelectedQuestion, + ), ), - ), - ], + ], + ), ); } } diff --git a/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/message/ai_text_message.dart b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/message/ai_text_message.dart new file mode 100644 index 0000000000..d2fa1b155c --- /dev/null +++ b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/message/ai_text_message.dart @@ -0,0 +1,295 @@ +import 'package:appflowy/generated/locale_keys.g.dart'; +import 'package:appflowy/plugins/ai_chat/application/chat_ai_message_bloc.dart'; +import 'package:appflowy/plugins/ai_chat/application/chat_bloc.dart'; +import 'package:appflowy/plugins/ai_chat/presentation/chat_loading.dart'; +import 'package:easy_localization/easy_localization.dart'; +import 'package:fixnum/fixnum.dart'; +import 'package:flowy_infra/theme_extension.dart'; +import 'package:flowy_infra_ui/style_widget/button.dart'; +import 'package:flowy_infra_ui/style_widget/text.dart'; +import 'package:flowy_infra_ui/widget/spacing.dart'; +import 'package:flutter/material.dart'; +import 'package:flutter_bloc/flutter_bloc.dart'; +import 'package:flutter_chat_types/flutter_chat_types.dart'; +import 'package:markdown_widget/markdown_widget.dart'; + +class ChatAITextMessageWidget extends StatelessWidget { + const ChatAITextMessageWidget({ + super.key, + required this.user, + required this.messageUserId, + required this.text, + required this.questionId, + required this.chatId, + }); + + final User user; + final String messageUserId; + final dynamic text; + final Int64? questionId; + final String chatId; + + @override + Widget build(BuildContext context) { + return BlocProvider( + create: (context) => ChatAIMessageBloc( + message: text, + chatId: chatId, + questionId: questionId, + )..add(const ChatAIMessageEvent.initial()), + child: BlocBuilder( + builder: (context, state) { + if (state.error != null) { + return StreamingError( + onRetryPressed: () { + context.read().add( + const ChatAIMessageEvent.retry(), + ); + }, + ); + } + + if (state.retryState == const LoadingState.loading()) { + return const ChatAILoading(); + } + + if (state.text.isEmpty) { + return const ChatAILoading(); + } else { + return _textWidgetBuilder(user, context, state.text); + } + }, + ), + ); + } + + Widget _textWidgetBuilder( + User user, + BuildContext context, + String text, + ) { + return MarkdownWidget( + data: text, + shrinkWrap: true, + physics: const NeverScrollableScrollPhysics(), + config: configFromContext(context), + ); + } + + MarkdownConfig configFromContext(BuildContext context) { + return MarkdownConfig( + configs: [ + HrConfig(color: AFThemeExtension.of(context).textColor), + ChatH1Config( + style: TextStyle( + color: AFThemeExtension.of(context).textColor, + fontSize: 24, + fontWeight: FontWeight.bold, + height: 1.5, + ), + dividerColor: AFThemeExtension.of(context).lightGreyHover, + ), + ChatH2Config( + style: TextStyle( + color: AFThemeExtension.of(context).textColor, + fontSize: 20, + fontWeight: FontWeight.bold, + height: 1.5, + ), + dividerColor: AFThemeExtension.of(context).lightGreyHover, + ), + ChatH3Config( + style: TextStyle( + color: AFThemeExtension.of(context).textColor, + fontSize: 18, + fontWeight: FontWeight.bold, + height: 1.5, + ), + dividerColor: AFThemeExtension.of(context).lightGreyHover, + ), + H4Config( + style: TextStyle( + color: AFThemeExtension.of(context).textColor, + fontSize: 16, + fontWeight: FontWeight.bold, + height: 1.5, + ), + ), + H5Config( + style: TextStyle( + color: AFThemeExtension.of(context).textColor, + fontSize: 14, + fontWeight: FontWeight.bold, + height: 1.5, + ), + ), + H6Config( + style: TextStyle( + color: AFThemeExtension.of(context).textColor, + fontSize: 12, + fontWeight: FontWeight.bold, + height: 1.5, + ), + ), + PreConfig( + padding: const EdgeInsets.all(14), + decoration: BoxDecoration( + color: Theme.of(context).colorScheme.surfaceContainerHighest, + borderRadius: const BorderRadius.all( + Radius.circular(8.0), + ), + ), + ), + PConfig( + textStyle: TextStyle( + color: AFThemeExtension.of(context).textColor, + fontSize: 16, + fontWeight: FontWeight.w500, + height: 1.5, + ), + ), + CodeConfig( + style: TextStyle( + color: AFThemeExtension.of(context).textColor, + fontSize: 16, + fontWeight: FontWeight.w500, + height: 1.5, + ), + ), + BlockquoteConfig.darkConfig, + ], + ); + } +} + +class ChatH1Config extends HeadingConfig { + const ChatH1Config({ + this.style = const TextStyle( + fontSize: 32, + height: 40 / 32, + fontWeight: FontWeight.bold, + ), + required this.dividerColor, + }); + + @override + final TextStyle style; + final Color dividerColor; + + @override + String get tag => MarkdownTag.h1.name; + + @override + HeadingDivider? get divider => HeadingDivider( + space: 10, + color: dividerColor, + height: 10, + ); +} + +///config class for h2 +class ChatH2Config extends HeadingConfig { + const ChatH2Config({ + this.style = const TextStyle( + fontSize: 24, + height: 30 / 24, + fontWeight: FontWeight.bold, + ), + required this.dividerColor, + }); + @override + final TextStyle style; + final Color dividerColor; + + @override + String get tag => MarkdownTag.h2.name; + + @override + HeadingDivider? get divider => HeadingDivider( + space: 10, + color: dividerColor, + height: 10, + ); +} + +class ChatH3Config extends HeadingConfig { + const ChatH3Config({ + this.style = const TextStyle( + fontSize: 24, + height: 30 / 24, + fontWeight: FontWeight.bold, + ), + required this.dividerColor, + }); + + @override + final TextStyle style; + final Color dividerColor; + + @override + String get tag => MarkdownTag.h3.name; + + @override + HeadingDivider? get divider => HeadingDivider( + space: 10, + color: dividerColor, + height: 10, + ); +} + +class StreamingError extends StatelessWidget { + const StreamingError({ + required this.onRetryPressed, + super.key, + }); + + final void Function() onRetryPressed; + @override + Widget build(BuildContext context) { + return Column( + children: [ + const Divider(height: 4, thickness: 1), + const VSpace(16), + Center( + child: Column( + children: [ + _aiUnvaliable(), + const VSpace(10), + _retryButton(), + ], + ), + ), + ], + ); + } + + FlowyButton _retryButton() { + return FlowyButton( + radius: BorderRadius.circular(20), + useIntrinsicWidth: true, + text: Padding( + padding: const EdgeInsets.symmetric(horizontal: 12, vertical: 8), + child: FlowyText( + LocaleKeys.chat_regenerateAnswer.tr(), + fontSize: 14, + ), + ), + onTap: onRetryPressed, + iconPadding: 0, + leftIcon: const Icon( + Icons.refresh, + size: 20, + ), + ); + } + + Padding _aiUnvaliable() { + return Padding( + padding: const EdgeInsets.all(8.0), + child: FlowyText( + LocaleKeys.chat_aiServerUnavailable.tr(), + fontSize: 14, + ), + ); + } +} diff --git a/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/message/user_text_message.dart b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/message/user_text_message.dart new file mode 100644 index 0000000000..ba5254408d --- /dev/null +++ b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/message/user_text_message.dart @@ -0,0 +1,60 @@ +import 'package:flowy_infra/theme_extension.dart'; +import 'package:flowy_infra_ui/style_widget/text.dart'; +import 'package:flutter/material.dart'; +import 'package:flutter_chat_types/flutter_chat_types.dart'; + +class ChatTextMessageWidget extends StatelessWidget { + const ChatTextMessageWidget({ + super.key, + required this.user, + required this.messageUserId, + required this.text, + }); + + final User user; + final String messageUserId; + final String text; + + @override + Widget build(BuildContext context) { + return _textWidgetBuilder(user, context, text); + } + + Widget _textWidgetBuilder( + User user, + BuildContext context, + String text, + ) { + return Column( + crossAxisAlignment: CrossAxisAlignment.start, + children: [ + TextMessageText( + text: text, + ), + ], + ); + } +} + +/// Widget to reuse the markdown capabilities, e.g., for previews. +class TextMessageText extends StatelessWidget { + const TextMessageText({ + super.key, + required this.text, + }); + + /// Text that is shown as markdown. + final String text; + + @override + Widget build(BuildContext context) { + return FlowyText( + text, + fontSize: 16, + fontWeight: FontWeight.w500, + lineHeight: 1.5, + maxLines: 2000, + color: AFThemeExtension.of(context).textColor, + ); + } +} diff --git a/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_user_message.dart b/frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/user_message_bubble.dart similarity index 100% rename from frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/chat_user_message.dart rename to frontend/appflowy_flutter/lib/plugins/ai_chat/presentation/user_message_bubble.dart diff --git a/frontend/appflowy_flutter/pubspec.lock b/frontend/appflowy_flutter/pubspec.lock index 7938fb37c4..cb14d7a654 100644 --- a/frontend/appflowy_flutter/pubspec.lock +++ b/frontend/appflowy_flutter/pubspec.lock @@ -680,6 +680,14 @@ packages: url: "https://github.com/LucasXu0/emoji_mart.git" source: git version: "1.0.2" + flutter_highlight: + dependency: transitive + description: + name: flutter_highlight + sha256: "7b96333867aa07e122e245c033b8ad622e4e3a42a1a2372cbb098a2541d8782c" + url: "https://pub.dev" + source: hosted + version: "0.7.0" flutter_link_previewer: dependency: transitive description: @@ -1066,7 +1074,7 @@ packages: source: hosted version: "0.6.0" isolates: - dependency: transitive + dependency: "direct main" description: name: isolates sha256: ce89e4141b27b877326d3715be2dceac7a7ba89f3229785816d2d318a75ddf28 @@ -1209,6 +1217,14 @@ packages: url: "https://pub.dev" source: hosted version: "7.2.2" + markdown_widget: + dependency: "direct main" + description: + name: markdown_widget + sha256: "216dced98962d7699a265344624bc280489d739654585ee881c95563a3252fac" + url: "https://pub.dev" + source: hosted + version: "2.3.2+6" matcher: dependency: transitive description: diff --git a/frontend/appflowy_flutter/pubspec.yaml b/frontend/appflowy_flutter/pubspec.yaml index cf352947c8..29750bedb1 100644 --- a/frontend/appflowy_flutter/pubspec.yaml +++ b/frontend/appflowy_flutter/pubspec.yaml @@ -141,6 +141,8 @@ dependencies: auto_size_text_field: ^2.2.3 reorderable_tabbar: ^1.0.6 shimmer: ^3.0.0 + isolates: ^3.0.3+8 + markdown_widget: ^2.3.2+6 # Window Manager for MacOS and Linux window_manager: ^0.3.9 diff --git a/frontend/appflowy_flutter/test/bloc_test/chat_test/chat_load_message_test.dart b/frontend/appflowy_flutter/test/bloc_test/chat_test/chat_load_message_test.dart new file mode 100644 index 0000000000..134a429a6b --- /dev/null +++ b/frontend/appflowy_flutter/test/bloc_test/chat_test/chat_load_message_test.dart @@ -0,0 +1,16 @@ +import 'package:flutter_test/flutter_test.dart'; + +import 'util.dart'; + +void main() { + // ignore: unused_local_variable + late AppFlowyChatTest chatTest; + + setUpAll(() async { + chatTest = await AppFlowyChatTest.ensureInitialized(); + }); + + test('send message', () async { + // final context = await chatTest.createChat(); + }); +} diff --git a/frontend/appflowy_flutter/test/bloc_test/chat_test/util.dart b/frontend/appflowy_flutter/test/bloc_test/chat_test/util.dart new file mode 100644 index 0000000000..29d98416b5 --- /dev/null +++ b/frontend/appflowy_flutter/test/bloc_test/chat_test/util.dart @@ -0,0 +1,44 @@ +import 'package:appflowy/plugins/ai_chat/chat.dart'; +import 'package:appflowy/workspace/application/view/view_service.dart'; +import 'package:appflowy_backend/protobuf/flowy-folder/view.pb.dart'; + +import '../../util.dart'; + +class AppFlowyChatTest { + AppFlowyChatTest({required this.unitTest}); + + final AppFlowyUnitTest unitTest; + + static Future ensureInitialized() async { + final inner = await AppFlowyUnitTest.ensureInitialized(); + return AppFlowyChatTest(unitTest: inner); + } + + Future createChat() async { + final app = await unitTest.createWorkspace(); + final builder = AIChatPluginBuilder(); + return ViewBackendService.createView( + parentViewId: app.id, + name: "Test Chat", + layoutType: builder.layoutType, + openAfterCreate: true, + ).then((result) { + return result.fold( + (view) async { + return view; + }, + (error) { + throw Exception(); + }, + ); + }); + } +} + +Future boardResponseFuture() { + return Future.delayed(boardResponseDuration()); +} + +Duration boardResponseDuration({int milliseconds = 200}) { + return Duration(milliseconds: milliseconds); +} diff --git a/frontend/appflowy_flutter/test/util.dart b/frontend/appflowy_flutter/test/util.dart index 65303cb789..21c1020a71 100644 --- a/frontend/appflowy_flutter/test/util.dart +++ b/frontend/appflowy_flutter/test/util.dart @@ -58,7 +58,6 @@ class AppFlowyUnitTest { } WorkspacePB get currentWorkspace => workspace; - Future _loadWorkspace() async { final result = await userService.getCurrentWorkspace(); result.fold( @@ -83,15 +82,6 @@ class AppFlowyUnitTest { (error) => throw Exception(error), ); } - - Future> loadApps() async { - final result = await workspaceService.getPublicViews(); - - return result.fold( - (apps) => apps, - (error) => throw Exception(error), - ); - } } void _pathProviderInitialized() { diff --git a/frontend/appflowy_tauri/src-tauri/Cargo.lock b/frontend/appflowy_tauri/src-tauri/Cargo.lock index 9dae4b3b63..46320ba2b9 100644 --- a/frontend/appflowy_tauri/src-tauri/Cargo.lock +++ b/frontend/appflowy_tauri/src-tauri/Cargo.lock @@ -117,6 +117,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "allo-isolate" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97b6d794345b06592d0ebeed8e477e41b71e5a0a49df4fc0e4184d5938b99509" +dependencies = [ + "atomic", + "pin-project", +] + [[package]] name = "alloc-no-stdlib" version = "2.0.4" @@ -162,7 +172,7 @@ checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" [[package]] name = "app-error" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "bincode", @@ -182,7 +192,7 @@ dependencies = [ [[package]] name = "appflowy-ai-client" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "bytes", @@ -289,6 +299,12 @@ dependencies = [ "system-deps 6.1.1", ] +[[package]] +name = "atomic" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" + [[package]] name = "atomic_refcell" version = "0.1.10" @@ -756,7 +772,7 @@ dependencies = [ [[package]] name = "client-api" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "again", "anyhow", @@ -803,7 +819,7 @@ dependencies = [ [[package]] name = "client-websocket" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "futures-channel", "futures-util", @@ -1043,7 +1059,7 @@ dependencies = [ [[package]] name = "collab-rt-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "bincode", @@ -1068,7 +1084,7 @@ dependencies = [ [[package]] name = "collab-rt-protocol" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "async-trait", @@ -1425,7 +1441,7 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" [[package]] name = "database-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "app-error", @@ -1817,6 +1833,7 @@ dependencies = [ name = "flowy-chat" version = "0.1.0" dependencies = [ + "allo-isolate", "bytes", "dashmap", "flowy-chat-pub", @@ -1828,6 +1845,7 @@ dependencies = [ "futures", "lib-dispatch", "lib-infra", + "log", "protobuf", "strum_macros 0.21.1", "tokio", @@ -2835,7 +2853,7 @@ dependencies = [ [[package]] name = "gotrue" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "futures-util", @@ -2852,7 +2870,7 @@ dependencies = [ [[package]] name = "gotrue-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "app-error", @@ -3284,7 +3302,7 @@ dependencies = [ [[package]] name = "infra" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "reqwest", @@ -3525,11 +3543,13 @@ dependencies = [ name = "lib-infra" version = "0.1.0" dependencies = [ + "allo-isolate", "anyhow", "async-trait", "atomic_refcell", "bytes", "chrono", + "futures", "futures-core", "md5", "pin-project", @@ -3643,9 +3663,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.20" +version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" [[package]] name = "loom" @@ -5772,7 +5792,7 @@ dependencies = [ [[package]] name = "shared-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "app-error", @@ -5783,6 +5803,7 @@ dependencies = [ "database-entity", "futures", "gotrue-entity", + "log", "pin-project", "reqwest", "serde", diff --git a/frontend/appflowy_tauri/src-tauri/Cargo.toml b/frontend/appflowy_tauri/src-tauri/Cargo.toml index 6bb9cb9db6..cac7634e62 100644 --- a/frontend/appflowy_tauri/src-tauri/Cargo.toml +++ b/frontend/appflowy_tauri/src-tauri/Cargo.toml @@ -52,7 +52,7 @@ collab-user = { version = "0.2" } # Run the script: # scripts/tool/update_client_api_rev.sh new_rev_id # ⚠️⚠️⚠️️ -client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "3f55cea9ca386875a1668ef30600c83cd6a1ffe2" } +client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" } [dependencies] serde_json.workspace = true diff --git a/frontend/appflowy_web/wasm-libs/Cargo.lock b/frontend/appflowy_web/wasm-libs/Cargo.lock index 15c6119e02..31ad62bb38 100644 --- a/frontend/appflowy_web/wasm-libs/Cargo.lock +++ b/frontend/appflowy_web/wasm-libs/Cargo.lock @@ -216,7 +216,7 @@ checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" [[package]] name = "app-error" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "bincode", @@ -236,7 +236,7 @@ dependencies = [ [[package]] name = "appflowy-ai-client" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "bytes", @@ -562,7 +562,7 @@ dependencies = [ [[package]] name = "client-api" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "again", "anyhow", @@ -609,7 +609,7 @@ dependencies = [ [[package]] name = "client-websocket" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "futures-channel", "futures-util", @@ -787,7 +787,7 @@ dependencies = [ [[package]] name = "collab-rt-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "bincode", @@ -812,7 +812,7 @@ dependencies = [ [[package]] name = "collab-rt-protocol" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "async-trait", @@ -1026,7 +1026,7 @@ checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" [[package]] name = "database-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "app-error", @@ -1881,7 +1881,7 @@ dependencies = [ [[package]] name = "gotrue" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "futures-util", @@ -1898,7 +1898,7 @@ dependencies = [ [[package]] name = "gotrue-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "app-error", @@ -2199,7 +2199,7 @@ dependencies = [ [[package]] name = "infra" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "reqwest", @@ -2339,6 +2339,7 @@ dependencies = [ "atomic_refcell", "bytes", "chrono", + "futures", "futures-core", "md5", "pin-project", @@ -2427,9 +2428,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.20" +version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" [[package]] name = "mac" @@ -3900,7 +3901,7 @@ dependencies = [ [[package]] name = "shared-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "app-error", @@ -3911,6 +3912,7 @@ dependencies = [ "database-entity", "futures", "gotrue-entity", + "log", "pin-project", "reqwest", "serde", diff --git a/frontend/appflowy_web/wasm-libs/Cargo.toml b/frontend/appflowy_web/wasm-libs/Cargo.toml index 2f0c25023a..4029f36c85 100644 --- a/frontend/appflowy_web/wasm-libs/Cargo.toml +++ b/frontend/appflowy_web/wasm-libs/Cargo.toml @@ -55,7 +55,7 @@ yrs = "0.18.8" # Run the script: # scripts/tool/update_client_api_rev.sh new_rev_id # ⚠️⚠️⚠️️ -client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "3f55cea9ca386875a1668ef30600c83cd6a1ffe2" } +client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" } diff --git a/frontend/appflowy_web_app/src-tauri/Cargo.lock b/frontend/appflowy_web_app/src-tauri/Cargo.lock index 12d5ca3b63..4c77b63af4 100644 --- a/frontend/appflowy_web_app/src-tauri/Cargo.lock +++ b/frontend/appflowy_web_app/src-tauri/Cargo.lock @@ -108,6 +108,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "allo-isolate" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97b6d794345b06592d0ebeed8e477e41b71e5a0a49df4fc0e4184d5938b99509" +dependencies = [ + "atomic", + "pin-project", +] + [[package]] name = "alloc-no-stdlib" version = "2.0.4" @@ -153,7 +163,7 @@ checksum = "0952808a6c2afd1aa8947271f3a60f1a6763c7b912d210184c5149b5cf147247" [[package]] name = "app-error" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "bincode", @@ -173,7 +183,7 @@ dependencies = [ [[package]] name = "appflowy-ai-client" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "bytes", @@ -299,6 +309,12 @@ dependencies = [ "system-deps 6.2.2", ] +[[package]] +name = "atomic" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" + [[package]] name = "atomic_refcell" version = "0.1.13" @@ -730,7 +746,7 @@ dependencies = [ [[package]] name = "client-api" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "again", "anyhow", @@ -777,7 +793,7 @@ dependencies = [ [[package]] name = "client-websocket" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "futures-channel", "futures-util", @@ -1026,7 +1042,7 @@ dependencies = [ [[package]] name = "collab-rt-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "bincode", @@ -1051,7 +1067,7 @@ dependencies = [ [[package]] name = "collab-rt-protocol" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "async-trait", @@ -1412,7 +1428,7 @@ checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" [[package]] name = "database-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "app-error", @@ -1854,6 +1870,7 @@ dependencies = [ name = "flowy-chat" version = "0.1.0" dependencies = [ + "allo-isolate", "bytes", "dashmap", "flowy-chat-pub", @@ -1865,6 +1882,7 @@ dependencies = [ "futures", "lib-dispatch", "lib-infra", + "log", "protobuf", "strum_macros 0.21.1", "tokio", @@ -2909,7 +2927,7 @@ dependencies = [ [[package]] name = "gotrue" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "futures-util", @@ -2926,7 +2944,7 @@ dependencies = [ [[package]] name = "gotrue-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "app-error", @@ -3363,7 +3381,7 @@ dependencies = [ [[package]] name = "infra" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "reqwest", @@ -3609,11 +3627,13 @@ dependencies = [ name = "lib-infra" version = "0.1.0" dependencies = [ + "allo-isolate", "anyhow", "async-trait", "atomic_refcell", "bytes", "chrono", + "futures", "futures-core", "md5", "pin-project", @@ -5867,7 +5887,7 @@ dependencies = [ [[package]] name = "shared-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "app-error", @@ -5878,6 +5898,7 @@ dependencies = [ "database-entity", "futures", "gotrue-entity", + "log", "pin-project", "reqwest", "serde", diff --git a/frontend/appflowy_web_app/src-tauri/Cargo.toml b/frontend/appflowy_web_app/src-tauri/Cargo.toml index 6f03972195..be1e6b69f5 100644 --- a/frontend/appflowy_web_app/src-tauri/Cargo.toml +++ b/frontend/appflowy_web_app/src-tauri/Cargo.toml @@ -52,7 +52,7 @@ collab-user = { version = "0.2" } # Run the script: # scripts/tool/update_client_api_rev.sh new_rev_id # ⚠️⚠️⚠️️ -client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "3f55cea9ca386875a1668ef30600c83cd6a1ffe2" } +client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" } [dependencies] serde_json.workspace = true diff --git a/frontend/resources/flowy_icons/16x/ai_stream_stop.svg b/frontend/resources/flowy_icons/16x/ai_stream_stop.svg new file mode 100644 index 0000000000..55c7355ab7 --- /dev/null +++ b/frontend/resources/flowy_icons/16x/ai_stream_stop.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/frontend/rust-lib/Cargo.lock b/frontend/rust-lib/Cargo.lock index abc72ec7c5..c79dab339a 100644 --- a/frontend/rust-lib/Cargo.lock +++ b/frontend/rust-lib/Cargo.lock @@ -163,7 +163,7 @@ checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" [[package]] name = "app-error" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "bincode", @@ -183,7 +183,7 @@ dependencies = [ [[package]] name = "appflowy-ai-client" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "bytes", @@ -664,7 +664,7 @@ dependencies = [ [[package]] name = "client-api" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "again", "anyhow", @@ -711,7 +711,7 @@ dependencies = [ [[package]] name = "client-websocket" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "futures-channel", "futures-util", @@ -920,7 +920,7 @@ dependencies = [ [[package]] name = "collab-rt-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "bincode", @@ -945,7 +945,7 @@ dependencies = [ [[package]] name = "collab-rt-protocol" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "async-trait", @@ -1165,7 +1165,7 @@ dependencies = [ "cssparser-macros", "dtoa-short", "itoa", - "phf 0.8.0", + "phf 0.11.2", "smallvec", ] @@ -1265,7 +1265,7 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" [[package]] name = "database-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "app-error", @@ -1652,6 +1652,7 @@ dependencies = [ name = "flowy-chat" version = "0.1.0" dependencies = [ + "allo-isolate", "bytes", "dashmap", "flowy-chat-pub", @@ -1663,6 +1664,7 @@ dependencies = [ "futures", "lib-dispatch", "lib-infra", + "log", "protobuf", "strum_macros 0.21.1", "tokio", @@ -2521,7 +2523,7 @@ dependencies = [ [[package]] name = "gotrue" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "futures-util", @@ -2538,7 +2540,7 @@ dependencies = [ [[package]] name = "gotrue-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "app-error", @@ -2903,7 +2905,7 @@ dependencies = [ [[package]] name = "infra" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "reqwest", @@ -3045,6 +3047,7 @@ dependencies = [ name = "lib-infra" version = "0.1.0" dependencies = [ + "allo-isolate", "anyhow", "async-trait", "atomic_refcell", @@ -3156,9 +3159,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.20" +version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" [[package]] name = "loom" @@ -3778,7 +3781,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dfb61232e34fcb633f43d12c58f83c1df82962dcdfa565a4e866ffc17dafe12" dependencies = [ - "phf_macros", + "phf_macros 0.8.0", "phf_shared 0.8.0", "proc-macro-hack", ] @@ -3798,6 +3801,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" dependencies = [ + "phf_macros 0.11.2", "phf_shared 0.11.2", ] @@ -3865,6 +3869,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "phf_macros" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3444646e286606587e49f3bcf1679b8cef1dc2c5ecc29ddacaffc305180d464b" +dependencies = [ + "phf_generator 0.11.2", + "phf_shared 0.11.2", + "proc-macro2", + "quote", + "syn 2.0.47", +] + [[package]] name = "phf_shared" version = "0.8.0" @@ -4068,7 +4085,7 @@ checksum = "c55e02e35260070b6f716a2423c2ff1c3bb1642ddca6f99e1f26d06268a0e2d2" dependencies = [ "bytes", "heck 0.4.1", - "itertools 0.10.5", + "itertools 0.11.0", "log", "multimap", "once_cell", @@ -4089,7 +4106,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.47", @@ -4986,7 +5003,7 @@ dependencies = [ [[package]] name = "shared-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=3f55cea9ca386875a1668ef30600c83cd6a1ffe2#3f55cea9ca386875a1668ef30600c83cd6a1ffe2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=d0467e7e2e8ee4b925556b5510fb6ed7322dde8c#d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" dependencies = [ "anyhow", "app-error", @@ -4997,6 +5014,7 @@ dependencies = [ "database-entity", "futures", "gotrue-entity", + "log", "pin-project", "reqwest", "serde", diff --git a/frontend/rust-lib/Cargo.toml b/frontend/rust-lib/Cargo.toml index 39dccadd26..6282fab3c6 100644 --- a/frontend/rust-lib/Cargo.toml +++ b/frontend/rust-lib/Cargo.toml @@ -29,7 +29,8 @@ members = [ "build-tool/flowy-codegen", "build-tool/flowy-derive", "flowy-search-pub", - "flowy-chat", "flowy-chat-pub", + "flowy-chat", + "flowy-chat-pub", ] resolver = "2" @@ -93,7 +94,7 @@ yrs = "0.18.8" # Run the script.add_workspace_members: # scripts/tool/update_client_api_rev.sh new_rev_id # ⚠️⚠️⚠️️ -client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "3f55cea9ca386875a1668ef30600c83cd6a1ffe2" } +client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "d0467e7e2e8ee4b925556b5510fb6ed7322dde8c" } [profile.dev] opt-level = 1 diff --git a/frontend/rust-lib/event-integration-test/src/chat_event.rs b/frontend/rust-lib/event-integration-test/src/chat_event.rs index ca2c4911f3..60e8ba65b1 100644 --- a/frontend/rust-lib/event-integration-test/src/chat_event.rs +++ b/frontend/rust-lib/event-integration-test/src/chat_event.rs @@ -43,7 +43,7 @@ impl EventIntegrationTest { }; EventBuilder::new(self.clone()) - .event(ChatEvent::SendMessage) + .event(ChatEvent::StreamMessage) .payload(payload) .async_send() .await; diff --git a/frontend/rust-lib/flowy-chat-pub/src/cloud.rs b/frontend/rust-lib/flowy-chat-pub/src/cloud.rs index 51de79422a..f8b81c250a 100644 --- a/frontend/rust-lib/flowy-chat-pub/src/cloud.rs +++ b/frontend/rust-lib/flowy-chat-pub/src/cloud.rs @@ -1,4 +1,4 @@ -pub use client_api::entity::ai_dto::{RelatedQuestion, RepeatedRelatedQuestion}; +pub use client_api::entity::ai_dto::{RelatedQuestion, RepeatedRelatedQuestion, StringOrMessage}; pub use client_api::entity::{ ChatAuthorType, ChatMessage, ChatMessageType, MessageCursor, QAChatMessage, RepeatedChatMessage, }; @@ -9,6 +9,7 @@ use lib_infra::async_trait::async_trait; use lib_infra::future::FutureResult; pub type ChatMessageStream = BoxStream<'static, Result>; +pub type StreamAnswer = BoxStream<'static, Result>; #[async_trait] pub trait ChatCloudService: Send + Sync + 'static { fn create_chat( @@ -26,6 +27,29 @@ pub trait ChatCloudService: Send + Sync + 'static { message_type: ChatMessageType, ) -> Result; + fn send_question( + &self, + workspace_id: &str, + chat_id: &str, + message: &str, + message_type: ChatMessageType, + ) -> FutureResult; + + fn save_answer( + &self, + workspace_id: &str, + chat_id: &str, + message: &str, + question_id: i64, + ) -> FutureResult; + + async fn stream_answer( + &self, + workspace_id: &str, + chat_id: &str, + message_id: i64, + ) -> Result; + fn get_chat_messages( &self, workspace_id: &str, diff --git a/frontend/rust-lib/flowy-chat/Cargo.toml b/frontend/rust-lib/flowy-chat/Cargo.toml index e1a7175442..0698a593d1 100644 --- a/frontend/rust-lib/flowy-chat/Cargo.toml +++ b/frontend/rust-lib/flowy-chat/Cargo.toml @@ -19,12 +19,14 @@ strum_macros = "0.21" protobuf.workspace = true bytes.workspace = true validator = { version = "0.16.0", features = ["derive"] } -lib-infra = { workspace = true } +lib-infra = { workspace = true, features = ["isolate_flutter"] } flowy-chat-pub.workspace = true dashmap = "5.5" flowy-sqlite = { workspace = true } tokio.workspace = true futures.workspace = true +allo-isolate = { version = "^0.1", features = ["catch-unwind"] } +log = "0.4.21" [build-dependencies] flowy-codegen.workspace = true diff --git a/frontend/rust-lib/flowy-chat/src/chat.rs b/frontend/rust-lib/flowy-chat/src/chat.rs index 70e36df27d..0f81c92c28 100644 --- a/frontend/rust-lib/flowy-chat/src/chat.rs +++ b/frontend/rust-lib/flowy-chat/src/chat.rs @@ -3,18 +3,18 @@ use crate::entities::{ }; use crate::manager::ChatUserService; use crate::notification::{send_notification, ChatNotification}; -use crate::persistence::{ - insert_answer_message, insert_chat_messages, select_chat_messages, ChatMessageTable, -}; +use crate::persistence::{insert_chat_messages, select_chat_messages, ChatMessageTable}; +use allo_isolate::Isolate; use flowy_chat_pub::cloud::{ - ChatAuthorType, ChatCloudService, ChatMessage, ChatMessageType, MessageCursor, + ChatCloudService, ChatMessage, ChatMessageType, MessageCursor, StringOrMessage, }; use flowy_error::{FlowyError, FlowyResult}; use flowy_sqlite::DBConnection; -use futures::StreamExt; -use std::sync::atomic::AtomicI64; +use futures::{SinkExt, StreamExt}; +use lib_infra::isolate_stream::IsolateSink; +use std::sync::atomic::{AtomicBool, AtomicI64}; use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; use tracing::{error, instrument, trace}; enum PrevMessageState { @@ -30,6 +30,8 @@ pub struct Chat { cloud_service: Arc, prev_message_state: Arc>, latest_message_id: Arc, + stop_stream: Arc, + steam_buffer: Arc>, } impl Chat { @@ -46,6 +48,8 @@ impl Chat { user_service, prev_message_state: Arc::new(RwLock::new(PrevMessageState::HasMore)), latest_message_id: Default::default(), + stop_stream: Arc::new(AtomicBool::new(false)), + steam_buffer: Arc::new(Mutex::new("".to_string())), } } @@ -63,27 +67,133 @@ impl Chat { } } + pub async fn stop_stream_message(&self) { + self + .stop_stream + .store(true, std::sync::atomic::Ordering::SeqCst); + } + #[instrument(level = "info", skip_all, err)] - pub async fn send_chat_message( + pub async fn stream_chat_message( &self, message: &str, message_type: ChatMessageType, - ) -> Result<(), FlowyError> { + text_stream_port: i64, + ) -> Result { if message.len() > 2000 { return Err(FlowyError::text_too_long().with_context("Exceeds maximum message 2000 length")); } + // clear + self + .stop_stream + .store(false, std::sync::atomic::Ordering::SeqCst); + self.steam_buffer.lock().await.clear(); + let stream_buffer = self.steam_buffer.clone(); let uid = self.user_service.user_id()?; let workspace_id = self.user_service.workspace_id()?; - stream_send_chat_messages( - uid, - workspace_id, - self.chat_id.clone(), - message.to_string(), - message_type, - self.cloud_service.clone(), - self.user_service.clone(), - ); + + let question = self + .cloud_service + .send_question(&workspace_id, &self.chat_id, message, message_type) + .await + .map_err(|err| { + error!("Failed to send question: {}", err); + FlowyError::server_error() + })?; + + save_chat_message( + self.user_service.sqlite_connection(uid)?, + &self.chat_id, + vec![question.clone()], + )?; + + let stop_stream = self.stop_stream.clone(); + let chat_id = self.chat_id.clone(); + let question_id = question.message_id; + let cloud_service = self.cloud_service.clone(); + let user_service = self.user_service.clone(); + tokio::spawn(async move { + let mut text_sink = IsolateSink::new(Isolate::new(text_stream_port)); + match cloud_service + .stream_answer(&workspace_id, &chat_id, question_id) + .await + { + Ok(mut stream) => { + while let Some(message) = stream.next().await { + match message { + Ok(message) => match message { + StringOrMessage::Left(s) => { + if stop_stream.load(std::sync::atomic::Ordering::Relaxed) { + send_notification(&chat_id, ChatNotification::FinishStreaming).send(); + trace!("[Chat] stop streaming message"); + let answer = cloud_service + .save_answer( + &workspace_id, + &chat_id, + &stream_buffer.lock().await, + question_id, + ) + .await?; + Self::save_answer(uid, &chat_id, &user_service, answer)?; + break; + } + stream_buffer.lock().await.push_str(&s); + let _ = text_sink.send(format!("data:{}", s)).await; + }, + StringOrMessage::Right(answer) => { + trace!("[Chat] received final answer: {:?}", answer); + send_notification(&chat_id, ChatNotification::FinishStreaming).send(); + Self::save_answer(uid, &chat_id, &user_service, answer)?; + }, + }, + Err(err) => { + error!("[Chat] failed to stream answer: {}", err); + let _ = text_sink.send(format!("error:{}", err)).await; + let pb = ChatMessageErrorPB { + chat_id: chat_id.clone(), + error_message: err.to_string(), + }; + send_notification(&chat_id, ChatNotification::StreamChatMessageError) + .payload(pb) + .send(); + break; + }, + } + } + }, + Err(err) => { + let pb = ChatMessageErrorPB { + chat_id: chat_id.clone(), + error_message: err.to_string(), + }; + send_notification(&chat_id, ChatNotification::StreamChatMessageError) + .payload(pb) + .send(); + }, + } + Ok::<(), FlowyError>(()) + }); + + let question_pb = ChatMessagePB::from(question); + Ok(question_pb) + } + + fn save_answer( + uid: i64, + chat_id: &str, + user_service: &Arc, + answer: ChatMessage, + ) -> Result<(), FlowyError> { + save_chat_message( + user_service.sqlite_connection(uid)?, + chat_id, + vec![answer.clone()], + )?; + let pb = ChatMessagePB::from(answer); + send_notification(chat_id, ChatNotification::DidReceiveChatMessage) + .payload(pb) + .send(); Ok(()) } @@ -106,7 +216,7 @@ impl Chat { before_message_id: Option, ) -> Result { trace!( - "Loading old messages: chat_id={}, limit={}, before_message_id={:?}", + "[Chat] Loading messages from disk: chat_id={}, limit={}, before_message_id={:?}", self.chat_id, limit, before_message_id @@ -116,13 +226,16 @@ impl Chat { .await?; // If the number of messages equals the limit, then no need to load more messages from remote - let has_more = !messages.is_empty(); if messages.len() == limit as usize { - return Ok(ChatMessageListPB { + let pb = ChatMessageListPB { messages, - has_more, + has_more: true, total: 0, - }); + }; + send_notification(&self.chat_id, ChatNotification::DidLoadPrevChatMessage) + .payload(pb.clone()) + .send(); + return Ok(pb); } if matches!( @@ -140,7 +253,7 @@ impl Chat { Ok(ChatMessageListPB { messages, - has_more, + has_more: true, total: 0, }) } @@ -151,7 +264,7 @@ impl Chat { after_message_id: Option, ) -> Result { trace!( - "Loading new messages: chat_id={}, limit={}, after_message_id={:?}", + "[Chat] Loading new messages: chat_id={}, limit={}, after_message_id={:?}", self.chat_id, limit, after_message_id, @@ -161,7 +274,7 @@ impl Chat { .await?; trace!( - "Loaded local chat messages: chat_id={}, messages={}", + "[Chat] Loaded local chat messages: chat_id={}, messages={}", self.chat_id, messages.len() ); @@ -185,7 +298,7 @@ impl Chat { after_message_id: Option, ) -> FlowyResult<()> { trace!( - "Loading chat messages from remote: chat_id={}, limit={}, before_message_id={:?}, after_message_id={:?}", + "[Chat] start loading messages from remote: chat_id={}, limit={}, before_message_id={:?}, after_message_id={:?}", self.chat_id, limit, before_message_id, @@ -228,9 +341,11 @@ impl Chat { let pb = ChatMessageListPB::from(resp); trace!( - "Loaded chat messages from remote: chat_id={}, messages={}", + "[Chat] Loaded messages from remote: chat_id={}, messages={}, hasMore: {}, cursor:{:?}", chat_id, - pb.messages.len() + pb.messages.len(), + pb.has_more, + cursor, ); if matches!(cursor, MessageCursor::BeforeMessageId(_)) { if pb.has_more { @@ -265,7 +380,7 @@ impl Chat { .await?; trace!( - "Related messages: chat_id={}, message_id={}, messages:{:?}", + "[Chat] related messages: chat_id={}, message_id={}, messages:{:?}", self.chat_id, message_id, resp.items @@ -275,20 +390,19 @@ impl Chat { #[instrument(level = "debug", skip_all, err)] pub async fn generate_answer(&self, question_message_id: i64) -> FlowyResult { + trace!( + "[Chat] generate answer: chat_id={}, question_message_id={}", + self.chat_id, + question_message_id + ); let workspace_id = self.user_service.workspace_id()?; - let resp = self + let answer = self .cloud_service .generate_answer(&workspace_id, &self.chat_id, question_message_id) .await?; - save_answer( - self.user_service.sqlite_connection(self.uid)?, - &self.chat_id, - resp.clone(), - question_message_id, - )?; - - let pb = ChatMessagePB::from(resp); + Self::save_answer(self.uid, &self.chat_id, &self.user_service, answer.clone())?; + let pb = ChatMessagePB::from(answer); Ok(pb) } @@ -314,7 +428,6 @@ impl Chat { created_at: record.created_at, author_type: record.author_type, author_id: record.author_id, - has_following: false, reply_message_id: record.reply_message_id, }) .collect::>(); @@ -323,114 +436,6 @@ impl Chat { } } -fn stream_send_chat_messages( - uid: i64, - workspace_id: String, - chat_id: String, - message_content: String, - message_type: ChatMessageType, - cloud_service: Arc, - user_service: Arc, -) { - tokio::spawn(async move { - trace!( - "Sending chat message: chat_id={}, message={}, type={:?}", - chat_id, - message_content, - message_type - ); - - let mut messages = Vec::with_capacity(2); - let stream_result = cloud_service - .send_chat_message(&workspace_id, &chat_id, &message_content, message_type) - .await; - - // By default, stream only returns two messages: - // 1. user message - // 2. ai response message - match stream_result { - Ok(mut stream) => { - while let Some(result) = stream.next().await { - match result { - Ok(message) => { - let mut pb = ChatMessagePB::from(message.clone()); - if matches!(message.author.author_type, ChatAuthorType::Human) { - pb.has_following = true; - send_notification(&chat_id, ChatNotification::LastUserSentMessage) - .payload(pb.clone()) - .send(); - } - - // - send_notification(&chat_id, ChatNotification::DidReceiveChatMessage) - .payload(pb) - .send(); - messages.push(message); - }, - Err(err) => { - error!("stream chat message error: {}", err); - let pb = ChatMessageErrorPB { - chat_id: chat_id.clone(), - content: message_content.clone(), - error_message: "Service Temporarily Unavailable".to_string(), - }; - send_notification(&chat_id, ChatNotification::StreamChatMessageError) - .payload(pb) - .send(); - break; - }, - } - } - }, - Err(err) => { - error!("Failed to send chat message: {}", err); - let pb = ChatMessageErrorPB { - chat_id: chat_id.clone(), - content: message_content.clone(), - error_message: err.to_string(), - }; - send_notification(&chat_id, ChatNotification::StreamChatMessageError) - .payload(pb) - .send(); - return; - }, - } - - if messages.is_empty() { - return; - } - - trace!( - "Saving chat messages to local disk: chat_id={}, messages:{:?}", - chat_id, - messages - ); - - // Insert chat messages to local disk - if let Err(err) = user_service.sqlite_connection(uid).and_then(|conn| { - let records = messages - .into_iter() - .map(|message| ChatMessageTable { - message_id: message.message_id, - chat_id: chat_id.clone(), - content: message.content, - created_at: message.created_at.timestamp(), - author_type: message.author.author_type as i64, - author_id: message.author.author_id.to_string(), - reply_message_id: message.reply_message_id, - }) - .collect::>(); - insert_chat_messages(conn, &records)?; - - // Mark chat as finished - send_notification(&chat_id, ChatNotification::FinishAnswerQuestion).send(); - Ok(()) - }) { - error!("Failed to save chat messages: {}", err); - } - }); -} - fn save_chat_message( conn: DBConnection, chat_id: &str, @@ -451,21 +456,3 @@ fn save_chat_message( insert_chat_messages(conn, &records)?; Ok(()) } -fn save_answer( - conn: DBConnection, - chat_id: &str, - message: ChatMessage, - question_message_id: i64, -) -> FlowyResult<()> { - let record = ChatMessageTable { - message_id: message.message_id, - chat_id: chat_id.to_string(), - content: message.content, - created_at: message.created_at.timestamp(), - author_type: message.author.author_type as i64, - author_id: message.author.author_id.to_string(), - reply_message_id: message.reply_message_id, - }; - insert_answer_message(conn, question_message_id, record)?; - Ok(()) -} diff --git a/frontend/rust-lib/flowy-chat/src/entities.rs b/frontend/rust-lib/flowy-chat/src/entities.rs index e01c2aaad1..4ef687c3c4 100644 --- a/frontend/rust-lib/flowy-chat/src/entities.rs +++ b/frontend/rust-lib/flowy-chat/src/entities.rs @@ -19,6 +19,30 @@ pub struct SendChatPayloadPB { pub message_type: ChatMessageTypePB, } +#[derive(Default, ProtoBuf, Validate, Clone, Debug)] +pub struct StreamChatPayloadPB { + #[pb(index = 1)] + #[validate(custom = "required_not_empty_str")] + pub chat_id: String, + + #[pb(index = 2)] + #[validate(custom = "required_not_empty_str")] + pub message: String, + + #[pb(index = 3)] + pub message_type: ChatMessageTypePB, + + #[pb(index = 4)] + pub text_stream_port: i64, +} + +#[derive(Default, ProtoBuf, Validate, Clone, Debug)] +pub struct StopStreamPB { + #[pb(index = 1)] + #[validate(custom = "required_not_empty_str")] + pub chat_id: String, +} + #[derive(Debug, Default, Clone, ProtoBuf_Enum, PartialEq, Eq, Copy)] pub enum ChatMessageTypePB { #[default] @@ -97,10 +121,7 @@ pub struct ChatMessagePB { #[pb(index = 5)] pub author_id: String, - #[pb(index = 6)] - pub has_following: bool, - - #[pb(index = 7, one_of)] + #[pb(index = 6, one_of)] pub reply_message_id: Option, } @@ -110,9 +131,6 @@ pub struct ChatMessageErrorPB { pub chat_id: String, #[pb(index = 2)] - pub content: String, - - #[pb(index = 3)] pub error_message: String, } @@ -124,7 +142,6 @@ impl From for ChatMessagePB { created_at: chat_message.created_at.timestamp(), author_type: chat_message.author.author_type as i64, author_id: chat_message.author.author_id.to_string(), - has_following: false, reply_message_id: None, } } diff --git a/frontend/rust-lib/flowy-chat/src/event_handler.rs b/frontend/rust-lib/flowy-chat/src/event_handler.rs index 959c0398b4..1d4499c6b2 100644 --- a/frontend/rust-lib/flowy-chat/src/event_handler.rs +++ b/frontend/rust-lib/flowy-chat/src/event_handler.rs @@ -18,10 +18,10 @@ fn upgrade_chat_manager( } #[tracing::instrument(level = "debug", skip_all, err)] -pub(crate) async fn send_chat_message_handler( - data: AFPluginData, +pub(crate) async fn stream_chat_message_handler( + data: AFPluginData, chat_manager: AFPluginState>, -) -> Result<(), FlowyError> { +) -> DataResult { let chat_manager = upgrade_chat_manager(chat_manager)?; let data = data.into_inner(); data.validate()?; @@ -30,10 +30,16 @@ pub(crate) async fn send_chat_message_handler( ChatMessageTypePB::System => ChatMessageType::System, ChatMessageTypePB::User => ChatMessageType::User, }; - chat_manager - .send_chat_message(&data.chat_id, &data.message, message_type) + + let question = chat_manager + .stream_chat_message( + &data.chat_id, + &data.message, + message_type, + data.text_stream_port, + ) .await?; - Ok(()) + data_result_ok(question) } #[tracing::instrument(level = "debug", skip_all, err)] @@ -91,3 +97,16 @@ pub(crate) async fn get_answer_handler( .await?; data_result_ok(message) } + +#[tracing::instrument(level = "debug", skip_all, err)] +pub(crate) async fn stop_stream_handler( + data: AFPluginData, + chat_manager: AFPluginState>, +) -> Result<(), FlowyError> { + let data = data.into_inner(); + data.validate()?; + + let chat_manager = upgrade_chat_manager(chat_manager)?; + chat_manager.stop_stream(&data.chat_id).await?; + Ok(()) +} diff --git a/frontend/rust-lib/flowy-chat/src/event_map.rs b/frontend/rust-lib/flowy-chat/src/event_map.rs index 9fae853459..e3b7828936 100644 --- a/frontend/rust-lib/flowy-chat/src/event_map.rs +++ b/frontend/rust-lib/flowy-chat/src/event_map.rs @@ -12,11 +12,12 @@ pub fn init(chat_manager: Weak) -> AFPlugin { AFPlugin::new() .name("Flowy-Chat") .state(chat_manager) - .event(ChatEvent::SendMessage, send_chat_message_handler) + .event(ChatEvent::StreamMessage, stream_chat_message_handler) .event(ChatEvent::LoadPrevMessage, load_prev_message_handler) .event(ChatEvent::LoadNextMessage, load_next_message_handler) .event(ChatEvent::GetRelatedQuestion, get_related_question_handler) .event(ChatEvent::GetAnswerForQuestion, get_answer_handler) + .event(ChatEvent::StopStream, stop_stream_handler) } #[derive(Clone, Copy, PartialEq, Eq, Debug, Display, Hash, ProtoBuf_Enum, Flowy_Event)] @@ -29,12 +30,15 @@ pub enum ChatEvent { #[event(input = "LoadNextChatMessagePB", output = "ChatMessageListPB")] LoadNextMessage = 1, - #[event(input = "SendChatPayloadPB")] - SendMessage = 2, + #[event(input = "StreamChatPayloadPB", output = "ChatMessagePB")] + StreamMessage = 2, + + #[event(input = "StopStreamPB")] + StopStream = 3, #[event(input = "ChatMessageIdPB", output = "RepeatedRelatedQuestionPB")] - GetRelatedQuestion = 3, + GetRelatedQuestion = 4, #[event(input = "ChatMessageIdPB", output = "ChatMessagePB")] - GetAnswerForQuestion = 4, + GetAnswerForQuestion = 5, } diff --git a/frontend/rust-lib/flowy-chat/src/manager.rs b/frontend/rust-lib/flowy-chat/src/manager.rs index b2cc679eb2..b72cdfb87d 100644 --- a/frontend/rust-lib/flowy-chat/src/manager.rs +++ b/frontend/rust-lib/flowy-chat/src/manager.rs @@ -7,7 +7,7 @@ use flowy_error::{FlowyError, FlowyResult}; use flowy_sqlite::DBConnection; use lib_infra::util::timestamp; use std::sync::Arc; -use tracing::{instrument, trace}; +use tracing::trace; pub trait ChatUserService: Send + Sync + 'static { fn user_id(&self) -> Result; @@ -79,16 +79,18 @@ impl ChatManager { Ok(chat) } - #[instrument(level = "info", skip_all, err)] - pub async fn send_chat_message( + pub async fn stream_chat_message( &self, chat_id: &str, message: &str, message_type: ChatMessageType, - ) -> Result<(), FlowyError> { + text_stream_port: i64, + ) -> Result { let chat = self.get_or_create_chat_instance(chat_id).await?; - chat.send_chat_message(message, message_type).await?; - Ok(()) + let question = chat + .stream_chat_message(message, message_type, text_stream_port) + .await?; + Ok(question) } pub async fn get_or_create_chat_instance(&self, chat_id: &str) -> Result, FlowyError> { @@ -168,6 +170,12 @@ impl ChatManager { let resp = chat.generate_answer(question_message_id).await?; Ok(resp) } + + pub async fn stop_stream(&self, chat_id: &str) -> Result<(), FlowyError> { + let chat = self.get_or_create_chat_instance(chat_id).await?; + chat.stop_stream_message().await; + Ok(()) + } } fn save_chat(conn: DBConnection, chat_id: &str) -> FlowyResult<()> { diff --git a/frontend/rust-lib/flowy-chat/src/notification.rs b/frontend/rust-lib/flowy-chat/src/notification.rs index 830c464a72..12f0470784 100644 --- a/frontend/rust-lib/flowy-chat/src/notification.rs +++ b/frontend/rust-lib/flowy-chat/src/notification.rs @@ -11,8 +11,7 @@ pub enum ChatNotification { DidLoadPrevChatMessage = 2, DidReceiveChatMessage = 3, StreamChatMessageError = 4, - FinishAnswerQuestion = 5, - LastUserSentMessage = 6, + FinishStreaming = 5, } impl std::convert::From for i32 { @@ -27,8 +26,7 @@ impl std::convert::From for ChatNotification { 2 => ChatNotification::DidLoadPrevChatMessage, 3 => ChatNotification::DidReceiveChatMessage, 4 => ChatNotification::StreamChatMessageError, - 5 => ChatNotification::FinishAnswerQuestion, - 6 => ChatNotification::LastUserSentMessage, + 5 => ChatNotification::FinishStreaming, _ => ChatNotification::Unknown, } } diff --git a/frontend/rust-lib/flowy-chat/src/persistence/chat_message_sql.rs b/frontend/rust-lib/flowy-chat/src/persistence/chat_message_sql.rs index 3e65123c27..6d9202def0 100644 --- a/frontend/rust-lib/flowy-chat/src/persistence/chat_message_sql.rs +++ b/frontend/rust-lib/flowy-chat/src/persistence/chat_message_sql.rs @@ -45,41 +45,6 @@ pub fn insert_chat_messages( Ok(()) } -pub fn insert_answer_message( - mut conn: DBConnection, - question_message_id: i64, - message: ChatMessageTable, -) -> FlowyResult<()> { - conn.immediate_transaction(|conn| { - // Step 1: Get the message with the given question_message_id - let question_message = dsl::chat_message_table - .filter(chat_message_table::message_id.eq(question_message_id)) - .first::(conn)?; - - // Step 2: Use reply_message_id from the retrieved message to delete the existing message - if let Some(reply_id) = question_message.reply_message_id { - diesel::delete(dsl::chat_message_table.filter(chat_message_table::message_id.eq(reply_id))) - .execute(conn)?; - } - - // Step 3: Insert the new message - let _ = insert_into(chat_message_table::table) - .values(message) - .on_conflict(chat_message_table::message_id) - .do_update() - .set(( - chat_message_table::content.eq(excluded(chat_message_table::content)), - chat_message_table::created_at.eq(excluded(chat_message_table::created_at)), - chat_message_table::author_type.eq(excluded(chat_message_table::author_type)), - chat_message_table::author_id.eq(excluded(chat_message_table::author_id)), - chat_message_table::reply_message_id.eq(excluded(chat_message_table::reply_message_id)), - )) - .execute(conn)?; - Ok::<(), FlowyError>(()) - })?; - - Ok(()) -} pub fn select_chat_messages( mut conn: DBConnection, chat_id_val: &str, diff --git a/frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps.rs b/frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps.rs index 5c383d204a..e3a86d4fc7 100644 --- a/frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps.rs +++ b/frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps.rs @@ -30,6 +30,7 @@ use tokio::sync::RwLock; use crate::integrate::server::ServerProvider; pub struct FolderDepsResolver(); +#[allow(clippy::too_many_arguments)] impl FolderDepsResolver { pub async fn resolve( authenticate_user: Weak, diff --git a/frontend/rust-lib/flowy-core/src/integrate/trait_impls.rs b/frontend/rust-lib/flowy-core/src/integrate/trait_impls.rs index 2e672dec6c..98f08761e6 100644 --- a/frontend/rust-lib/flowy-core/src/integrate/trait_impls.rs +++ b/frontend/rust-lib/flowy-core/src/integrate/trait_impls.rs @@ -18,6 +18,7 @@ use collab_integrate::collab_builder::{ }; use flowy_chat_pub::cloud::{ ChatCloudService, ChatMessage, ChatMessageStream, MessageCursor, RepeatedChatMessage, + StreamAnswer, }; use flowy_database_pub::cloud::{ CollabDocStateByOid, DatabaseCloudService, DatabaseSnapshot, SummaryRowContent, @@ -476,6 +477,60 @@ impl ChatCloudService for ServerProvider { .await } + fn send_question( + &self, + workspace_id: &str, + chat_id: &str, + message: &str, + message_type: ChatMessageType, + ) -> FutureResult { + let workspace_id = workspace_id.to_string(); + let chat_id = chat_id.to_string(); + let message = message.to_string(); + let server = self.get_server(); + + FutureResult::new(async move { + server? + .chat_service() + .send_question(&workspace_id, &chat_id, &message, message_type) + .await + }) + } + + fn save_answer( + &self, + workspace_id: &str, + chat_id: &str, + message: &str, + question_id: i64, + ) -> FutureResult { + let workspace_id = workspace_id.to_string(); + let chat_id = chat_id.to_string(); + let message = message.to_string(); + let server = self.get_server(); + FutureResult::new(async move { + server? + .chat_service() + .save_answer(&workspace_id, &chat_id, &message, question_id) + .await + }) + } + + async fn stream_answer( + &self, + workspace_id: &str, + chat_id: &str, + message_id: i64, + ) -> Result { + let workspace_id = workspace_id.to_string(); + let chat_id = chat_id.to_string(); + let server = self.get_server()?; + server + .chat_service() + .stream_answer(&workspace_id, &chat_id, message_id) + .await + } + fn get_chat_messages( &self, workspace_id: &str, diff --git a/frontend/rust-lib/flowy-server/src/af_cloud/impls/chat.rs b/frontend/rust-lib/flowy-server/src/af_cloud/impls/chat.rs index cc484a9346..09469f5b35 100644 --- a/frontend/rust-lib/flowy-server/src/af_cloud/impls/chat.rs +++ b/frontend/rust-lib/flowy-server/src/af_cloud/impls/chat.rs @@ -1,9 +1,12 @@ use crate::af_cloud::AFServer; use client_api::entity::ai_dto::RepeatedRelatedQuestion; use client_api::entity::{ - CreateChatMessageParams, CreateChatParams, MessageCursor, RepeatedChatMessage, + CreateAnswerMessageParams, CreateChatMessageParams, CreateChatParams, MessageCursor, + RepeatedChatMessage, +}; +use flowy_chat_pub::cloud::{ + ChatCloudService, ChatMessage, ChatMessageStream, ChatMessageType, StreamAnswer, }; -use flowy_chat_pub::cloud::{ChatCloudService, ChatMessage, ChatMessageStream, ChatMessageType}; use flowy_error::FlowyError; use futures_util::StreamExt; use lib_infra::async_trait::async_trait; @@ -50,22 +53,81 @@ where message: &str, message_type: ChatMessageType, ) -> Result { - let workspace_id = workspace_id.to_string(); - let chat_id = chat_id.to_string(); - let message = message.to_string(); let try_get_client = self.inner.try_get_client(); let params = CreateChatMessageParams { - content: message, + content: message.to_string(), message_type, }; let stream = try_get_client? - .create_chat_message(&workspace_id, &chat_id, params) + .create_chat_qa_message(workspace_id, chat_id, params) .await .map_err(FlowyError::from)?; Ok(stream.boxed()) } + fn send_question( + &self, + workspace_id: &str, + chat_id: &str, + message: &str, + message_type: ChatMessageType, + ) -> FutureResult { + let workspace_id = workspace_id.to_string(); + let chat_id = chat_id.to_string(); + let try_get_client = self.inner.try_get_client(); + let params = CreateChatMessageParams { + content: message.to_string(), + message_type, + }; + + FutureResult::new(async move { + let message = try_get_client? + .create_question(&workspace_id, &chat_id, params) + .await + .map_err(FlowyError::from)?; + Ok(message) + }) + } + + fn save_answer( + &self, + workspace_id: &str, + chat_id: &str, + message: &str, + question_id: i64, + ) -> FutureResult { + let workspace_id = workspace_id.to_string(); + let chat_id = chat_id.to_string(); + let try_get_client = self.inner.try_get_client(); + let params = CreateAnswerMessageParams { + content: message.to_string(), + question_message_id: question_id, + }; + + FutureResult::new(async move { + let message = try_get_client? + .create_answer(&workspace_id, &chat_id, params) + .await + .map_err(FlowyError::from)?; + Ok(message) + }) + } + + async fn stream_answer( + &self, + workspace_id: &str, + chat_id: &str, + message_id: i64, + ) -> Result { + let try_get_client = self.inner.try_get_client(); + let stream = try_get_client? + .stream_answer(workspace_id, chat_id, message_id) + .await + .map_err(FlowyError::from)?; + Ok(stream.boxed()) + } + fn get_chat_messages( &self, workspace_id: &str, @@ -119,7 +181,7 @@ where FutureResult::new(async move { let resp = try_get_client? - .generate_question_answer(&workspace_id, &chat_id, question_message_id) + .get_answer(&workspace_id, &chat_id, question_message_id) .await .map_err(FlowyError::from)?; Ok(resp) diff --git a/frontend/rust-lib/flowy-server/src/default_impl.rs b/frontend/rust-lib/flowy-server/src/default_impl.rs index 654bffcb1c..90d9bf15a7 100644 --- a/frontend/rust-lib/flowy-server/src/default_impl.rs +++ b/frontend/rust-lib/flowy-server/src/default_impl.rs @@ -1,6 +1,6 @@ use client_api::entity::ai_dto::RepeatedRelatedQuestion; use client_api::entity::{ChatMessageType, MessageCursor, RepeatedChatMessage}; -use flowy_chat_pub::cloud::{ChatCloudService, ChatMessage, ChatMessageStream}; +use flowy_chat_pub::cloud::{ChatCloudService, ChatMessage, ChatMessageStream, StreamAnswer}; use flowy_error::FlowyError; use lib_infra::async_trait::async_trait; use lib_infra::future::FutureResult; @@ -30,6 +30,39 @@ impl ChatCloudService for DefaultChatCloudServiceImpl { Err(FlowyError::not_support().with_context("Chat is not supported in local server.")) } + fn send_question( + &self, + _workspace_id: &str, + _chat_id: &str, + _message: &str, + _message_type: ChatMessageType, + ) -> FutureResult { + FutureResult::new(async move { + Err(FlowyError::not_support().with_context("Chat is not supported in local server.")) + }) + } + + fn save_answer( + &self, + _workspace_id: &str, + _chat_id: &str, + _message: &str, + _question_id: i64, + ) -> FutureResult { + FutureResult::new(async move { + Err(FlowyError::not_support().with_context("Chat is not supported in local server.")) + }) + } + + async fn stream_answer( + &self, + _workspace_id: &str, + _chat_id: &str, + _message_id: i64, + ) -> Result { + Err(FlowyError::not_support().with_context("Chat is not supported in local server.")) + } + fn get_chat_messages( &self, _workspace_id: &str, diff --git a/frontend/rust-lib/lib-infra/Cargo.toml b/frontend/rust-lib/lib-infra/Cargo.toml index 82f39cbf1b..c19b7a50cc 100644 --- a/frontend/rust-lib/lib-infra/Cargo.toml +++ b/frontend/rust-lib/lib-infra/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" crate-type = ["cdylib", "rlib"] [dependencies] -chrono = { workspace = true, default-features = false, features = ["clock"] } +chrono = { workspace = true, default-features = false, features = ["clock"] } bytes = { version = "1.5" } pin-project = "1.1.3" futures-core = { version = "0.3" } @@ -21,6 +21,8 @@ tempfile = "3.8.1" validator = "0.16.0" tracing.workspace = true atomic_refcell = "0.1" +allo-isolate = { version = "^0.1", features = ["catch-unwind"], optional = true } +futures = "0.3.30" [dev-dependencies] rand = "0.8.5" @@ -28,7 +30,8 @@ futures = "0.3.30" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] zip = { version = "0.6.6", features = ["deflate"] } -brotli = { version = "3.4.0", optional = true } +brotli = { version = "3.4.0", optional = true } [features] -compression = ["brotli"] \ No newline at end of file +compression = ["brotli"] +isolate_flutter = ["allo-isolate"] diff --git a/frontend/rust-lib/lib-infra/src/isolate_stream.rs b/frontend/rust-lib/lib-infra/src/isolate_stream.rs new file mode 100644 index 0000000000..19f692dda4 --- /dev/null +++ b/frontend/rust-lib/lib-infra/src/isolate_stream.rs @@ -0,0 +1,44 @@ +use allo_isolate::{IntoDart, Isolate}; +use futures::Sink; +use pin_project::pin_project; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[pin_project] +pub struct IsolateSink { + isolate: Isolate, +} + +impl IsolateSink { + pub fn new(isolate: Isolate) -> Self { + Self { isolate } + } +} + +impl Sink for IsolateSink +where + T: IntoDart, +{ + type Error = (); + + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + let this = self.project(); + if this.isolate.post(item) { + Ok(()) + } else { + Err(()) + } + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} diff --git a/frontend/rust-lib/lib-infra/src/lib.rs b/frontend/rust-lib/lib-infra/src/lib.rs index f6f1b4b1b0..18539e49aa 100644 --- a/frontend/rust-lib/lib-infra/src/lib.rs +++ b/frontend/rust-lib/lib-infra/src/lib.rs @@ -19,6 +19,8 @@ if_wasm! { } } +#[cfg(feature = "isolate_flutter")] +pub mod isolate_stream; pub mod priority_task; pub mod ref_map; pub mod util;