add frontend folder

This commit is contained in:
appflowy
2021-11-20 09:32:46 +08:00
parent f93f012bc8
commit 8f1d62f115
1697 changed files with 754 additions and 104 deletions

View File

@ -0,0 +1,41 @@
[package]
name = "lib-dispatch"
version = "0.1.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
pin-project = "1.0.0"
futures-core = { version = "0.3", default-features = false }
paste = "1"
futures-channel = "0.3.15"
futures = "0.3.15"
futures-util = "0.3.15"
bytes = {version = "1.0", features = ["serde"]}
tokio = { version = "1", features = ["full"] }
uuid = { version = "0.8", features = ["serde", "v4"] }
log = "0.4.14"
env_logger = "0.8"
serde_with = "1.9.4"
thread-id = "3.3.0"
lazy_static = "1.4.0"
dyn-clone = "1.0"
derivative = "2.2.0"
serde_json = {version = "1.0"}
serde = { version = "1.0", features = ["derive"] }
dashmap = "4.0"
#optional crate
bincode = { version = "1.3", optional = true}
protobuf = {version = "2.24.1", optional = true}
tracing = { version = "0.1"}
[dev-dependencies]
tokio = { version = "1", features = ["full"] }
futures-util = "0.3.15"
[features]
default = ["use_protobuf"]
use_serde = ["bincode"]
use_protobuf= ["protobuf"]

View File

@ -0,0 +1,74 @@
use crate::errors::{DispatchError, InternalError};
use bytes::Bytes;
// To bytes
pub trait ToBytes {
fn into_bytes(self) -> Result<Bytes, DispatchError>;
}
#[cfg(feature = "use_protobuf")]
impl<T> ToBytes for T
where
T: std::convert::TryInto<Bytes, Error = protobuf::ProtobufError>,
{
fn into_bytes(self) -> Result<Bytes, DispatchError> {
match self.try_into() {
Ok(data) => Ok(data),
Err(e) => {
// let system_err: DispatchError = InternalError::new(format!("{:?}",
// e)).into(); system_err.into()
// Err(format!("{:?}", e))
Err(InternalError::ProtobufError(format!("{:?}", e)).into())
},
}
}
}
#[cfg(feature = "use_serde")]
impl<T> ToBytes for T
where
T: serde::Serialize,
{
fn into_bytes(self) -> Result<Bytes, DispatchError> {
match serde_json::to_string(&self.0) {
Ok(s) => Ok(Bytes::from(s)),
Err(e) => Err(InternalError::SerializeToBytes(format!("{:?}", e)).into()),
}
}
}
// From bytes
pub trait FromBytes: Sized {
fn parse_from_bytes(bytes: Bytes) -> Result<Self, DispatchError>;
}
#[cfg(feature = "use_protobuf")]
impl<T> FromBytes for T
where
// // https://stackoverflow.com/questions/62871045/tryfromu8-trait-bound-in-trait
// T: for<'a> std::convert::TryFrom<&'a Bytes, Error =
// protobuf::ProtobufError>,
T: std::convert::TryFrom<Bytes, Error = protobuf::ProtobufError>,
{
fn parse_from_bytes(bytes: Bytes) -> Result<Self, DispatchError> {
let data = T::try_from(bytes.clone())?;
Ok(data)
}
}
#[cfg(feature = "use_serde")]
impl<T> FromBytes for T
where
T: serde::de::DeserializeOwned + 'static,
{
fn parse_from_bytes(bytes: Bytes) -> Result<Self, String> {
let s = String::from_utf8_lossy(&bytes);
match serde_json::from_str::<T>(s.as_ref()) {
Ok(data) => Ok(data),
Err(e) => Err(format!("{:?}", e)),
}
}
}

View File

@ -0,0 +1,102 @@
use crate::{
byte_trait::*,
errors::{DispatchError, InternalError},
request::{unexpected_none_payload, EventRequest, FromRequest, Payload},
response::{EventResponse, Responder, ResponseBuilder},
util::ready::{ready, Ready},
};
use bytes::Bytes;
use std::ops;
pub struct Data<T>(pub T);
impl<T> Data<T> {
pub fn into_inner(self) -> T { self.0 }
}
impl<T> ops::Deref for Data<T> {
type Target = T;
fn deref(&self) -> &T { &self.0 }
}
impl<T> ops::DerefMut for Data<T> {
fn deref_mut(&mut self) -> &mut T { &mut self.0 }
}
impl<T> FromRequest for Data<T>
where
T: FromBytes + 'static,
{
type Error = DispatchError;
type Future = Ready<Result<Self, DispatchError>>;
#[inline]
fn from_request(req: &EventRequest, payload: &mut Payload) -> Self::Future {
match payload {
Payload::None => ready(Err(unexpected_none_payload(req))),
Payload::Bytes(bytes) => match T::parse_from_bytes(bytes.clone()) {
Ok(data) => ready(Ok(Data(data))),
Err(e) => ready(Err(InternalError::DeserializeFromBytes(format!("{}", e)).into())),
},
}
}
}
impl<T> Responder for Data<T>
where
T: ToBytes,
{
fn respond_to(self, _request: &EventRequest) -> EventResponse {
match self.into_inner().into_bytes() {
Ok(bytes) => ResponseBuilder::Ok().data(bytes).build(),
Err(e) => e.into(),
}
}
}
impl<T> std::convert::TryFrom<&Payload> for Data<T>
where
T: FromBytes,
{
type Error = DispatchError;
fn try_from(payload: &Payload) -> Result<Data<T>, Self::Error> { parse_payload(payload) }
}
impl<T> std::convert::TryFrom<Payload> for Data<T>
where
T: FromBytes,
{
type Error = DispatchError;
fn try_from(payload: Payload) -> Result<Data<T>, Self::Error> { parse_payload(&payload) }
}
fn parse_payload<T>(payload: &Payload) -> Result<Data<T>, DispatchError>
where
T: FromBytes,
{
match payload {
Payload::None => Err(InternalError::UnexpectedNone(format!("Parse fail, expected payload")).into()),
Payload::Bytes(bytes) => {
let data = T::parse_from_bytes(bytes.clone())?;
Ok(Data(data))
},
}
}
impl<T> std::convert::TryInto<Payload> for Data<T>
where
T: ToBytes,
{
type Error = DispatchError;
fn try_into(self) -> Result<Payload, Self::Error> {
let inner = self.into_inner();
let bytes = inner.into_bytes()?;
Ok(Payload::Bytes(bytes))
}
}
impl ToBytes for Data<String> {
fn into_bytes(self) -> Result<Bytes, DispatchError> { Ok(Bytes::from(self.0)) }
}

View File

@ -0,0 +1,184 @@
use crate::{
errors::{DispatchError, Error, InternalError},
module::{as_module_map, Module, ModuleMap, ModuleRequest},
response::EventResponse,
service::{Service, ServiceFactory},
util::tokio_default_runtime,
};
use derivative::*;
use futures_core::future::BoxFuture;
use futures_util::task::Context;
use pin_project::pin_project;
use std::{future::Future, sync::Arc};
use tokio::macros::support::{Pin, Poll};
pub struct EventDispatch {
module_map: ModuleMap,
runtime: tokio::runtime::Runtime,
}
impl EventDispatch {
pub fn construct<F>(module_factory: F) -> EventDispatch
where
F: FnOnce() -> Vec<Module>,
{
let runtime = tokio_default_runtime().unwrap();
let modules = module_factory();
tracing::trace!("{}", module_info(&modules));
let module_map = as_module_map(modules);
let dispatch = EventDispatch { module_map, runtime };
dispatch
}
pub fn async_send<Req>(dispatch: Arc<EventDispatch>, request: Req) -> DispatchFuture<EventResponse>
where
Req: std::convert::Into<ModuleRequest>,
{
EventDispatch::async_send_with_callback(dispatch, request, |_| Box::pin(async {}))
}
pub fn async_send_with_callback<Req, Callback>(
dispatch: Arc<EventDispatch>,
request: Req,
callback: Callback,
) -> DispatchFuture<EventResponse>
where
Req: std::convert::Into<ModuleRequest>,
Callback: FnOnce(EventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync,
{
let request: ModuleRequest = request.into();
let module_map = dispatch.module_map.clone();
let service = Box::new(DispatchService { module_map });
tracing::trace!("Async event: {:?}", &request.event);
let service_ctx = DispatchContext {
request,
callback: Some(Box::new(callback)),
};
let join_handle = dispatch.runtime.spawn(async move {
service
.call(service_ctx)
.await
.unwrap_or_else(|e| InternalError::Other(format!("{:?}", e)).as_response())
});
DispatchFuture {
fut: Box::pin(async move {
join_handle.await.unwrap_or_else(|e| {
let error = InternalError::JoinError(format!("EVENT_DISPATCH join error: {:?}", e));
error.as_response()
})
}),
}
}
pub fn sync_send(dispatch: Arc<EventDispatch>, request: ModuleRequest) -> EventResponse {
futures::executor::block_on(async {
EventDispatch::async_send_with_callback(dispatch, request, |_| Box::pin(async {})).await
})
}
pub fn spawn<F>(&self, f: F)
where
F: Future<Output = ()> + Send + 'static,
{
self.runtime.spawn(f);
}
}
#[pin_project]
pub struct DispatchFuture<T: Send + Sync> {
#[pin]
pub fut: Pin<Box<dyn Future<Output = T> + Sync + Send>>,
}
impl<T> Future for DispatchFuture<T>
where
T: Send + Sync,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project();
loop {
return Poll::Ready(futures_core::ready!(this.fut.poll(cx)));
}
}
}
pub type BoxFutureCallback = Box<dyn FnOnce(EventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync>;
#[derive(Derivative)]
#[derivative(Debug)]
pub struct DispatchContext {
pub request: ModuleRequest,
#[derivative(Debug = "ignore")]
pub callback: Option<BoxFutureCallback>,
}
impl DispatchContext {
pub(crate) fn into_parts(self) -> (ModuleRequest, Option<BoxFutureCallback>) {
let DispatchContext { request, callback } = self;
(request, callback)
}
}
pub(crate) struct DispatchService {
pub(crate) module_map: ModuleMap,
}
impl Service<DispatchContext> for DispatchService {
type Response = EventResponse;
type Error = DispatchError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
#[cfg_attr(
feature = "use_tracing",
tracing::instrument(name = "DispatchService", level = "debug", skip(self, ctx))
)]
fn call(&self, ctx: DispatchContext) -> Self::Future {
let module_map = self.module_map.clone();
let (request, callback) = ctx.into_parts();
Box::pin(async move {
let result = {
// print_module_map_info(&module_map);
match module_map.get(&request.event) {
Some(module) => {
let fut = module.new_service(());
let service_fut = fut.await?.call(request);
service_fut.await
},
None => {
let msg = format!("Can not find the event handler. {:?}", request);
log::error!("{}", msg);
Err(InternalError::HandleNotFound(msg).into())
},
}
};
let response = result.unwrap_or_else(|e| e.into());
tracing::trace!("Dispatch result: {:?}", response);
if let Some(callback) = callback {
callback(response.clone()).await;
}
Ok(response)
})
}
}
#[allow(dead_code)]
fn module_info(modules: &Vec<Module>) -> String {
let mut info = format!("{} modules loaded\n", modules.len());
for module in modules {
info.push_str(&format!("-> {} loaded \n", module.name));
}
info
}
#[allow(dead_code)]
fn print_module_map_info(module_map: &ModuleMap) {
module_map.iter().for_each(|(k, v)| {
tracing::info!("Event: {:?} module: {:?}", k, v.name);
})
}

View File

@ -0,0 +1,113 @@
use crate::{
byte_trait::FromBytes,
request::EventRequest,
response::{EventResponse, ResponseBuilder},
};
use bytes::Bytes;
use dyn_clone::DynClone;
use serde::{Serialize, Serializer};
use std::fmt;
use tokio::{sync::mpsc::error::SendError, task::JoinError};
pub trait Error: fmt::Debug + DynClone + Send + Sync {
fn as_response(&self) -> EventResponse;
}
dyn_clone::clone_trait_object!(Error);
impl<T: Error + 'static> From<T> for DispatchError {
fn from(err: T) -> DispatchError { DispatchError { inner: Box::new(err) } }
}
#[derive(Clone)]
pub struct DispatchError {
inner: Box<dyn Error>,
}
impl DispatchError {
pub fn inner_error(&self) -> &dyn Error { self.inner.as_ref() }
}
impl fmt::Display for DispatchError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{:?}", &self.inner) }
}
impl fmt::Debug for DispatchError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{:?}", &self.inner) }
}
impl std::error::Error for DispatchError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { None }
fn cause(&self) -> Option<&dyn std::error::Error> { None }
}
impl From<SendError<EventRequest>> for DispatchError {
fn from(err: SendError<EventRequest>) -> Self { InternalError::Other(format!("{}", err)).into() }
}
impl From<String> for DispatchError {
fn from(s: String) -> Self { InternalError::Other(s).into() }
}
#[cfg(feature = "use_protobuf")]
impl From<protobuf::ProtobufError> for DispatchError {
fn from(e: protobuf::ProtobufError) -> Self { InternalError::ProtobufError(format!("{:?}", e)).into() }
}
impl FromBytes for DispatchError {
fn parse_from_bytes(bytes: Bytes) -> Result<Self, DispatchError> {
let s = String::from_utf8(bytes.to_vec()).unwrap();
Ok(InternalError::DeserializeFromBytes(s).into())
}
}
impl From<DispatchError> for EventResponse {
fn from(err: DispatchError) -> Self { err.inner_error().as_response() }
}
impl Serialize for DispatchError {
fn serialize<S>(&self, serializer: S) -> Result<<S as Serializer>::Ok, <S as Serializer>::Error>
where
S: Serializer,
{
serializer.serialize_str(&format!("{}", self))
}
}
#[derive(Clone, Debug)]
pub(crate) enum InternalError {
ProtobufError(String),
UnexpectedNone(String),
DeserializeFromBytes(String),
JoinError(String),
ServiceNotFound(String),
HandleNotFound(String),
Other(String),
}
impl fmt::Display for InternalError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
InternalError::ProtobufError(s) => fmt::Display::fmt(&s, f),
InternalError::UnexpectedNone(s) => fmt::Display::fmt(&s, f),
InternalError::DeserializeFromBytes(s) => fmt::Display::fmt(&s, f),
InternalError::JoinError(s) => fmt::Display::fmt(&s, f),
InternalError::ServiceNotFound(s) => fmt::Display::fmt(&s, f),
InternalError::HandleNotFound(s) => fmt::Display::fmt(&s, f),
InternalError::Other(s) => fmt::Display::fmt(&s, f),
}
}
}
impl Error for InternalError {
fn as_response(&self) -> EventResponse {
let error = format!("{}", self).into_bytes();
ResponseBuilder::Internal().data(error).build()
}
}
impl std::convert::From<JoinError> for InternalError {
fn from(e: JoinError) -> Self { InternalError::JoinError(format!("{}", e)) }
}

View File

@ -0,0 +1,3 @@
mod errors;
pub use errors::*;

View File

@ -0,0 +1,20 @@
mod errors;
mod module;
mod request;
mod response;
mod service;
mod util;
mod byte_trait;
mod data;
mod dispatch;
mod system;
#[macro_use]
pub mod macros;
pub use errors::Error;
pub mod prelude {
pub use crate::{byte_trait::*, data::*, dispatch::*, errors::*, module::*, request::*, response::*};
}

View File

@ -0,0 +1,8 @@
#[macro_export]
macro_rules! dispatch_future {
($fut:expr) => {
DispatchFuture {
fut: Box::pin(async move { $fut.await }),
}
};
}

View File

@ -0,0 +1,63 @@
use std::{
any::{Any, TypeId},
collections::HashMap,
};
#[derive(Default, Debug)]
pub struct ModuleDataMap {
map: HashMap<TypeId, Box<dyn Any + Sync + Send>>,
}
impl ModuleDataMap {
#[inline]
pub fn new() -> ModuleDataMap {
ModuleDataMap {
map: HashMap::default(),
}
}
pub fn insert<T>(&mut self, val: T) -> Option<T>
where
T: 'static + Send + Sync,
{
self.map
.insert(TypeId::of::<T>(), Box::new(val))
.and_then(downcast_owned)
}
pub fn remove<T>(&mut self) -> Option<T>
where
T: 'static + Send + Sync,
{
self.map.remove(&TypeId::of::<T>()).and_then(downcast_owned)
}
pub fn get<T>(&self) -> Option<&T>
where
T: 'static + Send + Sync,
{
self.map.get(&TypeId::of::<T>()).and_then(|boxed| boxed.downcast_ref())
}
pub fn get_mut<T>(&mut self) -> Option<&mut T>
where
T: 'static + Send + Sync,
{
self.map
.get_mut(&TypeId::of::<T>())
.and_then(|boxed| boxed.downcast_mut())
}
pub fn contains<T>(&self) -> bool
where
T: 'static + Send + Sync,
{
self.map.contains_key(&TypeId::of::<T>())
}
pub fn extend(&mut self, other: ModuleDataMap) { self.map.extend(other.map); }
}
fn downcast_owned<T: 'static + Send + Sync>(boxed: Box<dyn Any + Send + Sync>) -> Option<T> {
boxed.downcast().ok().map(|boxed| *boxed)
}

View File

@ -0,0 +1,59 @@
use crate::{
errors::{DispatchError, InternalError},
request::{payload::Payload, EventRequest, FromRequest},
util::ready::{ready, Ready},
};
use std::{any::type_name, ops::Deref, sync::Arc};
pub struct Unit<T: ?Sized + Send + Sync>(Arc<T>);
impl<T> Unit<T>
where
T: Send + Sync,
{
pub fn new(data: T) -> Self { Unit(Arc::new(data)) }
pub fn get_ref(&self) -> &T { self.0.as_ref() }
}
impl<T> Deref for Unit<T>
where
T: ?Sized + Send + Sync,
{
type Target = Arc<T>;
fn deref(&self) -> &Arc<T> { &self.0 }
}
impl<T> Clone for Unit<T>
where
T: ?Sized + Send + Sync,
{
fn clone(&self) -> Unit<T> { Unit(self.0.clone()) }
}
impl<T> From<Arc<T>> for Unit<T>
where
T: ?Sized + Send + Sync,
{
fn from(arc: Arc<T>) -> Self { Unit(arc) }
}
impl<T> FromRequest for Unit<T>
where
T: ?Sized + Send + Sync + 'static,
{
type Error = DispatchError;
type Future = Ready<Result<Self, DispatchError>>;
#[inline]
fn from_request(req: &EventRequest, _: &mut Payload) -> Self::Future {
if let Some(data) = req.module_data::<Unit<T>>() {
ready(Ok(data.clone()))
} else {
let msg = format!("Failed to get the module data of type: {}", type_name::<T>());
log::error!("{}", msg,);
ready(Err(InternalError::Other(msg).into()))
}
}
}

View File

@ -0,0 +1,7 @@
pub use container::*;
pub use data::*;
pub use module::*;
mod container;
mod data;
mod module;

View File

@ -0,0 +1,241 @@
use std::{
collections::HashMap,
fmt,
fmt::{Debug, Display},
future::Future,
hash::Hash,
pin::Pin,
task::{Context, Poll},
};
use futures_core::ready;
use pin_project::pin_project;
use crate::{
errors::{DispatchError, InternalError},
module::{container::ModuleDataMap, Unit},
request::{payload::Payload, EventRequest, FromRequest},
response::{EventResponse, Responder},
service::{
factory,
BoxService,
BoxServiceFactory,
Handler,
HandlerService,
Service,
ServiceFactory,
ServiceRequest,
ServiceResponse,
},
};
use futures_core::future::BoxFuture;
use std::sync::Arc;
pub type ModuleMap = Arc<HashMap<Event, Arc<Module>>>;
pub(crate) fn as_module_map(modules: Vec<Module>) -> ModuleMap {
let mut module_map = HashMap::new();
modules.into_iter().for_each(|m| {
let events = m.events();
let module = Arc::new(m);
events.into_iter().for_each(|e| {
module_map.insert(e, module.clone());
});
});
Arc::new(module_map)
}
#[derive(PartialEq, Eq, Hash, Debug, Clone)]
pub struct Event(String);
impl<T: Display + Eq + Hash + Debug + Clone> std::convert::From<T> for Event {
fn from(t: T) -> Self { Event(format!("{}", t)) }
}
pub type EventServiceFactory = BoxServiceFactory<(), ServiceRequest, ServiceResponse, DispatchError>;
pub struct Module {
pub name: String,
module_data: Arc<ModuleDataMap>,
service_map: Arc<HashMap<Event, EventServiceFactory>>,
}
impl Module {
pub fn new() -> Self {
Self {
name: "".to_owned(),
module_data: Arc::new(ModuleDataMap::new()),
service_map: Arc::new(HashMap::new()),
}
}
pub fn name(mut self, s: &str) -> Self {
self.name = s.to_owned();
self
}
pub fn data<D: 'static + Send + Sync>(mut self, data: D) -> Self {
Arc::get_mut(&mut self.module_data).unwrap().insert(Unit::new(data));
self
}
pub fn event<E, H, T, R>(mut self, event: E, handler: H) -> Self
where
H: Handler<T, R>,
T: FromRequest + 'static + Send + Sync,
<T as FromRequest>::Future: Sync + Send,
R: Future + 'static + Send + Sync,
R::Output: Responder + 'static,
E: Eq + Hash + Debug + Clone + Display,
{
let event: Event = event.into();
if self.service_map.contains_key(&event) {
log::error!("Duplicate Event: {:?}", &event);
}
Arc::get_mut(&mut self.service_map)
.unwrap()
.insert(event, factory(HandlerService::new(handler)));
self
}
pub fn events(&self) -> Vec<Event> { self.service_map.keys().map(|key| key.clone()).collect::<Vec<_>>() }
}
#[derive(Debug, Clone)]
pub struct ModuleRequest {
pub id: String,
pub event: Event,
pub(crate) payload: Payload,
}
impl ModuleRequest {
pub fn new<E>(event: E) -> Self
where
E: Into<Event>,
{
Self {
id: uuid::Uuid::new_v4().to_string(),
event: event.into(),
payload: Payload::None,
}
}
pub fn payload<P>(mut self, payload: P) -> Self
where
P: Into<Payload>,
{
self.payload = payload.into();
self
}
}
impl std::fmt::Display for ModuleRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}:{:?}", self.id, self.event) }
}
impl ServiceFactory<ModuleRequest> for Module {
type Response = EventResponse;
type Error = DispatchError;
type Service = BoxService<ModuleRequest, Self::Response, Self::Error>;
type Context = ();
type Future = BoxFuture<'static, Result<Self::Service, Self::Error>>;
fn new_service(&self, _cfg: Self::Context) -> Self::Future {
let service_map = self.service_map.clone();
let module_data = self.module_data.clone();
Box::pin(async move {
let service = ModuleService {
service_map,
module_data,
};
let module_service = Box::new(service) as Self::Service;
Ok(module_service)
})
}
}
pub struct ModuleService {
service_map: Arc<HashMap<Event, EventServiceFactory>>,
module_data: Arc<ModuleDataMap>,
}
impl Service<ModuleRequest> for ModuleService {
type Response = EventResponse;
type Error = DispatchError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn call(&self, request: ModuleRequest) -> Self::Future {
let ModuleRequest { id, event, payload } = request;
let module_data = self.module_data.clone();
let request = EventRequest::new(id.clone(), event, module_data);
match self.service_map.get(&request.event) {
Some(factory) => {
let service_fut = factory.new_service(());
let fut = ModuleServiceFuture {
fut: Box::pin(async {
let service = service_fut.await?;
let service_req = ServiceRequest::new(request, payload);
service.call(service_req).await
}),
};
Box::pin(async move { Ok(fut.await.unwrap_or_else(|e| e.into())) })
},
None => {
let msg = format!("Can not find service factory for event: {:?}", request.event);
Box::pin(async { Err(InternalError::ServiceNotFound(msg).into()) })
},
}
}
}
#[pin_project]
pub struct ModuleServiceFuture {
#[pin]
fut: BoxFuture<'static, Result<ServiceResponse, DispatchError>>,
}
impl Future for ModuleServiceFuture {
type Output = Result<EventResponse, DispatchError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let (_, response) = ready!(self.as_mut().project().fut.poll(cx))?.into_parts();
return Poll::Ready(Ok(response));
}
}
}
// #[cfg(test)]
// mod tests {
// use super::*;
// use crate::rt::Runtime;
// use futures_util::{future, pin_mut};
// use tokio::sync::mpsc::unbounded_channel;
// pub async fn hello_service() -> String { "hello".to_string() }
// #[test]
// fn test() {
// let runtime = Runtime::new().unwrap();
// runtime.block_on(async {
// let (sys_tx, mut sys_rx) = unbounded_channel::<SystemCommand>();
// let event = "hello".to_string();
// let module = Module::new(sys_tx).event(event.clone(),
// hello_service); let req_tx = module.req_tx();
// let event = async move {
// let request = EventRequest::new(event.clone());
// req_tx.send(request).unwrap();
//
// match sys_rx.recv().await {
// Some(cmd) => {
// tracing::info!("{:?}", cmd);
// },
// None => panic!(""),
// }
// };
//
// pin_mut!(module, event);
// future::select(module, event).await;
// });
// }
// }

View File

@ -0,0 +1,5 @@
pub mod payload;
mod request;
pub use payload::*;
pub use request::*;

View File

@ -0,0 +1,50 @@
use bytes::Bytes;
use std::{fmt, fmt::Formatter};
pub enum PayloadError {}
// TODO: support stream data
#[derive(Clone, serde::Serialize)]
pub enum Payload {
None,
Bytes(Bytes),
}
impl std::fmt::Debug for Payload {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { format_payload_print(self, f) }
}
impl std::fmt::Display for Payload {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { format_payload_print(self, f) }
}
fn format_payload_print(payload: &Payload, f: &mut Formatter<'_>) -> fmt::Result {
match payload {
Payload::Bytes(bytes) => f.write_fmt(format_args!("{} bytes", bytes.len())),
Payload::None => f.write_str("Empty"),
}
}
impl std::convert::Into<Payload> for String {
fn into(self) -> Payload { Payload::Bytes(Bytes::from(self)) }
}
impl std::convert::Into<Payload> for &'_ String {
fn into(self) -> Payload { Payload::Bytes(Bytes::from(self.to_owned())) }
}
impl std::convert::Into<Payload> for Bytes {
fn into(self) -> Payload { Payload::Bytes(self) }
}
impl std::convert::Into<Payload> for () {
fn into(self) -> Payload { Payload::None }
}
impl std::convert::Into<Payload> for Vec<u8> {
fn into(self) -> Payload { Payload::Bytes(Bytes::from(self)) }
}
impl std::convert::Into<Payload> for &str {
fn into(self) -> Payload { self.to_string().into() }
}

View File

@ -0,0 +1,115 @@
use std::future::Future;
use crate::{
errors::{DispatchError, InternalError},
module::{Event, ModuleDataMap},
request::payload::Payload,
util::ready::{ready, Ready},
};
use derivative::*;
use futures_core::ready;
use std::{
fmt::Debug,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
#[derive(Clone, Debug, Derivative)]
pub struct EventRequest {
pub(crate) id: String,
pub(crate) event: Event,
#[derivative(Debug = "ignore")]
pub(crate) module_data: Arc<ModuleDataMap>,
}
impl EventRequest {
pub fn new<E>(id: String, event: E, module_data: Arc<ModuleDataMap>) -> EventRequest
where
E: Into<Event>,
{
Self {
id,
event: event.into(),
module_data,
}
}
pub fn module_data<T: 'static>(&self) -> Option<&T>
where
T: Send + Sync,
{
if let Some(data) = self.module_data.get::<T>() {
return Some(data);
}
None
}
}
pub trait FromRequest: Sized {
type Error: Into<DispatchError>;
type Future: Future<Output = Result<Self, Self::Error>>;
fn from_request(req: &EventRequest, payload: &mut Payload) -> Self::Future;
}
#[doc(hidden)]
impl FromRequest for () {
type Error = DispatchError;
type Future = Ready<Result<(), DispatchError>>;
fn from_request(_req: &EventRequest, _payload: &mut Payload) -> Self::Future { ready(Ok(())) }
}
#[doc(hidden)]
impl FromRequest for String {
type Error = DispatchError;
type Future = Ready<Result<Self, Self::Error>>;
fn from_request(req: &EventRequest, payload: &mut Payload) -> Self::Future {
match &payload {
Payload::None => ready(Err(unexpected_none_payload(req))),
Payload::Bytes(buf) => ready(Ok(String::from_utf8_lossy(buf).into_owned())),
}
}
}
pub fn unexpected_none_payload(request: &EventRequest) -> DispatchError {
log::warn!("{:?} expected payload", &request.event);
InternalError::UnexpectedNone("Expected payload".to_string()).into()
}
#[doc(hidden)]
impl<T> FromRequest for Result<T, T::Error>
where
T: FromRequest,
{
type Error = DispatchError;
type Future = FromRequestFuture<T::Future>;
fn from_request(req: &EventRequest, payload: &mut Payload) -> Self::Future {
FromRequestFuture {
fut: T::from_request(req, payload),
}
}
}
#[pin_project::pin_project]
pub struct FromRequestFuture<Fut> {
#[pin]
fut: Fut,
}
impl<Fut, T, E> Future for FromRequestFuture<Fut>
where
Fut: Future<Output = Result<T, E>>,
{
type Output = Result<Result<T, E>, DispatchError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let res = ready!(this.fut.poll(cx));
Poll::Ready(Ok(res))
}
}

View File

@ -0,0 +1,41 @@
use crate::{
request::Payload,
response::{EventResponse, StatusCode},
};
macro_rules! static_response {
($name:ident, $status:expr) => {
#[allow(non_snake_case, missing_docs)]
pub fn $name() -> ResponseBuilder { ResponseBuilder::new($status) }
};
}
pub struct ResponseBuilder<T = Payload> {
pub payload: T,
pub status: StatusCode,
}
impl ResponseBuilder {
pub fn new(status: StatusCode) -> Self {
ResponseBuilder {
payload: Payload::None,
status,
}
}
pub fn data<D: std::convert::Into<Payload>>(mut self, data: D) -> Self {
self.payload = data.into();
self
}
pub fn build(self) -> EventResponse {
EventResponse {
payload: self.payload,
status_code: self.status,
}
}
static_response!(Ok, StatusCode::Ok);
static_response!(Err, StatusCode::Err);
static_response!(Internal, StatusCode::Internal);
}

View File

@ -0,0 +1,7 @@
pub use builder::*;
pub use responder::*;
pub use response::*;
mod builder;
mod responder;
mod response;

View File

@ -0,0 +1,39 @@
#[allow(unused_imports)]
use crate::errors::{DispatchError, InternalError};
use crate::{
request::EventRequest,
response::{EventResponse, ResponseBuilder},
};
use bytes::Bytes;
pub trait Responder {
fn respond_to(self, req: &EventRequest) -> EventResponse;
}
macro_rules! impl_responder {
($res: ty) => {
impl Responder for $res {
fn respond_to(self, _: &EventRequest) -> EventResponse { ResponseBuilder::Ok().data(self).build() }
}
};
}
impl_responder!(&'static str);
impl_responder!(String);
impl_responder!(&'_ String);
impl_responder!(Bytes);
impl_responder!(());
impl_responder!(Vec<u8>);
impl<T, E> Responder for Result<T, E>
where
T: Responder,
E: Into<DispatchError>,
{
fn respond_to(self, request: &EventRequest) -> EventResponse {
match self {
Ok(val) => val.respond_to(request),
Err(e) => e.into().into(),
}
}
}

View File

@ -0,0 +1,77 @@
use crate::{
byte_trait::FromBytes,
data::Data,
errors::DispatchError,
request::{EventRequest, Payload},
response::Responder,
};
use derivative::*;
use std::{convert::TryFrom, fmt, fmt::Formatter};
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub enum StatusCode {
Ok = 0,
Err = 1,
Internal = 2,
}
// serde user guide: https://serde.rs/field-attrs.html
#[derive(Debug, Clone, serde::Serialize, Derivative)]
pub struct EventResponse {
#[derivative(Debug = "ignore")]
pub payload: Payload,
pub status_code: StatusCode,
}
impl EventResponse {
pub fn new(status_code: StatusCode) -> Self {
EventResponse {
payload: Payload::None,
status_code,
}
}
pub fn parse<T, E>(self) -> Result<Result<T, E>, DispatchError>
where
T: FromBytes,
E: FromBytes,
{
match self.status_code {
StatusCode::Ok => {
let data = <Data<T>>::try_from(self.payload)?;
Ok(Ok(data.into_inner()))
},
StatusCode::Err | StatusCode::Internal => {
let err = <Data<E>>::try_from(self.payload)?;
Ok(Err(err.into_inner()))
},
}
}
}
impl std::fmt::Display for EventResponse {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.write_fmt(format_args!("Status_Code: {:?}", self.status_code))?;
match &self.payload {
Payload::Bytes(b) => f.write_fmt(format_args!("Data: {} bytes", b.len()))?,
Payload::None => f.write_fmt(format_args!("Data: Empty"))?,
}
Ok(())
}
}
impl Responder for EventResponse {
#[inline]
fn respond_to(self, _: &EventRequest) -> EventResponse { self }
}
pub type DataResult<T, E> = std::result::Result<Data<T>, E>;
pub fn data_result<T, E>(data: T) -> Result<Data<T>, E>
where
E: Into<DispatchError>,
{
Ok(Data(data))
}

View File

@ -0,0 +1,114 @@
use crate::service::{Service, ServiceFactory};
use futures_core::future::BoxFuture;
pub fn factory<SF, Req>(factory: SF) -> BoxServiceFactory<SF::Context, Req, SF::Response, SF::Error>
where
SF: ServiceFactory<Req> + 'static + Sync + Send,
Req: 'static,
SF::Response: 'static,
SF::Service: 'static,
SF::Future: 'static,
SF::Error: 'static + Send + Sync,
<SF as ServiceFactory<Req>>::Service: Sync + Send,
<<SF as ServiceFactory<Req>>::Service as Service<Req>>::Future: Send + Sync,
<SF as ServiceFactory<Req>>::Future: Send + Sync,
{
BoxServiceFactory(Box::new(FactoryWrapper(factory)))
}
type Inner<Cfg, Req, Res, Err> = Box<
dyn ServiceFactory<
Req,
Context = Cfg,
Response = Res,
Error = Err,
Service = BoxService<Req, Res, Err>,
Future = BoxFuture<'static, Result<BoxService<Req, Res, Err>, Err>>,
> + Sync
+ Send,
>;
pub struct BoxServiceFactory<Cfg, Req, Res, Err>(Inner<Cfg, Req, Res, Err>);
impl<Cfg, Req, Res, Err> ServiceFactory<Req> for BoxServiceFactory<Cfg, Req, Res, Err>
where
Req: 'static,
Res: 'static,
Err: 'static,
{
type Response = Res;
type Error = Err;
type Service = BoxService<Req, Res, Err>;
type Context = Cfg;
type Future = BoxFuture<'static, Result<Self::Service, Self::Error>>;
fn new_service(&self, cfg: Cfg) -> Self::Future { self.0.new_service(cfg) }
}
pub type BoxService<Req, Res, Err> =
Box<dyn Service<Req, Response = Res, Error = Err, Future = BoxFuture<'static, Result<Res, Err>>> + Sync + Send>;
// #[allow(dead_code)]
// pub fn service<S, Req>(service: S) -> BoxService<Req, S::Response, S::Error>
// where
// S: Service<Req> + 'static,
// Req: 'static,
// S::Future: 'static,
// {
// Box::new(ServiceWrapper::new(service))
// }
impl<S, Req> Service<Req> for Box<S>
where
S: Service<Req> + ?Sized,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn call(&self, request: Req) -> S::Future { (**self).call(request) }
}
struct ServiceWrapper<S> {
inner: S,
}
impl<S> ServiceWrapper<S> {
fn new(inner: S) -> Self { Self { inner } }
}
impl<S, Req, Res, Err> Service<Req> for ServiceWrapper<S>
where
S: Service<Req, Response = Res, Error = Err>,
S::Future: 'static + Send + Sync,
{
type Response = Res;
type Error = Err;
type Future = BoxFuture<'static, Result<Res, Err>>;
fn call(&self, req: Req) -> Self::Future { Box::pin(self.inner.call(req)) }
}
struct FactoryWrapper<SF>(SF);
impl<SF, Req, Cfg, Res, Err> ServiceFactory<Req> for FactoryWrapper<SF>
where
Req: 'static,
Res: 'static,
Err: 'static,
SF: ServiceFactory<Req, Context = Cfg, Response = Res, Error = Err>,
SF::Future: 'static,
SF::Service: 'static + Send + Sync,
<<SF as ServiceFactory<Req>>::Service as Service<Req>>::Future: Send + Sync + 'static,
<SF as ServiceFactory<Req>>::Future: Send + Sync,
{
type Response = Res;
type Error = Err;
type Service = BoxService<Req, Res, Err>;
type Context = Cfg;
type Future = BoxFuture<'static, Result<Self::Service, Self::Error>>;
fn new_service(&self, cfg: Cfg) -> Self::Future {
let f = self.0.new_service(cfg);
Box::pin(async { f.await.map(|s| Box::new(ServiceWrapper::new(s)) as Self::Service) })
}
}

View File

@ -0,0 +1,242 @@
use std::{
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use futures_core::ready;
use pin_project::pin_project;
use crate::{
errors::DispatchError,
request::{payload::Payload, EventRequest, FromRequest},
response::{EventResponse, Responder},
service::{Service, ServiceFactory, ServiceRequest, ServiceResponse},
util::ready::*,
};
pub trait Handler<T, R>: Clone + 'static + Sync + Send
where
R: Future + Send + Sync,
R::Output: Responder,
{
fn call(&self, param: T) -> R;
}
pub struct HandlerService<H, T, R>
where
H: Handler<T, R>,
T: FromRequest,
R: Future + Sync + Send,
R::Output: Responder,
{
handler: H,
_phantom: PhantomData<(T, R)>,
}
impl<H, T, R> HandlerService<H, T, R>
where
H: Handler<T, R>,
T: FromRequest,
R: Future + Sync + Send,
R::Output: Responder,
{
pub fn new(handler: H) -> Self {
Self {
handler,
_phantom: PhantomData,
}
}
}
impl<H, T, R> Clone for HandlerService<H, T, R>
where
H: Handler<T, R>,
T: FromRequest,
R: Future + Sync + Send,
R::Output: Responder,
{
fn clone(&self) -> Self {
Self {
handler: self.handler.clone(),
_phantom: PhantomData,
}
}
}
impl<F, T, R> ServiceFactory<ServiceRequest> for HandlerService<F, T, R>
where
F: Handler<T, R>,
T: FromRequest,
R: Future + Send + Sync,
R::Output: Responder,
{
type Response = ServiceResponse;
type Error = DispatchError;
type Service = Self;
type Context = ();
type Future = Ready<Result<Self::Service, Self::Error>>;
fn new_service(&self, _: ()) -> Self::Future { ready(Ok(self.clone())) }
}
impl<H, T, R> Service<ServiceRequest> for HandlerService<H, T, R>
where
H: Handler<T, R>,
T: FromRequest,
R: Future + Sync + Send,
R::Output: Responder,
{
type Response = ServiceResponse;
type Error = DispatchError;
type Future = HandlerServiceFuture<H, T, R>;
fn call(&self, req: ServiceRequest) -> Self::Future {
let (req, mut payload) = req.into_parts();
let fut = T::from_request(&req, &mut payload);
HandlerServiceFuture::Extract(fut, Some(req), self.handler.clone())
}
}
#[pin_project(project = HandlerServiceProj)]
pub enum HandlerServiceFuture<H, T, R>
where
H: Handler<T, R>,
T: FromRequest,
R: Future + Sync + Send,
R::Output: Responder,
{
Extract(#[pin] T::Future, Option<EventRequest>, H),
Handle(#[pin] R, Option<EventRequest>),
}
impl<F, T, R> Future for HandlerServiceFuture<F, T, R>
where
F: Handler<T, R>,
T: FromRequest,
R: Future + Sync + Send,
R::Output: Responder,
{
type Output = Result<ServiceResponse, DispatchError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match self.as_mut().project() {
HandlerServiceProj::Extract(fut, req, handle) => {
match ready!(fut.poll(cx)) {
Ok(params) => {
let fut = handle.call(params);
let state = HandlerServiceFuture::Handle(fut, req.take());
self.as_mut().set(state);
},
Err(err) => {
let req = req.take().unwrap();
let system_err: DispatchError = err.into();
let res: EventResponse = system_err.into();
return Poll::Ready(Ok(ServiceResponse::new(req, res)));
},
};
},
HandlerServiceProj::Handle(fut, req) => {
let result = ready!(fut.poll(cx));
let req = req.take().unwrap();
let resp = result.respond_to(&req);
return Poll::Ready(Ok(ServiceResponse::new(req, resp)));
},
}
}
}
}
macro_rules! factory_tuple ({ $($param:ident)* } => {
impl<Func, $($param,)* Res> Handler<($($param,)*), Res> for Func
where Func: Fn($($param),*) -> Res + Clone + 'static + Sync + Send,
Res: Future + Sync + Send,
Res::Output: Responder,
{
#[allow(non_snake_case)]
fn call(&self, ($($param,)*): ($($param,)*)) -> Res {
(self)($($param,)*)
}
}
});
macro_rules! tuple_from_req ({$tuple_type:ident, $(($n:tt, $T:ident)),+} => {
#[allow(non_snake_case)]
mod $tuple_type {
use super::*;
#[pin_project::pin_project]
struct FromRequestFutures<$($T: FromRequest),+>($(#[pin] $T::Future),+);
/// FromRequest implementation for tuple
#[doc(hidden)]
#[allow(unused_parens)]
impl<$($T: FromRequest + 'static),+> FromRequest for ($($T,)+)
{
type Error = DispatchError;
type Future = $tuple_type<$($T),+>;
fn from_request(req: &EventRequest, payload: &mut Payload) -> Self::Future {
$tuple_type {
items: <($(Option<$T>,)+)>::default(),
futs: FromRequestFutures($($T::from_request(req, payload),)+),
}
}
}
#[doc(hidden)]
#[pin_project::pin_project]
pub struct $tuple_type<$($T: FromRequest),+> {
items: ($(Option<$T>,)+),
#[pin]
futs: FromRequestFutures<$($T,)+>,
}
impl<$($T: FromRequest),+> Future for $tuple_type<$($T),+>
{
type Output = Result<($($T,)+), DispatchError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let mut ready = true;
$(
if this.items.$n.is_none() {
match this.futs.as_mut().project().$n.poll(cx) {
Poll::Ready(Ok(item)) => this.items.$n = Some(item),
Poll::Pending => ready = false,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
}
}
)+
if ready {
Poll::Ready(Ok(
($(this.items.$n.take().unwrap(),)+)
))
} else {
Poll::Pending
}
}
}
}
});
factory_tuple! {}
factory_tuple! { A }
factory_tuple! { A B }
factory_tuple! { A B C }
factory_tuple! { A B C D }
factory_tuple! { A B C D E }
#[rustfmt::skip]
mod m {
use super::*;
tuple_from_req!(TupleFromRequest1, (0, A));
tuple_from_req!(TupleFromRequest2, (0, A), (1, B));
tuple_from_req!(TupleFromRequest3, (0, A), (1, B), (2, C));
tuple_from_req!(TupleFromRequest4, (0, A), (1, B), (2, C), (3, D));
tuple_from_req!(TupleFromRequest5, (0, A), (1, B), (2, C), (3, D), (4, E));
}

View File

@ -0,0 +1,7 @@
mod boxed;
mod handler;
mod service;
pub use boxed::*;
pub use handler::*;
pub use service::*;

View File

@ -0,0 +1,47 @@
use std::future::Future;
use crate::{
request::{payload::Payload, EventRequest},
response::EventResponse,
};
pub trait Service<Request> {
type Response;
type Error;
type Future: Future<Output = Result<Self::Response, Self::Error>>;
fn call(&self, req: Request) -> Self::Future;
}
pub trait ServiceFactory<Request> {
type Response;
type Error;
type Service: Service<Request, Response = Self::Response, Error = Self::Error>;
type Context;
type Future: Future<Output = Result<Self::Service, Self::Error>>;
fn new_service(&self, cfg: Self::Context) -> Self::Future;
}
pub struct ServiceRequest {
req: EventRequest,
payload: Payload,
}
impl ServiceRequest {
pub fn new(req: EventRequest, payload: Payload) -> Self { Self { req, payload } }
#[inline]
pub fn into_parts(self) -> (EventRequest, Payload) { (self.req, self.payload) }
}
pub struct ServiceResponse {
request: EventRequest,
response: EventResponse,
}
impl ServiceResponse {
pub fn new(request: EventRequest, response: EventResponse) -> Self { ServiceResponse { request, response } }
pub fn into_parts(self) -> (EventRequest, EventResponse) { (self.request, self.response) }
}

View File

@ -0,0 +1,166 @@
use crate::module::{as_module_map, Module, ModuleMap};
use futures_core::{ready, task::Context};
use std::{cell::RefCell, fmt::Debug, future::Future, io, sync::Arc};
use tokio::{
macros::support::{Pin, Poll},
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
},
};
thread_local!(
static CURRENT: RefCell<Option<Arc<FlowySystem>>> = RefCell::new(None);
);
#[derive(Debug)]
#[allow(dead_code)]
pub enum SystemCommand {
Exit(i8),
}
pub struct FlowySystem {
sys_cmd_tx: UnboundedSender<SystemCommand>,
}
impl FlowySystem {
#[allow(dead_code)]
pub fn construct<F, S>(module_factory: F, sender_factory: S) -> SystemRunner
where
F: FnOnce() -> Vec<Module>,
S: FnOnce(ModuleMap, &Runtime),
{
let runtime = Arc::new(Runtime::new().unwrap());
let (sys_cmd_tx, sys_cmd_rx) = unbounded_channel::<SystemCommand>();
let (stop_tx, stop_rx) = oneshot::channel();
runtime.spawn(SystemController {
stop_tx: Some(stop_tx),
sys_cmd_rx,
});
let module_map = as_module_map(module_factory());
sender_factory(module_map.clone(), &runtime);
let system = Self { sys_cmd_tx };
FlowySystem::set_current(system);
let runner = SystemRunner { rt: runtime, stop_rx };
runner
}
#[allow(dead_code)]
pub fn stop(&self) {
match self.sys_cmd_tx.send(SystemCommand::Exit(0)) {
Ok(_) => {},
Err(e) => {
log::error!("Stop system error: {}", e);
},
}
}
#[allow(dead_code)]
pub fn set_current(sys: FlowySystem) {
CURRENT.with(|cell| {
*cell.borrow_mut() = Some(Arc::new(sys));
})
}
#[allow(dead_code)]
pub fn current() -> Arc<FlowySystem> {
CURRENT.with(|cell| match *cell.borrow() {
Some(ref sys) => sys.clone(),
None => panic!("System is not running"),
})
}
}
struct SystemController {
stop_tx: Option<oneshot::Sender<i8>>,
sys_cmd_rx: UnboundedReceiver<SystemCommand>,
}
impl Future for SystemController {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(Pin::new(&mut self.sys_cmd_rx).poll_recv(cx)) {
None => return Poll::Ready(()),
Some(cmd) => match cmd {
SystemCommand::Exit(code) => {
if let Some(tx) = self.stop_tx.take() {
let _ = tx.send(code);
}
},
},
}
}
}
}
pub struct SystemRunner {
rt: Arc<Runtime>,
stop_rx: oneshot::Receiver<i8>,
}
impl SystemRunner {
#[allow(dead_code)]
pub fn run(self) -> io::Result<()> {
let SystemRunner { rt, stop_rx } = self;
match rt.block_on(stop_rx) {
Ok(code) => {
if code != 0 {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", code),
))
} else {
Ok(())
}
},
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
}
}
#[allow(dead_code)]
pub fn spawn<F: Future<Output = ()> + 'static>(self, future: F) -> Self {
self.rt.spawn(future);
self
}
}
use crate::util::tokio_default_runtime;
use tokio::{runtime, task::LocalSet};
#[derive(Debug)]
pub struct Runtime {
local: LocalSet,
rt: runtime::Runtime,
}
impl Runtime {
#[allow(dead_code)]
pub fn new() -> io::Result<Runtime> {
let rt = tokio_default_runtime()?;
Ok(Runtime {
rt,
local: LocalSet::new(),
})
}
#[allow(dead_code)]
pub fn spawn<F>(&self, future: F) -> &Self
where
F: Future<Output = ()> + 'static,
{
self.local.spawn_local(future);
self
}
#[allow(dead_code)]
pub fn block_on<F>(&self, f: F) -> F::Output
where
F: Future + 'static,
{
self.local.block_on(&self.rt, f)
}
}

View File

@ -0,0 +1,27 @@
use std::{io, thread};
use thread_id;
use tokio::runtime;
pub mod ready;
pub fn tokio_default_runtime() -> io::Result<tokio::runtime::Runtime> {
runtime::Builder::new_multi_thread()
.thread_name("flowy-rt")
.enable_io()
.enable_time()
.on_thread_start(move || {
tracing::trace!(
"{:?} thread started: thread_id= {}",
thread::current(),
thread_id::get()
);
})
.on_thread_stop(move || {
tracing::trace!(
"{:?} thread stopping: thread_id= {}",
thread::current(),
thread_id::get(),
);
})
.build()
}

View File

@ -0,0 +1,28 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
pub struct Ready<T> {
val: Option<T>,
}
impl<T> Ready<T> {
#[inline]
pub fn into_inner(mut self) -> T { self.val.take().unwrap() }
}
impl<T> Unpin for Ready<T> {}
impl<T> Future for Ready<T> {
type Output = T;
#[inline]
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
let val = self.val.take().expect("Ready polled after completion");
Poll::Ready(val)
}
}
pub fn ready<T>(val: T) -> Ready<T> { Ready { val: Some(val) } }

View File

@ -0,0 +1 @@
mod module;

View File

@ -0,0 +1,21 @@
use lib_dispatch::prelude::*;
use std::sync::Arc;
pub async fn hello() -> String { "say hello".to_string() }
#[tokio::test]
async fn test() {
env_logger::init();
let event = "1";
let dispatch = Arc::new(EventDispatch::construct(|| vec![Module::new().event(event, hello)]));
let request = ModuleRequest::new(event);
let _ = EventDispatch::async_send_with_callback(dispatch.clone(), request, |resp| {
Box::pin(async move {
dbg!(&resp);
})
})
.await;
std::mem::forget(dispatch);
}