diff --git a/frontend/rust-lib/flowy-sidecar/src/core/mod.rs b/frontend/rust-lib/flowy-sidecar/src/core/mod.rs new file mode 100644 index 0000000000..cc28539a86 --- /dev/null +++ b/frontend/rust-lib/flowy-sidecar/src/core/mod.rs @@ -0,0 +1,5 @@ +pub mod parser; +pub mod plugin; +pub mod rpc_loop; +mod rpc_object; +pub mod rpc_peer; diff --git a/frontend/rust-lib/flowy-sidecar/src/parser.rs b/frontend/rust-lib/flowy-sidecar/src/core/parser.rs similarity index 83% rename from frontend/rust-lib/flowy-sidecar/src/parser.rs rename to frontend/rust-lib/flowy-sidecar/src/core/parser.rs index e65285e075..cd07efd2bd 100644 --- a/frontend/rust-lib/flowy-sidecar/src/parser.rs +++ b/frontend/rust-lib/flowy-sidecar/src/core/parser.rs @@ -1,5 +1,5 @@ +use crate::core::rpc_object::RpcObject; use crate::error::{ReadError, RemoteError}; -use crate::rpc_object::RpcObject; use serde_json::{json, Value}; use std::io::BufRead; @@ -76,6 +76,22 @@ impl ResponseParser for ChatResponseParser { } } +pub struct ChatRelatedQuestionsResponseParser; +impl ResponseParser for ChatRelatedQuestionsResponseParser { + type ValueType = Vec; + + fn parse_response(json: Value) -> Result { + if json.is_object() { + if let Some(message) = json.get("data") { + if let Some(values) = message.as_array() { + return Ok(values.clone()); + } + } + } + return Err(RemoteError::InvalidResponse(json)); + } +} + pub struct SimilarityResponseParser; impl ResponseParser for SimilarityResponseParser { type ValueType = f64; diff --git a/frontend/rust-lib/flowy-sidecar/src/plugin.rs b/frontend/rust-lib/flowy-sidecar/src/core/plugin.rs similarity index 94% rename from frontend/rust-lib/flowy-sidecar/src/plugin.rs rename to frontend/rust-lib/flowy-sidecar/src/core/plugin.rs index d842e58c4f..02e1867558 100644 --- a/frontend/rust-lib/flowy-sidecar/src/plugin.rs +++ b/frontend/rust-lib/flowy-sidecar/src/core/plugin.rs @@ -1,6 +1,5 @@ use crate::error::Error; use crate::manager::WeakSidecarState; -use crate::rpc_loop::RpcLoop; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; @@ -9,6 +8,8 @@ use std::io::BufReader; use std::process::{Child, Stdio}; use std::sync::Arc; +use crate::core::parser::ResponseParser; +use crate::core::rpc_loop::RpcLoop; use anyhow::anyhow; use std::thread; use std::time::Instant; @@ -85,7 +86,11 @@ impl Plugin { self.peer.send_rpc_request(method, params) } - pub async fn async_send_request(&self, method: &str, params: &Value) -> Result { + pub async fn async_send_request( + &self, + method: &str, + params: &Value, + ) -> Result { let (tx, rx) = tokio::sync::oneshot::channel(); self.peer.send_rpc_request_async( method, @@ -97,6 +102,8 @@ impl Plugin { let value = rx .await .map_err(|err| Error::Internal(anyhow!("error waiting for async response: {:?}", err)))??; + + let value = P::parse_response(value)?; Ok(value) } diff --git a/frontend/rust-lib/flowy-sidecar/src/rpc_loop.rs b/frontend/rust-lib/flowy-sidecar/src/core/rpc_loop.rs similarity index 97% rename from frontend/rust-lib/flowy-sidecar/src/rpc_loop.rs rename to frontend/rust-lib/flowy-sidecar/src/core/rpc_loop.rs index 7749889953..5b85da6ca8 100644 --- a/frontend/rust-lib/flowy-sidecar/src/rpc_loop.rs +++ b/frontend/rust-lib/flowy-sidecar/src/core/rpc_loop.rs @@ -1,13 +1,11 @@ +use crate::core::parser::{Call, MessageReader}; +use crate::core::plugin::RpcCtx; +use crate::core::rpc_object::RpcObject; +use crate::core::rpc_peer::{RawPeer, RpcState}; use crate::error::{Error, ReadError, RemoteError}; -use crate::parser::{Call, MessageReader}; -use crate::plugin::RpcCtx; -use crate::rpc_peer::{RawPeer, RpcState}; use serde::de::DeserializeOwned; use serde_json::Value; - use std::io::{BufRead, Write}; - -use crate::rpc_object::RpcObject; use std::sync::Arc; use std::thread; use std::time::Duration; diff --git a/frontend/rust-lib/flowy-sidecar/src/rpc_object.rs b/frontend/rust-lib/flowy-sidecar/src/core/rpc_object.rs similarity index 96% rename from frontend/rust-lib/flowy-sidecar/src/rpc_object.rs rename to frontend/rust-lib/flowy-sidecar/src/core/rpc_object.rs index 2059df69e3..b99e5404e3 100644 --- a/frontend/rust-lib/flowy-sidecar/src/rpc_object.rs +++ b/frontend/rust-lib/flowy-sidecar/src/core/rpc_object.rs @@ -1,5 +1,5 @@ -use crate::parser::{Call, RequestId}; -use crate::rpc_peer::Response; +use crate::core::parser::{Call, RequestId}; +use crate::core::rpc_peer::Response; use serde::de::{DeserializeOwned, Error}; use serde_json::Value; diff --git a/frontend/rust-lib/flowy-sidecar/src/rpc_peer.rs b/frontend/rust-lib/flowy-sidecar/src/core/rpc_peer.rs similarity index 98% rename from frontend/rust-lib/flowy-sidecar/src/rpc_peer.rs rename to frontend/rust-lib/flowy-sidecar/src/core/rpc_peer.rs index 122244dac1..ccef70b1f6 100644 --- a/frontend/rust-lib/flowy-sidecar/src/rpc_peer.rs +++ b/frontend/rust-lib/flowy-sidecar/src/core/rpc_peer.rs @@ -1,6 +1,6 @@ +use crate::core::plugin::{Callback, Peer, PluginId}; +use crate::core::rpc_object::RpcObject; use crate::error::{Error, ReadError, RemoteError}; -use crate::plugin::{Callback, Peer, PluginId}; -use crate::rpc_object::RpcObject; use parking_lot::{Condvar, Mutex}; use serde::{de, ser, Deserialize, Deserializer, Serialize, Serializer}; use serde_json::{json, Value}; diff --git a/frontend/rust-lib/flowy-sidecar/src/lib.rs b/frontend/rust-lib/flowy-sidecar/src/lib.rs index e2bc67f89b..2e508ba35f 100644 --- a/frontend/rust-lib/flowy-sidecar/src/lib.rs +++ b/frontend/rust-lib/flowy-sidecar/src/lib.rs @@ -1,7 +1,4 @@ +pub mod core; mod error; pub mod manager; -pub mod parser; -pub mod plugin; -mod rpc_loop; -mod rpc_object; -mod rpc_peer; +pub mod plugins; diff --git a/frontend/rust-lib/flowy-sidecar/src/manager.rs b/frontend/rust-lib/flowy-sidecar/src/manager.rs index 19ff59a261..737381229e 100644 --- a/frontend/rust-lib/flowy-sidecar/src/manager.rs +++ b/frontend/rust-lib/flowy-sidecar/src/manager.rs @@ -1,8 +1,8 @@ +use crate::core::parser::ResponseParser; +use crate::core::plugin::{start_plugin_process, Plugin, PluginId, PluginInfo, RpcCtx}; +use crate::core::rpc_loop::Handler; +use crate::core::rpc_peer::PluginCommand; use crate::error::{Error, ReadError, RemoteError}; -use crate::parser::ResponseParser; -use crate::plugin::{start_plugin_process, Plugin, PluginId, PluginInfo, RpcCtx}; -use crate::rpc_loop::Handler; -use crate::rpc_peer::PluginCommand; use anyhow::anyhow; use lib_infra::util::{get_operating_system, OperatingSystem}; use parking_lot::Mutex; @@ -41,14 +41,14 @@ impl SidecarManager { Ok(plugin_id) } - pub async fn get_plugin(&self, plugin_id: PluginId) -> Result { + pub async fn get_plugin(&self, plugin_id: PluginId) -> Result, Error> { let state = self.state.lock(); let plugin = state .plugins .iter() .find(|p| p.id == plugin_id) .ok_or(anyhow!("plugin not found"))?; - Ok(plugin.clone()) + Ok(Arc::downgrade(plugin)) } pub async fn remove_plugin(&self, id: PluginId) -> Result<(), Error> { @@ -118,14 +118,13 @@ impl SidecarManager { .iter() .find(|p| p.id == id) .ok_or(anyhow!("plugin not found"))?; - let resp = plugin.async_send_request(method, &request).await?; - let value = P::parse_response(resp)?; + let value = plugin.async_send_request::

(method, &request).await?; Ok(value) } } pub struct SidecarState { - plugins: Vec, + plugins: Vec>, } impl SidecarState { @@ -133,7 +132,7 @@ impl SidecarState { match plugin { Ok(plugin) => { trace!("plugin connected: {:?}", plugin.id); - self.plugins.push(plugin); + self.plugins.push(Arc::new(plugin)); }, Err(err) => { warn!("plugin failed to connect: {:?}", err); diff --git a/frontend/rust-lib/flowy-sidecar/src/plugins/chat_plugin.rs b/frontend/rust-lib/flowy-sidecar/src/plugins/chat_plugin.rs new file mode 100644 index 0000000000..59e196fdaf --- /dev/null +++ b/frontend/rust-lib/flowy-sidecar/src/plugins/chat_plugin.rs @@ -0,0 +1,50 @@ +use crate::core::parser::{ChatRelatedQuestionsResponseParser, ChatResponseParser}; +use crate::core::plugin::{Plugin, PluginId}; +use crate::error::Error; +use anyhow::anyhow; +use serde_json::json; +use std::sync::Weak; + +pub struct ChatPluginOperation { + plugin: Weak, +} + +impl ChatPluginOperation { + pub fn new(plugin: Weak) -> Self { + ChatPluginOperation { plugin } + } + + pub async fn send_message( + &self, + chat_id: &str, + plugin_id: PluginId, + message: &str, + ) -> Result { + let plugin = self + .plugin + .upgrade() + .ok_or(Error::Internal(anyhow!("Plugin is dropped")))?; + + let params = json!({"chat_id": chat_id, "method": "answer", "params": {"content": message}}); + let resp = plugin + .async_send_request::("handle", ¶ms) + .await?; + Ok(resp) + } + + pub async fn get_related_questions( + &self, + chat_id: &str, + ) -> Result, Error> { + let plugin = self + .plugin + .upgrade() + .ok_or(Error::Internal(anyhow!("Plugin is dropped")))?; + + let params = json!({"chat_id": chat_id, "method": "related_question"}); + let resp = plugin + .async_send_request::("handle", ¶ms) + .await?; + Ok(resp) + } +} diff --git a/frontend/rust-lib/flowy-sidecar/src/plugins/embedding_plugin.rs b/frontend/rust-lib/flowy-sidecar/src/plugins/embedding_plugin.rs new file mode 100644 index 0000000000..5f1e065f37 --- /dev/null +++ b/frontend/rust-lib/flowy-sidecar/src/plugins/embedding_plugin.rs @@ -0,0 +1,30 @@ +use crate::core::parser::{ + ChatRelatedQuestionsResponseParser, ChatResponseParser, SimilarityResponseParser, +}; +use crate::core::plugin::{Plugin, PluginId}; +use crate::error::Error; +use anyhow::anyhow; +use serde_json::json; +use std::sync::Weak; + +pub struct EmbeddingPluginOperation { + plugin: Weak, +} + +impl EmbeddingPluginOperation { + pub fn new(plugin: Weak) -> Self { + EmbeddingPluginOperation { plugin } + } + + pub async fn calculate_similarity(&self, message1: &str, message2: &str) -> Result { + let plugin = self + .plugin + .upgrade() + .ok_or(Error::Internal(anyhow!("Plugin is dropped")))?; + let params = + json!({"method": "calculate_similarity", "params": {"src": message1, "dest": message2}}); + plugin + .async_send_request::("handle", ¶ms) + .await + } +} diff --git a/frontend/rust-lib/flowy-sidecar/src/plugins/mod.rs b/frontend/rust-lib/flowy-sidecar/src/plugins/mod.rs new file mode 100644 index 0000000000..1688717c79 --- /dev/null +++ b/frontend/rust-lib/flowy-sidecar/src/plugins/mod.rs @@ -0,0 +1,2 @@ +pub mod chat_plugin; +pub mod embedding_plugin; diff --git a/frontend/rust-lib/flowy-sidecar/tests/chat_test/mod.rs b/frontend/rust-lib/flowy-sidecar/tests/chat_test/mod.rs index 030d15ab6e..b5b324cba5 100644 --- a/frontend/rust-lib/flowy-sidecar/tests/chat_test/mod.rs +++ b/frontend/rust-lib/flowy-sidecar/tests/chat_test/mod.rs @@ -11,5 +11,9 @@ async fn load_chat_model_test() { let embedding_plugin_id = test.init_embedding_plugin().await; let score = test.calculate_similarity(embedding_plugin_id, &resp, "Hello! How can I help you today? Is there something specific you would like to know or discuss").await; assert!(score > 0.8); + + // let questions = test.related_question(&chat_id, plugin_id).await; + // assert_eq!(questions.len(), 3); + // eprintln!("related questions: {:?}", questions); } } diff --git a/frontend/rust-lib/flowy-sidecar/tests/util.rs b/frontend/rust-lib/flowy-sidecar/tests/util.rs index b6886d0ad3..2d7818225a 100644 --- a/frontend/rust-lib/flowy-sidecar/tests/util.rs +++ b/frontend/rust-lib/flowy-sidecar/tests/util.rs @@ -1,10 +1,12 @@ use anyhow::Result; use flowy_sidecar::manager::SidecarManager; -use flowy_sidecar::parser::{ChatResponseParser, SimilarityResponseParser}; -use flowy_sidecar::plugin::{PluginId, PluginInfo}; use serde_json::json; -use std::sync::Once; +use std::sync::{Arc, Once}; +use flowy_sidecar::core::parser::{ChatResponseParser, SimilarityResponseParser}; +use flowy_sidecar::core::plugin::{PluginId, PluginInfo}; +use flowy_sidecar::plugins::chat_plugin::ChatPluginOperation; +use flowy_sidecar::plugins::embedding_plugin::EmbeddingPluginOperation; use tracing_subscriber::fmt::Subscriber; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; @@ -60,32 +62,37 @@ impl LocalAITest { } pub async fn send_message(&self, chat_id: &str, plugin_id: PluginId, message: &str) -> String { - let resp = self - .manager - .async_send_request::( - plugin_id, - "handle", - json!({"chat_id": chat_id, "method": "answer", "params": {"content": message}}), - ) + let plugin = self.manager.get_plugin(plugin_id).await.unwrap(); + let operation = ChatPluginOperation::new(plugin); + let resp = operation + .send_message(chat_id, plugin_id, message) .await .unwrap(); resp } + pub async fn related_question( + &self, + chat_id: &str, + plugin_id: PluginId, + ) -> Vec { + let plugin = self.manager.get_plugin(plugin_id).await.unwrap(); + let operation = ChatPluginOperation::new(plugin); + let resp = operation.get_related_questions(chat_id).await.unwrap(); + resp + } + pub async fn calculate_similarity( &self, plugin_id: PluginId, message1: &str, message2: &str, ) -> f64 { - self - .manager - .async_send_request::( - plugin_id, - "handle", - json!({"method": "calculate_similarity", "params": {"src": message1, "dest": message2}}), - ) + let plugin = self.manager.get_plugin(plugin_id).await.unwrap(); + let operation = EmbeddingPluginOperation::new(plugin); + operation + .calculate_similarity(message1, message2) .await .unwrap() }