From 13e52ae3506672a220881ec5123554658af9047d Mon Sep 17 00:00:00 2001 From: appflowy Date: Fri, 9 Jul 2021 17:47:15 +0800 Subject: [PATCH] enable module data passed in --- rust-lib/flowy-dispatch/src/dispatch.rs | 4 +- .../flowy-dispatch/src/module/container.rs | 74 ++----------------- rust-lib/flowy-dispatch/src/module/data.rs | 16 +++- rust-lib/flowy-dispatch/src/module/module.rs | 45 ++++++----- .../flowy-dispatch/src/request/request.rs | 23 +++++- 5 files changed, 65 insertions(+), 97 deletions(-) diff --git a/rust-lib/flowy-dispatch/src/dispatch.rs b/rust-lib/flowy-dispatch/src/dispatch.rs index 40985ab726..bc9cd31059 100644 --- a/rust-lib/flowy-dispatch/src/dispatch.rs +++ b/rust-lib/flowy-dispatch/src/dispatch.rs @@ -49,7 +49,7 @@ impl EventDispatch { let dispatch = dispatch.as_ref().unwrap(); let module_map = dispatch.module_map.clone(); let service = Box::new(DispatchService { module_map }); - log::trace!("Async event: {:?}", &request.event()); + log::trace!("Async event: {:?}", &request.event); let service_ctx = DispatchContext { request, callback: Some(Box::new(callback)), @@ -142,7 +142,7 @@ impl Service for DispatchService { Box::pin(async move { let result = { - match module_map.get(&request.event()) { + match module_map.get(&request.event) { Some(module) => { let fut = module.new_service(()); let service_fut = fut.await?.call(request); diff --git a/rust-lib/flowy-dispatch/src/module/container.rs b/rust-lib/flowy-dispatch/src/module/container.rs index 118dea0455..156b98d82c 100644 --- a/rust-lib/flowy-dispatch/src/module/container.rs +++ b/rust-lib/flowy-dispatch/src/module/container.rs @@ -3,15 +3,15 @@ use std::{ collections::HashMap, }; -#[derive(Default)] -pub struct DataContainer { +#[derive(Default, Debug)] +pub struct ModuleDataMap { map: HashMap>, } -impl DataContainer { +impl ModuleDataMap { #[inline] - pub fn new() -> DataContainer { - DataContainer { + pub fn new() -> ModuleDataMap { + ModuleDataMap { map: HashMap::default(), } } @@ -57,71 +57,9 @@ impl DataContainer { self.map.contains_key(&TypeId::of::()) } - pub fn extend(&mut self, other: DataContainer) { self.map.extend(other.map); } + pub fn extend(&mut self, other: ModuleDataMap) { self.map.extend(other.map); } } fn downcast_owned(boxed: Box) -> Option { boxed.downcast().ok().map(|boxed| *boxed) } - -// use std::{ -// any::{Any, TypeId}, -// collections::HashMap, -// sync::RwLock, -// }; -// -// #[derive(Default)] -// pub struct DataContainer { -// map: RwLock>>, -// } -// -// impl DataContainer { -// #[inline] -// pub fn new() -> DataContainer { -// DataContainer { -// map: RwLock::new(HashMap::default()), -// } -// } -// -// pub fn insert(&mut self, val: T) -> Option { -// self.map -// .write() -// .unwrap() -// .insert(TypeId::of::(), Box::new(val)) -// .and_then(downcast_owned) -// } -// -// pub fn remove(&mut self) -> Option { -// self.map -// .write() -// .unwrap() -// .remove(&TypeId::of::()) -// .and_then(downcast_owned) -// } -// -// pub fn get(&self) -> Option<&T> { -// self.map -// .read() -// .unwrap() -// .get(&TypeId::of::()) -// .and_then(|boxed| boxed.downcast_ref()) -// } -// -// pub fn get_mut(&mut self) -> Option<&mut T> { -// self.map -// .write() -// .unwrap() -// .get_mut(&TypeId::of::()) -// .and_then(|boxed| boxed.downcast_mut()) -// } -// -// pub fn contains(&self) -> bool { -// self.map.read().unwrap().contains_key(&TypeId::of::()) -// } -// -// pub fn extend(&mut self, other: DataContainer) { -// self.map.write().unwrap().extend(other.map); } } -// -// fn downcast_owned(boxed: Box) -> Option { -// boxed.downcast().ok().map(|boxed| *boxed) -// } diff --git a/rust-lib/flowy-dispatch/src/module/data.rs b/rust-lib/flowy-dispatch/src/module/data.rs index d46e75aded..59a8ece442 100644 --- a/rust-lib/flowy-dispatch/src/module/data.rs +++ b/rust-lib/flowy-dispatch/src/module/data.rs @@ -1,9 +1,9 @@ use crate::{ - error::SystemError, + error::{InternalError, SystemError}, request::{payload::Payload, EventRequest, FromRequest}, - util::ready::Ready, + util::ready::{ready, Ready}, }; -use std::{ops::Deref, sync::Arc}; +use std::{any::type_name, ops::Deref, sync::Arc}; pub struct ModuleData(Arc); @@ -47,5 +47,13 @@ where type Future = Ready>; #[inline] - fn from_request(_req: &EventRequest, _: &mut Payload) -> Self::Future { unimplemented!() } + fn from_request(req: &EventRequest, _: &mut Payload) -> Self::Future { + if let Some(data) = req.module_data::>() { + ready(Ok(data.clone())) + } else { + let msg = format!("Failed to get the module data(type: {})", type_name::()); + log::error!("{}", msg,); + ready(Err(InternalError::new(msg).into())) + } + } } diff --git a/rust-lib/flowy-dispatch/src/module/module.rs b/rust-lib/flowy-dispatch/src/module/module.rs index 30152e2c0e..b4e74b3c89 100644 --- a/rust-lib/flowy-dispatch/src/module/module.rs +++ b/rust-lib/flowy-dispatch/src/module/module.rs @@ -13,7 +13,7 @@ use pin_project::pin_project; use crate::{ error::{InternalError, SystemError}, - module::{container::DataContainer, ModuleData}, + module::{container::ModuleDataMap, ModuleData}, request::{payload::Payload, EventRequest, FromRequest}, response::{EventResponse, Responder}, service::{ @@ -55,7 +55,7 @@ pub type EventServiceFactory = BoxServiceFactory<(), ServiceRequest, ServiceResp pub struct Module { pub name: String, - module_data: DataContainer, + module_data: Arc, service_map: Arc>, } @@ -63,7 +63,7 @@ impl Module { pub fn new() -> Self { Self { name: "".to_owned(), - module_data: DataContainer::new(), + module_data: Arc::new(ModuleDataMap::new()), service_map: Arc::new(HashMap::new()), } } @@ -74,7 +74,10 @@ impl Module { } pub fn data(mut self, data: D) -> Self { - self.module_data.insert(ModuleData::new(data)); + Arc::get_mut(&mut self.module_data) + .unwrap() + .insert(ModuleData::new(data)); + self } @@ -108,8 +111,9 @@ impl Module { #[derive(Debug)] pub struct ModuleRequest { - inner: EventRequest, - payload: Payload, + pub(crate) id: String, + pub(crate) event: Event, + pub(crate) payload: Payload, } impl ModuleRequest { @@ -118,7 +122,8 @@ impl ModuleRequest { E: Into, { Self { - inner: EventRequest::new(event, uuid::Uuid::new_v4().to_string()), + id: uuid::Uuid::new_v4().to_string(), + event: event.into(), payload: Payload::None, } } @@ -130,22 +135,14 @@ impl ModuleRequest { self.payload = payload.into(); self } - - pub fn id(&self) -> &str { &self.inner.id } - - pub fn event(&self) -> &Event { &self.inner.event } } impl std::fmt::Display for ModuleRequest { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}:{:?}", self.inner.id, self.inner.event) + write!(f, "{}:{:?}", self.id, self.event) } } -impl std::convert::Into for ModuleRequest { - fn into(self) -> ServiceRequest { ServiceRequest::new(self.inner, self.payload) } -} - impl ServiceFactory for Module { type Response = EventResponse; type Error = SystemError; @@ -155,8 +152,12 @@ impl ServiceFactory for Module { fn new_service(&self, _cfg: Self::Context) -> Self::Future { let service_map = self.service_map.clone(); + let module_data = self.module_data.clone(); Box::pin(async move { - let service = ModuleService { service_map }; + let service = ModuleService { + service_map, + module_data, + }; let module_service = Box::new(service) as Self::Service; Ok(module_service) }) @@ -165,6 +166,7 @@ impl ServiceFactory for Module { pub struct ModuleService { service_map: Arc>, + module_data: Arc, } impl Service for ModuleService { @@ -173,13 +175,18 @@ impl Service for ModuleService { type Future = BoxFuture<'static, Result>; fn call(&self, request: ModuleRequest) -> Self::Future { - match self.service_map.get(&request.event()) { + let ModuleRequest { id, event, payload } = request; + let module_data = self.module_data.clone(); + let request = EventRequest::new(id.clone(), event, module_data); + + match self.service_map.get(&request.event) { Some(factory) => { let service_fut = factory.new_service(()); let fut = ModuleServiceFuture { fut: Box::pin(async { let service = service_fut.await?; - service.call(request.into()).await + let service_req = ServiceRequest::new(request, payload); + service.call(service_req).await }), }; Box::pin(async move { Ok(fut.await.unwrap_or_else(|e| e.into())) }) diff --git a/rust-lib/flowy-dispatch/src/request/request.rs b/rust-lib/flowy-dispatch/src/request/request.rs index 6f1d82ac89..805f12492c 100644 --- a/rust-lib/flowy-dispatch/src/request/request.rs +++ b/rust-lib/flowy-dispatch/src/request/request.rs @@ -2,34 +2,49 @@ use std::future::Future; use crate::{ error::{InternalError, SystemError}, - module::Event, + module::{Event, ModuleDataMap}, request::payload::Payload, util::ready::{ready, Ready}, }; - +use derivative::*; use futures_core::ready; use std::{ fmt::Debug, pin::Pin, + sync::Arc, task::{Context, Poll}, }; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Derivative)] pub struct EventRequest { pub(crate) id: String, pub(crate) event: Event, + #[derivative(Debug = "ignore")] + pub(crate) module_data: Arc, } impl EventRequest { - pub fn new(event: E, id: String) -> EventRequest + pub fn new(id: String, event: E, module_data: Arc) -> EventRequest where E: Into, { Self { id, event: event.into(), + module_data, } } + + pub fn module_data(&self) -> Option<&T> + where + T: Send + Sync, + { + if let Some(data) = self.module_data.get::() { + return Some(data); + } + + None + } } pub trait FromRequest: Sized {