[refactor]:

1. replace unbounded sender with directory call using static runtime
2. sync + send for handler
This commit is contained in:
appflowy 2021-07-02 20:45:51 +08:00
parent 3328e29241
commit 7e1cf1222f
21 changed files with 449 additions and 481 deletions

View File

@ -12,7 +12,7 @@ paste = "1"
futures-channel = "0.3.15"
futures = "0.3.15"
futures-util = "0.3.15"
bytes = "0.5"
bytes = "1.0"
tokio = { version = "1", features = ["full"] }
uuid = { version = "0.8", features = ["serde", "v4"] }
log = "0.4.14"

View File

@ -0,0 +1,174 @@
use crate::{
error::{Error, InternalError, SystemError},
module::{as_module_map, Event, Module, ModuleMap, ModuleRequest},
request::Payload,
response::EventResponse,
service::{Service, ServiceFactory},
util::tokio_default_runtime,
};
use derivative::*;
use futures_core::future::BoxFuture;
use lazy_static::lazy_static;
use std::{
fmt::{Debug, Display},
hash::Hash,
sync::RwLock,
};
lazy_static! {
pub static ref EVENT_DISPATCH: RwLock<Option<EventDispatch>> = RwLock::new(None);
}
pub struct EventDispatch {
module_map: ModuleMap,
runtime: tokio::runtime::Runtime,
}
impl EventDispatch {
pub fn construct<F>(module_factory: F)
where
F: FnOnce() -> Vec<Module>,
{
let modules = module_factory();
let module_map = as_module_map(modules);
let runtime = tokio_default_runtime().unwrap();
let dispatch = EventDispatch {
module_map,
runtime,
};
*(EVENT_DISPATCH.write().unwrap()) = Some(dispatch);
}
pub async fn async_send<T>(request: DispatchRequest<T>) -> Result<EventResponse, SystemError>
where
T: 'static + Debug + Send + Sync,
{
match EVENT_DISPATCH.read() {
Ok(dispatch) => {
let dispatch = dispatch.as_ref().unwrap();
let module_map = dispatch.module_map.clone();
let service = Box::new(DispatchService { module_map });
dispatch
.runtime
.spawn(async move { service.call(request).await })
.await
.unwrap_or_else(|e| {
let msg = format!("{:?}", e);
Ok(InternalError::new(msg).as_response())
})
},
Err(e) => {
let msg = format!("{:?}", e);
Err(InternalError::new(msg).into())
},
}
}
pub fn sync_send<T>(request: DispatchRequest<T>) -> Result<EventResponse, SystemError>
where
T: 'static + Debug + Send + Sync,
{
futures::executor::block_on(async { EventDispatch::async_send(request).await })
}
}
pub type BoxStreamCallback<T> = Box<dyn FnOnce(T, EventResponse) + 'static + Send + Sync>;
#[derive(Derivative)]
#[derivative(Debug)]
pub struct DispatchRequest<T>
where
T: 'static + Debug,
{
pub config: T,
pub event: Event,
pub payload: Option<Payload>,
#[derivative(Debug = "ignore")]
pub callback: Option<BoxStreamCallback<T>>,
}
impl<T> DispatchRequest<T>
where
T: 'static + Debug,
{
pub fn new<E>(config: T, event: E) -> Self
where
E: Eq + Hash + Debug + Clone + Display,
{
Self {
config,
payload: None,
event: event.into(),
callback: None,
}
}
pub fn payload(mut self, payload: Payload) -> Self {
self.payload = Some(payload);
self
}
pub fn callback<F>(mut self, callback: F) -> Self
where
F: FnOnce(T, EventResponse) + 'static + Send + Sync,
{
self.callback = Some(Box::new(callback));
self
}
}
pub(crate) struct DispatchService {
pub(crate) module_map: ModuleMap,
}
impl<T> Service<DispatchRequest<T>> for DispatchService
where
T: 'static + Debug + Send + Sync,
{
type Response = EventResponse;
type Error = SystemError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn call(&self, dispatch_request: DispatchRequest<T>) -> Self::Future {
let module_map = self.module_map.clone();
let DispatchRequest {
config,
event,
payload,
callback,
} = dispatch_request;
let mut request = ModuleRequest::new(event.clone());
if let Some(payload) = payload {
request = request.payload(payload);
};
Box::pin(async move {
let result = {
match module_map.get(&event) {
Some(module) => {
let fut = module.new_service(());
let service_fut = fut.await?.call(request);
service_fut.await
},
None => {
let msg = format!(
"Can not find the module to handle the request:{:?}",
request
);
Err(InternalError::new(msg).into())
},
}
};
let response = result.unwrap_or_else(|e| e.into());
if let Some(callback) = callback {
callback(config, response.clone());
}
Ok(response)
})
}
}

View File

@ -7,7 +7,7 @@ use serde::{Serialize, Serializer};
use std::{fmt, option::NoneError};
use tokio::sync::mpsc::error::SendError;
pub trait Error: fmt::Debug + fmt::Display + DynClone {
pub trait Error: fmt::Debug + fmt::Display + DynClone + Send + Sync {
fn status_code(&self) -> StatusCode;
fn as_response(&self) -> EventResponse { EventResponse::new(self.status_code()) }
@ -83,21 +83,21 @@ impl<T: Clone> InternalError<T> {
impl<T> fmt::Debug for InternalError<T>
where
T: fmt::Debug + 'static + Clone,
T: fmt::Debug + 'static + Clone + Send + Sync,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Debug::fmt(&self.inner, f) }
}
impl<T> fmt::Display for InternalError<T>
where
T: fmt::Debug + fmt::Display + 'static + Clone,
T: fmt::Debug + fmt::Display + 'static + Clone + Send + Sync,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Display::fmt(&self.inner, f) }
}
impl<T> Error for InternalError<T>
where
T: fmt::Debug + fmt::Display + 'static + Clone,
T: fmt::Debug + fmt::Display + 'static + Clone + Send + Sync,
{
fn status_code(&self) -> StatusCode { StatusCode::Err }

View File

@ -8,9 +8,9 @@ mod rt;
mod service;
mod util;
mod sender;
mod dispatch;
mod system;
pub mod prelude {
pub use crate::{error::*, module::*, request::*, response::*, rt::*, sender::*};
pub use crate::{dispatch::*, error::*, module::*, request::*, response::*, rt::*};
}

View File

@ -5,7 +5,7 @@ use std::{
#[derive(Default)]
pub struct DataContainer {
map: HashMap<TypeId, Box<dyn Any>>,
map: HashMap<TypeId, Box<dyn Any + Sync + Send>>,
}
impl DataContainer {
@ -16,27 +16,112 @@ impl DataContainer {
}
}
pub fn insert<T: 'static>(&mut self, val: T) -> Option<T> {
pub fn insert<T>(&mut self, val: T) -> Option<T>
where
T: 'static + Send + Sync,
{
self.map
.insert(TypeId::of::<T>(), Box::new(val))
.and_then(downcast_owned)
}
pub fn remove<T: 'static>(&mut self) -> Option<T> { self.map.remove(&TypeId::of::<T>()).and_then(downcast_owned) }
pub fn get<T: 'static>(&self) -> Option<&T> {
self.map.get(&TypeId::of::<T>()).and_then(|boxed| boxed.downcast_ref())
pub fn remove<T>(&mut self) -> Option<T>
where
T: 'static + Send + Sync,
{
self.map.remove(&TypeId::of::<T>()).and_then(downcast_owned)
}
pub fn get_mut<T: 'static>(&mut self) -> Option<&mut T> {
pub fn get<T>(&self) -> Option<&T>
where
T: 'static + Send + Sync,
{
self.map
.get(&TypeId::of::<T>())
.and_then(|boxed| boxed.downcast_ref())
}
pub fn get_mut<T>(&mut self) -> Option<&mut T>
where
T: 'static + Send + Sync,
{
self.map
.get_mut(&TypeId::of::<T>())
.and_then(|boxed| boxed.downcast_mut())
}
pub fn contains<T: 'static>(&self) -> bool { self.map.contains_key(&TypeId::of::<T>()) }
pub fn contains<T>(&self) -> bool
where
T: 'static + Send + Sync,
{
self.map.contains_key(&TypeId::of::<T>())
}
pub fn extend(&mut self, other: DataContainer) { self.map.extend(other.map); }
}
fn downcast_owned<T: 'static>(boxed: Box<dyn Any>) -> Option<T> { boxed.downcast().ok().map(|boxed| *boxed) }
fn downcast_owned<T: 'static + Send + Sync>(boxed: Box<dyn Any + Send + Sync>) -> Option<T> {
boxed.downcast().ok().map(|boxed| *boxed)
}
// use std::{
// any::{Any, TypeId},
// collections::HashMap,
// sync::RwLock,
// };
//
// #[derive(Default)]
// pub struct DataContainer {
// map: RwLock<HashMap<TypeId, Box<dyn Any>>>,
// }
//
// impl DataContainer {
// #[inline]
// pub fn new() -> DataContainer {
// DataContainer {
// map: RwLock::new(HashMap::default()),
// }
// }
//
// pub fn insert<T: 'static>(&mut self, val: T) -> Option<T> {
// self.map
// .write()
// .unwrap()
// .insert(TypeId::of::<T>(), Box::new(val))
// .and_then(downcast_owned)
// }
//
// pub fn remove<T: 'static>(&mut self) -> Option<T> {
// self.map
// .write()
// .unwrap()
// .remove(&TypeId::of::<T>())
// .and_then(downcast_owned)
// }
//
// pub fn get<T: 'static>(&self) -> Option<&T> {
// self.map
// .read()
// .unwrap()
// .get(&TypeId::of::<T>())
// .and_then(|boxed| boxed.downcast_ref())
// }
//
// pub fn get_mut<T: 'static>(&mut self) -> Option<&mut T> {
// self.map
// .write()
// .unwrap()
// .get_mut(&TypeId::of::<T>())
// .and_then(|boxed| boxed.downcast_mut())
// }
//
// pub fn contains<T: 'static>(&self) -> bool {
// self.map.read().unwrap().contains_key(&TypeId::of::<T>())
// }
//
// pub fn extend(&mut self, other: DataContainer) {
// self.map.write().unwrap().extend(other.map); } }
//
// fn downcast_owned<T: 'static>(boxed: Box<dyn Any>) -> Option<T> {
// boxed.downcast().ok().map(|boxed| *boxed)
// }

View File

@ -5,29 +5,44 @@ use crate::{
};
use std::{ops::Deref, sync::Arc};
pub struct ModuleData<T: ?Sized>(Arc<T>);
pub struct ModuleData<T: ?Sized + Send + Sync>(Arc<T>);
impl<T> ModuleData<T> {
impl<T> ModuleData<T>
where
T: Send + Sync,
{
pub fn new(data: T) -> Self { ModuleData(Arc::new(data)) }
pub fn get_ref(&self) -> &T { self.0.as_ref() }
}
impl<T: ?Sized> Deref for ModuleData<T> {
impl<T> Deref for ModuleData<T>
where
T: ?Sized + Send + Sync,
{
type Target = Arc<T>;
fn deref(&self) -> &Arc<T> { &self.0 }
}
impl<T: ?Sized> Clone for ModuleData<T> {
impl<T> Clone for ModuleData<T>
where
T: ?Sized + Send + Sync,
{
fn clone(&self) -> ModuleData<T> { ModuleData(self.0.clone()) }
}
impl<T: ?Sized> From<Arc<T>> for ModuleData<T> {
impl<T> From<Arc<T>> for ModuleData<T>
where
T: ?Sized + Send + Sync,
{
fn from(arc: Arc<T>) -> Self { ModuleData(arc) }
}
impl<T: ?Sized + 'static> FromRequest for ModuleData<T> {
impl<T> FromRequest for ModuleData<T>
where
T: ?Sized + Send + Sync + 'static,
{
type Error = SystemError;
type Future = Ready<Result<Self, SystemError>>;

View File

@ -4,13 +4,11 @@ use std::{
future::Future,
hash::Hash,
pin::Pin,
rc::Rc,
task::{Context, Poll},
};
use futures_core::{future::LocalBoxFuture, ready};
use futures_core::ready;
use pin_project::pin_project;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use crate::{
error::{InternalError, SystemError},
@ -29,6 +27,21 @@ use crate::{
ServiceResponse,
},
};
use futures_core::future::BoxFuture;
use std::sync::Arc;
pub type ModuleMap = Arc<HashMap<Event, Arc<Module>>>;
pub(crate) fn as_module_map(modules: Vec<Module>) -> ModuleMap {
let mut module_map = HashMap::new();
modules.into_iter().for_each(|m| {
let events = m.events();
let module = Arc::new(m);
events.into_iter().for_each(|e| {
module_map.insert(e, module.clone());
});
});
Arc::new(module_map)
}
#[derive(PartialEq, Eq, Hash, Debug, Clone)]
pub struct Event(String);
@ -42,20 +55,15 @@ pub type EventServiceFactory = BoxServiceFactory<(), ServiceRequest, ServiceResp
pub struct Module {
name: String,
data: DataContainer,
service_map: Rc<HashMap<Event, EventServiceFactory>>,
req_tx: UnboundedSender<ModuleRequest>,
req_rx: UnboundedReceiver<ModuleRequest>,
service_map: Arc<HashMap<Event, EventServiceFactory>>,
}
impl Module {
pub fn new() -> Self {
let (req_tx, req_rx) = unbounded_channel::<ModuleRequest>();
Self {
name: "".to_owned(),
data: DataContainer::new(),
service_map: Rc::new(HashMap::new()),
req_tx,
req_rx,
service_map: Arc::new(HashMap::new()),
}
}
@ -64,7 +72,7 @@ impl Module {
self
}
pub fn data<D: 'static>(mut self, data: D) -> Self {
pub fn data<D: 'static + Send + Sync>(mut self, data: D) -> Self {
self.data.insert(ModuleData::new(data));
self
}
@ -72,8 +80,9 @@ impl Module {
pub fn event<E, H, T, R>(mut self, event: E, handler: H) -> Self
where
H: Handler<T, R>,
T: FromRequest + 'static,
R: Future + 'static,
T: FromRequest + 'static + Send + Sync,
<T as FromRequest>::Future: Sync + Send,
R: Future + 'static + Send + Sync,
R::Output: Responder + 'static,
E: Eq + Hash + Debug + Clone + Display,
{
@ -82,39 +91,17 @@ impl Module {
log::error!("Duplicate Event: {:?}", &event);
}
Rc::get_mut(&mut self.service_map)
Arc::get_mut(&mut self.service_map)
.unwrap()
.insert(event, factory(HandlerService::new(handler)));
self
}
pub fn forward_map(&self) -> HashMap<Event, UnboundedSender<ModuleRequest>> {
pub fn events(&self) -> Vec<Event> {
self.service_map
.keys()
.map(|key| (key.clone(), self.req_tx.clone()))
.collect::<HashMap<_, _>>()
}
pub fn events(&self) -> Vec<Event> { self.service_map.keys().map(|key| key.clone()).collect::<Vec<_>>() }
}
impl Future for Module {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(Pin::new(&mut self.req_rx).poll_recv(cx)) {
None => return Poll::Ready(()),
Some(request) => {
let mut service = self.new_service(());
if let Ok(service) = ready!(Pin::new(&mut service).poll(cx)) {
log::trace!("Spawn module service for request {}", request.id());
tokio::task::spawn_local(async move {
let _ = service.call(request).await;
});
}
},
}
}
.map(|key| key.clone())
.collect::<Vec<_>>()
}
}
@ -140,23 +127,23 @@ impl ModuleRequest {
self
}
pub(crate) fn into_parts(self) -> (EventRequest, Payload) { (self.inner, self.payload) }
pub(crate) fn into_service_request(self) -> ServiceRequest { ServiceRequest::new(self.inner, self.payload) }
pub(crate) fn id(&self) -> &str { &self.inner.id }
pub(crate) fn event(&self) -> &Event { &self.inner.event }
}
impl std::convert::Into<ServiceRequest> for ModuleRequest {
fn into(self) -> ServiceRequest { ServiceRequest::new(self.inner, self.payload) }
}
impl ServiceFactory<ModuleRequest> for Module {
type Response = EventResponse;
type Error = SystemError;
type Service = BoxService<ModuleRequest, Self::Response, Self::Error>;
type Context = ();
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::Error>>;
type Future = BoxFuture<'static, Result<Self::Service, Self::Error>>;
fn new_service(&self, cfg: Self::Context) -> Self::Future {
fn new_service(&self, _cfg: Self::Context) -> Self::Future {
let service_map = self.service_map.clone();
Box::pin(async move {
let service = ModuleService { service_map };
@ -167,13 +154,13 @@ impl ServiceFactory<ModuleRequest> for Module {
}
pub struct ModuleService {
service_map: Rc<HashMap<Event, EventServiceFactory>>,
service_map: Arc<HashMap<Event, EventServiceFactory>>,
}
impl Service<ModuleRequest> for ModuleService {
type Response = EventResponse;
type Error = SystemError;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn call(&self, request: ModuleRequest) -> Self::Future {
log::trace!("Call module service for request {}", &request.id());
@ -183,8 +170,7 @@ impl Service<ModuleRequest> for ModuleService {
let fut = ModuleServiceFuture {
fut: Box::pin(async {
let service = service_fut.await?;
let request = request.into_service_request();
service.call(request).await
service.call(request.into()).await
}),
};
Box::pin(async move { Ok(fut.await.unwrap_or_else(|e| e.into())) })
@ -200,7 +186,7 @@ impl Service<ModuleRequest> for ModuleService {
#[pin_project]
pub struct ModuleServiceFuture {
#[pin]
fut: LocalBoxFuture<'static, Result<ServiceResponse, SystemError>>,
fut: BoxFuture<'static, Result<ServiceResponse, SystemError>>,
}
impl Future for ModuleServiceFuture {

View File

@ -4,15 +4,13 @@ use crate::{
error::{InternalError, SystemError},
module::Event,
request::payload::Payload,
response::Responder,
util::ready::{ready, Ready},
};
use crate::response::{EventResponse, ResponseBuilder};
use futures_core::ready;
use std::{
fmt::Debug,
hash::Hash,
ops,
pin::Pin,
task::{Context, Poll},
@ -135,8 +133,11 @@ where
match payload {
Payload::None => ready(Err(unexpected_none_payload(req))),
Payload::Bytes(bytes) => {
let data: T = bincode::deserialize(bytes).unwrap();
ready(Ok(Data(data)))
let s = String::from_utf8_lossy(bytes);
match serde_json::from_str(s.as_ref()) {
Ok(data) => ready(Ok(Data(data))),
Err(e) => ready(Err(InternalError::new(format!("{:?}", e)).into())),
}
},
}
}

View File

@ -1,5 +1,5 @@
use bytes::{Buf, Bytes};
use serde::{Deserialize, Serialize};
use bytes::{Bytes};
use std::{fmt, fmt::Formatter};
#[derive(Debug, Clone)]
@ -27,7 +27,10 @@ impl std::convert::Into<ResponseData> for &'_ String {
}
impl std::convert::Into<ResponseData> for Bytes {
fn into(self) -> ResponseData { ResponseData::Bytes(self.bytes().to_vec()) }
fn into(self) -> ResponseData {
// Opti(nathan): do not copy the bytes?
ResponseData::Bytes(self.as_ref().to_vec())
}
}
impl std::convert::Into<ResponseData> for Vec<u8> {

View File

@ -4,7 +4,7 @@ use crate::{
response::{EventResponse, ResponseBuilder},
};
use bytes::Bytes;
use std::ops;
pub trait Responder {
fn respond_to(self, req: &EventRequest) -> EventResponse;

View File

@ -5,7 +5,7 @@ use crate::{
};
use crate::request::Data;
use serde::{Deserialize, Serialize, Serializer};
use std::{fmt, fmt::Formatter};
#[derive(Clone, Debug, Eq, PartialEq)]

View File

@ -1,5 +1,5 @@
use std::{future::Future, io, thread};
use thread_id;
use crate::util::tokio_default_runtime;
use std::{future::Future, io};
use tokio::{runtime, task::LocalSet};
#[derive(Debug)]
@ -10,26 +10,7 @@ pub struct Runtime {
impl Runtime {
pub fn new() -> io::Result<Runtime> {
let rt = runtime::Builder::new_multi_thread()
.thread_name("flowy-sys")
.enable_io()
.enable_time()
.on_thread_start(move || {
log::trace!(
"{:?} thread started: thread_id= {}",
thread::current(),
thread_id::get()
);
})
.on_thread_stop(move || {
log::trace!(
"{:?} thread stopping: thread_id= {}",
thread::current(),
thread_id::get(),
);
})
.build()?;
let rt = tokio_default_runtime()?;
Ok(Runtime {
rt,
local: LocalSet::new(),

View File

@ -1,99 +0,0 @@
use crate::{module::Event, request::Payload, response::EventResponse};
use derivative::*;
use std::{
fmt::{Debug, Display},
hash::Hash,
};
// #[derive(Debug)]
// pub struct SenderPayload {
// pub(crate) payload: Payload,
// pub(crate) event: Event,
// }
//
// impl SenderPayload {
// pub fn new<E>(event: E) -> SenderPayload
// where
// E: Eq + Hash + Debug + Clone + Display,
// {
// Self {
// event: event.into(),
// payload: Payload::None,
// }
// }
//
// pub fn payload(mut self, payload: Payload) -> Self {
// self.payload = payload;
// self
// }
//
// pub fn from_bytes(bytes: Vec<u8>) -> Self { unimplemented!() }
// }
//
// impl std::convert::Into<ModuleRequest> for SenderPayload {
// fn into(self) -> ModuleRequest {
// ModuleRequest::new(self.event).payload(self.payload) } }
//
// impl std::default::Default for SenderPayload {
// fn default() -> Self { SenderPayload::new("").payload(Payload::None) }
// }
//
// impl std::convert::Into<EventRequest> for SenderPayload {
// fn into(self) -> EventRequest { unimplemented!() }
// }
pub type BoxStreamCallback<T> = Box<dyn FnOnce(T, EventResponse) + 'static + Send + Sync>;
// #[derive(Debug)]
// pub struct SenderRequest2<T, C>
// where
// T: 'static + Debug,
// C: FnOnce(T, EventResponse) + 'static,
// {
// pub config: T,
// pub event: Event,
// pub payload: Option<Payload>,
// pub callback: Box<dyn C>,
// }
#[derive(Derivative)]
#[derivative(Debug)]
pub struct SenderRequest<T>
where
T: 'static + Debug,
{
pub config: T,
pub event: Event,
pub payload: Option<Payload>,
#[derivative(Debug = "ignore")]
pub callback: Option<BoxStreamCallback<T>>,
}
impl<T> SenderRequest<T>
where
T: 'static + Debug,
{
pub fn new<E>(config: T, event: E) -> Self
where
E: Eq + Hash + Debug + Clone + Display,
{
Self {
config,
payload: None,
event: event.into(),
callback: None,
}
}
pub fn payload(mut self, payload: Payload) -> Self {
self.payload = Some(payload);
self
}
pub fn callback<F>(mut self, callback: F) -> Self
where
F: FnOnce(T, EventResponse) + 'static + Send + Sync,
{
self.callback = Some(Box::new(callback));
self
}
}

View File

@ -1,5 +0,0 @@
mod data;
mod sender;
pub use data::*;
pub use sender::*;

View File

@ -1,167 +0,0 @@
use crate::{
error::{InternalError, SystemError},
module::ModuleRequest,
request::{EventRequest, Payload},
response::EventResponse,
sender::SenderRequest,
service::{BoxService, Service, ServiceFactory},
system::ModuleMap,
};
use futures_core::{future::LocalBoxFuture, ready, task::Context};
use std::{fmt::Debug, future::Future};
use tokio::{
macros::support::{Pin, Poll},
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
};
macro_rules! service_factor_impl {
($name:ident) => {
#[allow(non_snake_case, missing_docs)]
impl<T> ServiceFactory<SenderRequest<T>> for $name<T>
where
T: 'static + Debug,
{
type Response = EventResponse;
type Error = SystemError;
type Service = BoxService<SenderRequest<T>, Self::Response, Self::Error>;
type Context = ();
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::Error>>;
fn new_service(&self, _cfg: Self::Context) -> Self::Future {
let module_map = self.module_map.clone();
let service = Box::new(SenderService { module_map });
Box::pin(async move { Ok(service as Self::Service) })
}
}
};
}
struct SenderService {
module_map: ModuleMap,
}
impl<T> Service<SenderRequest<T>> for SenderService
where
T: 'static + Debug,
{
type Response = EventResponse;
type Error = SystemError;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn call(&self, data: SenderRequest<T>) -> Self::Future {
let module_map = self.module_map.clone();
let SenderRequest {
config,
event,
payload,
callback,
} = data;
let mut request = ModuleRequest::new(event.clone());
if let Some(payload) = payload {
request = request.payload(payload);
}
let fut = async move {
let result = {
match module_map.get(&event) {
Some(module) => {
let fut = module.new_service(());
let service_fut = fut.await?.call(request);
service_fut.await
},
None => {
let msg = format!("Can not find the module to handle the request:{:?}", request);
Err(InternalError::new(msg).into())
},
}
};
let response = result.unwrap_or_else(|e| e.into());
if let Some(callback) = callback {
callback(config, response.clone());
}
Ok(response)
};
Box::pin(fut)
}
}
pub struct Sender<T>
where
T: 'static + Debug,
{
module_map: ModuleMap,
data_tx: UnboundedSender<SenderRequest<T>>,
data_rx: Option<UnboundedReceiver<SenderRequest<T>>>,
}
service_factor_impl!(Sender);
impl<T> Sender<T>
where
T: 'static + Debug,
{
pub fn new(module_map: ModuleMap) -> Self {
let (data_tx, data_rx) = unbounded_channel::<SenderRequest<T>>();
Self {
module_map,
data_tx,
data_rx: Some(data_rx),
}
}
pub fn async_send(&self, data: SenderRequest<T>) { let _ = self.data_tx.send(data); }
pub fn sync_send(&self, data: SenderRequest<T>) -> EventResponse {
let factory = self.new_service(());
futures::executor::block_on(async {
let service = factory.await.unwrap();
service.call(data).await.unwrap()
})
}
pub fn take_rx(&mut self) -> UnboundedReceiver<SenderRequest<T>> { self.data_rx.take().unwrap() }
}
pub struct SenderRunner<T>
where
T: 'static + Debug,
{
module_map: ModuleMap,
data_rx: UnboundedReceiver<SenderRequest<T>>,
}
service_factor_impl!(SenderRunner);
impl<T> SenderRunner<T>
where
T: 'static + Debug,
{
pub fn new(module_map: ModuleMap, data_rx: UnboundedReceiver<SenderRequest<T>>) -> Self {
Self { module_map, data_rx }
}
}
impl<T> Future for SenderRunner<T>
where
T: 'static + Debug,
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(Pin::new(&mut self.data_rx).poll_recv(cx)) {
None => return Poll::Ready(()),
Some(ctx) => {
let factory = self.new_service(());
tokio::task::spawn_local(async move {
let service = factory.await.unwrap();
let _ = service.call(ctx).await;
});
},
}
}
}
}

View File

@ -1,27 +1,31 @@
use crate::service::{Service, ServiceFactory};
use futures_core::future::LocalBoxFuture;
use futures_core::future::{BoxFuture};
pub fn factory<SF, Req>(factory: SF) -> BoxServiceFactory<SF::Context, Req, SF::Response, SF::Error>
where
SF: ServiceFactory<Req> + 'static,
SF: ServiceFactory<Req> + 'static + Sync + Send,
Req: 'static,
SF::Response: 'static,
SF::Service: 'static,
SF::Future: 'static,
SF::Error: 'static,
SF::Error: 'static + Send + Sync,
<SF as ServiceFactory<Req>>::Service: Sync + Send,
<<SF as ServiceFactory<Req>>::Service as Service<Req>>::Future: Send + Sync,
<SF as ServiceFactory<Req>>::Future: Send + Sync,
{
BoxServiceFactory(Box::new(FactoryWrapper(factory)))
}
type Inner<Cfg, Req, Res, Err> = Box<
dyn ServiceFactory<
Req,
Context = Cfg,
Response = Res,
Error = Err,
Service = BoxService<Req, Res, Err>,
Future = LocalBoxFuture<'static, Result<BoxService<Req, Res, Err>, Err>>,
>,
Req,
Context = Cfg,
Response = Res,
Error = Err,
Service = BoxService<Req, Res, Err>,
Future = BoxFuture<'static, Result<BoxService<Req, Res, Err>, Err>>,
> + Sync
+ Send,
>;
pub struct BoxServiceFactory<Cfg, Req, Res, Err>(Inner<Cfg, Req, Res, Err>);
@ -35,23 +39,26 @@ where
type Error = Err;
type Service = BoxService<Req, Res, Err>;
type Context = Cfg;
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::Error>>;
type Future = BoxFuture<'static, Result<Self::Service, Self::Error>>;
fn new_service(&self, cfg: Cfg) -> Self::Future { self.0.new_service(cfg) }
}
pub type BoxService<Req, Res, Err> =
Box<dyn Service<Req, Response = Res, Error = Err, Future = LocalBoxFuture<'static, Result<Res, Err>>>>;
pub type BoxService<Req, Res, Err> = Box<
dyn Service<Req, Response = Res, Error = Err, Future = BoxFuture<'static, Result<Res, Err>>>
+ Sync
+ Send,
>;
#[allow(dead_code)]
pub fn service<S, Req>(service: S) -> BoxService<Req, S::Response, S::Error>
where
S: Service<Req> + 'static,
Req: 'static,
S::Future: 'static,
{
Box::new(ServiceWrapper::new(service))
}
// #[allow(dead_code)]
// pub fn service<S, Req>(service: S) -> BoxService<Req, S::Response, S::Error>
// where
// S: Service<Req> + 'static,
// Req: 'static,
// S::Future: 'static,
// {
// Box::new(ServiceWrapper::new(service))
// }
impl<S, Req> Service<Req> for Box<S>
where
@ -75,11 +82,11 @@ impl<S> ServiceWrapper<S> {
impl<S, Req, Res, Err> Service<Req> for ServiceWrapper<S>
where
S: Service<Req, Response = Res, Error = Err>,
S::Future: 'static,
S::Future: 'static + Send + Sync,
{
type Response = Res;
type Error = Err;
type Future = LocalBoxFuture<'static, Result<Res, Err>>;
type Future = BoxFuture<'static, Result<Res, Err>>;
fn call(&self, req: Req) -> Self::Future { Box::pin(self.inner.call(req)) }
}
@ -93,17 +100,21 @@ where
Err: 'static,
SF: ServiceFactory<Req, Context = Cfg, Response = Res, Error = Err>,
SF::Future: 'static,
SF::Service: 'static,
<SF::Service as Service<Req>>::Future: 'static,
SF::Service: 'static + Send + Sync,
<<SF as ServiceFactory<Req>>::Service as Service<Req>>::Future: Send + Sync + 'static,
<SF as ServiceFactory<Req>>::Future: Send + Sync,
{
type Response = Res;
type Error = Err;
type Service = BoxService<Req, Res, Err>;
type Context = Cfg;
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::Error>>;
type Future = BoxFuture<'static, Result<Self::Service, Self::Error>>;
fn new_service(&self, cfg: Cfg) -> Self::Future {
let f = self.0.new_service(cfg);
Box::pin(async { f.await.map(|s| Box::new(ServiceWrapper::new(s)) as Self::Service) })
Box::pin(async {
f.await
.map(|s| Box::new(ServiceWrapper::new(s)) as Self::Service)
})
}
}

View File

@ -16,9 +16,9 @@ use crate::{
util::ready::*,
};
pub trait Handler<T, R>: Clone + 'static
pub trait Handler<T, R>: Clone + 'static + Sync + Send
where
R: Future,
R: Future + Send + Sync,
R::Output: Responder,
{
fn call(&self, param: T) -> R;
@ -28,7 +28,7 @@ pub struct HandlerService<H, T, R>
where
H: Handler<T, R>,
T: FromRequest,
R: Future,
R: Future + Sync + Send,
R::Output: Responder,
{
handler: H,
@ -39,7 +39,7 @@ impl<H, T, R> HandlerService<H, T, R>
where
H: Handler<T, R>,
T: FromRequest,
R: Future,
R: Future + Sync + Send,
R::Output: Responder,
{
pub fn new(handler: H) -> Self {
@ -54,7 +54,7 @@ impl<H, T, R> Clone for HandlerService<H, T, R>
where
H: Handler<T, R>,
T: FromRequest,
R: Future,
R: Future + Sync + Send,
R::Output: Responder,
{
fn clone(&self) -> Self {
@ -69,7 +69,7 @@ impl<F, T, R> ServiceFactory<ServiceRequest> for HandlerService<F, T, R>
where
F: Handler<T, R>,
T: FromRequest,
R: Future,
R: Future + Send + Sync,
R::Output: Responder,
{
type Response = ServiceResponse;
@ -85,7 +85,7 @@ impl<H, T, R> Service<ServiceRequest> for HandlerService<H, T, R>
where
H: Handler<T, R>,
T: FromRequest,
R: Future,
R: Future + Sync + Send,
R::Output: Responder,
{
type Response = ServiceResponse;
@ -104,7 +104,7 @@ pub enum HandlerServiceFuture<H, T, R>
where
H: Handler<T, R>,
T: FromRequest,
R: Future,
R: Future + Sync + Send,
R::Output: Responder,
{
Extract(#[pin] T::Future, Option<EventRequest>, H),
@ -115,7 +115,7 @@ impl<F, T, R> Future for HandlerServiceFuture<F, T, R>
where
F: Handler<T, R>,
T: FromRequest,
R: Future,
R: Future + Sync + Send,
R::Output: Responder,
{
type Output = Result<ServiceResponse, SystemError>;
@ -151,8 +151,8 @@ where
macro_rules! factory_tuple ({ $($param:ident)* } => {
impl<Func, $($param,)* Res> Handler<($($param,)*), Res> for Func
where Func: Fn($($param),*) -> Res + Clone + 'static,
Res: Future,
where Func: Fn($($param),*) -> Res + Clone + 'static + Sync + Send,
Res: Future + Sync + Send,
Res::Output: Responder,
{
#[allow(non_snake_case)]

View File

@ -1,9 +1,9 @@
use crate::{
module::{Event, Module},
module::{as_module_map, Module, ModuleMap},
rt::Runtime,
};
use futures_core::{ready, task::Context};
use std::{cell::RefCell, collections::HashMap, fmt::Debug, future::Future, io, rc::Rc, sync::Arc};
use std::{cell::RefCell, fmt::Debug, future::Future, io, sync::Arc};
use tokio::{
macros::support::{Pin, Poll},
sync::{
@ -21,7 +21,6 @@ pub enum SystemCommand {
Exit(i8),
}
pub type ModuleMap = Rc<HashMap<Event, Rc<Module>>>;
pub struct FlowySystem {
sys_cmd_tx: UnboundedSender<SystemCommand>,
}
@ -32,7 +31,7 @@ impl FlowySystem {
F: FnOnce() -> Vec<Module>,
S: FnOnce(ModuleMap, &Runtime),
{
let runtime = Runtime::new().unwrap();
let runtime = Arc::new(Runtime::new().unwrap());
let (sys_cmd_tx, sys_cmd_rx) = unbounded_channel::<SystemCommand>();
let (stop_tx, stop_rx) = oneshot::channel();
@ -41,21 +40,15 @@ impl FlowySystem {
sys_cmd_rx,
});
let factory = module_factory();
let mut module_map = HashMap::new();
factory.into_iter().for_each(|m| {
let events = m.events();
let rc_module = Rc::new(m);
events.into_iter().for_each(|e| {
module_map.insert(e, rc_module.clone());
});
});
let module_map = as_module_map(module_factory());
sender_factory(module_map.clone(), &runtime);
let system = Self { sys_cmd_tx };
sender_factory(Rc::new(module_map), &runtime);
FlowySystem::set_current(system);
let runner = SystemRunner { rt: runtime, stop_rx };
let runner = SystemRunner {
rt: runtime,
stop_rx,
};
runner
}
@ -107,7 +100,7 @@ impl Future for SystemController {
}
pub struct SystemRunner {
rt: Runtime,
rt: Arc<Runtime>,
stop_rx: oneshot::Receiver<i8>,
}

View File

@ -1 +1,27 @@
use std::{io, thread};
use thread_id;
use tokio::runtime;
pub mod ready;
pub(crate) fn tokio_default_runtime() -> io::Result<tokio::runtime::Runtime> {
runtime::Builder::new_multi_thread()
.thread_name("flowy-sys")
.enable_io()
.enable_time()
.on_thread_start(move || {
log::trace!(
"{:?} thread started: thread_id= {}",
thread::current(),
thread_id::get()
);
})
.on_thread_stop(move || {
log::trace!(
"{:?} thread stopping: thread_id= {}",
thread::current(),
thread_id::get(),
);
})
.build()
}

View File

@ -1,4 +1,4 @@
use flowy_sys::prelude::{EventResponse, FlowySystem, Module, Sender, SenderRequest, SenderRunner};
use flowy_sys::prelude::*;
use std::{cell::RefCell, sync::Once};
#[allow(dead_code)]
@ -10,44 +10,14 @@ pub fn setup_env() {
});
}
thread_local!(
static SENDER: RefCell<Option<Sender<i64>>> = RefCell::new(None);
);
pub fn sync_send(data: SenderRequest<i64>) -> EventResponse {
SENDER.with(|cell| match &*cell.borrow() {
Some(stream) => stream.sync_send(data),
None => panic!(""),
})
pub async fn async_send(data: DispatchRequest<i64>) -> Result<EventResponse, SystemError> {
EventDispatch::async_send(data).await
}
pub fn async_send(data: SenderRequest<i64>) {
SENDER.with(|cell| match &*cell.borrow() {
Some(stream) => {
stream.async_send(data);
},
None => panic!(""),
});
}
pub fn init_system<F>(modules: Vec<Module>, f: F)
pub fn init_system<F>(module_factory: F)
where
F: FnOnce() + 'static,
F: FnOnce() -> Vec<Module>,
{
FlowySystem::construct(
|| modules,
|module_map, runtime| {
let mut sender = Sender::<i64>::new(module_map.clone());
runtime.spawn(SenderRunner::new(module_map, sender.take_rx()));
SENDER.with(|cell| {
*cell.borrow_mut() = Some(sender);
});
},
)
.spawn(async { f() })
.run()
.unwrap();
let system = EventDispatch::new(module_factory);
EventDispatch::set_current(system);
}
pub fn stop_system() { FlowySystem::current().stop(); }

View File

@ -2,20 +2,14 @@ use crate::helper::*;
use flowy_sys::prelude::*;
pub async fn hello() -> String { "say hello".to_string() }
#[test]
fn test_init() {
#[tokio::test]
async fn test_init() {
setup_env();
let event = "1";
let modules = vec![Module::new().event(event, hello)];
init_system(|| vec![Module::new().event(event, hello)]);
init_system(modules, move || {
let request = SenderRequest::new(1, event).callback(|_config, response| {
log::info!("async resp: {:?}", response);
});
let resp = sync_send(request);
log::info!("sync resp: {:?}", resp);
stop_system();
});
let request = DispatchRequest::new(1, event);
let resp = async_send(request).await.unwrap();
log::info!("sync resp: {:?}", resp);
}