chore: clippy

This commit is contained in:
nathan 2024-06-27 15:17:10 +08:00
parent d50194df25
commit 9472361664
10 changed files with 146 additions and 74 deletions

View File

@ -2148,6 +2148,7 @@ dependencies = [
"anyhow",
"crossbeam-utils",
"dotenv",
"futures",
"lib-infra",
"log",
"once_cell",

View File

@ -278,11 +278,11 @@ impl ChatCloudService for ChatService {
limit: u64,
) -> FutureResult<RepeatedChatMessage, FlowyError> {
FutureResult::new(async move {
RepeatedChatMessage {
Ok(RepeatedChatMessage {
messages: vec![],
has_more: false,
total: 0,
}
})
})
}
@ -293,10 +293,10 @@ impl ChatCloudService for ChatService {
message_id: i64,
) -> FutureResult<RepeatedRelatedQuestion, FlowyError> {
FutureResult::new(async move {
RepeatedRelatedQuestion {
Ok(RepeatedRelatedQuestion {
message_id,
items: vec![],
}
})
})
}

View File

@ -17,6 +17,7 @@ crossbeam-utils = "0.8.20"
log = "0.4.21"
parking_lot.workspace = true
lib-infra.workspace = true
futures.workspace = true
[dev-dependencies]

View File

@ -1,6 +1,7 @@
use crate::core::rpc_object::RpcObject;
use crate::core::rpc_peer::ResponsePayload;
use crate::error::{ReadError, RemoteError};
use serde_json::{json, Value};
use serde_json::{json, Value as JsonValue};
use std::io::BufRead;
#[derive(Debug, Default)]
@ -30,7 +31,7 @@ impl MessageReader {
/// This should not be called directly unless you are writing tests.
#[doc(hidden)]
pub fn parse(&self, s: &str) -> Result<RpcObject, ReadError> {
match serde_json::from_str::<Value>(s) {
match serde_json::from_str::<JsonValue>(s) {
Ok(val) => {
if !val.is_object() {
Err(ReadError::NotObject(s.to_string()))
@ -44,10 +45,10 @@ impl MessageReader {
}
pub type RequestId = u64;
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone)]
/// An RPC call, which may be either a notification or a request.
pub enum Call<R> {
Message(Value),
Message(JsonValue),
/// An id and an RPC Request
Request(RequestId, R),
/// A malformed request: the request contained an id, but could
@ -57,17 +58,17 @@ pub enum Call<R> {
pub trait ResponseParser {
type ValueType;
fn parse_response(json: serde_json::Value) -> Result<Self::ValueType, RemoteError>;
fn parse_response(payload: JsonValue) -> Result<Self::ValueType, RemoteError>;
}
pub struct ChatResponseParser;
impl ResponseParser for ChatResponseParser {
type ValueType = String;
fn parse_response(json: Value) -> Result<Self::ValueType, RemoteError> {
fn parse_response(json: JsonValue) -> Result<Self::ValueType, RemoteError> {
if json.is_object() {
if let Some(message) = json.get("data") {
if let Some(message) = message.as_str() {
if let Some(data) = json.get("data") {
if let Some(message) = data.as_str() {
return Ok(message.to_string());
}
}
@ -78,12 +79,12 @@ impl ResponseParser for ChatResponseParser {
pub struct ChatRelatedQuestionsResponseParser;
impl ResponseParser for ChatRelatedQuestionsResponseParser {
type ValueType = Vec<Value>;
type ValueType = Vec<JsonValue>;
fn parse_response(json: Value) -> Result<Self::ValueType, RemoteError> {
fn parse_response(json: JsonValue) -> Result<Self::ValueType, RemoteError> {
if json.is_object() {
if let Some(message) = json.get("data") {
if let Some(values) = message.as_array() {
if let Some(data) = json.get("data") {
if let Some(values) = data.as_array() {
return Ok(values.clone());
}
}
@ -96,7 +97,7 @@ pub struct SimilarityResponseParser;
impl ResponseParser for SimilarityResponseParser {
type ValueType = f64;
fn parse_response(json: Value) -> Result<Self::ValueType, RemoteError> {
fn parse_response(json: JsonValue) -> Result<Self::ValueType, RemoteError> {
if json.is_object() {
if let Some(data) = json.get("data") {
if let Some(score) = data.get("score").and_then(|v| v.as_f64()) {
@ -104,6 +105,7 @@ impl ResponseParser for SimilarityResponseParser {
}
}
}
return Err(RemoteError::InvalidResponse(json));
}
}

View File

@ -1,16 +1,15 @@
use crate::error::Error;
use crate::manager::WeakSidecarState;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::io::BufReader;
use std::process::{Child, Stdio};
use std::sync::Arc;
use crate::core::parser::ResponseParser;
use crate::core::rpc_loop::RpcLoop;
use crate::core::rpc_peer::{Callback, ResponsePayload};
use anyhow::anyhow;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value as JsonValue};
use std::io::BufReader;
use std::process::{Child, Stdio};
use std::sync::Arc;
use std::thread;
use std::time::Instant;
use tracing::{error, info};
@ -26,16 +25,6 @@ impl From<i64> for PluginId {
}
}
pub trait Callback: Send {
fn call(self: Box<Self>, result: Result<Value, Error>);
}
impl<F: Send + FnOnce(Result<Value, Error>)> Callback for F {
fn call(self: Box<F>, result: Result<Value, Error>) {
(*self)(result)
}
}
/// The `Peer` trait defines the interface for the opposite side of the RPC channel,
/// designed to be used behind a pointer or as a trait object.
pub trait Peer: Send + Sync + 'static {
@ -43,14 +32,14 @@ pub trait Peer: Send + Sync + 'static {
fn box_clone(&self) -> Arc<dyn Peer>;
/// Sends an RPC notification to the peer with the specified method and parameters.
fn send_rpc_notification(&self, method: &str, params: &Value);
fn send_rpc_notification(&self, method: &str, params: &JsonValue);
/// Sends an asynchronous RPC request to the peer and executes the provided callback upon completion.
fn send_rpc_request_async(&self, method: &str, params: &Value, f: Box<dyn Callback>);
fn send_rpc_request_async(&self, method: &str, params: &JsonValue, f: Box<dyn Callback>);
/// Sends a synchronous RPC request to the peer and waits for the result.
/// Returns the result of the request or an error.
fn send_rpc_request(&self, method: &str, params: &Value) -> Result<Value, Error>;
fn send_rpc_request(&self, method: &str, params: &JsonValue) -> Result<JsonValue, Error>;
/// Checks if there is an incoming request pending, intended to reduce latency for bulk operations done in the background.
fn request_is_pending(&self) -> bool;
@ -77,19 +66,19 @@ pub struct Plugin {
}
impl Plugin {
pub fn initialize(&self, value: Value) -> Result<(), Error> {
pub fn initialize(&self, value: JsonValue) -> Result<(), Error> {
self.peer.send_rpc_request("initialize", &value)?;
Ok(())
}
pub fn send_request(&self, method: &str, params: &Value) -> Result<Value, Error> {
pub fn send_request(&self, method: &str, params: &JsonValue) -> Result<JsonValue, Error> {
self.peer.send_rpc_request(method, params)
}
pub async fn async_send_request<P: ResponseParser>(
&self,
method: &str,
params: &Value,
params: &JsonValue,
) -> Result<P::ValueType, Error> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.peer.send_rpc_request_async(
@ -140,7 +129,7 @@ pub(crate) async fn start_plugin_process(
let mut looper = RpcLoop::new(child_stdin);
let peer: RpcPeer = Arc::new(looper.get_raw_peer());
let name = plugin_info.name.clone();
peer.send_rpc_notification("ping", &Value::Array(Vec::new()));
peer.send_rpc_notification("ping", &JsonValue::Array(Vec::new()));
let plugin = Plugin {
peer,

View File

@ -1,7 +1,7 @@
use crate::core::parser::{Call, MessageReader};
use crate::core::plugin::RpcCtx;
use crate::core::rpc_object::RpcObject;
use crate::core::rpc_peer::{RawPeer, RpcState};
use crate::core::rpc_peer::{RawPeer, ResponsePayload, RpcState};
use crate::error::{Error, ReadError, RemoteError};
use serde::de::DeserializeOwned;
use serde_json::Value;
@ -15,7 +15,11 @@ const MAX_IDLE_WAIT: Duration = Duration::from_millis(5);
pub trait Handler {
type Request: DeserializeOwned;
fn handle_request(&mut self, ctx: &RpcCtx, rpc: Self::Request) -> Result<Value, RemoteError>;
fn handle_request(
&mut self,
ctx: &RpcCtx,
rpc: Self::Request,
) -> Result<ResponsePayload, RemoteError>;
#[allow(unused_variables)]
fn idle(&mut self, ctx: &RpcCtx, token: usize) {}
}

View File

@ -1,5 +1,5 @@
use crate::core::parser::{Call, RequestId};
use crate::core::rpc_peer::Response;
use crate::core::rpc_peer::{Response, ResponsePayload};
use serde::de::{DeserializeOwned, Error};
use serde_json::Value;
@ -39,7 +39,7 @@ impl RpcObject {
}
let result = self.0.as_object_mut().and_then(|obj| obj.remove("result"));
match result {
Some(r) => Ok(Ok(r)),
Some(r) => Ok(Ok(ResponsePayload::Json(r))),
None => {
let error = self
.0

View File

@ -1,11 +1,14 @@
use crate::core::plugin::{Callback, Peer, PluginId};
use crate::core::plugin::{Peer, PluginId};
use crate::core::rpc_object::RpcObject;
use crate::error::{Error, ReadError, RemoteError};
use futures::Stream;
use parking_lot::{Condvar, Mutex};
use serde::{de, ser, Deserialize, Deserializer, Serialize, Serializer};
use serde_json::{json, Value};
use serde_json::{json, Value as JsonValue};
use std::collections::{BTreeMap, BinaryHeap, VecDeque};
use std::fmt::Display;
use std::io::Write;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{mpsc, Arc};
use std::time::{Duration, Instant};
@ -37,7 +40,7 @@ impl<'de, T: Deserialize<'de>> Deserialize<'de> for PluginCommand<T> {
struct PluginIdHelper {
plugin_id: PluginId,
}
let v = Value::deserialize(deserializer)?;
let v = JsonValue::deserialize(deserializer)?;
let plugin_id = PluginIdHelper::deserialize(&v)
.map_err(de::Error::custom)?
.plugin_id;
@ -82,7 +85,7 @@ impl<W: Write + Send + 'static> Peer for RawPeer<W> {
fn box_clone(&self) -> Arc<dyn Peer> {
Arc::new((*self).clone())
}
fn send_rpc_notification(&self, method: &str, params: &Value) {
fn send_rpc_notification(&self, method: &str, params: &JsonValue) {
if let Err(e) = self.send(&json!({
"method": method,
"params": params,
@ -94,11 +97,11 @@ impl<W: Write + Send + 'static> Peer for RawPeer<W> {
}
}
fn send_rpc_request_async(&self, method: &str, params: &Value, f: Box<dyn Callback>) {
fn send_rpc_request_async(&self, method: &str, params: &JsonValue, f: Box<dyn Callback>) {
self.send_rpc(method, params, ResponseHandler::Callback(f));
}
fn send_rpc_request(&self, method: &str, params: &Value) -> Result<Value, Error> {
fn send_rpc_request(&self, method: &str, params: &JsonValue) -> Result<JsonValue, Error> {
let (tx, rx) = mpsc::channel();
self.0.is_blocking.store(true, Ordering::Release);
self.send_rpc(method, params, ResponseHandler::Chan(tx));
@ -119,7 +122,7 @@ impl<W: Write + Send + 'static> Peer for RawPeer<W> {
}
impl<W: Write> RawPeer<W> {
fn send(&self, v: &Value) -> Result<(), io::Error> {
fn send(&self, v: &JsonValue) -> Result<(), io::Error> {
let mut s = serde_json::to_string(v).unwrap();
s.push('\n');
self.0.writer.lock().write_all(s.as_bytes())
@ -128,7 +131,12 @@ impl<W: Write> RawPeer<W> {
pub(crate) fn respond(&self, result: Response, id: u64) {
let mut response = json!({ "id": id });
match result {
Ok(result) => response["result"] = result,
Ok(result) => match result {
ResponsePayload::Json(value) => response["result"] = value,
ResponsePayload::Stream(_) => {
error!("stream response not supported")
},
},
Err(error) => response["error"] = json!(error),
};
if let Err(e) = self.send(&response) {
@ -136,7 +144,7 @@ impl<W: Write> RawPeer<W> {
}
}
fn send_rpc(&self, method: &str, params: &Value, rh: ResponseHandler) {
fn send_rpc(&self, method: &str, params: &JsonValue, rh: ResponseHandler) {
trace!("[RPC] call method: {} params: {:?}", method, params);
let id = self.0.id.fetch_add(1, Ordering::Relaxed);
{
@ -158,16 +166,22 @@ impl<W: Write> RawPeer<W> {
}
}
pub(crate) fn handle_response(&self, id: u64, resp: Result<Value, Error>) {
pub(crate) fn handle_response(&self, id: u64, resp: Result<ResponsePayload, Error>) {
let id = id as usize;
let handler = {
let mut pending = self.0.pending.lock();
pending.remove(&id)
};
let is_stream = resp.as_ref().map(|resp| resp.is_stream()).unwrap_or(false);
match handler {
Some(response_handler) => {
//
response_handler.invoke(resp)
let json = resp.map(|resp| resp.into_json());
response_handler.invoke(json);
// if is_stream {
// let mut pending = self.0.pending.lock();
// pending.insert(id, response_handler);
// }
},
None => warn!("[RPC] id {} not found in pending", id),
}
@ -250,18 +264,70 @@ impl<W: Write> Clone for RawPeer<W> {
}
}
pub struct ResponsePayload {
value: Value,
#[derive(Clone, Debug)]
pub enum ResponsePayload {
Json(JsonValue),
Stream(JsonValue),
}
pub type Response = Result<Value, RemoteError>;
impl ResponsePayload {
pub fn empty_json() -> Self {
ResponsePayload::Json(json!({}))
}
pub fn is_stream(&self) -> bool {
match self {
ResponsePayload::Json(_) => false,
ResponsePayload::Stream(_) => true,
}
}
pub fn json(&self) -> &JsonValue {
match self {
ResponsePayload::Json(v) => v,
ResponsePayload::Stream(v) => v,
}
}
pub fn into_json(self) -> JsonValue {
match self {
ResponsePayload::Json(v) => v,
ResponsePayload::Stream(v) => v,
}
}
}
impl Display for ResponsePayload {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ResponsePayload::Json(v) => write!(f, "{}", v),
ResponsePayload::Stream(_) => write!(f, "stream"),
}
}
}
pub type Response = Result<ResponsePayload, RemoteError>;
pub trait ResponseStream: Stream<Item = Result<JsonValue, Error>> + Unpin + Send {}
impl<T> ResponseStream for T where T: Stream<Item = Result<JsonValue, Error>> + Unpin + Send {}
enum ResponseHandler {
Chan(mpsc::Sender<Result<Value, Error>>),
Chan(mpsc::Sender<Result<JsonValue, Error>>),
Callback(Box<dyn Callback>),
}
pub trait Callback: Send {
fn call(self: Box<Self>, result: Result<JsonValue, Error>);
}
impl<F: Send + FnOnce(Result<JsonValue, Error>)> Callback for F {
fn call(self: Box<F>, result: Result<JsonValue, Error>) {
(*self)(result)
}
}
impl ResponseHandler {
fn invoke(self, result: Result<Value, Error>) {
fn invoke(self, result: Result<JsonValue, Error>) {
match self {
ResponseHandler::Chan(tx) => {
let _ = tx.send(result);

View File

@ -1,5 +1,6 @@
use crate::core::rpc_peer::ResponsePayload;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_json::{json, Value};
use serde_json::{json, Value as JsonValue};
use std::{fmt, io};
/// The error type of `tauri-utils`.
@ -36,30 +37,30 @@ pub enum ReadError {
Disconnect,
}
#[derive(Debug, Clone, PartialEq, thiserror::Error)]
#[derive(Debug, Clone, thiserror::Error)]
pub enum RemoteError {
/// The JSON was valid, but was not a correctly formed request.
///
/// This Error is used internally, and should not be returned by
/// clients.
#[error("Invalid request: {0:?}")]
InvalidRequest(Option<Value>),
InvalidRequest(Option<JsonValue>),
#[error("Invalid response: {0}")]
InvalidResponse(Value),
InvalidResponse(JsonValue),
/// A custom error, defined by the client.
#[error("Custom error: {message}")]
Custom {
code: i64,
message: String,
data: Option<Value>,
data: Option<JsonValue>,
},
/// An error that cannot be represented by an error object.
///
/// This error is intended to accommodate clients that return arbitrary
/// error values. It should not be used for new errors.
#[error("Unknown error: {0}")]
Unknown(Value),
Unknown(JsonValue),
}
impl ReadError {
@ -110,7 +111,7 @@ struct ErrorHelper {
code: i64,
message: String,
#[serde(skip_serializing_if = "Option::is_none")]
data: Option<Value>,
data: Option<JsonValue>,
}
impl<'de> Deserialize<'de> for RemoteError {
@ -118,7 +119,7 @@ impl<'de> Deserialize<'de> for RemoteError {
where
D: Deserializer<'de>,
{
let v = Value::deserialize(deserializer)?;
let v = JsonValue::deserialize(deserializer)?;
let resp = match ErrorHelper::deserialize(&v) {
Ok(resp) => resp,
Err(_) => return Ok(RemoteError::Unknown(v)),
@ -150,7 +151,11 @@ impl Serialize for RemoteError {
RemoteError::Unknown(_) => {
panic!("The 'Unknown' error variant is not intended for client use.")
},
RemoteError::InvalidResponse(s) => (-1, "Invalid response".to_string(), Some(s.clone())),
RemoteError::InvalidResponse(resp) => (
-1,
"Invalid response".to_string(),
Some(json!(resp.to_string())),
),
};
let err = ErrorHelper {
code,

View File

@ -1,7 +1,7 @@
use crate::core::parser::ResponseParser;
use crate::core::plugin::{start_plugin_process, Plugin, PluginId, PluginInfo, RpcCtx};
use crate::core::rpc_loop::Handler;
use crate::core::rpc_peer::PluginCommand;
use crate::core::rpc_peer::{PluginCommand, ResponsePayload};
use crate::error::{Error, ReadError, RemoteError};
use anyhow::anyhow;
use lib_infra::util::{get_operating_system, OperatingSystem};
@ -176,8 +176,12 @@ impl WeakSidecarState {
impl Handler for WeakSidecarState {
type Request = PluginCommand<String>;
fn handle_request(&mut self, _ctx: &RpcCtx, rpc: Self::Request) -> Result<Value, RemoteError> {
fn handle_request(
&mut self,
_ctx: &RpcCtx,
rpc: Self::Request,
) -> Result<ResponsePayload, RemoteError> {
trace!("handling request: {:?}", rpc.cmd);
Ok(json!({}))
Ok(ResponsePayload::empty_json())
}
}