mirror of
https://github.com/AppFlowy-IO/AppFlowy.git
synced 2024-08-30 18:12:39 +00:00
chore: clippy
This commit is contained in:
parent
565c003f1b
commit
ae20547f8b
@ -3,4 +3,5 @@ pub mod manager;
|
||||
pub mod parser;
|
||||
pub mod plugin;
|
||||
mod rpc_loop;
|
||||
mod rpc_object;
|
||||
mod rpc_peer;
|
||||
|
@ -1,5 +1,5 @@
|
||||
use crate::error::{ReadError, RemoteError};
|
||||
use crate::rpc_loop::RpcObject;
|
||||
use crate::rpc_object::RpcObject;
|
||||
use serde_json::{json, Value};
|
||||
use std::io::BufRead;
|
||||
|
||||
|
@ -26,29 +26,27 @@ impl From<i64> for PluginId {
|
||||
pub trait Callback: Send {
|
||||
fn call(self: Box<Self>, result: Result<Value, Error>);
|
||||
}
|
||||
|
||||
/// The `Peer` trait represents the interface for the other side of the RPC
|
||||
/// channel. It is intended to be used behind a pointer, a trait object.
|
||||
/// The `Peer` trait defines the interface for the opposite side of the RPC channel,
|
||||
/// designed to be used behind a pointer or as a trait object.
|
||||
pub trait Peer: Send + 'static {
|
||||
/// Clones the peer into a boxed trait object.
|
||||
fn box_clone(&self) -> Box<dyn Peer>;
|
||||
|
||||
/// Sends an RPC notification to the peer with the specified method and parameters.
|
||||
fn send_rpc_notification(&self, method: &str, params: &Value);
|
||||
|
||||
/// Sends an asynchronous RPC request to the peer and executes the provided callback upon completion.
|
||||
fn send_rpc_request_async(&self, method: &str, params: &Value, f: Box<dyn Callback>);
|
||||
/// Sends a request (synchronous RPC) to the peer, and waits for the result.
|
||||
|
||||
/// Sends a synchronous RPC request to the peer and waits for the result.
|
||||
/// Returns the result of the request or an error.
|
||||
fn send_rpc_request(&self, method: &str, params: &Value) -> Result<Value, Error>;
|
||||
/// Determines whether an incoming request (or notification) is
|
||||
/// pending. This is intended to reduce latency for bulk operations
|
||||
/// done in the background.
|
||||
|
||||
/// Checks if there is an incoming request pending, intended to reduce latency for bulk operations done in the background.
|
||||
fn request_is_pending(&self) -> bool;
|
||||
|
||||
fn schedule_idle(&self, token: usize);
|
||||
/// Like `schedule_idle`, with the guarantee that the handler's `idle`
|
||||
/// fn will not be called _before_ the provided `Instant`.
|
||||
///
|
||||
/// # Note
|
||||
///
|
||||
/// This is not intended as a high-fidelity timer. Regular RPC messages
|
||||
/// will always take priority over an idle task.
|
||||
/// Schedules a timer to execute the handler's `idle` function after the specified `Instant`.
|
||||
/// Note: This is not a high-fidelity timer. Regular RPC messages will always take priority over idle tasks.
|
||||
fn schedule_timer(&self, after: Instant, token: usize);
|
||||
}
|
||||
|
||||
@ -85,7 +83,6 @@ impl Plugin {
|
||||
|
||||
pub struct PluginInfo {
|
||||
pub name: String,
|
||||
// pub absolute_chat_model_path: String,
|
||||
pub exec_path: String,
|
||||
}
|
||||
|
||||
|
@ -1,91 +1,19 @@
|
||||
use crate::error::{Error, ReadError, RemoteError};
|
||||
use crate::parser::{Call, MessageReader, RequestId};
|
||||
use crate::parser::{Call, MessageReader};
|
||||
use crate::plugin::RpcCtx;
|
||||
use crate::rpc_peer::{RawPeer, Response, RpcState};
|
||||
use serde::de::{DeserializeOwned, Error as SerdeError};
|
||||
use crate::rpc_peer::{RawPeer, RpcState};
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde_json::Value;
|
||||
|
||||
use std::io::{BufRead, Write};
|
||||
|
||||
use crate::rpc_object::RpcObject;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use tracing::{error, trace};
|
||||
|
||||
const MAX_IDLE_WAIT: Duration = Duration::from_millis(5);
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RpcObject(pub Value);
|
||||
|
||||
impl RpcObject {
|
||||
/// Returns the 'id' of the underlying object, if present.
|
||||
pub fn get_id(&self) -> Option<RequestId> {
|
||||
self.0.get("id").and_then(Value::as_u64)
|
||||
}
|
||||
|
||||
/// Returns the 'method' field of the underlying object, if present.
|
||||
pub fn get_method(&self) -> Option<&str> {
|
||||
self.0.get("method").and_then(Value::as_str)
|
||||
}
|
||||
|
||||
/// Returns `true` if this object looks like an RPC response;
|
||||
/// that is, if it has an 'id' field and does _not_ have a 'method'
|
||||
/// field.
|
||||
pub fn is_response(&self) -> bool {
|
||||
self.0.get("id").is_some() && self.0.get("method").is_none()
|
||||
}
|
||||
|
||||
/// Converts the underlying `Value` into an RPC response object.
|
||||
/// The caller should verify that the object is a response before calling this method.
|
||||
/// # Errors
|
||||
/// If the `Value` is not a well-formed response object, this returns a `String` containing an
|
||||
/// error message. The caller should print this message and exit.
|
||||
pub fn into_response(mut self) -> Result<Response, String> {
|
||||
let _ = self
|
||||
.get_id()
|
||||
.ok_or("Response requires 'id' field.".to_string())?;
|
||||
|
||||
if self.0.get("result").is_some() == self.0.get("error").is_some() {
|
||||
return Err("RPC response must contain exactly one of 'error' or 'result' fields.".into());
|
||||
}
|
||||
let result = self.0.as_object_mut().and_then(|obj| obj.remove("result"));
|
||||
|
||||
match result {
|
||||
Some(r) => Ok(Ok(r)),
|
||||
None => {
|
||||
let error = self
|
||||
.0
|
||||
.as_object_mut()
|
||||
.and_then(|obj| obj.remove("error"))
|
||||
.unwrap();
|
||||
Err(format!("Error handling response: {:?}", error))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts the underlying `Value` into either an RPC notification or request.
|
||||
pub fn into_rpc<R>(self) -> Result<Call<R>, serde_json::Error>
|
||||
where
|
||||
R: DeserializeOwned,
|
||||
{
|
||||
let id = self.get_id();
|
||||
match id {
|
||||
Some(id) => match serde_json::from_value::<R>(self.0) {
|
||||
Ok(resp) => Ok(Call::Request(id, resp)),
|
||||
Err(err) => Ok(Call::InvalidRequest(id, err.into())),
|
||||
},
|
||||
None => match self.0.get("message").and_then(|value| value.as_str()) {
|
||||
None => Err(serde_json::Error::missing_field("message")),
|
||||
Some(s) => Ok(Call::Message(s.to_string().into())),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Value> for RpcObject {
|
||||
fn from(v: Value) -> RpcObject {
|
||||
RpcObject(v)
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Handler {
|
||||
type Request: DeserializeOwned;
|
||||
@ -198,15 +126,10 @@ impl<W: Write + Send> RpcLoop<W> {
|
||||
|
||||
loop {
|
||||
let _guard = PanicGuard(&peer);
|
||||
let read_result = next_read(&peer, handler, &ctx);
|
||||
let read_result = next_read(&peer, &ctx);
|
||||
let json = match read_result {
|
||||
Ok(json) => json,
|
||||
Err(err) => {
|
||||
// finish idle work before disconnecting;
|
||||
// this is mostly useful for integration tests.
|
||||
if let Some(idle_token) = peer.try_get_idle() {
|
||||
handler.idle(&ctx, idle_token);
|
||||
}
|
||||
peer.disconnect();
|
||||
return err;
|
||||
},
|
||||
@ -244,34 +167,25 @@ impl<W: Write + Send> RpcLoop<W> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the next read result, checking for idle work when no
|
||||
/// result is available.
|
||||
fn next_read<W, H>(peer: &RawPeer<W>, handler: &mut H, ctx: &RpcCtx) -> Result<RpcObject, ReadError>
|
||||
/// retrieves the next available read result from a peer, performing idle work if no result is
|
||||
/// immediately available.
|
||||
fn next_read<W>(peer: &RawPeer<W>, _ctx: &RpcCtx) -> Result<RpcObject, ReadError>
|
||||
where
|
||||
W: Write + Send,
|
||||
H: Handler,
|
||||
{
|
||||
loop {
|
||||
// Continuously checks if there is a result available from the peer using
|
||||
if let Some(result) = peer.try_get_rx() {
|
||||
return result;
|
||||
}
|
||||
// handle timers before general idle work
|
||||
|
||||
let time_to_next_timer = match peer.check_timers() {
|
||||
Some(Ok(token)) => {
|
||||
do_idle(handler, ctx, token);
|
||||
continue;
|
||||
},
|
||||
Some(Ok(_token)) => continue,
|
||||
Some(Err(duration)) => Some(duration),
|
||||
None => None,
|
||||
};
|
||||
|
||||
if let Some(idle_token) = peer.try_get_idle() {
|
||||
do_idle(handler, ctx, idle_token);
|
||||
continue;
|
||||
}
|
||||
|
||||
// we don't want to block indefinitely if there's no current idle work,
|
||||
// because idle work could be scheduled from another thread.
|
||||
// Ensures the function does not block indefinitely by setting a maximum wait time
|
||||
let idle_timeout = time_to_next_timer
|
||||
.unwrap_or(MAX_IDLE_WAIT)
|
||||
.min(MAX_IDLE_WAIT);
|
||||
@ -281,5 +195,3 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn do_idle<H: Handler>(_handler: &mut H, _ctx: &RpcCtx, _token: usize) {}
|
||||
|
78
frontend/rust-lib/flowy-sidecar/src/rpc_object.rs
Normal file
78
frontend/rust-lib/flowy-sidecar/src/rpc_object.rs
Normal file
@ -0,0 +1,78 @@
|
||||
use crate::parser::{Call, RequestId};
|
||||
use crate::rpc_peer::Response;
|
||||
use serde::de::{DeserializeOwned, Error};
|
||||
use serde_json::Value;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RpcObject(pub Value);
|
||||
|
||||
impl RpcObject {
|
||||
/// Returns the 'id' of the underlying object, if present.
|
||||
pub fn get_id(&self) -> Option<RequestId> {
|
||||
self.0.get("id").and_then(Value::as_u64)
|
||||
}
|
||||
|
||||
/// Returns the 'method' field of the underlying object, if present.
|
||||
pub fn get_method(&self) -> Option<&str> {
|
||||
self.0.get("method").and_then(Value::as_str)
|
||||
}
|
||||
|
||||
/// Returns `true` if this object looks like an RPC response;
|
||||
/// that is, if it has an 'id' field and does _not_ have a 'method'
|
||||
/// field.
|
||||
pub fn is_response(&self) -> bool {
|
||||
self.0.get("id").is_some() && self.0.get("method").is_none()
|
||||
}
|
||||
|
||||
/// Converts the underlying `Value` into an RPC response object.
|
||||
/// The caller should verify that the object is a response before calling this method.
|
||||
/// # Errors
|
||||
/// If the `Value` is not a well-formed response object, this returns a `String` containing an
|
||||
/// error message. The caller should print this message and exit.
|
||||
pub fn into_response(mut self) -> Result<Response, String> {
|
||||
let _ = self
|
||||
.get_id()
|
||||
.ok_or("Response requires 'id' field.".to_string())?;
|
||||
|
||||
if self.0.get("result").is_some() == self.0.get("error").is_some() {
|
||||
return Err("RPC response must contain exactly one of 'error' or 'result' fields.".into());
|
||||
}
|
||||
let result = self.0.as_object_mut().and_then(|obj| obj.remove("result"));
|
||||
|
||||
match result {
|
||||
Some(r) => Ok(Ok(r)),
|
||||
None => {
|
||||
let error = self
|
||||
.0
|
||||
.as_object_mut()
|
||||
.and_then(|obj| obj.remove("error"))
|
||||
.unwrap();
|
||||
Err(format!("Error handling response: {:?}", error))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts the underlying `Value` into either an RPC notification or request.
|
||||
pub fn into_rpc<R>(self) -> Result<Call<R>, serde_json::Error>
|
||||
where
|
||||
R: DeserializeOwned,
|
||||
{
|
||||
let id = self.get_id();
|
||||
match id {
|
||||
Some(id) => match serde_json::from_value::<R>(self.0) {
|
||||
Ok(resp) => Ok(Call::Request(id, resp)),
|
||||
Err(err) => Ok(Call::InvalidRequest(id, err.into())),
|
||||
},
|
||||
None => match self.0.get("message").and_then(|value| value.as_str()) {
|
||||
None => Err(serde_json::Error::missing_field("message")),
|
||||
Some(s) => Ok(Call::Message(s.to_string().into())),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Value> for RpcObject {
|
||||
fn from(v: Value) -> RpcObject {
|
||||
RpcObject(v)
|
||||
}
|
||||
}
|
@ -1,6 +1,6 @@
|
||||
use crate::error::{Error, ReadError, RemoteError};
|
||||
use crate::plugin::{Callback, Peer, PluginId};
|
||||
use crate::rpc_loop::RpcObject;
|
||||
use crate::rpc_object::RpcObject;
|
||||
use parking_lot::{Condvar, Mutex};
|
||||
use serde::{de, ser, Deserialize, Deserializer, Serialize, Serializer};
|
||||
use serde_json::{json, Value};
|
||||
@ -52,7 +52,6 @@ pub struct RpcState<W: Write> {
|
||||
writer: Mutex<W>,
|
||||
id: AtomicUsize,
|
||||
pending: Mutex<BTreeMap<usize, ResponseHandler>>,
|
||||
idle_queue: Mutex<VecDeque<usize>>,
|
||||
timers: Mutex<BinaryHeap<Timer>>,
|
||||
needs_exit: AtomicBool,
|
||||
is_blocking: AtomicBool,
|
||||
@ -66,7 +65,6 @@ impl<W: Write> RpcState<W> {
|
||||
writer: Mutex::new(writer),
|
||||
id: AtomicUsize::new(0),
|
||||
pending: Mutex::new(BTreeMap::new()),
|
||||
idle_queue: Mutex::new(VecDeque::new()),
|
||||
timers: Mutex::new(BinaryHeap::new()),
|
||||
needs_exit: AtomicBool::new(false),
|
||||
is_blocking: Default::default(),
|
||||
@ -112,10 +110,6 @@ impl<W: Write + Send + 'static> Peer for RawPeer<W> {
|
||||
!queue.is_empty()
|
||||
}
|
||||
|
||||
fn schedule_idle(&self, token: usize) {
|
||||
self.0.idle_queue.lock().push_back(token);
|
||||
}
|
||||
|
||||
fn schedule_timer(&self, after: Instant, token: usize) {
|
||||
self.0.timers.lock().push(Timer {
|
||||
fire_after: after,
|
||||
@ -143,6 +137,7 @@ impl<W: Write> RawPeer<W> {
|
||||
}
|
||||
|
||||
fn send_rpc(&self, method: &str, params: &Value, rh: ResponseHandler) {
|
||||
trace!("[RPC] call method: {} params: {:?}", method, params);
|
||||
let id = self.0.id.fetch_add(1, Ordering::Relaxed);
|
||||
{
|
||||
let mut pending = self.0.pending.lock();
|
||||
@ -197,15 +192,16 @@ impl<W: Write> RawPeer<W> {
|
||||
self.0.rx_cvar.notify_one();
|
||||
}
|
||||
|
||||
pub(crate) fn try_get_idle(&self) -> Option<usize> {
|
||||
self.0.idle_queue.lock().pop_front()
|
||||
}
|
||||
|
||||
/// Checks status of the most imminent timer. If that timer has expired,
|
||||
/// returns `Some(Ok(_))`, with the corresponding token.
|
||||
/// If a timer exists but has not expired, returns `Some(Err(_))`,
|
||||
/// with the error value being the `Duration` until the timer is ready.
|
||||
/// Checks the status of the most imminent timer.
|
||||
///
|
||||
/// If the timer has expired, returns `Some(Ok(_))` with the corresponding token.
|
||||
/// If a timer exists but has not expired, returns `Some(Err(_))` with the `Duration` until the timer is ready.
|
||||
/// Returns `None` if no timers are registered.
|
||||
///
|
||||
/// # Returns
|
||||
/// - `Some(Ok(usize))`: If the timer has expired, with the corresponding token.
|
||||
/// - `Some(Err(Duration))`: If a timer exists but hasn't expired, with the time remaining until it is ready.
|
||||
/// - `None`: If no timers are registered.
|
||||
pub(crate) fn check_timers(&self) -> Option<Result<usize, Duration>> {
|
||||
let mut timers = self.0.timers.lock();
|
||||
match timers.peek() {
|
||||
|
Loading…
Reference in New Issue
Block a user