diff --git a/.idea/appflowy_client.iml b/.idea/appflowy_client.iml
index 037f009510..ee323030e9 100644
--- a/.idea/appflowy_client.iml
+++ b/.idea/appflowy_client.iml
@@ -6,6 +6,7 @@
+
diff --git a/rust-lib/flowy-sys/src/error/error.rs b/rust-lib/flowy-sys/src/error/error.rs
index e545971f61..429ab8157a 100644
--- a/rust-lib/flowy-sys/src/error/error.rs
+++ b/rust-lib/flowy-sys/src/error/error.rs
@@ -1,10 +1,14 @@
-use crate::response::{FlowyResponse, StatusCode};
-use std::{cell::RefCell, fmt};
+use crate::{
+ request::EventRequest,
+ response::{EventResponse, EventResponseBuilder, StatusCode},
+};
+use std::{fmt, option::NoneError};
+use tokio::sync::mpsc::error::SendError;
pub trait Error: fmt::Debug + fmt::Display {
fn status_code(&self) -> StatusCode;
- fn as_response(&self) -> FlowyResponse { FlowyResponse::new(self.status_code()) }
+ fn as_response(&self) -> EventResponse { EventResponse::new(self.status_code()) }
}
impl From for SystemError {
@@ -33,6 +37,57 @@ impl std::error::Error for SystemError {
fn cause(&self) -> Option<&dyn std::error::Error> { None }
}
-impl From for FlowyResponse {
+impl From> for SystemError
+where
+ T: fmt::Display + fmt::Debug + 'static,
+{
+ fn from(err: SendError) -> Self { InternalError { inner: err }.into() }
+}
+
+impl From> for SystemError {
+ fn from(err: SendError) -> Self { InternalError { inner: err }.into() }
+}
+
+impl From for SystemError {
+ fn from(s: NoneError) -> Self {
+ InternalError {
+ inner: format!("Unexpected none: {:?}", s),
+ }
+ .into()
+ }
+}
+
+impl From for EventResponse {
fn from(err: SystemError) -> Self { err.inner_error().as_response() }
}
+
+pub struct InternalError {
+ inner: T,
+}
+
+impl InternalError {
+ pub fn new(inner: T) -> Self { InternalError { inner } }
+}
+
+impl fmt::Debug for InternalError
+where
+ T: fmt::Debug + 'static,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Debug::fmt(&self.inner, f) }
+}
+
+impl fmt::Display for InternalError
+where
+ T: fmt::Display + 'static,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Display::fmt(&self.inner, f) }
+}
+
+impl Error for InternalError
+where
+ T: fmt::Debug + fmt::Display + 'static,
+{
+ fn status_code(&self) -> StatusCode { StatusCode::Err }
+
+ fn as_response(&self) -> EventResponse { EventResponseBuilder::Err().data(format!("{}", self.inner)).build() }
+}
diff --git a/rust-lib/flowy-sys/src/lib.rs b/rust-lib/flowy-sys/src/lib.rs
index 176c8c18a8..d2c041e770 100644
--- a/rust-lib/flowy-sys/src/lib.rs
+++ b/rust-lib/flowy-sys/src/lib.rs
@@ -1,3 +1,5 @@
+#![feature(try_trait)]
+
mod data;
mod error;
mod module;
@@ -6,3 +8,7 @@ mod response;
mod rt;
mod service;
mod util;
+
+pub mod prelude {
+ pub use crate::{error::*, module::*, request::*, response::*, rt::*};
+}
diff --git a/rust-lib/flowy-sys/src/module/data.rs b/rust-lib/flowy-sys/src/module/data.rs
index 0c07d1b044..adb01bf781 100644
--- a/rust-lib/flowy-sys/src/module/data.rs
+++ b/rust-lib/flowy-sys/src/module/data.rs
@@ -1,6 +1,6 @@
use crate::{
error::SystemError,
- request::{payload::Payload, FlowyRequest, FromRequest},
+ request::{payload::Payload, EventRequest, FromRequest},
util::ready::Ready,
};
use std::{ops::Deref, sync::Arc};
@@ -32,5 +32,5 @@ impl FromRequest for ModuleData {
type Future = Ready>;
#[inline]
- fn from_request(req: &FlowyRequest, _: &mut Payload) -> Self::Future { unimplemented!() }
+ fn from_request(_req: &EventRequest, _: &mut Payload) -> Self::Future { unimplemented!() }
}
diff --git a/rust-lib/flowy-sys/src/module/module.rs b/rust-lib/flowy-sys/src/module/module.rs
index 6182ced955..9921fb1370 100644
--- a/rust-lib/flowy-sys/src/module/module.rs
+++ b/rust-lib/flowy-sys/src/module/module.rs
@@ -8,44 +8,35 @@ use crate::{
};
use crate::{
- request::{payload::Payload, FlowyRequest},
- response::{FlowyResponse, FlowyResponseBuilder},
+ request::{payload::Payload, EventRequest},
+ response::EventResponse,
service::{factory, BoxServiceFactory, HandlerService},
};
use futures_core::{future::LocalBoxFuture, ready};
use pin_project::pin_project;
use std::{
- cell::RefCell,
collections::HashMap,
- fmt::Debug,
future::Future,
- hash::Hash,
- marker::PhantomData,
pin::Pin,
- rc::Rc,
- sync::Arc,
task::{Context, Poll},
};
-use tokio::sync::{
- mpsc,
- mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
-};
+use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
-pub type Command = String;
-pub type CommandServiceFactory = BoxServiceFactory<(), ServiceRequest, ServiceResponse, SystemError>;
+pub type Event = String;
+pub type EventServiceFactory = BoxServiceFactory<(), ServiceRequest, ServiceResponse, SystemError>;
pub struct Module {
name: String,
data: DataContainer,
- service_map: HashMap,
- req_tx: UnboundedSender,
- req_rx: UnboundedReceiver,
- resp_tx: UnboundedSender,
+ service_map: HashMap,
+ req_tx: UnboundedSender,
+ req_rx: UnboundedReceiver,
+ resp_tx: UnboundedSender,
}
impl Module {
- pub fn new(resp_tx: UnboundedSender) -> Self {
- let (req_tx, req_rx) = unbounded_channel::();
+ pub fn new(resp_tx: UnboundedSender) -> Self {
+ let (req_tx, req_rx) = unbounded_channel::();
Self {
name: "".to_owned(),
data: DataContainer::new(),
@@ -62,36 +53,38 @@ impl Module {
}
pub fn data(mut self, data: D) -> Self {
- let module_data = ModuleData::new(data);
- self.data.insert(module_data);
+ self.data.insert(ModuleData::new(data));
self
}
- pub fn event(mut self, command: Command, handler: H) -> Self
+ pub fn event(mut self, event: Event, handler: H) -> Self
where
H: Handler,
T: FromRequest + 'static,
R: Future + 'static,
R::Output: Responder + 'static,
{
- self.service_map.insert(command, factory(HandlerService::new(handler)));
+ if self.service_map.contains_key(&event) {
+ log::error!("Duplicate Event: {}", &event);
+ }
+
+ self.service_map.insert(event, factory(HandlerService::new(handler)));
self
}
- pub fn can_handle(&self, cmd: &Command) -> bool { self.service_map.contains_key(cmd) }
+ pub fn req_tx(&self) -> UnboundedSender { self.req_tx.clone() }
- pub fn req_tx(&self) -> UnboundedSender { self.req_tx.clone() }
-
- pub fn handle(&self, request: FlowyRequest) {
+ pub fn handle(&self, request: EventRequest) {
+ log::trace!("Module: {} receive request: {:?}", self.name, request);
match self.req_tx.send(request) {
Ok(_) => {},
Err(e) => {
- log::error!("{:?}", e);
+ log::error!("Module: {} with error: {:?}", self.name, e);
},
}
}
- pub fn service_sender_map(&self) -> HashMap> {
+ pub fn forward_map(&self) -> HashMap> {
self.service_map
.keys()
.map(|key| (key.clone(), self.req_tx()))
@@ -105,7 +98,7 @@ impl Future for Module {
loop {
match ready!(Pin::new(&mut self.req_rx).poll_recv(cx)) {
None => return Poll::Ready(()),
- Some(request) => match self.service_map.get(request.get_cmd()) {
+ Some(request) => match self.service_map.get(request.get_event()) {
Some(factory) => {
let fut = ModuleServiceFuture {
request,
@@ -113,14 +106,14 @@ impl Future for Module {
};
let resp_tx = self.resp_tx.clone();
tokio::task::spawn_local(async move {
- let resp = fut.await.unwrap_or_else(|e| panic!());
+ let resp = fut.await.unwrap_or_else(|_e| panic!());
if let Err(e) = resp_tx.send(resp) {
log::error!("{:?}", e);
}
});
},
None => {
- log::error!("Command: {} handler not found", request.get_cmd());
+ log::error!("Event: {} handler not found", request.get_event());
},
},
}
@@ -131,18 +124,19 @@ impl Future for Module {
type BoxModuleService = BoxService;
#[pin_project]
pub struct ModuleServiceFuture {
- request: FlowyRequest,
+ request: EventRequest,
#[pin]
fut: LocalBoxFuture<'static, Result>,
}
impl Future for ModuleServiceFuture {
- type Output = Result;
+ type Output = Result;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
loop {
let service = ready!(self.as_mut().project().fut.poll(cx))?;
let req = ServiceRequest::new(self.as_mut().request.clone(), Payload::None);
+ log::trace!("Call service to handle request {:?}", self.request);
let (_, resp) = ready!(Pin::new(&mut service.call(req)).poll(cx))?.into_parts();
return Poll::Ready(Ok(resp));
}
@@ -156,29 +150,23 @@ mod tests {
use futures_util::{future, pin_mut};
use tokio::sync::mpsc::unbounded_channel;
- pub async fn hello_service() -> String {
- println!("no params");
- "hello".to_string()
- }
-
- // #[tokio::test]
+ pub async fn hello_service() -> String { "hello".to_string() }
#[test]
fn test() {
let mut runtime = Runtime::new().unwrap();
runtime.block_on(async {
- let (resp_tx, mut resp_rx) = unbounded_channel::();
- let command = "hello".to_string();
- let mut module = Module::new(resp_tx).event(command.clone(), hello_service);
- assert_eq!(module.can_handle(&command), true);
+ let (resp_tx, mut resp_rx) = unbounded_channel::();
+ let event = "hello".to_string();
+ let mut module = Module::new(resp_tx).event(event.clone(), hello_service);
let req_tx = module.req_tx();
let mut event = async move {
- let request = FlowyRequest::new(command.clone());
+ let request = EventRequest::new(event.clone());
req_tx.send(request).unwrap();
match resp_rx.recv().await {
Some(resp) => {
- println!("{}", resp);
+ log::info!("{}", resp);
},
None => panic!(""),
}
diff --git a/rust-lib/flowy-sys/src/request/mod.rs b/rust-lib/flowy-sys/src/request/mod.rs
index cab7b63efc..1c5b08f2a3 100644
--- a/rust-lib/flowy-sys/src/request/mod.rs
+++ b/rust-lib/flowy-sys/src/request/mod.rs
@@ -1,4 +1,5 @@
-pub use request::*;
-
pub mod payload;
mod request;
+
+pub use payload::*;
+pub use request::*;
diff --git a/rust-lib/flowy-sys/src/request/request.rs b/rust-lib/flowy-sys/src/request/request.rs
index 033a962147..bf0b6bf710 100644
--- a/rust-lib/flowy-sys/src/request/request.rs
+++ b/rust-lib/flowy-sys/src/request/request.rs
@@ -5,32 +5,31 @@ use crate::{
request::payload::Payload,
util::ready::{ready, Ready},
};
-use std::hash::Hash;
#[derive(Clone, Debug)]
-pub struct FlowyRequest {
+pub struct EventRequest {
id: String,
- cmd: String,
+ event: String,
}
-impl FlowyRequest {
- pub fn new(cmd: String) -> FlowyRequest {
+impl EventRequest {
+ pub fn new(event: String) -> EventRequest {
Self {
id: uuid::Uuid::new_v4().to_string(),
- cmd,
+ event,
}
}
}
-impl FlowyRequest {
- pub fn get_cmd(&self) -> &str { &self.cmd }
+impl EventRequest {
+ pub fn get_event(&self) -> &str { &self.event }
}
pub trait FromRequest: Sized {
type Error: Into;
type Future: Future