2021-07-02 12:45:51 +00:00
|
|
|
use crate::{
|
2021-07-10 08:27:20 +00:00
|
|
|
errors::{DispatchError, Error, InternalError},
|
2021-07-08 05:47:11 +00:00
|
|
|
module::{as_module_map, Module, ModuleMap, ModuleRequest},
|
2021-07-02 12:45:51 +00:00
|
|
|
response::EventResponse,
|
|
|
|
service::{Service, ServiceFactory},
|
|
|
|
util::tokio_default_runtime,
|
|
|
|
};
|
|
|
|
use derivative::*;
|
|
|
|
use futures_core::future::BoxFuture;
|
2021-07-03 06:14:10 +00:00
|
|
|
use futures_util::task::Context;
|
2021-07-02 12:45:51 +00:00
|
|
|
use lazy_static::lazy_static;
|
2021-07-03 06:14:10 +00:00
|
|
|
use pin_project::pin_project;
|
2021-07-08 05:47:11 +00:00
|
|
|
use std::{future::Future, sync::RwLock};
|
2021-07-03 14:24:02 +00:00
|
|
|
use tokio::macros::support::{Pin, Poll};
|
2021-07-02 12:45:51 +00:00
|
|
|
|
|
|
|
lazy_static! {
|
|
|
|
pub static ref EVENT_DISPATCH: RwLock<Option<EventDispatch>> = RwLock::new(None);
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct EventDispatch {
|
|
|
|
module_map: ModuleMap,
|
|
|
|
runtime: tokio::runtime::Runtime,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl EventDispatch {
|
|
|
|
pub fn construct<F>(module_factory: F)
|
|
|
|
where
|
|
|
|
F: FnOnce() -> Vec<Module>,
|
|
|
|
{
|
|
|
|
let modules = module_factory();
|
2021-07-08 13:23:44 +00:00
|
|
|
log::trace!("{}", module_info(&modules));
|
2021-07-02 12:45:51 +00:00
|
|
|
let module_map = as_module_map(modules);
|
|
|
|
let runtime = tokio_default_runtime().unwrap();
|
|
|
|
let dispatch = EventDispatch {
|
|
|
|
module_map,
|
|
|
|
runtime,
|
|
|
|
};
|
|
|
|
*(EVENT_DISPATCH.write().unwrap()) = Some(dispatch);
|
|
|
|
}
|
|
|
|
|
2021-07-07 14:24:26 +00:00
|
|
|
pub fn async_send<Req, Callback>(request: Req, callback: Callback) -> DispatchFuture
|
|
|
|
where
|
2021-07-08 05:47:11 +00:00
|
|
|
Req: std::convert::Into<ModuleRequest>,
|
2021-07-07 14:24:26 +00:00
|
|
|
Callback: FnOnce(EventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync,
|
|
|
|
{
|
2021-07-08 05:47:11 +00:00
|
|
|
let request: ModuleRequest = request.into();
|
2021-07-02 12:45:51 +00:00
|
|
|
match EVENT_DISPATCH.read() {
|
|
|
|
Ok(dispatch) => {
|
|
|
|
let dispatch = dispatch.as_ref().unwrap();
|
|
|
|
let module_map = dispatch.module_map.clone();
|
|
|
|
let service = Box::new(DispatchService { module_map });
|
2021-07-09 09:47:15 +00:00
|
|
|
log::trace!("Async event: {:?}", &request.event);
|
2021-07-08 05:47:11 +00:00
|
|
|
let service_ctx = DispatchContext {
|
|
|
|
request,
|
|
|
|
callback: Some(Box::new(callback)),
|
|
|
|
};
|
2021-07-03 06:14:10 +00:00
|
|
|
let join_handle = dispatch.runtime.spawn(async move {
|
|
|
|
service
|
2021-07-08 05:47:11 +00:00
|
|
|
.call(service_ctx)
|
2021-07-03 06:14:10 +00:00
|
|
|
.await
|
|
|
|
.unwrap_or_else(|e| InternalError::new(format!("{:?}", e)).as_response())
|
|
|
|
});
|
|
|
|
|
|
|
|
DispatchFuture {
|
|
|
|
fut: Box::pin(async move {
|
|
|
|
join_handle.await.unwrap_or_else(|e| {
|
2021-07-09 06:02:42 +00:00
|
|
|
InternalError::new(format!("EVENT_DISPATCH join error: {:?}", e))
|
2021-07-03 06:14:10 +00:00
|
|
|
.as_response()
|
|
|
|
})
|
|
|
|
}),
|
|
|
|
}
|
2021-07-02 12:45:51 +00:00
|
|
|
},
|
|
|
|
|
|
|
|
Err(e) => {
|
2021-07-08 13:23:44 +00:00
|
|
|
let msg = format!("EVENT_DISPATCH read failed. {:?}", e);
|
|
|
|
log::error!("{}", msg);
|
2021-07-03 06:14:10 +00:00
|
|
|
DispatchFuture {
|
|
|
|
fut: Box::pin(async { InternalError::new(msg).as_response() }),
|
|
|
|
}
|
2021-07-02 12:45:51 +00:00
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-08 05:47:11 +00:00
|
|
|
pub fn sync_send(request: ModuleRequest) -> EventResponse {
|
2021-07-07 14:24:26 +00:00
|
|
|
futures::executor::block_on(async {
|
2021-07-08 13:23:44 +00:00
|
|
|
EventDispatch::async_send(request, |_| Box::pin(async {})).await
|
2021-07-07 14:24:26 +00:00
|
|
|
})
|
2021-07-02 12:45:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-03 06:14:10 +00:00
|
|
|
#[pin_project]
|
|
|
|
pub struct DispatchFuture {
|
|
|
|
#[pin]
|
|
|
|
fut: BoxFuture<'static, EventResponse>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Future for DispatchFuture {
|
|
|
|
type Output = EventResponse;
|
|
|
|
|
|
|
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
|
|
let this = self.as_mut().project();
|
|
|
|
loop {
|
|
|
|
return Poll::Ready(futures_core::ready!(this.fut.poll(cx)));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub type BoxFutureCallback =
|
|
|
|
Box<dyn FnOnce(EventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync>;
|
2021-07-02 12:45:51 +00:00
|
|
|
|
|
|
|
#[derive(Derivative)]
|
|
|
|
#[derivative(Debug)]
|
2021-07-08 05:47:11 +00:00
|
|
|
pub struct DispatchContext {
|
|
|
|
pub request: ModuleRequest,
|
2021-07-02 12:45:51 +00:00
|
|
|
#[derivative(Debug = "ignore")]
|
2021-07-03 06:14:10 +00:00
|
|
|
pub callback: Option<BoxFutureCallback>,
|
2021-07-02 12:45:51 +00:00
|
|
|
}
|
|
|
|
|
2021-07-08 05:47:11 +00:00
|
|
|
impl DispatchContext {
|
2021-07-03 06:14:10 +00:00
|
|
|
pub(crate) fn into_parts(self) -> (ModuleRequest, Option<BoxFutureCallback>) {
|
2021-07-08 05:47:11 +00:00
|
|
|
let DispatchContext { request, callback } = self;
|
|
|
|
(request, callback)
|
2021-07-03 06:14:10 +00:00
|
|
|
}
|
2021-07-02 12:45:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) struct DispatchService {
|
|
|
|
pub(crate) module_map: ModuleMap,
|
|
|
|
}
|
|
|
|
|
2021-07-08 05:47:11 +00:00
|
|
|
impl Service<DispatchContext> for DispatchService {
|
2021-07-02 12:45:51 +00:00
|
|
|
type Response = EventResponse;
|
2021-07-10 08:27:20 +00:00
|
|
|
type Error = DispatchError;
|
2021-07-02 12:45:51 +00:00
|
|
|
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
|
|
|
|
2021-07-03 13:40:13 +00:00
|
|
|
#[cfg_attr(
|
|
|
|
feature = "use_tracing",
|
2021-07-08 05:47:11 +00:00
|
|
|
tracing::instrument(name = "DispatchService", level = "debug", skip(self, ctx))
|
2021-07-03 13:40:13 +00:00
|
|
|
)]
|
2021-07-08 05:47:11 +00:00
|
|
|
fn call(&self, ctx: DispatchContext) -> Self::Future {
|
2021-07-02 12:45:51 +00:00
|
|
|
let module_map = self.module_map.clone();
|
2021-07-08 05:47:11 +00:00
|
|
|
let (request, callback) = ctx.into_parts();
|
|
|
|
|
2021-07-02 12:45:51 +00:00
|
|
|
Box::pin(async move {
|
|
|
|
let result = {
|
2021-07-09 09:47:15 +00:00
|
|
|
match module_map.get(&request.event) {
|
2021-07-02 12:45:51 +00:00
|
|
|
Some(module) => {
|
|
|
|
let fut = module.new_service(());
|
|
|
|
let service_fut = fut.await?.call(request);
|
|
|
|
service_fut.await
|
|
|
|
},
|
|
|
|
None => {
|
2021-07-08 13:23:44 +00:00
|
|
|
let msg = format!("Can not find the event handler. {:?}", request);
|
2021-07-03 06:14:10 +00:00
|
|
|
log::trace!("{}", msg);
|
2021-07-02 12:45:51 +00:00
|
|
|
Err(InternalError::new(msg).into())
|
|
|
|
},
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let response = result.unwrap_or_else(|e| e.into());
|
2021-07-03 06:14:10 +00:00
|
|
|
log::trace!("Dispatch result: {:?}", response);
|
2021-07-02 12:45:51 +00:00
|
|
|
if let Some(callback) = callback {
|
2021-07-03 06:14:10 +00:00
|
|
|
callback(response.clone()).await;
|
2021-07-02 12:45:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
Ok(response)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
2021-07-03 06:14:10 +00:00
|
|
|
|
|
|
|
fn module_info(modules: &Vec<Module>) -> String {
|
|
|
|
let mut info = format!("{} modules loaded\n", modules.len());
|
|
|
|
for module in modules {
|
|
|
|
info.push_str(&format!("-> {} loaded \n", module.name));
|
|
|
|
}
|
|
|
|
info
|
|
|
|
}
|