generice handle input params using In<> and Out<>, support protobuf with features=user_protobuf

This commit is contained in:
appflowy 2021-06-30 15:33:49 +08:00
parent 9371a3d31e
commit f73b3ded1d
22 changed files with 369 additions and 207 deletions

View File

@ -16,9 +16,9 @@ pub extern "C" fn init_sdk(path: *mut c_char) -> i64 {
#[no_mangle] #[no_mangle]
pub extern "C" fn async_command(port: i64, input: *const u8, len: usize) { pub extern "C" fn async_command(port: i64, input: *const u8, len: usize) {
let bytes = unsafe { std::slice::from_raw_parts(input, len) }.to_vec(); let bytes = unsafe { std::slice::from_raw_parts(input, len) }.to_vec();
let request = EventRequest::from_data(bytes); let payload = SenderPayload::from_data(bytes);
let stream_data = CommandData::new(port, Some(request)).with_callback(Box::new(|_config, response| { let stream_data = SenderData::new(port, payload).callback(Box::new(|_config, response| {
log::info!("async resp: {:?}", response); log::info!("async resp: {:?}", response);
})); }));

View File

@ -18,15 +18,13 @@ pub fn init_modules() -> Vec<Module> {
pub fn init_system<F>(modules: Vec<Module>) { pub fn init_system<F>(modules: Vec<Module>) {
FlowySystem::construct( FlowySystem::construct(
|| modules, || modules,
|module_map| { |module_map, runtime| {
let mut stream = CommandSender::<i64>::new(module_map.clone()); let mut sender = Sender::<i64>::new(module_map.clone());
let runner = CommandSenderRunner::new(module_map, stream.take_data_rx()); runtime.spawn(SenderRunner::new(module_map, sender.take_rx()));
CMD_SENDER.with(|cell| { SENDER.with(|cell| {
*cell.borrow_mut() = Some(stream); *cell.borrow_mut() = Some(sender);
}); });
runner
}, },
) )
.run() .run()
@ -34,18 +32,18 @@ pub fn init_system<F>(modules: Vec<Module>) {
} }
thread_local!( thread_local!(
static CMD_SENDER: RefCell<Option<CommandSender<i64>>> = RefCell::new(None); static SENDER: RefCell<Option<Sender<i64>>> = RefCell::new(None);
); );
pub fn sync_send(data: CommandData<i64>) -> EventResponse { pub fn sync_send(data: SenderData<i64>) -> EventResponse {
CMD_SENDER.with(|cell| match &*cell.borrow() { SENDER.with(|cell| match &*cell.borrow() {
Some(stream) => stream.sync_send(data), Some(stream) => stream.sync_send(data),
None => panic!(""), None => panic!(""),
}) })
} }
pub fn async_send(data: CommandData<i64>) { pub fn async_send(data: SenderData<i64>) {
CMD_SENDER.with(|cell| match &*cell.borrow() { SENDER.with(|cell| match &*cell.borrow() {
Some(stream) => { Some(stream) => {
stream.async_send(data); stream.async_send(data);
}, },

View File

@ -26,6 +26,7 @@ dyn-clone = "1.0"
bincode = { version = "1.3", optional = true} bincode = { version = "1.3", optional = true}
serde = { version = "1.0", features = ["derive"], optional = true } serde = { version = "1.0", features = ["derive"], optional = true }
serde_json = {version = "1.0", optional = true} serde_json = {version = "1.0", optional = true}
protobuf = {version = "2.24.1", optional = true}
[dev-dependencies] [dev-dependencies]
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
@ -33,3 +34,4 @@ futures-util = "0.3.15"
[features] [features]
use_serde = ["bincode", "serde", "serde_json"] use_serde = ["bincode", "serde", "serde_json"]
use_protobuf= ["protobuf"]

View File

@ -1 +0,0 @@
pub mod container;

View File

@ -1,6 +1,5 @@
#![feature(try_trait)] #![feature(try_trait)]
mod data;
mod error; mod error;
mod module; mod module;
mod request; mod request;
@ -9,11 +8,9 @@ mod rt;
mod service; mod service;
mod util; mod util;
#[cfg(feature = "dart_ffi")] mod sender;
pub mod dart_ffi;
mod stream;
mod system; mod system;
pub mod prelude { pub mod prelude {
pub use crate::{error::*, module::*, request::*, response::*, rt::*, stream::*}; pub use crate::{error::*, module::*, request::*, response::*, rt::*, sender::*};
} }

View File

@ -1,5 +1,7 @@
mod data; pub use container::*;
mod module;
pub use data::*; pub use data::*;
pub use module::*; pub use module::*;
mod container;
mod data;
mod module;

View File

@ -1,20 +1,3 @@
use crate::{
data::container::DataContainer,
error::SystemError,
module::ModuleData,
request::FromRequest,
response::Responder,
service::{BoxService, Handler, Service, ServiceFactory, ServiceRequest, ServiceResponse},
};
use crate::{
error::InternalError,
request::{payload::Payload, EventRequest},
response::EventResponse,
service::{factory, BoxServiceFactory, HandlerService},
};
use futures_core::{future::LocalBoxFuture, ready};
use pin_project::pin_project;
use std::{ use std::{
collections::HashMap, collections::HashMap,
fmt::{Debug, Display}, fmt::{Debug, Display},
@ -24,8 +7,29 @@ use std::{
rc::Rc, rc::Rc,
task::{Context, Poll}, task::{Context, Poll},
}; };
use futures_core::{future::LocalBoxFuture, ready};
use pin_project::pin_project;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use crate::{
error::{InternalError, SystemError},
module::{container::DataContainer, ModuleData},
request::{payload::Payload, EventRequest, FromRequest},
response::{EventResponse, Responder},
service::{
factory,
BoxService,
BoxServiceFactory,
Handler,
HandlerService,
Service,
ServiceFactory,
ServiceRequest,
ServiceResponse,
},
};
#[derive(PartialEq, Eq, Hash, Debug, Clone)] #[derive(PartialEq, Eq, Hash, Debug, Clone)]
pub struct Event(String); pub struct Event(String);
@ -39,13 +43,13 @@ pub struct Module {
name: String, name: String,
data: DataContainer, data: DataContainer,
service_map: Rc<HashMap<Event, EventServiceFactory>>, service_map: Rc<HashMap<Event, EventServiceFactory>>,
req_tx: UnboundedSender<EventRequest>, req_tx: UnboundedSender<ModuleRequest>,
req_rx: UnboundedReceiver<EventRequest>, req_rx: UnboundedReceiver<ModuleRequest>,
} }
impl Module { impl Module {
pub fn new() -> Self { pub fn new() -> Self {
let (req_tx, req_rx) = unbounded_channel::<EventRequest>(); let (req_tx, req_rx) = unbounded_channel::<ModuleRequest>();
Self { Self {
name: "".to_owned(), name: "".to_owned(),
data: DataContainer::new(), data: DataContainer::new(),
@ -84,22 +88,10 @@ impl Module {
self self
} }
pub fn req_tx(&self) -> UnboundedSender<EventRequest> { self.req_tx.clone() } pub fn forward_map(&self) -> HashMap<Event, UnboundedSender<ModuleRequest>> {
pub fn handle(&self, request: EventRequest) {
log::debug!("Module: {} receive request: {:?}", self.name, request);
match self.req_tx.send(request) {
Ok(_) => {},
Err(e) => {
log::error!("Module: {} with error: {:?}", self.name, e);
},
}
}
pub fn forward_map(&self) -> HashMap<Event, UnboundedSender<EventRequest>> {
self.service_map self.service_map
.keys() .keys()
.map(|key| (key.clone(), self.req_tx())) .map(|key| (key.clone(), self.req_tx.clone()))
.collect::<HashMap<_, _>>() .collect::<HashMap<_, _>>()
} }
@ -113,9 +105,9 @@ impl Future for Module {
match ready!(Pin::new(&mut self.req_rx).poll_recv(cx)) { match ready!(Pin::new(&mut self.req_rx).poll_recv(cx)) {
None => return Poll::Ready(()), None => return Poll::Ready(()),
Some(request) => { Some(request) => {
let mut service = self.new_service(request.get_id().to_string()); let mut service = self.new_service(());
if let Ok(service) = ready!(Pin::new(&mut service).poll(cx)) { if let Ok(service) = ready!(Pin::new(&mut service).poll(cx)) {
log::trace!("Spawn module service for request {}", request.get_id()); log::trace!("Spawn module service for request {}", request.id());
tokio::task::spawn_local(async move { tokio::task::spawn_local(async move {
let _ = service.call(request).await; let _ = service.call(request).await;
}); });
@ -126,15 +118,45 @@ impl Future for Module {
} }
} }
impl ServiceFactory<EventRequest> for Module { #[derive(Debug)]
pub struct ModuleRequest {
inner: EventRequest,
payload: Payload,
}
impl ModuleRequest {
pub fn new<E>(event: E) -> Self
where
E: Into<Event>,
{
Self {
inner: EventRequest::new(event),
payload: Payload::None,
}
}
pub fn payload(mut self, payload: Payload) -> Self {
self.payload = payload;
self
}
pub(crate) fn into_parts(self) -> (EventRequest, Payload) { (self.inner, self.payload) }
pub(crate) fn into_service_request(self) -> ServiceRequest { ServiceRequest::new(self.inner, self.payload) }
pub(crate) fn id(&self) -> &str { &self.inner.id }
pub(crate) fn event(&self) -> &Event { &self.inner.event }
}
impl ServiceFactory<ModuleRequest> for Module {
type Response = EventResponse; type Response = EventResponse;
type Error = SystemError; type Error = SystemError;
type Service = BoxService<EventRequest, Self::Response, Self::Error>; type Service = BoxService<ModuleRequest, Self::Response, Self::Error>;
type Config = String; type Context = ();
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::Error>>; type Future = LocalBoxFuture<'static, Result<Self::Service, Self::Error>>;
fn new_service(&self, cfg: Self::Config) -> Self::Future { fn new_service(&self, cfg: Self::Context) -> Self::Future {
log::trace!("Create module service for request {}", cfg);
let service_map = self.service_map.clone(); let service_map = self.service_map.clone();
Box::pin(async move { Box::pin(async move {
let service = ModuleService { service_map }; let service = ModuleService { service_map };
@ -148,18 +170,22 @@ pub struct ModuleService {
service_map: Rc<HashMap<Event, EventServiceFactory>>, service_map: Rc<HashMap<Event, EventServiceFactory>>,
} }
impl Service<EventRequest> for ModuleService { impl Service<ModuleRequest> for ModuleService {
type Response = EventResponse; type Response = EventResponse;
type Error = SystemError; type Error = SystemError;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>; type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn call(&self, request: EventRequest) -> Self::Future { fn call(&self, request: ModuleRequest) -> Self::Future {
log::trace!("Call module service for request {}", request.get_id()); log::trace!("Call module service for request {}", &request.id());
match self.service_map.get(request.get_event()) { match self.service_map.get(&request.event()) {
Some(factory) => { Some(factory) => {
let service_fut = factory.new_service(());
let fut = ModuleServiceFuture { let fut = ModuleServiceFuture {
request, fut: Box::pin(async {
fut: factory.new_service(()), let service = service_fut.await?;
let request = request.into_service_request();
service.call(request).await
}),
}; };
Box::pin(async move { Ok(fut.await.unwrap_or_else(|e| e.into())) }) Box::pin(async move { Ok(fut.await.unwrap_or_else(|e| e.into())) })
}, },
@ -168,13 +194,13 @@ impl Service<EventRequest> for ModuleService {
} }
} }
type BoxModuleService = BoxService<ServiceRequest, ServiceResponse, SystemError>; // type BoxModuleService = BoxService<ServiceRequest, ServiceResponse,
// SystemError>;
#[pin_project] #[pin_project]
pub struct ModuleServiceFuture { pub struct ModuleServiceFuture {
request: EventRequest,
#[pin] #[pin]
fut: LocalBoxFuture<'static, Result<BoxModuleService, SystemError>>, fut: LocalBoxFuture<'static, Result<ServiceResponse, SystemError>>,
} }
impl Future for ModuleServiceFuture { impl Future for ModuleServiceFuture {
@ -182,11 +208,8 @@ impl Future for ModuleServiceFuture {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop { loop {
let service = ready!(self.as_mut().project().fut.poll(cx))?; let (_, response) = ready!(self.as_mut().project().fut.poll(cx))?.into_parts();
let req = ServiceRequest::new(self.as_mut().request.clone(), Payload::None); return Poll::Ready(Ok(response));
log::debug!("Call service to handle request {:?}", self.request);
let (_, resp) = ready!(Pin::new(&mut service.call(req)).poll(cx))?.into_parts();
return Poll::Ready(Ok(resp));
} }
} }
} }

View File

@ -5,8 +5,9 @@ use futures::Stream;
pub enum PayloadError {} pub enum PayloadError {}
pub type PayloadStream = Pin<Box<dyn Stream<Item = Result<Bytes, PayloadError>>>>; // TODO: support stream data
pub enum Payload<S = PayloadStream> { #[derive(Clone, Debug)]
pub enum Payload {
None, None,
Stream(S), Bytes(Vec<u8>),
} }

View File

@ -3,45 +3,36 @@ use std::future::Future;
use crate::{ use crate::{
error::{InternalError, SystemError}, error::{InternalError, SystemError},
module::Event, module::Event,
request::payload::Payload, request::{payload::Payload, PayloadError},
response::Responder,
util::ready::{ready, Ready}, util::ready::{ready, Ready},
}; };
use futures_core::ready; use bytes::Bytes;
use futures_core::{ready, Stream};
use std::{ use std::{
fmt::{Debug, Display}, fmt::{Debug, Display},
hash::Hash, hash::Hash,
ops,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct EventRequest { pub struct EventRequest {
id: String, pub(crate) id: String,
event: Event, pub(crate) event: Event,
data: Option<Vec<u8>>,
} }
impl EventRequest { impl EventRequest {
pub fn new<E>(event: E) -> EventRequest pub fn new<E>(event: E) -> EventRequest
where where
E: Eq + Hash + Debug + Clone + Display, E: Into<Event>,
{ {
Self { Self {
id: uuid::Uuid::new_v4().to_string(), id: uuid::Uuid::new_v4().to_string(),
event: event.into(), event: event.into(),
data: None,
} }
} }
pub fn data(mut self, data: Vec<u8>) -> Self {
self.data = Some(data);
self
}
pub fn get_event(&self) -> &Event { &self.event }
pub fn get_id(&self) -> &str { &self.id }
pub fn from_data(_data: Vec<u8>) -> Self { unimplemented!() }
} }
pub trait FromRequest: Sized { pub trait FromRequest: Sized {
@ -64,14 +55,16 @@ impl FromRequest for String {
type Error = SystemError; type Error = SystemError;
type Future = Ready<Result<Self, Self::Error>>; type Future = Ready<Result<Self, Self::Error>>;
fn from_request(req: &EventRequest, _payload: &mut Payload) -> Self::Future { fn from_request(req: &EventRequest, payload: &mut Payload) -> Self::Future {
match &req.data { match &payload {
None => ready(Err(InternalError::new("Expected string but request had data").into())), Payload::None => ready(Err(unexpected_none_payload())),
Some(buf) => ready(Ok(String::from_utf8_lossy(buf).into_owned())), Payload::Bytes(buf) => ready(Ok(String::from_utf8_lossy(buf).into_owned())),
} }
} }
} }
fn unexpected_none_payload() -> SystemError { InternalError::new("Expected string but request had data").into() }
#[doc(hidden)] #[doc(hidden)]
impl<T> FromRequest for Result<T, T::Error> impl<T> FromRequest for Result<T, T::Error>
where where
@ -105,3 +98,59 @@ where
Poll::Ready(Ok(res)) Poll::Ready(Ok(res))
} }
} }
pub struct In<T>(pub T);
impl<T> In<T> {
pub fn into_inner(self) -> T { self.0 }
}
impl<T> ops::Deref for In<T> {
type Target = T;
fn deref(&self) -> &T { &self.0 }
}
impl<T> ops::DerefMut for In<T> {
fn deref_mut(&mut self) -> &mut T { &mut self.0 }
}
#[cfg(feature = "use_serde")]
impl<T> FromRequest for In<T>
where
T: serde::de::DeserializeOwned + 'static,
{
type Error = SystemError;
type Future = Ready<Result<Self, SystemError>>;
#[inline]
fn from_request(req: &EventRequest, payload: &mut Payload) -> Self::Future {
match payload {
Payload::None => ready(Err(unexpected_none_payload())),
Payload::Bytes(bytes) => {
let data: T = bincode::deserialize(bytes).unwrap();
ready(Ok(In(data)))
},
}
}
}
#[cfg(feature = "use_protobuf")]
impl<T> FromRequest for In<T>
where
T: ::protobuf::Message + 'static,
{
type Error = SystemError;
type Future = Ready<Result<Self, SystemError>>;
#[inline]
fn from_request(req: &EventRequest, payload: &mut Payload) -> Self::Future {
match payload {
Payload::None => ready(Err(unexpected_none_payload())),
Payload::Bytes(bytes) => {
let data: T = ::protobuf::Message::parse_from_bytes(bytes).unwrap();
ready(Ok(In(data)))
},
}
}
}

View File

@ -4,6 +4,7 @@ use crate::{
response::{EventResponse, EventResponseBuilder}, response::{EventResponse, EventResponseBuilder},
}; };
use bytes::Bytes; use bytes::Bytes;
use std::ops;
pub trait Responder { pub trait Responder {
fn respond_to(self, req: &EventRequest) -> EventResponse; fn respond_to(self, req: &EventRequest) -> EventResponse;
@ -35,13 +36,40 @@ where
} }
} }
// #[cfg(feature = "use_serde")] pub struct Out<T>(pub T);
// impl<T> Responder for T
// where impl<T> Out<T> {
// T: serde::Serialize, pub fn into_inner(self) -> T { self.0 }
// { }
// fn respond_to(self, request: &EventRequest) -> EventResponse {
// let bytes = bincode::serialize(&self).unwrap(); impl<T> ops::Deref for Out<T> {
// EventResponseBuilder::Ok().data(bytes).build() type Target = T;
// }
// } fn deref(&self) -> &T { &self.0 }
}
impl<T> ops::DerefMut for Out<T> {
fn deref_mut(&mut self) -> &mut T { &mut self.0 }
}
#[cfg(feature = "use_serde")]
impl<T> Responder for Out<T>
where
T: serde::Serialize,
{
fn respond_to(self, request: &EventRequest) -> EventResponse {
let bytes: Vec<u8> = bincode::serialize(&self.0).unwrap();
EventResponseBuilder::Ok().data(bytes).build()
}
}
#[cfg(feature = "use_protobuf")]
impl<T> Responder for Out<T>
where
T: ::protobuf::Message,
{
fn respond_to(self, _request: &EventRequest) -> EventResponse {
let bytes: Vec<u8> = self.write_to_bytes().unwrap();
EventResponseBuilder::Ok().data(bytes).build()
}
}

View File

@ -0,0 +1,71 @@
use crate::{
module::{Event, ModuleRequest},
request::{EventRequest, Payload},
response::EventResponse,
};
use std::{
fmt::{Debug, Display},
hash::Hash,
};
#[derive(Debug)]
pub struct SenderPayload {
pub(crate) payload: Payload,
pub(crate) event: Event,
}
impl SenderPayload {
pub fn new<E>(event: E) -> SenderPayload
where
E: Eq + Hash + Debug + Clone + Display,
{
Self {
event: event.into(),
payload: Payload::None,
}
}
pub fn payload(mut self, payload: Payload) -> Self {
self.payload = payload;
self
}
pub fn from_bytes(bytes: Vec<u8>) -> Self { unimplemented!() }
}
impl std::convert::Into<ModuleRequest> for SenderPayload {
fn into(self) -> ModuleRequest { ModuleRequest::new(self.event).payload(self.payload) }
}
impl std::default::Default for SenderPayload {
fn default() -> Self { SenderPayload::new("").payload(Payload::None) }
}
impl std::convert::Into<EventRequest> for SenderPayload {
fn into(self) -> EventRequest { unimplemented!() }
}
pub type BoxStreamCallback<T> = Box<dyn FnOnce(T, EventResponse) + 'static + Send + Sync>;
pub struct SenderData<T>
where
T: 'static,
{
pub config: T,
pub payload: SenderPayload,
pub callback: Option<BoxStreamCallback<T>>,
}
impl<T> SenderData<T> {
pub fn new(config: T, payload: SenderPayload) -> Self {
Self {
config,
payload,
callback: None,
}
}
pub fn callback(mut self, callback: BoxStreamCallback<T>) -> Self {
self.callback = Some(callback);
self
}
}

View File

@ -0,0 +1,5 @@
mod data;
mod sender;
pub use data::*;
pub use sender::*;

View File

@ -1,7 +1,9 @@
use crate::{ use crate::{
error::{InternalError, SystemError}, error::{InternalError, SystemError},
request::EventRequest, module::{Event, ModuleRequest},
request::{EventRequest, Payload},
response::EventResponse, response::EventResponse,
sender::{SenderData, SenderPayload},
service::{BoxService, Service, ServiceFactory}, service::{BoxService, Service, ServiceFactory},
system::ModuleMap, system::ModuleMap,
}; };
@ -15,30 +17,30 @@ use tokio::{
macro_rules! service_factor_impl { macro_rules! service_factor_impl {
($name:ident) => { ($name:ident) => {
#[allow(non_snake_case, missing_docs)] #[allow(non_snake_case, missing_docs)]
impl<T> ServiceFactory<CommandData<T>> for $name<T> impl<T> ServiceFactory<SenderData<T>> for $name<T>
where where
T: 'static, T: 'static,
{ {
type Response = EventResponse; type Response = EventResponse;
type Error = SystemError; type Error = SystemError;
type Service = BoxService<CommandData<T>, Self::Response, Self::Error>; type Service = BoxService<SenderData<T>, Self::Response, Self::Error>;
type Config = (); type Context = ();
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::Error>>; type Future = LocalBoxFuture<'static, Result<Self::Service, Self::Error>>;
fn new_service(&self, _cfg: Self::Config) -> Self::Future { fn new_service(&self, _cfg: Self::Context) -> Self::Future {
let module_map = self.module_map.clone(); let module_map = self.module_map.clone();
let service = Box::new(CommandSenderService { module_map }); let service = Box::new(SenderService { module_map });
Box::pin(async move { Ok(service as Self::Service) }) Box::pin(async move { Ok(service as Self::Service) })
} }
} }
}; };
} }
struct CommandSenderService { struct SenderService {
module_map: ModuleMap, module_map: ModuleMap,
} }
impl<T> Service<CommandData<T>> for CommandSenderService impl<T> Service<SenderData<T>> for SenderService
where where
T: 'static, T: 'static,
{ {
@ -46,15 +48,22 @@ where
type Error = SystemError; type Error = SystemError;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>; type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn call(&self, mut data: CommandData<T>) -> Self::Future { fn call(&self, data: SenderData<T>) -> Self::Future {
let module_map = self.module_map.clone(); let module_map = self.module_map.clone();
let request = data.request.take().unwrap(); let SenderData {
config,
payload,
callback,
} = data;
let event = payload.event.clone();
let request = payload.into();
let fut = async move { let fut = async move {
let result = { let result = {
match module_map.get(request.get_event()) { match module_map.get(&event) {
Some(module) => { Some(module) => {
let config = request.get_id().to_owned(); let fut = module.new_service(());
let fut = module.new_service(config);
let service_fut = fut.await?.call(request); let service_fut = fut.await?.call(request);
service_fut.await service_fut.await
}, },
@ -66,8 +75,8 @@ where
}; };
let response = result.unwrap_or_else(|e| e.into()); let response = result.unwrap_or_else(|e| e.into());
if let Some(callback) = data.callback { if let Some(callback) = callback {
callback(data.config, response.clone()); callback(config, response.clone());
} }
Ok(response) Ok(response)
@ -76,48 +85,23 @@ where
} }
} }
pub type BoxStreamCallback<T> = Box<dyn FnOnce(T, EventResponse) + 'static + Send + Sync>; pub struct Sender<T>
pub struct CommandData<T>
where
T: 'static,
{
config: T,
request: Option<EventRequest>,
callback: Option<BoxStreamCallback<T>>,
}
impl<T> CommandData<T> {
pub fn new(config: T, request: Option<EventRequest>) -> Self {
Self {
config,
request,
callback: None,
}
}
pub fn with_callback(mut self, callback: BoxStreamCallback<T>) -> Self {
self.callback = Some(callback);
self
}
}
pub struct CommandSender<T>
where where
T: 'static, T: 'static,
{ {
module_map: ModuleMap, module_map: ModuleMap,
data_tx: UnboundedSender<CommandData<T>>, data_tx: UnboundedSender<SenderData<T>>,
data_rx: Option<UnboundedReceiver<CommandData<T>>>, data_rx: Option<UnboundedReceiver<SenderData<T>>>,
} }
service_factor_impl!(CommandSender); service_factor_impl!(Sender);
impl<T> CommandSender<T> impl<T> Sender<T>
where where
T: 'static, T: 'static,
{ {
pub fn new(module_map: ModuleMap) -> Self { pub fn new(module_map: ModuleMap) -> Self {
let (data_tx, data_rx) = unbounded_channel::<CommandData<T>>(); let (data_tx, data_rx) = unbounded_channel::<SenderData<T>>();
Self { Self {
module_map, module_map,
data_tx, data_tx,
@ -125,9 +109,9 @@ where
} }
} }
pub fn async_send(&self, data: CommandData<T>) { let _ = self.data_tx.send(data); } pub fn async_send(&self, data: SenderData<T>) { let _ = self.data_tx.send(data); }
pub fn sync_send(&self, data: CommandData<T>) -> EventResponse { pub fn sync_send(&self, data: SenderData<T>) -> EventResponse {
let factory = self.new_service(()); let factory = self.new_service(());
futures::executor::block_on(async { futures::executor::block_on(async {
@ -136,31 +120,29 @@ where
}) })
} }
pub fn tx(&self) -> UnboundedSender<CommandData<T>> { self.data_tx.clone() } pub fn take_rx(&mut self) -> UnboundedReceiver<SenderData<T>> { self.data_rx.take().unwrap() }
pub fn take_data_rx(&mut self) -> UnboundedReceiver<CommandData<T>> { self.data_rx.take().unwrap() }
} }
pub struct CommandSenderRunner<T> pub struct SenderRunner<T>
where where
T: 'static, T: 'static,
{ {
module_map: ModuleMap, module_map: ModuleMap,
data_rx: UnboundedReceiver<CommandData<T>>, data_rx: UnboundedReceiver<SenderData<T>>,
} }
service_factor_impl!(CommandSenderRunner); service_factor_impl!(SenderRunner);
impl<T> CommandSenderRunner<T> impl<T> SenderRunner<T>
where where
T: 'static, T: 'static,
{ {
pub fn new(module_map: ModuleMap, data_rx: UnboundedReceiver<CommandData<T>>) -> Self { pub fn new(module_map: ModuleMap, data_rx: UnboundedReceiver<SenderData<T>>) -> Self {
Self { module_map, data_rx } Self { module_map, data_rx }
} }
} }
impl<T> Future for CommandSenderRunner<T> impl<T> Future for SenderRunner<T>
where where
T: 'static, T: 'static,
{ {

View File

@ -1,7 +1,7 @@
use crate::service::{Service, ServiceFactory}; use crate::service::{Service, ServiceFactory};
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
pub fn factory<SF, Req>(factory: SF) -> BoxServiceFactory<SF::Config, Req, SF::Response, SF::Error> pub fn factory<SF, Req>(factory: SF) -> BoxServiceFactory<SF::Context, Req, SF::Response, SF::Error>
where where
SF: ServiceFactory<Req> + 'static, SF: ServiceFactory<Req> + 'static,
Req: 'static, Req: 'static,
@ -16,7 +16,7 @@ where
type Inner<Cfg, Req, Res, Err> = Box< type Inner<Cfg, Req, Res, Err> = Box<
dyn ServiceFactory< dyn ServiceFactory<
Req, Req,
Config = Cfg, Context = Cfg,
Response = Res, Response = Res,
Error = Err, Error = Err,
Service = BoxService<Req, Res, Err>, Service = BoxService<Req, Res, Err>,
@ -34,7 +34,7 @@ where
type Response = Res; type Response = Res;
type Error = Err; type Error = Err;
type Service = BoxService<Req, Res, Err>; type Service = BoxService<Req, Res, Err>;
type Config = Cfg; type Context = Cfg;
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::Error>>; type Future = LocalBoxFuture<'static, Result<Self::Service, Self::Error>>;
fn new_service(&self, cfg: Cfg) -> Self::Future { self.0.new_service(cfg) } fn new_service(&self, cfg: Cfg) -> Self::Future { self.0.new_service(cfg) }
@ -91,7 +91,7 @@ where
Req: 'static, Req: 'static,
Res: 'static, Res: 'static,
Err: 'static, Err: 'static,
SF: ServiceFactory<Req, Config = Cfg, Response = Res, Error = Err>, SF: ServiceFactory<Req, Context = Cfg, Response = Res, Error = Err>,
SF::Future: 'static, SF::Future: 'static,
SF::Service: 'static, SF::Service: 'static,
<SF::Service as Service<Req>>::Future: 'static, <SF::Service as Service<Req>>::Future: 'static,
@ -99,7 +99,7 @@ where
type Response = Res; type Response = Res;
type Error = Err; type Error = Err;
type Service = BoxService<Req, Res, Err>; type Service = BoxService<Req, Res, Err>;
type Config = Cfg; type Context = Cfg;
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::Error>>; type Future = LocalBoxFuture<'static, Result<Self::Service, Self::Error>>;
fn new_service(&self, cfg: Cfg) -> Self::Future { fn new_service(&self, cfg: Cfg) -> Self::Future {

View File

@ -75,7 +75,7 @@ where
type Response = ServiceResponse; type Response = ServiceResponse;
type Error = SystemError; type Error = SystemError;
type Service = Self; type Service = Self;
type Config = (); type Context = ();
type Future = Ready<Result<Self::Service, Self::Error>>; type Future = Ready<Result<Self::Service, Self::Error>>;
fn new_service(&self, _: ()) -> Self::Future { ready(Ok(self.clone())) } fn new_service(&self, _: ()) -> Self::Future { ready(Ok(self.clone())) }
@ -139,10 +139,10 @@ where
}; };
}, },
HandlerServiceProj::Handle(fut, req) => { HandlerServiceProj::Handle(fut, req) => {
let res = ready!(fut.poll(cx)); let result = ready!(fut.poll(cx));
let req = req.take().unwrap(); let req = req.take().unwrap();
let res = res.respond_to(&req); let resp = result.respond_to(&req);
return Poll::Ready(Ok(ServiceResponse::new(req, res))); return Poll::Ready(Ok(ServiceResponse::new(req, resp)));
}, },
} }
} }

View File

@ -17,10 +17,10 @@ pub trait ServiceFactory<Request> {
type Response; type Response;
type Error; type Error;
type Service: Service<Request, Response = Self::Response, Error = Self::Error>; type Service: Service<Request, Response = Self::Response, Error = Self::Error>;
type Config; type Context;
type Future: Future<Output = Result<Self::Service, Self::Error>>; type Future: Future<Output = Result<Self::Service, Self::Error>>;
fn new_service(&self, cfg: Self::Config) -> Self::Future; fn new_service(&self, cfg: Self::Context) -> Self::Future;
} }
pub struct ServiceRequest { pub struct ServiceRequest {

View File

@ -1,7 +1,6 @@
use crate::{ use crate::{
module::{Event, Module}, module::{Event, Module},
rt::Runtime, rt::Runtime,
stream::CommandSenderRunner,
}; };
use futures_core::{ready, task::Context}; use futures_core::{ready, task::Context};
use std::{cell::RefCell, collections::HashMap, fmt::Debug, future::Future, io, rc::Rc, sync::Arc}; use std::{cell::RefCell, collections::HashMap, fmt::Debug, future::Future, io, rc::Rc, sync::Arc};
@ -28,12 +27,10 @@ pub struct FlowySystem {
} }
impl FlowySystem { impl FlowySystem {
pub fn construct<F, S, T>(module_factory: F, sender_factory: S) -> SystemRunner pub fn construct<F, S>(module_factory: F, sender_factory: S) -> SystemRunner
where where
// E: Into<Event<E>> + Eq + Hash + Debug + Clone + 'static,
F: FnOnce() -> Vec<Module>, F: FnOnce() -> Vec<Module>,
S: FnOnce(ModuleMap) -> CommandSenderRunner<T>, S: FnOnce(ModuleMap, &Runtime),
T: 'static,
{ {
let runtime = Runtime::new().unwrap(); let runtime = Runtime::new().unwrap();
let (sys_cmd_tx, sys_cmd_rx) = unbounded_channel::<SystemCommand>(); let (sys_cmd_tx, sys_cmd_rx) = unbounded_channel::<SystemCommand>();
@ -55,8 +52,7 @@ impl FlowySystem {
}); });
let system = Self { sys_cmd_tx }; let system = Self { sys_cmd_tx };
let sender_runner = sender_factory(Rc::new(module_map)); sender_factory(Rc::new(module_map), &runtime);
runtime.spawn(sender_runner);
FlowySystem::set_current(system); FlowySystem::set_current(system);
let runner = SystemRunner { rt: runtime, stop_rx }; let runner = SystemRunner { rt: runtime, stop_rx };

View File

@ -1,4 +1,4 @@
use flowy_sys::prelude::{CommandData, CommandSender, CommandSenderRunner, EventResponse, FlowySystem, Module}; use flowy_sys::prelude::{EventResponse, FlowySystem, Module, Sender, SenderData, SenderRunner};
use std::{cell::RefCell, sync::Once}; use std::{cell::RefCell, sync::Once};
#[allow(dead_code)] #[allow(dead_code)]
@ -17,18 +17,18 @@ pub struct ExecutorAction {
pub struct FlowySystemExecutor {} pub struct FlowySystemExecutor {}
thread_local!( thread_local!(
static CMD_SENDER: RefCell<Option<CommandSender<i64>>> = RefCell::new(None); static SENDER: RefCell<Option<Sender<i64>>> = RefCell::new(None);
); );
pub fn sync_send(data: CommandData<i64>) -> EventResponse { pub fn sync_send(data: SenderData<i64>) -> EventResponse {
CMD_SENDER.with(|cell| match &*cell.borrow() { SENDER.with(|cell| match &*cell.borrow() {
Some(stream) => stream.sync_send(data), Some(stream) => stream.sync_send(data),
None => panic!(""), None => panic!(""),
}) })
} }
pub fn async_send(data: CommandData<i64>) { pub fn async_send(data: SenderData<i64>) {
CMD_SENDER.with(|cell| match &*cell.borrow() { SENDER.with(|cell| match &*cell.borrow() {
Some(stream) => { Some(stream) => {
stream.async_send(data); stream.async_send(data);
}, },
@ -42,15 +42,13 @@ where
{ {
FlowySystem::construct( FlowySystem::construct(
|| modules, || modules,
|module_map| { |module_map, runtime| {
let mut stream = CommandSender::<i64>::new(module_map.clone()); let mut sender = Sender::<i64>::new(module_map.clone());
let runner = CommandSenderRunner::new(module_map, stream.take_data_rx()); runtime.spawn(SenderRunner::new(module_map, sender.take_rx()));
CMD_SENDER.with(|cell| { SENDER.with(|cell| {
*cell.borrow_mut() = Some(stream); *cell.borrow_mut() = Some(sender);
}); });
runner
}, },
) )
.spawn(async { f() }) .spawn(async { f() })

View File

@ -10,8 +10,9 @@ fn test_init() {
let modules = vec![Module::new().event(event, hello)]; let modules = vec![Module::new().event(event, hello)];
init_system(modules, move || { init_system(modules, move || {
let request = EventRequest::new(event); let payload = SenderPayload::new(event);
let stream_data = CommandData::new(1, Some(request)).with_callback(Box::new(|_config, response| {
let stream_data = SenderData::new(1, payload).callback(Box::new(|_config, response| {
log::info!("async resp: {:?}", response); log::info!("async resp: {:?}", response);
})); }));

View File

@ -7,7 +7,7 @@ edition = "2018"
[dependencies] [dependencies]
derive_more = {version = "0.99", features = ["display"]} derive_more = {version = "0.99", features = ["display"]}
flowy-sys = { path = "../flowy-sys" } flowy-sys = { path = "../flowy-sys", features = ["use_serde"] }
flowy-log = { path = "../flowy-log" } flowy-log = { path = "../flowy-log" }
tracing = { version = "0.1", features = ["log"] } tracing = { version = "0.1", features = ["log"] }
bytes = "0.5" bytes = "0.5"

View File

@ -1,7 +1,20 @@
use crate::domain::{User, UserEmail, UserName}; use crate::domain::{User, UserEmail, UserName};
use bytes::Bytes; use bytes::Bytes;
use flowy_sys::prelude::{In, Out};
use std::convert::TryInto; use std::convert::TryInto;
// tracing instrument 👉🏻 https://docs.rs/tracing/0.1.26/tracing/attr.instrument.html
#[tracing::instrument(
name = "User check",
skip(data),
fields(
email = %data.email,
name = %data.name
)
)]
pub async fn user_check(data: In<UserData>) -> Out<UserData> { panic!("") }
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct UserData { pub struct UserData {
name: String, name: String,
email: String, email: String,
@ -16,6 +29,3 @@ impl TryInto<User> for UserData {
Ok(User::new(name, email)) Ok(User::new(name, email))
} }
} }
#[tracing::instrument(name = "User check")]
pub async fn user_check(user_name: String) -> Bytes { unimplemented!() }