add stream & prepare dart ffi

This commit is contained in:
appflowy 2021-06-28 14:27:16 +08:00
parent 6d2d24baf7
commit 97dde2d711
7 changed files with 243 additions and 120 deletions

View File

@ -11,7 +11,9 @@ mod util;
#[cfg(feature = "dart_ffi")] #[cfg(feature = "dart_ffi")]
mod dart_ffi; mod dart_ffi;
mod stream;
mod system;
pub mod prelude { pub mod prelude {
pub use crate::{error::*, module::*, request::*, response::*, rt::*}; pub use crate::{error::*, module::*, request::*, response::*, rt::*, stream::*};
} }

View File

@ -10,7 +10,7 @@ use crate::{
use crate::{ use crate::{
error::InternalError, error::InternalError,
request::{payload::Payload, EventRequest}, request::{payload::Payload, EventRequest},
response::{EventResponse, EventResponseBuilder}, response::EventResponse,
rt::SystemCommand, rt::SystemCommand,
service::{factory, BoxServiceFactory, HandlerService}, service::{factory, BoxServiceFactory, HandlerService},
}; };
@ -34,11 +34,10 @@ pub struct Module {
service_map: Rc<HashMap<Event, EventServiceFactory>>, service_map: Rc<HashMap<Event, EventServiceFactory>>,
req_tx: UnboundedSender<EventRequest>, req_tx: UnboundedSender<EventRequest>,
req_rx: UnboundedReceiver<EventRequest>, req_rx: UnboundedReceiver<EventRequest>,
sys_tx: UnboundedSender<SystemCommand>,
} }
impl Module { impl Module {
pub fn new(sys_tx: UnboundedSender<SystemCommand>) -> Self { pub fn new() -> Self {
let (req_tx, req_rx) = unbounded_channel::<EventRequest>(); let (req_tx, req_rx) = unbounded_channel::<EventRequest>();
Self { Self {
name: "".to_owned(), name: "".to_owned(),
@ -46,7 +45,6 @@ impl Module {
service_map: Rc::new(HashMap::new()), service_map: Rc::new(HashMap::new()),
req_tx, req_tx,
req_rx, req_rx,
sys_tx,
} }
} }
@ -95,6 +93,8 @@ impl Module {
.map(|key| (key.clone(), self.req_tx())) .map(|key| (key.clone(), self.req_tx()))
.collect::<HashMap<_, _>>() .collect::<HashMap<_, _>>()
} }
pub fn events(&self) -> Vec<Event> { self.service_map.keys().map(|key| key.clone()).collect::<Vec<_>>() }
} }
impl Future for Module { impl Future for Module {
@ -118,7 +118,7 @@ impl Future for Module {
} }
impl ServiceFactory<EventRequest> for Module { impl ServiceFactory<EventRequest> for Module {
type Response = (); type Response = EventResponse;
type Error = SystemError; type Error = SystemError;
type Service = BoxService<EventRequest, Self::Response, Self::Error>; type Service = BoxService<EventRequest, Self::Response, Self::Error>;
type Config = String; type Config = String;
@ -126,10 +126,9 @@ impl ServiceFactory<EventRequest> for Module {
fn new_service(&self, cfg: Self::Config) -> Self::Future { fn new_service(&self, cfg: Self::Config) -> Self::Future {
log::trace!("Create module service for request {}", cfg); log::trace!("Create module service for request {}", cfg);
let sys_tx = self.sys_tx.clone();
let service_map = self.service_map.clone(); let service_map = self.service_map.clone();
Box::pin(async move { Box::pin(async move {
let service = ModuleService { service_map, sys_tx }; let service = ModuleService { service_map };
let module_service = Box::new(service) as Self::Service; let module_service = Box::new(service) as Self::Service;
Ok(module_service) Ok(module_service)
}) })
@ -138,11 +137,10 @@ impl ServiceFactory<EventRequest> for Module {
pub struct ModuleService { pub struct ModuleService {
service_map: Rc<HashMap<Event, EventServiceFactory>>, service_map: Rc<HashMap<Event, EventServiceFactory>>,
sys_tx: UnboundedSender<SystemCommand>,
} }
impl Service<EventRequest> for ModuleService { impl Service<EventRequest> for ModuleService {
type Response = (); type Response = EventResponse;
type Error = SystemError; type Error = SystemError;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>; type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
@ -154,13 +152,7 @@ impl Service<EventRequest> for ModuleService {
request, request,
fut: factory.new_service(()), fut: factory.new_service(()),
}; };
Box::pin(async move { Ok(fut.await.unwrap_or_else(|e| e.into())) })
let sys_tx = self.sys_tx.clone();
Box::pin(async move {
let resp = fut.await.unwrap_or_else(|e| e.into());
sys_tx.send(SystemCommand::EventResponse(resp));
Ok(())
})
}, },
None => Box::pin(async { Err(InternalError::new("".to_string()).into()) }), None => Box::pin(async { Err(InternalError::new("".to_string()).into()) }),
} }
@ -190,35 +182,35 @@ impl Future for ModuleServiceFuture {
} }
} }
#[cfg(test)] // #[cfg(test)]
mod tests { // mod tests {
use super::*; // use super::*;
use crate::rt::Runtime; // use crate::rt::Runtime;
use futures_util::{future, pin_mut}; // use futures_util::{future, pin_mut};
use tokio::sync::mpsc::unbounded_channel; // use tokio::sync::mpsc::unbounded_channel;
pub async fn hello_service() -> String { "hello".to_string() } // pub async fn hello_service() -> String { "hello".to_string() }
#[test] // #[test]
fn test() { // fn test() {
let mut runtime = Runtime::new().unwrap(); // let runtime = Runtime::new().unwrap();
runtime.block_on(async { // runtime.block_on(async {
let (sys_tx, mut sys_rx) = unbounded_channel::<SystemCommand>(); // let (sys_tx, mut sys_rx) = unbounded_channel::<SystemCommand>();
let event = "hello".to_string(); // let event = "hello".to_string();
let mut module = Module::new(sys_tx).event(event.clone(), hello_service); // let module = Module::new(sys_tx).event(event.clone(),
let req_tx = module.req_tx(); // hello_service); let req_tx = module.req_tx();
let mut event = async move { // let event = async move {
let request = EventRequest::new(event.clone()); // let request = EventRequest::new(event.clone());
req_tx.send(request).unwrap(); // req_tx.send(request).unwrap();
//
match sys_rx.recv().await { // match sys_rx.recv().await {
Some(cmd) => { // Some(cmd) => {
log::info!("{:?}", cmd); // log::info!("{:?}", cmd);
}, // },
None => panic!(""), // None => panic!(""),
} // }
}; // };
//
pin_mut!(module, event); // pin_mut!(module, event);
future::select(module, event).await; // future::select(module, event).await;
}); // });
} // }
} // }

View File

@ -10,6 +10,7 @@ use crate::{
pub struct EventRequest { pub struct EventRequest {
id: String, id: String,
event: String, event: String,
data: Option<Vec<u8>>,
} }
impl EventRequest { impl EventRequest {
@ -17,12 +18,17 @@ impl EventRequest {
Self { Self {
id: uuid::Uuid::new_v4().to_string(), id: uuid::Uuid::new_v4().to_string(),
event, event,
data: None,
} }
} }
}
impl EventRequest { pub fn data(mut self, data: Vec<u8>) -> Self {
self.data = Some(data);
self
}
pub fn get_event(&self) -> &str { &self.event } pub fn get_event(&self) -> &str { &self.event }
pub fn get_id(&self) -> &str { &self.id } pub fn get_id(&self) -> &str { &self.id }
} }

View File

@ -1,5 +1,5 @@
mod runtime;
mod system;
pub use runtime::*; pub use runtime::*;
pub use system::*;
pub use crate::system::*;
mod runtime;

View File

@ -0,0 +1,131 @@
use crate::{
error::{InternalError, SystemError},
module::{Event, Module},
request::EventRequest,
response::EventResponse,
service::{BoxService, Service, ServiceFactory},
system::ModuleMap,
};
use futures_core::{future::LocalBoxFuture, ready, task::Context};
use std::{collections::HashMap, future::Future, rc::Rc};
use tokio::{
macros::support::{Pin, Poll},
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
};
pub type BoxStreamCallback<T> = Box<dyn FnOnce(T, EventResponse) + 'static>;
pub struct StreamData<T>
where
T: 'static,
{
config: T,
request: Option<EventRequest>,
callback: BoxStreamCallback<T>,
}
impl<T> StreamData<T> {
pub fn new(config: T, request: Option<EventRequest>, callback: BoxStreamCallback<T>) -> Self {
Self {
config,
request,
callback,
}
}
}
pub struct CommandStream<T>
where
T: 'static,
{
module_map: ModuleMap,
pub data_tx: UnboundedSender<StreamData<T>>,
data_rx: UnboundedReceiver<StreamData<T>>,
}
impl<T> CommandStream<T> {
pub fn new(module_map: ModuleMap) -> Self {
let (data_tx, data_rx) = unbounded_channel::<StreamData<T>>();
Self {
module_map,
data_tx,
data_rx,
}
}
pub fn send(&self, data: StreamData<T>) { let _ = self.data_tx.send(data); }
}
impl<T> Future for CommandStream<T>
where
T: 'static,
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(Pin::new(&mut self.data_rx).poll_recv(cx)) {
None => return Poll::Ready(()),
Some(ctx) => {
let factory = self.new_service(());
tokio::task::spawn_local(async move {
let service = factory.await.unwrap();
let _ = service.call(ctx).await;
});
},
}
}
}
}
impl<T> ServiceFactory<StreamData<T>> for CommandStream<T>
where
T: 'static,
{
type Response = ();
type Error = SystemError;
type Service = BoxService<StreamData<T>, Self::Response, Self::Error>;
type Config = ();
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::Error>>;
fn new_service(&self, _cfg: Self::Config) -> Self::Future {
let module_map = self.module_map.clone();
let service = Box::new(CommandStreamService { module_map });
Box::pin(async move { Ok(service as Self::Service) })
}
}
pub struct CommandStreamService {
module_map: ModuleMap,
}
impl<T> Service<StreamData<T>> for CommandStreamService
where
T: 'static,
{
type Response = ();
type Error = SystemError;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn call(&self, mut data: StreamData<T>) -> Self::Future {
let module_map = self.module_map.clone();
let fut = async move {
let request = data.request.take().unwrap();
let result = || async {
match module_map.get(request.get_event()) {
Some(module) => {
let config = request.get_id().to_owned();
let fut = module.new_service(config);
let service_fut = fut.await?.call(request);
service_fut.await
},
None => Err(InternalError::new("".to_owned()).into()),
}
};
let resp = result().await.unwrap();
(data.callback)(data.config, resp);
Ok(())
};
Box::pin(fut)
}
}

View File

@ -1,13 +1,13 @@
use crate::{ use crate::{
error::SystemError,
module::{Event, Module}, module::{Event, Module},
request::EventRequest, request::EventRequest,
response::EventResponse, response::EventResponse,
rt::runtime::Runtime, rt::Runtime,
stream::CommandStreamService,
}; };
use futures_core::{ready, task::Context}; use futures_core::{ready, task::Context};
use std::{cell::RefCell, collections::HashMap, future::Future, io, rc::Rc, sync::Arc};
use crate::error::{InternalError, SystemError};
use std::{cell::RefCell, collections::HashMap, future::Future, io, sync::Arc};
use tokio::{ use tokio::{
macros::support::{Pin, Poll}, macros::support::{Pin, Poll},
sync::{ sync::{
@ -23,60 +23,52 @@ thread_local!(
#[derive(Debug)] #[derive(Debug)]
pub enum SystemCommand { pub enum SystemCommand {
Exit(i8), Exit(i8),
EventResponse(EventResponse), Response(EventResponse),
} }
pub type ModuleMap = Rc<HashMap<Event, Rc<Module>>>;
pub struct FlowySystem { pub struct FlowySystem {
sys_tx: UnboundedSender<SystemCommand>, sys_cmd_tx: UnboundedSender<SystemCommand>,
forward_map: HashMap<Event, UnboundedSender<EventRequest>>, module_map: ModuleMap,
} }
impl FlowySystem { impl FlowySystem {
pub fn construct<F>(module_factory: F, response_tx: Option<UnboundedSender<EventResponse>>) -> SystemRunner pub fn construct<F>(module_factory: F) -> SystemRunner
where where
F: FnOnce(UnboundedSender<SystemCommand>) -> Vec<Module>, F: FnOnce() -> Vec<Module>,
{ {
let runtime = Runtime::new().unwrap(); let runtime = Runtime::new().unwrap();
let (sys_tx, sys_rx) = unbounded_channel::<SystemCommand>(); let (sys_cmd_tx, sys_cmd_rx) = unbounded_channel::<SystemCommand>();
let (stop_tx, stop_rx) = oneshot::channel(); let (stop_tx, stop_rx) = oneshot::channel();
runtime.spawn(SystemController { runtime.spawn(SystemController {
stop_tx: Some(stop_tx), stop_tx: Some(stop_tx),
sys_rx, sys_cmd_rx,
response_tx,
}); });
let mut system = Self { let mut system = Self {
sys_tx: sys_tx.clone(), sys_cmd_tx: sys_cmd_tx.clone(),
forward_map: HashMap::default(), module_map: Rc::new(HashMap::default()),
}; };
let factory = module_factory(sys_tx.clone()); let factory = module_factory();
let mut module_map = HashMap::new();
factory.into_iter().for_each(|m| { factory.into_iter().for_each(|m| {
system.forward_map.extend(m.forward_map()); let events = m.events();
runtime.spawn(m); let rc_module = Rc::new(m);
events.into_iter().for_each(|e| {
module_map.insert(e, rc_module.clone());
}); });
});
system.module_map = Rc::new(module_map);
FlowySystem::set_current(system); FlowySystem::set_current(system);
let runner = SystemRunner { rt: runtime, stop_rx }; let runner = SystemRunner { rt: runtime, stop_rx };
runner runner
} }
pub fn sink(&self, event: Event, request: EventRequest) -> Result<(), SystemError> {
log::debug!("Sink event: {}", event);
let _ = self.forward_map.get(&event)?.send(request)?;
Ok(())
}
pub fn request_tx(&self, event: Event) -> Option<UnboundedSender<EventRequest>> {
match self.forward_map.get(&event) {
Some(tx) => Some(tx.clone()),
None => None,
}
}
pub fn stop(&self) { pub fn stop(&self) {
match self.sys_tx.send(SystemCommand::Exit(0)) { match self.sys_cmd_tx.send(SystemCommand::Exit(0)) {
Ok(_) => {}, Ok(_) => {},
Err(e) => { Err(e) => {
log::error!("Stop system error: {}", e); log::error!("Stop system error: {}", e);
@ -84,6 +76,8 @@ impl FlowySystem {
} }
} }
pub fn module_map(&self) -> ModuleMap { self.module_map.clone() }
#[doc(hidden)] #[doc(hidden)]
pub fn set_current(sys: FlowySystem) { pub fn set_current(sys: FlowySystem) {
CURRENT.with(|cell| { CURRENT.with(|cell| {
@ -101,15 +95,14 @@ impl FlowySystem {
struct SystemController { struct SystemController {
stop_tx: Option<oneshot::Sender<i8>>, stop_tx: Option<oneshot::Sender<i8>>,
sys_rx: UnboundedReceiver<SystemCommand>, sys_cmd_rx: UnboundedReceiver<SystemCommand>,
response_tx: Option<UnboundedSender<EventResponse>>,
} }
impl Future for SystemController { impl Future for SystemController {
type Output = (); type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop { loop {
match ready!(Pin::new(&mut self.sys_rx).poll_recv(cx)) { match ready!(Pin::new(&mut self.sys_cmd_rx).poll_recv(cx)) {
None => return Poll::Ready(()), None => return Poll::Ready(()),
Some(cmd) => match cmd { Some(cmd) => match cmd {
SystemCommand::Exit(code) => { SystemCommand::Exit(code) => {
@ -117,16 +110,8 @@ impl Future for SystemController {
let _ = tx.send(code); let _ = tx.send(code);
} }
}, },
SystemCommand::EventResponse(resp) => { SystemCommand::Response(resp) => {
log::debug!("Response: {:?}", resp); log::debug!("Response: {:?}", resp);
if let Some(tx) = &self.response_tx {
match tx.send(resp) {
Ok(_) => {},
Err(e) => {
log::error!("Response tx send fail: {:?}", e);
},
}
}
}, },
}, },
} }
@ -157,10 +142,7 @@ impl SystemRunner {
} }
} }
pub fn spawn<F>(self, future: F) -> Self pub fn spawn<F: Future<Output = ()> + 'static>(self, future: F) -> Self {
where
F: Future<Output = ()> + 'static,
{
self.rt.spawn(future); self.rt.spawn(future);
self self
} }

View File

@ -2,8 +2,8 @@ use crate::helper::*;
use flowy_sys::prelude::*; use flowy_sys::prelude::*;
pub async fn no_params() -> String { "no params function call".to_string() } pub async fn no_params() -> String { "no params function call".to_string() }
pub async fn one_params(s: String) -> String { "one params function call".to_string() } pub async fn one_params(_s: String) -> String { "one params function call".to_string() }
pub async fn two_params(s1: String, s2: String) -> String { "two params function call".to_string() } pub async fn two_params(_s1: String, _s2: String) -> String { "two params function call".to_string() }
#[test] #[test]
fn test() { fn test() {
@ -12,19 +12,29 @@ fn test() {
let no_params_command = "no params".to_string(); let no_params_command = "no params".to_string();
let one_params_command = "one params".to_string(); let one_params_command = "one params".to_string();
let two_params_command = "two params".to_string(); let two_params_command = "two params".to_string();
FlowySystem::construct(
|tx| { let runner = FlowySystem::construct(|| {
vec![Module::new(tx.clone()) vec![Module::new()
.event(no_params_command.clone(), no_params) .event(no_params_command.clone(), no_params)
.event(one_params_command.clone(), one_params) .event(one_params_command.clone(), one_params)
.event(two_params_command.clone(), two_params)] .event(two_params_command.clone(), two_params)]
}, });
None,
)
.spawn(async {
let request = EventRequest::new(no_params_command.clone());
FlowySystem::current().sink(no_params_command, request);
let stream = CommandStream::new(FlowySystem::current().module_map());
let tx = stream.data_tx.clone();
runner
.spawn(stream)
.spawn(async move {
let request = EventRequest::new(no_params_command.clone());
let stream_data = StreamData::new(
1,
Some(request),
Box::new(|config, response| {
log::info!("{:?}", response);
}),
);
tx.send(stream_data);
FlowySystem::current().stop(); FlowySystem::current().stop();
}) })
.run() .run()