use flowy_chat_pub::cloud::ChatMessageType; use std::sync::{Arc, Weak}; use validator::Validate; use crate::tools::AITools; use flowy_error::{FlowyError, FlowyResult}; use lib_dispatch::prelude::{data_result_ok, AFPluginData, AFPluginState, DataResult}; use crate::chat_manager::ChatManager; use crate::entities::*; fn upgrade_chat_manager( chat_manager: AFPluginState>, ) -> FlowyResult> { let chat_manager = chat_manager .upgrade() .ok_or(FlowyError::internal().with_context("The chat manager is already dropped"))?; Ok(chat_manager) } #[tracing::instrument(level = "debug", skip_all, err)] pub(crate) async fn stream_chat_message_handler( data: AFPluginData, chat_manager: AFPluginState>, ) -> DataResult { let chat_manager = upgrade_chat_manager(chat_manager)?; let data = data.into_inner(); data.validate()?; let message_type = match data.message_type { ChatMessageTypePB::System => ChatMessageType::System, ChatMessageTypePB::User => ChatMessageType::User, }; let question = chat_manager .stream_chat_message( &data.chat_id, &data.message, message_type, data.text_stream_port, ) .await?; data_result_ok(question) } #[tracing::instrument(level = "debug", skip_all, err)] pub(crate) async fn load_prev_message_handler( data: AFPluginData, chat_manager: AFPluginState>, ) -> DataResult { let chat_manager = upgrade_chat_manager(chat_manager)?; let data = data.into_inner(); data.validate()?; let messages = chat_manager .load_prev_chat_messages(&data.chat_id, data.limit, data.before_message_id) .await?; data_result_ok(messages) } #[tracing::instrument(level = "debug", skip_all, err)] pub(crate) async fn load_next_message_handler( data: AFPluginData, chat_manager: AFPluginState>, ) -> DataResult { let chat_manager = upgrade_chat_manager(chat_manager)?; let data = data.into_inner(); data.validate()?; let messages = chat_manager .load_latest_chat_messages(&data.chat_id, data.limit, data.after_message_id) .await?; data_result_ok(messages) } #[tracing::instrument(level = "debug", skip_all, err)] pub(crate) async fn get_related_question_handler( data: AFPluginData, chat_manager: AFPluginState>, ) -> DataResult { let chat_manager = upgrade_chat_manager(chat_manager)?; let data = data.into_inner(); let messages = chat_manager .get_related_questions(&data.chat_id, data.message_id) .await?; data_result_ok(messages) } #[tracing::instrument(level = "debug", skip_all, err)] pub(crate) async fn get_answer_handler( data: AFPluginData, chat_manager: AFPluginState>, ) -> DataResult { let chat_manager = upgrade_chat_manager(chat_manager)?; let data = data.into_inner(); let message = chat_manager .generate_answer(&data.chat_id, data.message_id) .await?; data_result_ok(message) } #[tracing::instrument(level = "debug", skip_all, err)] pub(crate) async fn stop_stream_handler( data: AFPluginData, chat_manager: AFPluginState>, ) -> Result<(), FlowyError> { let data = data.into_inner(); data.validate()?; let chat_manager = upgrade_chat_manager(chat_manager)?; chat_manager.stop_stream(&data.chat_id).await?; Ok(()) } #[tracing::instrument(level = "debug", skip_all, err)] pub(crate) async fn start_complete_text_handler( data: AFPluginData, tools: AFPluginState>, ) -> DataResult { let task = tools.create_complete_task(data.into_inner()).await?; data_result_ok(task) } #[tracing::instrument(level = "debug", skip_all, err)] pub(crate) async fn stop_complete_text_handler( data: AFPluginData, tools: AFPluginState>, ) -> Result<(), FlowyError> { let data = data.into_inner(); tools.cancel_complete_task(&data.task_id).await; Ok(()) }