From 6d2d24baf749a1f91f5639d3aee0eed9efa89ce9 Mon Sep 17 00:00:00 2001 From: appflowy Date: Sun, 27 Jun 2021 22:07:33 +0800 Subject: [PATCH] module as service --- rust-lib/flowy-sys/Cargo.toml | 11 +- rust-lib/flowy-sys/src/dart_ffi/ffi.rs | 34 ++++++ rust-lib/flowy-sys/src/dart_ffi/mod.rs | 2 + rust-lib/flowy-sys/src/lib.rs | 3 + rust-lib/flowy-sys/src/module/module.rs | 107 +++++++++++++------ rust-lib/flowy-sys/src/request/request.rs | 1 + rust-lib/flowy-sys/src/rt/runtime.rs | 23 +++- rust-lib/flowy-sys/src/rt/system.rs | 55 +++++----- rust-lib/flowy-sys/src/service/boxed.rs | 22 ++-- rust-lib/flowy-sys/tests/api/module_event.rs | 15 +-- 10 files changed, 195 insertions(+), 78 deletions(-) create mode 100644 rust-lib/flowy-sys/src/dart_ffi/ffi.rs create mode 100644 rust-lib/flowy-sys/src/dart_ffi/mod.rs diff --git a/rust-lib/flowy-sys/Cargo.toml b/rust-lib/flowy-sys/Cargo.toml index 8a49cbc079..c79e07dd88 100644 --- a/rust-lib/flowy-sys/Cargo.toml +++ b/rust-lib/flowy-sys/Cargo.toml @@ -20,7 +20,16 @@ env_logger = "0.8" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_with = "1.9.4" +thread-id = "3.3.0" + +#optional crate +allo-isolate = {version = "^0.1", features = ["catch-unwind",], optional = true} +byteorder = {version = "1.3.4", optional = true} +ffi-support = {version = "0.4.2", optional = true} [dev-dependencies] tokio = { version = "1", features = ["full"] } -futures-util = "0.3.15" \ No newline at end of file +futures-util = "0.3.15" + +[features] +dart_ffi = ["ffi-support", "allo-isolate", "byteorder"] \ No newline at end of file diff --git a/rust-lib/flowy-sys/src/dart_ffi/ffi.rs b/rust-lib/flowy-sys/src/dart_ffi/ffi.rs new file mode 100644 index 0000000000..bf1b49f0bb --- /dev/null +++ b/rust-lib/flowy-sys/src/dart_ffi/ffi.rs @@ -0,0 +1,34 @@ +use crate::{response::EventResponse, rt::SystemCommand}; +use futures_core::ready; +use std::{future::Future, task::Context}; +use tokio::{ + macros::support::{Pin, Poll}, + sync::{mpsc::UnboundedReceiver, oneshot}, +}; + +#[no_mangle] +pub extern "C" fn async_command(port: i64, input: *const u8, len: usize) {} + +#[no_mangle] +pub extern "C" fn free_rust(ptr: *mut u8, length: u32) { reclaim_rust(ptr, length) } + +#[no_mangle] +pub extern "C" fn init_stream(port: i64) -> i32 { return 0; } + +struct SystemFFI { + resp_rx: UnboundedReceiver, +} + +impl Future for SystemFFI { + type Output = (); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + match ready!(Pin::new(&mut self.resp_rx).poll_recv(cx)) { + None => return Poll::Ready(()), + Some(resp) => { + log::trace!("Response: {:?}", resp); + }, + } + } + } +} diff --git a/rust-lib/flowy-sys/src/dart_ffi/mod.rs b/rust-lib/flowy-sys/src/dart_ffi/mod.rs new file mode 100644 index 0000000000..9fd02ec7c1 --- /dev/null +++ b/rust-lib/flowy-sys/src/dart_ffi/mod.rs @@ -0,0 +1,2 @@ +#[cfg(feature = "dart_ffi")] +mod ffi; diff --git a/rust-lib/flowy-sys/src/lib.rs b/rust-lib/flowy-sys/src/lib.rs index d2c041e770..9f6852bbbe 100644 --- a/rust-lib/flowy-sys/src/lib.rs +++ b/rust-lib/flowy-sys/src/lib.rs @@ -9,6 +9,9 @@ mod rt; mod service; mod util; +#[cfg(feature = "dart_ffi")] +mod dart_ffi; + pub mod prelude { pub use crate::{error::*, module::*, request::*, response::*, rt::*}; } diff --git a/rust-lib/flowy-sys/src/module/module.rs b/rust-lib/flowy-sys/src/module/module.rs index 9921fb1370..fc4d5baf2a 100644 --- a/rust-lib/flowy-sys/src/module/module.rs +++ b/rust-lib/flowy-sys/src/module/module.rs @@ -8,8 +8,10 @@ use crate::{ }; use crate::{ + error::InternalError, request::{payload::Payload, EventRequest}, - response::EventResponse, + response::{EventResponse, EventResponseBuilder}, + rt::SystemCommand, service::{factory, BoxServiceFactory, HandlerService}, }; use futures_core::{future::LocalBoxFuture, ready}; @@ -18,6 +20,7 @@ use std::{ collections::HashMap, future::Future, pin::Pin, + rc::Rc, task::{Context, Poll}, }; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; @@ -28,22 +31,22 @@ pub type EventServiceFactory = BoxServiceFactory<(), ServiceRequest, ServiceResp pub struct Module { name: String, data: DataContainer, - service_map: HashMap, + service_map: Rc>, req_tx: UnboundedSender, req_rx: UnboundedReceiver, - resp_tx: UnboundedSender, + sys_tx: UnboundedSender, } impl Module { - pub fn new(resp_tx: UnboundedSender) -> Self { + pub fn new(sys_tx: UnboundedSender) -> Self { let (req_tx, req_rx) = unbounded_channel::(); Self { name: "".to_owned(), data: DataContainer::new(), - service_map: HashMap::new(), + service_map: Rc::new(HashMap::new()), req_tx, req_rx, - resp_tx, + sys_tx, } } @@ -68,14 +71,16 @@ impl Module { log::error!("Duplicate Event: {}", &event); } - self.service_map.insert(event, factory(HandlerService::new(handler))); + Rc::get_mut(&mut self.service_map) + .unwrap() + .insert(event, factory(HandlerService::new(handler))); self } pub fn req_tx(&self) -> UnboundedSender { self.req_tx.clone() } pub fn handle(&self, request: EventRequest) { - log::trace!("Module: {} receive request: {:?}", self.name, request); + log::debug!("Module: {} receive request: {:?}", self.name, request); match self.req_tx.send(request) { Ok(_) => {}, Err(e) => { @@ -98,30 +103,72 @@ impl Future for Module { loop { match ready!(Pin::new(&mut self.req_rx).poll_recv(cx)) { None => return Poll::Ready(()), - Some(request) => match self.service_map.get(request.get_event()) { - Some(factory) => { - let fut = ModuleServiceFuture { - request, - fut: factory.new_service(()), - }; - let resp_tx = self.resp_tx.clone(); + Some(request) => { + let mut service = self.new_service(request.get_id().to_string()); + if let Ok(service) = ready!(Pin::new(&mut service).poll(cx)) { + log::trace!("Spawn module service for request {}", request.get_id()); tokio::task::spawn_local(async move { - let resp = fut.await.unwrap_or_else(|_e| panic!()); - if let Err(e) = resp_tx.send(resp) { - log::error!("{:?}", e); - } + let _ = service.call(request).await; }); - }, - None => { - log::error!("Event: {} handler not found", request.get_event()); - }, + } }, } } } } +impl ServiceFactory for Module { + type Response = (); + type Error = SystemError; + type Service = BoxService; + type Config = String; + type Future = LocalBoxFuture<'static, Result>; + + fn new_service(&self, cfg: Self::Config) -> Self::Future { + log::trace!("Create module service for request {}", cfg); + let sys_tx = self.sys_tx.clone(); + let service_map = self.service_map.clone(); + Box::pin(async move { + let service = ModuleService { service_map, sys_tx }; + let module_service = Box::new(service) as Self::Service; + Ok(module_service) + }) + } +} + +pub struct ModuleService { + service_map: Rc>, + sys_tx: UnboundedSender, +} + +impl Service for ModuleService { + type Response = (); + type Error = SystemError; + type Future = LocalBoxFuture<'static, Result>; + + fn call(&self, request: EventRequest) -> Self::Future { + log::trace!("Call module service for request {}", request.get_id()); + match self.service_map.get(request.get_event()) { + Some(factory) => { + let fut = ModuleServiceFuture { + request, + fut: factory.new_service(()), + }; + + let sys_tx = self.sys_tx.clone(); + Box::pin(async move { + let resp = fut.await.unwrap_or_else(|e| e.into()); + sys_tx.send(SystemCommand::EventResponse(resp)); + Ok(()) + }) + }, + None => Box::pin(async { Err(InternalError::new("".to_string()).into()) }), + } + } +} + type BoxModuleService = BoxService; + #[pin_project] pub struct ModuleServiceFuture { request: EventRequest, @@ -136,7 +183,7 @@ impl Future for ModuleServiceFuture { loop { let service = ready!(self.as_mut().project().fut.poll(cx))?; let req = ServiceRequest::new(self.as_mut().request.clone(), Payload::None); - log::trace!("Call service to handle request {:?}", self.request); + log::debug!("Call service to handle request {:?}", self.request); let (_, resp) = ready!(Pin::new(&mut service.call(req)).poll(cx))?.into_parts(); return Poll::Ready(Ok(resp)); } @@ -149,24 +196,22 @@ mod tests { use crate::rt::Runtime; use futures_util::{future, pin_mut}; use tokio::sync::mpsc::unbounded_channel; - pub async fn hello_service() -> String { "hello".to_string() } - #[test] fn test() { let mut runtime = Runtime::new().unwrap(); runtime.block_on(async { - let (resp_tx, mut resp_rx) = unbounded_channel::(); + let (sys_tx, mut sys_rx) = unbounded_channel::(); let event = "hello".to_string(); - let mut module = Module::new(resp_tx).event(event.clone(), hello_service); + let mut module = Module::new(sys_tx).event(event.clone(), hello_service); let req_tx = module.req_tx(); let mut event = async move { let request = EventRequest::new(event.clone()); req_tx.send(request).unwrap(); - match resp_rx.recv().await { - Some(resp) => { - log::info!("{}", resp); + match sys_rx.recv().await { + Some(cmd) => { + log::info!("{:?}", cmd); }, None => panic!(""), } diff --git a/rust-lib/flowy-sys/src/request/request.rs b/rust-lib/flowy-sys/src/request/request.rs index bf0b6bf710..62d6646ff9 100644 --- a/rust-lib/flowy-sys/src/request/request.rs +++ b/rust-lib/flowy-sys/src/request/request.rs @@ -23,6 +23,7 @@ impl EventRequest { impl EventRequest { pub fn get_event(&self) -> &str { &self.event } + pub fn get_id(&self) -> &str { &self.id } } pub trait FromRequest: Sized { diff --git a/rust-lib/flowy-sys/src/rt/runtime.rs b/rust-lib/flowy-sys/src/rt/runtime.rs index c6066540d3..2c24213bb9 100644 --- a/rust-lib/flowy-sys/src/rt/runtime.rs +++ b/rust-lib/flowy-sys/src/rt/runtime.rs @@ -1,4 +1,5 @@ -use std::{future::Future, io}; +use std::{future::Future, io, thread}; +use thread_id; use tokio::{runtime, task::LocalSet}; #[derive(Debug)] @@ -9,7 +10,25 @@ pub struct Runtime { impl Runtime { pub fn new() -> io::Result { - let rt = runtime::Builder::new_multi_thread().enable_io().enable_time().build()?; + let rt = runtime::Builder::new_multi_thread() + .thread_name("flowy-sys") + .enable_io() + .enable_time() + .on_thread_start(move || { + log::trace!( + "{:?} thread started: thread_id= {}", + thread::current(), + thread_id::get() + ); + }) + .on_thread_stop(move || { + log::trace!( + "{:?} thread stopping: thread_id= {}", + thread::current(), + thread_id::get(), + ); + }) + .build()?; Ok(Runtime { rt, diff --git a/rust-lib/flowy-sys/src/rt/system.rs b/rust-lib/flowy-sys/src/rt/system.rs index 64b23a812b..8402c8be06 100644 --- a/rust-lib/flowy-sys/src/rt/system.rs +++ b/rust-lib/flowy-sys/src/rt/system.rs @@ -20,8 +20,10 @@ thread_local!( static CURRENT: RefCell>> = RefCell::new(None); ); -enum SystemCommand { +#[derive(Debug)] +pub enum SystemCommand { Exit(i8), + EventResponse(EventResponse), } pub struct FlowySystem { @@ -30,28 +32,26 @@ pub struct FlowySystem { } impl FlowySystem { - pub fn construct(module_factory: F) -> SystemRunner + pub fn construct(module_factory: F, response_tx: Option>) -> SystemRunner where - F: FnOnce(UnboundedSender) -> Vec, + F: FnOnce(UnboundedSender) -> Vec, { let runtime = Runtime::new().unwrap(); - let (resp_tx, resp_rx) = unbounded_channel::(); - let (sys_tx, sys_rx) = unbounded_channel::(); let (stop_tx, stop_rx) = oneshot::channel(); - runtime.spawn(SystemFFI { resp_rx }); runtime.spawn(SystemController { stop_tx: Some(stop_tx), sys_rx, + response_tx, }); let mut system = Self { - sys_tx, + sys_tx: sys_tx.clone(), forward_map: HashMap::default(), }; - let factory = module_factory(resp_tx.clone()); + let factory = module_factory(sys_tx.clone()); factory.into_iter().for_each(|m| { system.forward_map.extend(m.forward_map()); runtime.spawn(m); @@ -63,11 +63,18 @@ impl FlowySystem { } pub fn sink(&self, event: Event, request: EventRequest) -> Result<(), SystemError> { - log::trace!("Sink event: {}", event); + log::debug!("Sink event: {}", event); let _ = self.forward_map.get(&event)?.send(request)?; Ok(()) } + pub fn request_tx(&self, event: Event) -> Option> { + match self.forward_map.get(&event) { + Some(tx) => Some(tx.clone()), + None => None, + } + } + pub fn stop(&self) { match self.sys_tx.send(SystemCommand::Exit(0)) { Ok(_) => {}, @@ -92,27 +99,10 @@ impl FlowySystem { } } -struct SystemFFI { - resp_rx: UnboundedReceiver, -} - -impl Future for SystemFFI { - type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match ready!(Pin::new(&mut self.resp_rx).poll_recv(cx)) { - None => return Poll::Ready(()), - Some(resp) => { - log::trace!("Response: {:?}", resp); - }, - } - } - } -} - struct SystemController { stop_tx: Option>, sys_rx: UnboundedReceiver, + response_tx: Option>, } impl Future for SystemController { @@ -127,6 +117,17 @@ impl Future for SystemController { let _ = tx.send(code); } }, + SystemCommand::EventResponse(resp) => { + log::debug!("Response: {:?}", resp); + if let Some(tx) = &self.response_tx { + match tx.send(resp) { + Ok(_) => {}, + Err(e) => { + log::error!("Response tx send fail: {:?}", e); + }, + } + } + }, }, } } diff --git a/rust-lib/flowy-sys/src/service/boxed.rs b/rust-lib/flowy-sys/src/service/boxed.rs index 834d40a91d..d1e6c4f7d4 100644 --- a/rust-lib/flowy-sys/src/service/boxed.rs +++ b/rust-lib/flowy-sys/src/service/boxed.rs @@ -13,6 +13,17 @@ where BoxServiceFactory(Box::new(FactoryWrapper(factory))) } +type Inner = Box< + dyn ServiceFactory< + Req, + Config = Cfg, + Response = Res, + Error = Err, + Service = BoxService, + Future = LocalBoxFuture<'static, Result, Err>>, + >, +>; + pub struct BoxServiceFactory(Inner); impl ServiceFactory for BoxServiceFactory where @@ -29,17 +40,6 @@ where fn new_service(&self, cfg: Cfg) -> Self::Future { self.0.new_service(cfg) } } -type Inner = Box< - dyn ServiceFactory< - Req, - Config = Cfg, - Response = Res, - Error = Err, - Service = BoxService, - Future = LocalBoxFuture<'static, Result, Err>>, - >, ->; - pub type BoxService = Box>>>; diff --git a/rust-lib/flowy-sys/tests/api/module_event.rs b/rust-lib/flowy-sys/tests/api/module_event.rs index 2ea0619474..a49b3b83db 100644 --- a/rust-lib/flowy-sys/tests/api/module_event.rs +++ b/rust-lib/flowy-sys/tests/api/module_event.rs @@ -12,12 +12,15 @@ fn test() { let no_params_command = "no params".to_string(); let one_params_command = "one params".to_string(); let two_params_command = "two params".to_string(); - FlowySystem::construct(|tx| { - vec![Module::new(tx.clone()) - .event(no_params_command.clone(), no_params) - .event(one_params_command.clone(), one_params) - .event(two_params_command.clone(), two_params)] - }) + FlowySystem::construct( + |tx| { + vec![Module::new(tx.clone()) + .event(no_params_command.clone(), no_params) + .event(one_params_command.clone(), one_params) + .event(two_params_command.clone(), two_params)] + }, + None, + ) .spawn(async { let request = EventRequest::new(no_params_command.clone()); FlowySystem::current().sink(no_params_command, request);