chore: add plugins

This commit is contained in:
nathan
2024-06-27 07:52:17 +08:00
parent 6dea12fd15
commit 06950e5543
13 changed files with 160 additions and 45 deletions

View File

@ -0,0 +1,5 @@
pub mod parser;
pub mod plugin;
pub mod rpc_loop;
mod rpc_object;
pub mod rpc_peer;

View File

@ -1,5 +1,5 @@
use crate::core::rpc_object::RpcObject;
use crate::error::{ReadError, RemoteError};
use crate::rpc_object::RpcObject;
use serde_json::{json, Value};
use std::io::BufRead;
@ -76,6 +76,22 @@ impl ResponseParser for ChatResponseParser {
}
}
pub struct ChatRelatedQuestionsResponseParser;
impl ResponseParser for ChatRelatedQuestionsResponseParser {
type ValueType = Vec<Value>;
fn parse_response(json: Value) -> Result<Self::ValueType, RemoteError> {
if json.is_object() {
if let Some(message) = json.get("data") {
if let Some(values) = message.as_array() {
return Ok(values.clone());
}
}
}
return Err(RemoteError::InvalidResponse(json));
}
}
pub struct SimilarityResponseParser;
impl ResponseParser for SimilarityResponseParser {
type ValueType = f64;

View File

@ -1,6 +1,5 @@
use crate::error::Error;
use crate::manager::WeakSidecarState;
use crate::rpc_loop::RpcLoop;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
@ -9,6 +8,8 @@ use std::io::BufReader;
use std::process::{Child, Stdio};
use std::sync::Arc;
use crate::core::parser::ResponseParser;
use crate::core::rpc_loop::RpcLoop;
use anyhow::anyhow;
use std::thread;
use std::time::Instant;
@ -85,7 +86,11 @@ impl Plugin {
self.peer.send_rpc_request(method, params)
}
pub async fn async_send_request(&self, method: &str, params: &Value) -> Result<Value, Error> {
pub async fn async_send_request<P: ResponseParser>(
&self,
method: &str,
params: &Value,
) -> Result<P::ValueType, Error> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.peer.send_rpc_request_async(
method,
@ -97,6 +102,8 @@ impl Plugin {
let value = rx
.await
.map_err(|err| Error::Internal(anyhow!("error waiting for async response: {:?}", err)))??;
let value = P::parse_response(value)?;
Ok(value)
}

View File

@ -1,13 +1,11 @@
use crate::core::parser::{Call, MessageReader};
use crate::core::plugin::RpcCtx;
use crate::core::rpc_object::RpcObject;
use crate::core::rpc_peer::{RawPeer, RpcState};
use crate::error::{Error, ReadError, RemoteError};
use crate::parser::{Call, MessageReader};
use crate::plugin::RpcCtx;
use crate::rpc_peer::{RawPeer, RpcState};
use serde::de::DeserializeOwned;
use serde_json::Value;
use std::io::{BufRead, Write};
use crate::rpc_object::RpcObject;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

View File

@ -1,5 +1,5 @@
use crate::parser::{Call, RequestId};
use crate::rpc_peer::Response;
use crate::core::parser::{Call, RequestId};
use crate::core::rpc_peer::Response;
use serde::de::{DeserializeOwned, Error};
use serde_json::Value;

View File

@ -1,6 +1,6 @@
use crate::core::plugin::{Callback, Peer, PluginId};
use crate::core::rpc_object::RpcObject;
use crate::error::{Error, ReadError, RemoteError};
use crate::plugin::{Callback, Peer, PluginId};
use crate::rpc_object::RpcObject;
use parking_lot::{Condvar, Mutex};
use serde::{de, ser, Deserialize, Deserializer, Serialize, Serializer};
use serde_json::{json, Value};

View File

@ -1,7 +1,4 @@
pub mod core;
mod error;
pub mod manager;
pub mod parser;
pub mod plugin;
mod rpc_loop;
mod rpc_object;
mod rpc_peer;
pub mod plugins;

View File

@ -1,8 +1,8 @@
use crate::core::parser::ResponseParser;
use crate::core::plugin::{start_plugin_process, Plugin, PluginId, PluginInfo, RpcCtx};
use crate::core::rpc_loop::Handler;
use crate::core::rpc_peer::PluginCommand;
use crate::error::{Error, ReadError, RemoteError};
use crate::parser::ResponseParser;
use crate::plugin::{start_plugin_process, Plugin, PluginId, PluginInfo, RpcCtx};
use crate::rpc_loop::Handler;
use crate::rpc_peer::PluginCommand;
use anyhow::anyhow;
use lib_infra::util::{get_operating_system, OperatingSystem};
use parking_lot::Mutex;
@ -41,14 +41,14 @@ impl SidecarManager {
Ok(plugin_id)
}
pub async fn get_plugin(&self, plugin_id: PluginId) -> Result<Plugin, Error> {
pub async fn get_plugin(&self, plugin_id: PluginId) -> Result<Weak<Plugin>, Error> {
let state = self.state.lock();
let plugin = state
.plugins
.iter()
.find(|p| p.id == plugin_id)
.ok_or(anyhow!("plugin not found"))?;
Ok(plugin.clone())
Ok(Arc::downgrade(plugin))
}
pub async fn remove_plugin(&self, id: PluginId) -> Result<(), Error> {
@ -118,14 +118,13 @@ impl SidecarManager {
.iter()
.find(|p| p.id == id)
.ok_or(anyhow!("plugin not found"))?;
let resp = plugin.async_send_request(method, &request).await?;
let value = P::parse_response(resp)?;
let value = plugin.async_send_request::<P>(method, &request).await?;
Ok(value)
}
}
pub struct SidecarState {
plugins: Vec<Plugin>,
plugins: Vec<Arc<Plugin>>,
}
impl SidecarState {
@ -133,7 +132,7 @@ impl SidecarState {
match plugin {
Ok(plugin) => {
trace!("plugin connected: {:?}", plugin.id);
self.plugins.push(plugin);
self.plugins.push(Arc::new(plugin));
},
Err(err) => {
warn!("plugin failed to connect: {:?}", err);

View File

@ -0,0 +1,50 @@
use crate::core::parser::{ChatRelatedQuestionsResponseParser, ChatResponseParser};
use crate::core::plugin::{Plugin, PluginId};
use crate::error::Error;
use anyhow::anyhow;
use serde_json::json;
use std::sync::Weak;
pub struct ChatPluginOperation {
plugin: Weak<Plugin>,
}
impl ChatPluginOperation {
pub fn new(plugin: Weak<Plugin>) -> Self {
ChatPluginOperation { plugin }
}
pub async fn send_message(
&self,
chat_id: &str,
plugin_id: PluginId,
message: &str,
) -> Result<String, Error> {
let plugin = self
.plugin
.upgrade()
.ok_or(Error::Internal(anyhow!("Plugin is dropped")))?;
let params = json!({"chat_id": chat_id, "method": "answer", "params": {"content": message}});
let resp = plugin
.async_send_request::<ChatResponseParser>("handle", &params)
.await?;
Ok(resp)
}
pub async fn get_related_questions(
&self,
chat_id: &str,
) -> Result<Vec<serde_json::Value>, Error> {
let plugin = self
.plugin
.upgrade()
.ok_or(Error::Internal(anyhow!("Plugin is dropped")))?;
let params = json!({"chat_id": chat_id, "method": "related_question"});
let resp = plugin
.async_send_request::<ChatRelatedQuestionsResponseParser>("handle", &params)
.await?;
Ok(resp)
}
}

View File

@ -0,0 +1,30 @@
use crate::core::parser::{
ChatRelatedQuestionsResponseParser, ChatResponseParser, SimilarityResponseParser,
};
use crate::core::plugin::{Plugin, PluginId};
use crate::error::Error;
use anyhow::anyhow;
use serde_json::json;
use std::sync::Weak;
pub struct EmbeddingPluginOperation {
plugin: Weak<Plugin>,
}
impl EmbeddingPluginOperation {
pub fn new(plugin: Weak<Plugin>) -> Self {
EmbeddingPluginOperation { plugin }
}
pub async fn calculate_similarity(&self, message1: &str, message2: &str) -> Result<f64, Error> {
let plugin = self
.plugin
.upgrade()
.ok_or(Error::Internal(anyhow!("Plugin is dropped")))?;
let params =
json!({"method": "calculate_similarity", "params": {"src": message1, "dest": message2}});
plugin
.async_send_request::<SimilarityResponseParser>("handle", &params)
.await
}
}

View File

@ -0,0 +1,2 @@
pub mod chat_plugin;
pub mod embedding_plugin;

View File

@ -11,5 +11,9 @@ async fn load_chat_model_test() {
let embedding_plugin_id = test.init_embedding_plugin().await;
let score = test.calculate_similarity(embedding_plugin_id, &resp, "Hello! How can I help you today? Is there something specific you would like to know or discuss").await;
assert!(score > 0.8);
// let questions = test.related_question(&chat_id, plugin_id).await;
// assert_eq!(questions.len(), 3);
// eprintln!("related questions: {:?}", questions);
}
}

View File

@ -1,10 +1,12 @@
use anyhow::Result;
use flowy_sidecar::manager::SidecarManager;
use flowy_sidecar::parser::{ChatResponseParser, SimilarityResponseParser};
use flowy_sidecar::plugin::{PluginId, PluginInfo};
use serde_json::json;
use std::sync::Once;
use std::sync::{Arc, Once};
use flowy_sidecar::core::parser::{ChatResponseParser, SimilarityResponseParser};
use flowy_sidecar::core::plugin::{PluginId, PluginInfo};
use flowy_sidecar::plugins::chat_plugin::ChatPluginOperation;
use flowy_sidecar::plugins::embedding_plugin::EmbeddingPluginOperation;
use tracing_subscriber::fmt::Subscriber;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
@ -60,32 +62,37 @@ impl LocalAITest {
}
pub async fn send_message(&self, chat_id: &str, plugin_id: PluginId, message: &str) -> String {
let resp = self
.manager
.async_send_request::<ChatResponseParser>(
plugin_id,
"handle",
json!({"chat_id": chat_id, "method": "answer", "params": {"content": message}}),
)
let plugin = self.manager.get_plugin(plugin_id).await.unwrap();
let operation = ChatPluginOperation::new(plugin);
let resp = operation
.send_message(chat_id, plugin_id, message)
.await
.unwrap();
resp
}
pub async fn related_question(
&self,
chat_id: &str,
plugin_id: PluginId,
) -> Vec<serde_json::Value> {
let plugin = self.manager.get_plugin(plugin_id).await.unwrap();
let operation = ChatPluginOperation::new(plugin);
let resp = operation.get_related_questions(chat_id).await.unwrap();
resp
}
pub async fn calculate_similarity(
&self,
plugin_id: PluginId,
message1: &str,
message2: &str,
) -> f64 {
self
.manager
.async_send_request::<SimilarityResponseParser>(
plugin_id,
"handle",
json!({"method": "calculate_similarity", "params": {"src": message1, "dest": message2}}),
)
let plugin = self.manager.get_plugin(plugin_id).await.unwrap();
let operation = EmbeddingPluginOperation::new(plugin);
operation
.calculate_similarity(message1, message2)
.await
.unwrap()
}