feat: Stream chat message (#5498)

* chore: stream message

* chore: stream message

* chore: fix streaming

* chore: fix clippy
This commit is contained in:
Nathan.fooo
2024-06-09 14:02:32 +08:00
committed by GitHub
parent 94060a0a99
commit bb3e9d5bd8
46 changed files with 1691 additions and 870 deletions

View File

@ -1,9 +1,12 @@
use crate::af_cloud::AFServer;
use client_api::entity::ai_dto::RepeatedRelatedQuestion;
use client_api::entity::{
CreateChatMessageParams, CreateChatParams, MessageCursor, RepeatedChatMessage,
CreateAnswerMessageParams, CreateChatMessageParams, CreateChatParams, MessageCursor,
RepeatedChatMessage,
};
use flowy_chat_pub::cloud::{
ChatCloudService, ChatMessage, ChatMessageStream, ChatMessageType, StreamAnswer,
};
use flowy_chat_pub::cloud::{ChatCloudService, ChatMessage, ChatMessageStream, ChatMessageType};
use flowy_error::FlowyError;
use futures_util::StreamExt;
use lib_infra::async_trait::async_trait;
@ -50,22 +53,81 @@ where
message: &str,
message_type: ChatMessageType,
) -> Result<ChatMessageStream, FlowyError> {
let workspace_id = workspace_id.to_string();
let chat_id = chat_id.to_string();
let message = message.to_string();
let try_get_client = self.inner.try_get_client();
let params = CreateChatMessageParams {
content: message,
content: message.to_string(),
message_type,
};
let stream = try_get_client?
.create_chat_message(&workspace_id, &chat_id, params)
.create_chat_qa_message(workspace_id, chat_id, params)
.await
.map_err(FlowyError::from)?;
Ok(stream.boxed())
}
fn send_question(
&self,
workspace_id: &str,
chat_id: &str,
message: &str,
message_type: ChatMessageType,
) -> FutureResult<ChatMessage, FlowyError> {
let workspace_id = workspace_id.to_string();
let chat_id = chat_id.to_string();
let try_get_client = self.inner.try_get_client();
let params = CreateChatMessageParams {
content: message.to_string(),
message_type,
};
FutureResult::new(async move {
let message = try_get_client?
.create_question(&workspace_id, &chat_id, params)
.await
.map_err(FlowyError::from)?;
Ok(message)
})
}
fn save_answer(
&self,
workspace_id: &str,
chat_id: &str,
message: &str,
question_id: i64,
) -> FutureResult<ChatMessage, FlowyError> {
let workspace_id = workspace_id.to_string();
let chat_id = chat_id.to_string();
let try_get_client = self.inner.try_get_client();
let params = CreateAnswerMessageParams {
content: message.to_string(),
question_message_id: question_id,
};
FutureResult::new(async move {
let message = try_get_client?
.create_answer(&workspace_id, &chat_id, params)
.await
.map_err(FlowyError::from)?;
Ok(message)
})
}
async fn stream_answer(
&self,
workspace_id: &str,
chat_id: &str,
message_id: i64,
) -> Result<StreamAnswer, FlowyError> {
let try_get_client = self.inner.try_get_client();
let stream = try_get_client?
.stream_answer(workspace_id, chat_id, message_id)
.await
.map_err(FlowyError::from)?;
Ok(stream.boxed())
}
fn get_chat_messages(
&self,
workspace_id: &str,
@ -119,7 +181,7 @@ where
FutureResult::new(async move {
let resp = try_get_client?
.generate_question_answer(&workspace_id, &chat_id, question_message_id)
.get_answer(&workspace_id, &chat_id, question_message_id)
.await
.map_err(FlowyError::from)?;
Ok(resp)

View File

@ -1,6 +1,6 @@
use client_api::entity::ai_dto::RepeatedRelatedQuestion;
use client_api::entity::{ChatMessageType, MessageCursor, RepeatedChatMessage};
use flowy_chat_pub::cloud::{ChatCloudService, ChatMessage, ChatMessageStream};
use flowy_chat_pub::cloud::{ChatCloudService, ChatMessage, ChatMessageStream, StreamAnswer};
use flowy_error::FlowyError;
use lib_infra::async_trait::async_trait;
use lib_infra::future::FutureResult;
@ -30,6 +30,39 @@ impl ChatCloudService for DefaultChatCloudServiceImpl {
Err(FlowyError::not_support().with_context("Chat is not supported in local server."))
}
fn send_question(
&self,
_workspace_id: &str,
_chat_id: &str,
_message: &str,
_message_type: ChatMessageType,
) -> FutureResult<ChatMessage, FlowyError> {
FutureResult::new(async move {
Err(FlowyError::not_support().with_context("Chat is not supported in local server."))
})
}
fn save_answer(
&self,
_workspace_id: &str,
_chat_id: &str,
_message: &str,
_question_id: i64,
) -> FutureResult<ChatMessage, FlowyError> {
FutureResult::new(async move {
Err(FlowyError::not_support().with_context("Chat is not supported in local server."))
})
}
async fn stream_answer(
&self,
_workspace_id: &str,
_chat_id: &str,
_message_id: i64,
) -> Result<StreamAnswer, FlowyError> {
Err(FlowyError::not_support().with_context("Chat is not supported in local server."))
}
fn get_chat_messages(
&self,
_workspace_id: &str,