From f73b3ded1d9a0dfff470e31db71565a9bb2bd6db Mon Sep 17 00:00:00 2001 From: appflowy Date: Wed, 30 Jun 2021 15:33:49 +0800 Subject: [PATCH] generice handle input params using In<> and Out<>, support protobuf with features=user_protobuf --- rust-lib/dart-ffi/src/lib.rs | 4 +- rust-lib/flowy-sdk/src/lib.rs | 22 ++- rust-lib/flowy-sys/Cargo.toml | 2 + rust-lib/flowy-sys/src/data/mod.rs | 1 - rust-lib/flowy-sys/src/lib.rs | 7 +- .../src/{data => module}/container.rs | 0 rust-lib/flowy-sys/src/module/mod.rs | 8 +- rust-lib/flowy-sys/src/module/module.rs | 133 ++++++++++-------- rust-lib/flowy-sys/src/request/payload.rs | 7 +- rust-lib/flowy-sys/src/request/request.rs | 93 +++++++++--- rust-lib/flowy-sys/src/response/responder.rs | 48 +++++-- rust-lib/flowy-sys/src/sender/data.rs | 71 ++++++++++ rust-lib/flowy-sys/src/sender/mod.rs | 5 + .../src/{stream.rs => sender/sender.rs} | 96 +++++-------- rust-lib/flowy-sys/src/service/boxed.rs | 10 +- rust-lib/flowy-sys/src/service/handler.rs | 8 +- rust-lib/flowy-sys/src/service/service.rs | 4 +- rust-lib/flowy-sys/src/system.rs | 10 +- rust-lib/flowy-sys/tests/api/helper.rs | 24 ++-- rust-lib/flowy-sys/tests/api/module.rs | 5 +- rust-lib/flowy-user/Cargo.toml | 2 +- rust-lib/flowy-user/src/handlers/auth.rs | 16 ++- 22 files changed, 369 insertions(+), 207 deletions(-) delete mode 100644 rust-lib/flowy-sys/src/data/mod.rs rename rust-lib/flowy-sys/src/{data => module}/container.rs (100%) create mode 100644 rust-lib/flowy-sys/src/sender/data.rs create mode 100644 rust-lib/flowy-sys/src/sender/mod.rs rename rust-lib/flowy-sys/src/{stream.rs => sender/sender.rs} (57%) diff --git a/rust-lib/dart-ffi/src/lib.rs b/rust-lib/dart-ffi/src/lib.rs index b14fc8bdde..a3dddad424 100644 --- a/rust-lib/dart-ffi/src/lib.rs +++ b/rust-lib/dart-ffi/src/lib.rs @@ -16,9 +16,9 @@ pub extern "C" fn init_sdk(path: *mut c_char) -> i64 { #[no_mangle] pub extern "C" fn async_command(port: i64, input: *const u8, len: usize) { let bytes = unsafe { std::slice::from_raw_parts(input, len) }.to_vec(); - let request = EventRequest::from_data(bytes); + let payload = SenderPayload::from_data(bytes); - let stream_data = CommandData::new(port, Some(request)).with_callback(Box::new(|_config, response| { + let stream_data = SenderData::new(port, payload).callback(Box::new(|_config, response| { log::info!("async resp: {:?}", response); })); diff --git a/rust-lib/flowy-sdk/src/lib.rs b/rust-lib/flowy-sdk/src/lib.rs index 9dc1a60208..035f8a9639 100644 --- a/rust-lib/flowy-sdk/src/lib.rs +++ b/rust-lib/flowy-sdk/src/lib.rs @@ -18,15 +18,13 @@ pub fn init_modules() -> Vec { pub fn init_system(modules: Vec) { FlowySystem::construct( || modules, - |module_map| { - let mut stream = CommandSender::::new(module_map.clone()); - let runner = CommandSenderRunner::new(module_map, stream.take_data_rx()); + |module_map, runtime| { + let mut sender = Sender::::new(module_map.clone()); + runtime.spawn(SenderRunner::new(module_map, sender.take_rx())); - CMD_SENDER.with(|cell| { - *cell.borrow_mut() = Some(stream); + SENDER.with(|cell| { + *cell.borrow_mut() = Some(sender); }); - - runner }, ) .run() @@ -34,18 +32,18 @@ pub fn init_system(modules: Vec) { } thread_local!( - static CMD_SENDER: RefCell>> = RefCell::new(None); + static SENDER: RefCell>> = RefCell::new(None); ); -pub fn sync_send(data: CommandData) -> EventResponse { - CMD_SENDER.with(|cell| match &*cell.borrow() { +pub fn sync_send(data: SenderData) -> EventResponse { + SENDER.with(|cell| match &*cell.borrow() { Some(stream) => stream.sync_send(data), None => panic!(""), }) } -pub fn async_send(data: CommandData) { - CMD_SENDER.with(|cell| match &*cell.borrow() { +pub fn async_send(data: SenderData) { + SENDER.with(|cell| match &*cell.borrow() { Some(stream) => { stream.async_send(data); }, diff --git a/rust-lib/flowy-sys/Cargo.toml b/rust-lib/flowy-sys/Cargo.toml index f6c7d7c5e1..c54769d817 100644 --- a/rust-lib/flowy-sys/Cargo.toml +++ b/rust-lib/flowy-sys/Cargo.toml @@ -26,6 +26,7 @@ dyn-clone = "1.0" bincode = { version = "1.3", optional = true} serde = { version = "1.0", features = ["derive"], optional = true } serde_json = {version = "1.0", optional = true} +protobuf = {version = "2.24.1", optional = true} [dev-dependencies] tokio = { version = "1", features = ["full"] } @@ -33,3 +34,4 @@ futures-util = "0.3.15" [features] use_serde = ["bincode", "serde", "serde_json"] +use_protobuf= ["protobuf"] diff --git a/rust-lib/flowy-sys/src/data/mod.rs b/rust-lib/flowy-sys/src/data/mod.rs deleted file mode 100644 index 18581c4bb7..0000000000 --- a/rust-lib/flowy-sys/src/data/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod container; diff --git a/rust-lib/flowy-sys/src/lib.rs b/rust-lib/flowy-sys/src/lib.rs index 1fc8dffdc8..9479f5308c 100644 --- a/rust-lib/flowy-sys/src/lib.rs +++ b/rust-lib/flowy-sys/src/lib.rs @@ -1,6 +1,5 @@ #![feature(try_trait)] -mod data; mod error; mod module; mod request; @@ -9,11 +8,9 @@ mod rt; mod service; mod util; -#[cfg(feature = "dart_ffi")] -pub mod dart_ffi; -mod stream; +mod sender; mod system; pub mod prelude { - pub use crate::{error::*, module::*, request::*, response::*, rt::*, stream::*}; + pub use crate::{error::*, module::*, request::*, response::*, rt::*, sender::*}; } diff --git a/rust-lib/flowy-sys/src/data/container.rs b/rust-lib/flowy-sys/src/module/container.rs similarity index 100% rename from rust-lib/flowy-sys/src/data/container.rs rename to rust-lib/flowy-sys/src/module/container.rs diff --git a/rust-lib/flowy-sys/src/module/mod.rs b/rust-lib/flowy-sys/src/module/mod.rs index b37a3cf92a..cff3899cbc 100644 --- a/rust-lib/flowy-sys/src/module/mod.rs +++ b/rust-lib/flowy-sys/src/module/mod.rs @@ -1,5 +1,7 @@ -mod data; -mod module; - +pub use container::*; pub use data::*; pub use module::*; + +mod container; +mod data; +mod module; diff --git a/rust-lib/flowy-sys/src/module/module.rs b/rust-lib/flowy-sys/src/module/module.rs index 4f2b57fd43..edd8f9d6e5 100644 --- a/rust-lib/flowy-sys/src/module/module.rs +++ b/rust-lib/flowy-sys/src/module/module.rs @@ -1,20 +1,3 @@ -use crate::{ - data::container::DataContainer, - error::SystemError, - module::ModuleData, - request::FromRequest, - response::Responder, - service::{BoxService, Handler, Service, ServiceFactory, ServiceRequest, ServiceResponse}, -}; - -use crate::{ - error::InternalError, - request::{payload::Payload, EventRequest}, - response::EventResponse, - service::{factory, BoxServiceFactory, HandlerService}, -}; -use futures_core::{future::LocalBoxFuture, ready}; -use pin_project::pin_project; use std::{ collections::HashMap, fmt::{Debug, Display}, @@ -24,8 +7,29 @@ use std::{ rc::Rc, task::{Context, Poll}, }; + +use futures_core::{future::LocalBoxFuture, ready}; +use pin_project::pin_project; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use crate::{ + error::{InternalError, SystemError}, + module::{container::DataContainer, ModuleData}, + request::{payload::Payload, EventRequest, FromRequest}, + response::{EventResponse, Responder}, + service::{ + factory, + BoxService, + BoxServiceFactory, + Handler, + HandlerService, + Service, + ServiceFactory, + ServiceRequest, + ServiceResponse, + }, +}; + #[derive(PartialEq, Eq, Hash, Debug, Clone)] pub struct Event(String); @@ -39,13 +43,13 @@ pub struct Module { name: String, data: DataContainer, service_map: Rc>, - req_tx: UnboundedSender, - req_rx: UnboundedReceiver, + req_tx: UnboundedSender, + req_rx: UnboundedReceiver, } impl Module { pub fn new() -> Self { - let (req_tx, req_rx) = unbounded_channel::(); + let (req_tx, req_rx) = unbounded_channel::(); Self { name: "".to_owned(), data: DataContainer::new(), @@ -84,22 +88,10 @@ impl Module { self } - pub fn req_tx(&self) -> UnboundedSender { self.req_tx.clone() } - - pub fn handle(&self, request: EventRequest) { - log::debug!("Module: {} receive request: {:?}", self.name, request); - match self.req_tx.send(request) { - Ok(_) => {}, - Err(e) => { - log::error!("Module: {} with error: {:?}", self.name, e); - }, - } - } - - pub fn forward_map(&self) -> HashMap> { + pub fn forward_map(&self) -> HashMap> { self.service_map .keys() - .map(|key| (key.clone(), self.req_tx())) + .map(|key| (key.clone(), self.req_tx.clone())) .collect::>() } @@ -113,9 +105,9 @@ impl Future for Module { match ready!(Pin::new(&mut self.req_rx).poll_recv(cx)) { None => return Poll::Ready(()), Some(request) => { - let mut service = self.new_service(request.get_id().to_string()); + let mut service = self.new_service(()); if let Ok(service) = ready!(Pin::new(&mut service).poll(cx)) { - log::trace!("Spawn module service for request {}", request.get_id()); + log::trace!("Spawn module service for request {}", request.id()); tokio::task::spawn_local(async move { let _ = service.call(request).await; }); @@ -126,15 +118,45 @@ impl Future for Module { } } -impl ServiceFactory for Module { +#[derive(Debug)] +pub struct ModuleRequest { + inner: EventRequest, + payload: Payload, +} + +impl ModuleRequest { + pub fn new(event: E) -> Self + where + E: Into, + { + Self { + inner: EventRequest::new(event), + payload: Payload::None, + } + } + + pub fn payload(mut self, payload: Payload) -> Self { + self.payload = payload; + self + } + + pub(crate) fn into_parts(self) -> (EventRequest, Payload) { (self.inner, self.payload) } + + pub(crate) fn into_service_request(self) -> ServiceRequest { ServiceRequest::new(self.inner, self.payload) } + + pub(crate) fn id(&self) -> &str { &self.inner.id } + + pub(crate) fn event(&self) -> &Event { &self.inner.event } +} + +impl ServiceFactory for Module { type Response = EventResponse; type Error = SystemError; - type Service = BoxService; - type Config = String; + type Service = BoxService; + type Context = (); type Future = LocalBoxFuture<'static, Result>; - fn new_service(&self, cfg: Self::Config) -> Self::Future { - log::trace!("Create module service for request {}", cfg); + fn new_service(&self, cfg: Self::Context) -> Self::Future { let service_map = self.service_map.clone(); Box::pin(async move { let service = ModuleService { service_map }; @@ -148,18 +170,22 @@ pub struct ModuleService { service_map: Rc>, } -impl Service for ModuleService { +impl Service for ModuleService { type Response = EventResponse; type Error = SystemError; type Future = LocalBoxFuture<'static, Result>; - fn call(&self, request: EventRequest) -> Self::Future { - log::trace!("Call module service for request {}", request.get_id()); - match self.service_map.get(request.get_event()) { + fn call(&self, request: ModuleRequest) -> Self::Future { + log::trace!("Call module service for request {}", &request.id()); + match self.service_map.get(&request.event()) { Some(factory) => { + let service_fut = factory.new_service(()); let fut = ModuleServiceFuture { - request, - fut: factory.new_service(()), + fut: Box::pin(async { + let service = service_fut.await?; + let request = request.into_service_request(); + service.call(request).await + }), }; Box::pin(async move { Ok(fut.await.unwrap_or_else(|e| e.into())) }) }, @@ -168,13 +194,13 @@ impl Service for ModuleService { } } -type BoxModuleService = BoxService; +// type BoxModuleService = BoxService; #[pin_project] pub struct ModuleServiceFuture { - request: EventRequest, #[pin] - fut: LocalBoxFuture<'static, Result>, + fut: LocalBoxFuture<'static, Result>, } impl Future for ModuleServiceFuture { @@ -182,11 +208,8 @@ impl Future for ModuleServiceFuture { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - let service = ready!(self.as_mut().project().fut.poll(cx))?; - let req = ServiceRequest::new(self.as_mut().request.clone(), Payload::None); - log::debug!("Call service to handle request {:?}", self.request); - let (_, resp) = ready!(Pin::new(&mut service.call(req)).poll(cx))?.into_parts(); - return Poll::Ready(Ok(resp)); + let (_, response) = ready!(self.as_mut().project().fut.poll(cx))?.into_parts(); + return Poll::Ready(Ok(response)); } } } diff --git a/rust-lib/flowy-sys/src/request/payload.rs b/rust-lib/flowy-sys/src/request/payload.rs index 3e2f6bae3b..a6f4801d0a 100644 --- a/rust-lib/flowy-sys/src/request/payload.rs +++ b/rust-lib/flowy-sys/src/request/payload.rs @@ -5,8 +5,9 @@ use futures::Stream; pub enum PayloadError {} -pub type PayloadStream = Pin>>>; -pub enum Payload { +// TODO: support stream data +#[derive(Clone, Debug)] +pub enum Payload { None, - Stream(S), + Bytes(Vec), } diff --git a/rust-lib/flowy-sys/src/request/request.rs b/rust-lib/flowy-sys/src/request/request.rs index 7776a3f65e..800bfce3d9 100644 --- a/rust-lib/flowy-sys/src/request/request.rs +++ b/rust-lib/flowy-sys/src/request/request.rs @@ -3,45 +3,36 @@ use std::future::Future; use crate::{ error::{InternalError, SystemError}, module::Event, - request::payload::Payload, + request::{payload::Payload, PayloadError}, + response::Responder, util::ready::{ready, Ready}, }; -use futures_core::ready; +use bytes::Bytes; +use futures_core::{ready, Stream}; use std::{ fmt::{Debug, Display}, hash::Hash, + ops, pin::Pin, task::{Context, Poll}, }; + #[derive(Clone, Debug)] pub struct EventRequest { - id: String, - event: Event, - data: Option>, + pub(crate) id: String, + pub(crate) event: Event, } impl EventRequest { pub fn new(event: E) -> EventRequest where - E: Eq + Hash + Debug + Clone + Display, + E: Into, { Self { id: uuid::Uuid::new_v4().to_string(), event: event.into(), - data: None, } } - - pub fn data(mut self, data: Vec) -> Self { - self.data = Some(data); - self - } - - pub fn get_event(&self) -> &Event { &self.event } - - pub fn get_id(&self) -> &str { &self.id } - - pub fn from_data(_data: Vec) -> Self { unimplemented!() } } pub trait FromRequest: Sized { @@ -64,14 +55,16 @@ impl FromRequest for String { type Error = SystemError; type Future = Ready>; - fn from_request(req: &EventRequest, _payload: &mut Payload) -> Self::Future { - match &req.data { - None => ready(Err(InternalError::new("Expected string but request had data").into())), - Some(buf) => ready(Ok(String::from_utf8_lossy(buf).into_owned())), + fn from_request(req: &EventRequest, payload: &mut Payload) -> Self::Future { + match &payload { + Payload::None => ready(Err(unexpected_none_payload())), + Payload::Bytes(buf) => ready(Ok(String::from_utf8_lossy(buf).into_owned())), } } } +fn unexpected_none_payload() -> SystemError { InternalError::new("Expected string but request had data").into() } + #[doc(hidden)] impl FromRequest for Result where @@ -105,3 +98,59 @@ where Poll::Ready(Ok(res)) } } + +pub struct In(pub T); + +impl In { + pub fn into_inner(self) -> T { self.0 } +} + +impl ops::Deref for In { + type Target = T; + + fn deref(&self) -> &T { &self.0 } +} + +impl ops::DerefMut for In { + fn deref_mut(&mut self) -> &mut T { &mut self.0 } +} + +#[cfg(feature = "use_serde")] +impl FromRequest for In +where + T: serde::de::DeserializeOwned + 'static, +{ + type Error = SystemError; + type Future = Ready>; + + #[inline] + fn from_request(req: &EventRequest, payload: &mut Payload) -> Self::Future { + match payload { + Payload::None => ready(Err(unexpected_none_payload())), + Payload::Bytes(bytes) => { + let data: T = bincode::deserialize(bytes).unwrap(); + ready(Ok(In(data))) + }, + } + } +} + +#[cfg(feature = "use_protobuf")] +impl FromRequest for In +where + T: ::protobuf::Message + 'static, +{ + type Error = SystemError; + type Future = Ready>; + + #[inline] + fn from_request(req: &EventRequest, payload: &mut Payload) -> Self::Future { + match payload { + Payload::None => ready(Err(unexpected_none_payload())), + Payload::Bytes(bytes) => { + let data: T = ::protobuf::Message::parse_from_bytes(bytes).unwrap(); + ready(Ok(In(data))) + }, + } + } +} diff --git a/rust-lib/flowy-sys/src/response/responder.rs b/rust-lib/flowy-sys/src/response/responder.rs index 2e9d5a09c2..8b3c373c3e 100644 --- a/rust-lib/flowy-sys/src/response/responder.rs +++ b/rust-lib/flowy-sys/src/response/responder.rs @@ -4,6 +4,7 @@ use crate::{ response::{EventResponse, EventResponseBuilder}, }; use bytes::Bytes; +use std::ops; pub trait Responder { fn respond_to(self, req: &EventRequest) -> EventResponse; @@ -35,13 +36,40 @@ where } } -// #[cfg(feature = "use_serde")] -// impl Responder for T -// where -// T: serde::Serialize, -// { -// fn respond_to(self, request: &EventRequest) -> EventResponse { -// let bytes = bincode::serialize(&self).unwrap(); -// EventResponseBuilder::Ok().data(bytes).build() -// } -// } +pub struct Out(pub T); + +impl Out { + pub fn into_inner(self) -> T { self.0 } +} + +impl ops::Deref for Out { + type Target = T; + + fn deref(&self) -> &T { &self.0 } +} + +impl ops::DerefMut for Out { + fn deref_mut(&mut self) -> &mut T { &mut self.0 } +} + +#[cfg(feature = "use_serde")] +impl Responder for Out +where + T: serde::Serialize, +{ + fn respond_to(self, request: &EventRequest) -> EventResponse { + let bytes: Vec = bincode::serialize(&self.0).unwrap(); + EventResponseBuilder::Ok().data(bytes).build() + } +} + +#[cfg(feature = "use_protobuf")] +impl Responder for Out +where + T: ::protobuf::Message, +{ + fn respond_to(self, _request: &EventRequest) -> EventResponse { + let bytes: Vec = self.write_to_bytes().unwrap(); + EventResponseBuilder::Ok().data(bytes).build() + } +} diff --git a/rust-lib/flowy-sys/src/sender/data.rs b/rust-lib/flowy-sys/src/sender/data.rs new file mode 100644 index 0000000000..b9b4bfedb2 --- /dev/null +++ b/rust-lib/flowy-sys/src/sender/data.rs @@ -0,0 +1,71 @@ +use crate::{ + module::{Event, ModuleRequest}, + request::{EventRequest, Payload}, + response::EventResponse, +}; +use std::{ + fmt::{Debug, Display}, + hash::Hash, +}; + +#[derive(Debug)] +pub struct SenderPayload { + pub(crate) payload: Payload, + pub(crate) event: Event, +} + +impl SenderPayload { + pub fn new(event: E) -> SenderPayload + where + E: Eq + Hash + Debug + Clone + Display, + { + Self { + event: event.into(), + payload: Payload::None, + } + } + + pub fn payload(mut self, payload: Payload) -> Self { + self.payload = payload; + self + } + + pub fn from_bytes(bytes: Vec) -> Self { unimplemented!() } +} + +impl std::convert::Into for SenderPayload { + fn into(self) -> ModuleRequest { ModuleRequest::new(self.event).payload(self.payload) } +} + +impl std::default::Default for SenderPayload { + fn default() -> Self { SenderPayload::new("").payload(Payload::None) } +} + +impl std::convert::Into for SenderPayload { + fn into(self) -> EventRequest { unimplemented!() } +} + +pub type BoxStreamCallback = Box; +pub struct SenderData +where + T: 'static, +{ + pub config: T, + pub payload: SenderPayload, + pub callback: Option>, +} + +impl SenderData { + pub fn new(config: T, payload: SenderPayload) -> Self { + Self { + config, + payload, + callback: None, + } + } + + pub fn callback(mut self, callback: BoxStreamCallback) -> Self { + self.callback = Some(callback); + self + } +} diff --git a/rust-lib/flowy-sys/src/sender/mod.rs b/rust-lib/flowy-sys/src/sender/mod.rs new file mode 100644 index 0000000000..9ddbdec49f --- /dev/null +++ b/rust-lib/flowy-sys/src/sender/mod.rs @@ -0,0 +1,5 @@ +mod data; +mod sender; + +pub use data::*; +pub use sender::*; diff --git a/rust-lib/flowy-sys/src/stream.rs b/rust-lib/flowy-sys/src/sender/sender.rs similarity index 57% rename from rust-lib/flowy-sys/src/stream.rs rename to rust-lib/flowy-sys/src/sender/sender.rs index f6999f6ba8..956e46d24e 100644 --- a/rust-lib/flowy-sys/src/stream.rs +++ b/rust-lib/flowy-sys/src/sender/sender.rs @@ -1,7 +1,9 @@ use crate::{ error::{InternalError, SystemError}, - request::EventRequest, + module::{Event, ModuleRequest}, + request::{EventRequest, Payload}, response::EventResponse, + sender::{SenderData, SenderPayload}, service::{BoxService, Service, ServiceFactory}, system::ModuleMap, }; @@ -15,30 +17,30 @@ use tokio::{ macro_rules! service_factor_impl { ($name:ident) => { #[allow(non_snake_case, missing_docs)] - impl ServiceFactory> for $name + impl ServiceFactory> for $name where T: 'static, { type Response = EventResponse; type Error = SystemError; - type Service = BoxService, Self::Response, Self::Error>; - type Config = (); + type Service = BoxService, Self::Response, Self::Error>; + type Context = (); type Future = LocalBoxFuture<'static, Result>; - fn new_service(&self, _cfg: Self::Config) -> Self::Future { + fn new_service(&self, _cfg: Self::Context) -> Self::Future { let module_map = self.module_map.clone(); - let service = Box::new(CommandSenderService { module_map }); + let service = Box::new(SenderService { module_map }); Box::pin(async move { Ok(service as Self::Service) }) } } }; } -struct CommandSenderService { +struct SenderService { module_map: ModuleMap, } -impl Service> for CommandSenderService +impl Service> for SenderService where T: 'static, { @@ -46,15 +48,22 @@ where type Error = SystemError; type Future = LocalBoxFuture<'static, Result>; - fn call(&self, mut data: CommandData) -> Self::Future { + fn call(&self, data: SenderData) -> Self::Future { let module_map = self.module_map.clone(); - let request = data.request.take().unwrap(); + let SenderData { + config, + payload, + callback, + } = data; + + let event = payload.event.clone(); + let request = payload.into(); + let fut = async move { let result = { - match module_map.get(request.get_event()) { + match module_map.get(&event) { Some(module) => { - let config = request.get_id().to_owned(); - let fut = module.new_service(config); + let fut = module.new_service(()); let service_fut = fut.await?.call(request); service_fut.await }, @@ -66,8 +75,8 @@ where }; let response = result.unwrap_or_else(|e| e.into()); - if let Some(callback) = data.callback { - callback(data.config, response.clone()); + if let Some(callback) = callback { + callback(config, response.clone()); } Ok(response) @@ -76,48 +85,23 @@ where } } -pub type BoxStreamCallback = Box; -pub struct CommandData -where - T: 'static, -{ - config: T, - request: Option, - callback: Option>, -} - -impl CommandData { - pub fn new(config: T, request: Option) -> Self { - Self { - config, - request, - callback: None, - } - } - - pub fn with_callback(mut self, callback: BoxStreamCallback) -> Self { - self.callback = Some(callback); - self - } -} - -pub struct CommandSender +pub struct Sender where T: 'static, { module_map: ModuleMap, - data_tx: UnboundedSender>, - data_rx: Option>>, + data_tx: UnboundedSender>, + data_rx: Option>>, } -service_factor_impl!(CommandSender); +service_factor_impl!(Sender); -impl CommandSender +impl Sender where T: 'static, { pub fn new(module_map: ModuleMap) -> Self { - let (data_tx, data_rx) = unbounded_channel::>(); + let (data_tx, data_rx) = unbounded_channel::>(); Self { module_map, data_tx, @@ -125,9 +109,9 @@ where } } - pub fn async_send(&self, data: CommandData) { let _ = self.data_tx.send(data); } + pub fn async_send(&self, data: SenderData) { let _ = self.data_tx.send(data); } - pub fn sync_send(&self, data: CommandData) -> EventResponse { + pub fn sync_send(&self, data: SenderData) -> EventResponse { let factory = self.new_service(()); futures::executor::block_on(async { @@ -136,31 +120,29 @@ where }) } - pub fn tx(&self) -> UnboundedSender> { self.data_tx.clone() } - - pub fn take_data_rx(&mut self) -> UnboundedReceiver> { self.data_rx.take().unwrap() } + pub fn take_rx(&mut self) -> UnboundedReceiver> { self.data_rx.take().unwrap() } } -pub struct CommandSenderRunner +pub struct SenderRunner where T: 'static, { module_map: ModuleMap, - data_rx: UnboundedReceiver>, + data_rx: UnboundedReceiver>, } -service_factor_impl!(CommandSenderRunner); +service_factor_impl!(SenderRunner); -impl CommandSenderRunner +impl SenderRunner where T: 'static, { - pub fn new(module_map: ModuleMap, data_rx: UnboundedReceiver>) -> Self { + pub fn new(module_map: ModuleMap, data_rx: UnboundedReceiver>) -> Self { Self { module_map, data_rx } } } -impl Future for CommandSenderRunner +impl Future for SenderRunner where T: 'static, { diff --git a/rust-lib/flowy-sys/src/service/boxed.rs b/rust-lib/flowy-sys/src/service/boxed.rs index d1e6c4f7d4..5de36f2de1 100644 --- a/rust-lib/flowy-sys/src/service/boxed.rs +++ b/rust-lib/flowy-sys/src/service/boxed.rs @@ -1,7 +1,7 @@ use crate::service::{Service, ServiceFactory}; use futures_core::future::LocalBoxFuture; -pub fn factory(factory: SF) -> BoxServiceFactory +pub fn factory(factory: SF) -> BoxServiceFactory where SF: ServiceFactory + 'static, Req: 'static, @@ -16,7 +16,7 @@ where type Inner = Box< dyn ServiceFactory< Req, - Config = Cfg, + Context = Cfg, Response = Res, Error = Err, Service = BoxService, @@ -34,7 +34,7 @@ where type Response = Res; type Error = Err; type Service = BoxService; - type Config = Cfg; + type Context = Cfg; type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, cfg: Cfg) -> Self::Future { self.0.new_service(cfg) } @@ -91,7 +91,7 @@ where Req: 'static, Res: 'static, Err: 'static, - SF: ServiceFactory, + SF: ServiceFactory, SF::Future: 'static, SF::Service: 'static, >::Future: 'static, @@ -99,7 +99,7 @@ where type Response = Res; type Error = Err; type Service = BoxService; - type Config = Cfg; + type Context = Cfg; type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, cfg: Cfg) -> Self::Future { diff --git a/rust-lib/flowy-sys/src/service/handler.rs b/rust-lib/flowy-sys/src/service/handler.rs index 648036afa3..9954b3984f 100644 --- a/rust-lib/flowy-sys/src/service/handler.rs +++ b/rust-lib/flowy-sys/src/service/handler.rs @@ -75,7 +75,7 @@ where type Response = ServiceResponse; type Error = SystemError; type Service = Self; - type Config = (); + type Context = (); type Future = Ready>; fn new_service(&self, _: ()) -> Self::Future { ready(Ok(self.clone())) } @@ -139,10 +139,10 @@ where }; }, HandlerServiceProj::Handle(fut, req) => { - let res = ready!(fut.poll(cx)); + let result = ready!(fut.poll(cx)); let req = req.take().unwrap(); - let res = res.respond_to(&req); - return Poll::Ready(Ok(ServiceResponse::new(req, res))); + let resp = result.respond_to(&req); + return Poll::Ready(Ok(ServiceResponse::new(req, resp))); }, } } diff --git a/rust-lib/flowy-sys/src/service/service.rs b/rust-lib/flowy-sys/src/service/service.rs index 1258c20c15..740612cd45 100644 --- a/rust-lib/flowy-sys/src/service/service.rs +++ b/rust-lib/flowy-sys/src/service/service.rs @@ -17,10 +17,10 @@ pub trait ServiceFactory { type Response; type Error; type Service: Service; - type Config; + type Context; type Future: Future>; - fn new_service(&self, cfg: Self::Config) -> Self::Future; + fn new_service(&self, cfg: Self::Context) -> Self::Future; } pub struct ServiceRequest { diff --git a/rust-lib/flowy-sys/src/system.rs b/rust-lib/flowy-sys/src/system.rs index d300aaac84..3965bafa1f 100644 --- a/rust-lib/flowy-sys/src/system.rs +++ b/rust-lib/flowy-sys/src/system.rs @@ -1,7 +1,6 @@ use crate::{ module::{Event, Module}, rt::Runtime, - stream::CommandSenderRunner, }; use futures_core::{ready, task::Context}; use std::{cell::RefCell, collections::HashMap, fmt::Debug, future::Future, io, rc::Rc, sync::Arc}; @@ -28,12 +27,10 @@ pub struct FlowySystem { } impl FlowySystem { - pub fn construct(module_factory: F, sender_factory: S) -> SystemRunner + pub fn construct(module_factory: F, sender_factory: S) -> SystemRunner where - // E: Into> + Eq + Hash + Debug + Clone + 'static, F: FnOnce() -> Vec, - S: FnOnce(ModuleMap) -> CommandSenderRunner, - T: 'static, + S: FnOnce(ModuleMap, &Runtime), { let runtime = Runtime::new().unwrap(); let (sys_cmd_tx, sys_cmd_rx) = unbounded_channel::(); @@ -55,8 +52,7 @@ impl FlowySystem { }); let system = Self { sys_cmd_tx }; - let sender_runner = sender_factory(Rc::new(module_map)); - runtime.spawn(sender_runner); + sender_factory(Rc::new(module_map), &runtime); FlowySystem::set_current(system); let runner = SystemRunner { rt: runtime, stop_rx }; diff --git a/rust-lib/flowy-sys/tests/api/helper.rs b/rust-lib/flowy-sys/tests/api/helper.rs index 3b9ea955ce..2b317877bc 100644 --- a/rust-lib/flowy-sys/tests/api/helper.rs +++ b/rust-lib/flowy-sys/tests/api/helper.rs @@ -1,4 +1,4 @@ -use flowy_sys::prelude::{CommandData, CommandSender, CommandSenderRunner, EventResponse, FlowySystem, Module}; +use flowy_sys::prelude::{EventResponse, FlowySystem, Module, Sender, SenderData, SenderRunner}; use std::{cell::RefCell, sync::Once}; #[allow(dead_code)] @@ -17,18 +17,18 @@ pub struct ExecutorAction { pub struct FlowySystemExecutor {} thread_local!( - static CMD_SENDER: RefCell>> = RefCell::new(None); + static SENDER: RefCell>> = RefCell::new(None); ); -pub fn sync_send(data: CommandData) -> EventResponse { - CMD_SENDER.with(|cell| match &*cell.borrow() { +pub fn sync_send(data: SenderData) -> EventResponse { + SENDER.with(|cell| match &*cell.borrow() { Some(stream) => stream.sync_send(data), None => panic!(""), }) } -pub fn async_send(data: CommandData) { - CMD_SENDER.with(|cell| match &*cell.borrow() { +pub fn async_send(data: SenderData) { + SENDER.with(|cell| match &*cell.borrow() { Some(stream) => { stream.async_send(data); }, @@ -42,15 +42,13 @@ where { FlowySystem::construct( || modules, - |module_map| { - let mut stream = CommandSender::::new(module_map.clone()); - let runner = CommandSenderRunner::new(module_map, stream.take_data_rx()); + |module_map, runtime| { + let mut sender = Sender::::new(module_map.clone()); + runtime.spawn(SenderRunner::new(module_map, sender.take_rx())); - CMD_SENDER.with(|cell| { - *cell.borrow_mut() = Some(stream); + SENDER.with(|cell| { + *cell.borrow_mut() = Some(sender); }); - - runner }, ) .spawn(async { f() }) diff --git a/rust-lib/flowy-sys/tests/api/module.rs b/rust-lib/flowy-sys/tests/api/module.rs index e7095b6809..1750c16f50 100644 --- a/rust-lib/flowy-sys/tests/api/module.rs +++ b/rust-lib/flowy-sys/tests/api/module.rs @@ -10,8 +10,9 @@ fn test_init() { let modules = vec![Module::new().event(event, hello)]; init_system(modules, move || { - let request = EventRequest::new(event); - let stream_data = CommandData::new(1, Some(request)).with_callback(Box::new(|_config, response| { + let payload = SenderPayload::new(event); + + let stream_data = SenderData::new(1, payload).callback(Box::new(|_config, response| { log::info!("async resp: {:?}", response); })); diff --git a/rust-lib/flowy-user/Cargo.toml b/rust-lib/flowy-user/Cargo.toml index 1303c3c075..dc6734b9a1 100644 --- a/rust-lib/flowy-user/Cargo.toml +++ b/rust-lib/flowy-user/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" [dependencies] derive_more = {version = "0.99", features = ["display"]} -flowy-sys = { path = "../flowy-sys" } +flowy-sys = { path = "../flowy-sys", features = ["use_serde"] } flowy-log = { path = "../flowy-log" } tracing = { version = "0.1", features = ["log"] } bytes = "0.5" diff --git a/rust-lib/flowy-user/src/handlers/auth.rs b/rust-lib/flowy-user/src/handlers/auth.rs index d1d4e4b301..fe7c5eb1dc 100644 --- a/rust-lib/flowy-user/src/handlers/auth.rs +++ b/rust-lib/flowy-user/src/handlers/auth.rs @@ -1,7 +1,20 @@ use crate::domain::{User, UserEmail, UserName}; use bytes::Bytes; +use flowy_sys::prelude::{In, Out}; use std::convert::TryInto; +// tracing instrument 👉🏻 https://docs.rs/tracing/0.1.26/tracing/attr.instrument.html +#[tracing::instrument( + name = "User check", + skip(data), + fields( + email = %data.email, + name = %data.name + ) +)] +pub async fn user_check(data: In) -> Out { panic!("") } + +#[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct UserData { name: String, email: String, @@ -16,6 +29,3 @@ impl TryInto for UserData { Ok(User::new(name, email)) } } - -#[tracing::instrument(name = "User check")] -pub async fn user_check(user_name: String) -> Bytes { unimplemented!() }