From 85e8e4b8afcee2ce09e21e0bcc9453b24d9bc087 Mon Sep 17 00:00:00 2001 From: appflowy Date: Sat, 26 Jun 2021 23:52:03 +0800 Subject: [PATCH] add system --- rust-lib/flowy-sys/Cargo.toml | 9 +- rust-lib/flowy-sys/src/lib.rs | 1 + rust-lib/flowy-sys/src/module/module.rs | 137 ++++++++++++----- rust-lib/flowy-sys/src/request/request.rs | 13 +- rust-lib/flowy-sys/src/response/data.rs | 13 ++ rust-lib/flowy-sys/src/response/response.rs | 18 ++- rust-lib/flowy-sys/src/rt/mod.rs | 4 + rust-lib/flowy-sys/src/rt/runtime.rs | 34 +++++ rust-lib/flowy-sys/src/rt/system.rs | 157 ++++++++++++++++++++ rust-lib/flowy-sys/src/service/handler.rs | 4 +- rust-lib/flowy-sys/src/service/service.rs | 2 + 11 files changed, 351 insertions(+), 41 deletions(-) create mode 100644 rust-lib/flowy-sys/src/rt/mod.rs create mode 100644 rust-lib/flowy-sys/src/rt/runtime.rs create mode 100644 rust-lib/flowy-sys/src/rt/system.rs diff --git a/rust-lib/flowy-sys/Cargo.toml b/rust-lib/flowy-sys/Cargo.toml index 23728a4d63..8a49cbc079 100644 --- a/rust-lib/flowy-sys/Cargo.toml +++ b/rust-lib/flowy-sys/Cargo.toml @@ -14,6 +14,13 @@ futures = "0.3.15" futures-util = "0.3.15" bytes = "0.5" tokio = { version = "1", features = ["sync"] } +uuid = { version = "0.8", features = ["serde", "v4"] } +log = "0.4.14" +env_logger = "0.8" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +serde_with = "1.9.4" [dev-dependencies] -tokio = { version = "1", features = ["full"] } \ No newline at end of file +tokio = { version = "1", features = ["full"] } +futures-util = "0.3.15" \ No newline at end of file diff --git a/rust-lib/flowy-sys/src/lib.rs b/rust-lib/flowy-sys/src/lib.rs index 931712c2c2..176c8c18a8 100644 --- a/rust-lib/flowy-sys/src/lib.rs +++ b/rust-lib/flowy-sys/src/lib.rs @@ -3,5 +3,6 @@ mod error; mod module; mod request; mod response; +mod rt; mod service; mod util; diff --git a/rust-lib/flowy-sys/src/module/module.rs b/rust-lib/flowy-sys/src/module/module.rs index 86ec74dbf8..78069e3a80 100644 --- a/rust-lib/flowy-sys/src/module/module.rs +++ b/rust-lib/flowy-sys/src/module/module.rs @@ -7,43 +7,52 @@ use crate::{ service::{BoxService, Handler, Service, ServiceFactory, ServiceRequest, ServiceResponse}, }; +use crate::{ + request::{payload::Payload, FlowyRequest}, + response::{FlowyResponse, FlowyResponseBuilder}, + service::{factory, BoxServiceFactory, HandlerService}, +}; use futures_core::{future::LocalBoxFuture, ready}; +use pin_project::pin_project; use std::{ + cell::RefCell, collections::HashMap, + fmt::Debug, future::Future, hash::Hash, marker::PhantomData, pin::Pin, rc::Rc, + sync::Arc, task::{Context, Poll}, }; -use tokio::sync::{mpsc, mpsc::UnboundedReceiver}; - -use crate::{ - request::{payload::Payload, FlowyRequest}, - service::{factory, BoxServiceFactory, HandlerService}, +use tokio::sync::{ + mpsc, + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, }; -use pin_project::pin_project; -use std::fmt::Debug; pub type Command = String; -pub type ModuleServiceFactory = BoxServiceFactory<(), ServiceRequest, ServiceResponse, SystemError>; +pub type CommandServiceFactory = BoxServiceFactory<(), ServiceRequest, ServiceResponse, SystemError>; -#[pin_project::pin_project] pub struct Module { name: String, data: DataContainer, - fact_map: HashMap, - cmd_rx: UnboundedReceiver, + factory_map: HashMap, + req_tx: UnboundedSender, + req_rx: UnboundedReceiver, + resp_tx: UnboundedSender, } impl Module { - pub fn new(cmd_rx: UnboundedReceiver) -> Self { + pub fn new(resp_tx: UnboundedSender) -> Self { + let (req_tx, req_rx) = unbounded_channel::(); Self { name: "".to_owned(), data: DataContainer::new(), - fact_map: HashMap::new(), - cmd_rx, + factory_map: HashMap::new(), + req_tx, + req_rx, + resp_tx, } } @@ -65,51 +74,111 @@ impl Module { R: Future + 'static, R::Output: Responder + 'static, { - self.fact_map.insert(command, factory(HandlerService::new(handler))); + self.factory_map.insert(command, factory(HandlerService::new(handler))); self } + + pub fn can_handle(&self, cmd: &Command) -> bool { self.factory_map.contains_key(cmd) } + + pub fn req_tx(&self) -> UnboundedSender { self.req_tx.clone() } + + pub fn handle(&self, request: FlowyRequest) { + match self.req_tx.send(request) { + Ok(_) => {}, + Err(e) => { + log::error!("{:?}", e); + }, + } + } } impl Future for Module { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match ready!(Pin::new(&mut self.cmd_rx).poll_recv(cx)) { + match ready!(Pin::new(&mut self.req_rx).poll_recv(cx)) { None => return Poll::Ready(()), - Some(request) => match self.fact_map.get(request.get_id()) { + Some(request) => match self.factory_map.get(request.get_cmd()) { Some(factory) => { - let service_future = factory.new_service(()); - tokio::task::spawn_local(ModuleServiceFuture { + let fut = ModuleServiceFuture { request, - service_future, + fut: factory.new_service(()), + }; + let resp_tx = self.resp_tx.clone(); + tokio::task::spawn_local(async move { + let resp = fut.await.unwrap_or_else(|e| panic!()); + if let Err(e) = resp_tx.send(resp) { + log::error!("{:?}", e); + } }); }, - None => {}, + None => { + log::error!("Command: {} handler not found", request.get_cmd()); + }, }, } } } } -#[pin_project(project = HandlerServiceProj)] -pub struct ModuleServiceFuture { +type BoxModuleService = BoxService; +#[pin_project] +pub struct ModuleServiceFuture { request: FlowyRequest, #[pin] - service_future: LocalBoxFuture<'static, Result>, + fut: LocalBoxFuture<'static, Result>, } -impl Future for ModuleServiceFuture { - type Output = (); +impl Future for ModuleServiceFuture { + type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { unimplemented!() } + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + let service = ready!(self.as_mut().project().fut.poll(cx))?; + let req = ServiceRequest::new(self.as_mut().request.clone(), Payload::None); + let (_, resp) = ready!(Pin::new(&mut service.call(req)).poll(cx))?.into_parts(); + return Poll::Ready(Ok(resp)); + } + } } -impl ServiceFactory for Module { - type Response = ServiceResponse; - type Error = SystemError; - type Service = BoxService; - type Config = (); - type Future = LocalBoxFuture<'static, Result>; +#[cfg(test)] +mod tests { + use super::*; + use crate::rt::Runtime; + use futures_util::{future, pin_mut}; + use tokio::sync::mpsc::unbounded_channel; - fn new_service(&self, cfg: Self::Config) -> Self::Future { unimplemented!() } + pub async fn hello_service() -> String { + println!("no params"); + "hello".to_string() + } + + // #[tokio::test] + + #[test] + fn test() { + let mut runtime = Runtime::new().unwrap(); + runtime.block_on(async { + let (resp_tx, mut resp_rx) = unbounded_channel::(); + let command = "hello".to_string(); + let mut module = Module::new(resp_tx).event(command.clone(), hello_service); + assert_eq!(module.can_handle(&command), true); + let req_tx = module.req_tx(); + let mut event = async move { + let request = FlowyRequest::new(command.clone()); + req_tx.send(request).unwrap(); + + match resp_rx.recv().await { + Some(resp) => { + println!("{}", resp); + }, + None => panic!(""), + } + }; + + pin_mut!(module, event); + future::select(module, event).await; + }); + } } diff --git a/rust-lib/flowy-sys/src/request/request.rs b/rust-lib/flowy-sys/src/request/request.rs index a0a839708c..033a962147 100644 --- a/rust-lib/flowy-sys/src/request/request.rs +++ b/rust-lib/flowy-sys/src/request/request.rs @@ -7,16 +7,23 @@ use crate::{ }; use std::hash::Hash; +#[derive(Clone, Debug)] pub struct FlowyRequest { id: String, + cmd: String, } impl FlowyRequest { - pub fn get_id(&self) -> &str { &self.id } + pub fn new(cmd: String) -> FlowyRequest { + Self { + id: uuid::Uuid::new_v4().to_string(), + cmd, + } + } } -impl std::default::Default for FlowyRequest { - fn default() -> Self { Self { id: "".to_string() } } +impl FlowyRequest { + pub fn get_cmd(&self) -> &str { &self.cmd } } pub trait FromRequest: Sized { diff --git a/rust-lib/flowy-sys/src/response/data.rs b/rust-lib/flowy-sys/src/response/data.rs index 87b2b12762..03746285fa 100644 --- a/rust-lib/flowy-sys/src/response/data.rs +++ b/rust-lib/flowy-sys/src/response/data.rs @@ -1,8 +1,21 @@ +use serde::{Deserialize, Serialize}; +use std::{fmt, fmt::Formatter}; + +#[derive(Debug, Serialize, Deserialize)] pub enum ResponseData { Bytes(Vec), None, } +impl std::fmt::Display for ResponseData { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + ResponseData::Bytes(bytes) => f.write_fmt(format_args!("{} bytes", bytes.len())), + ResponseData::None => f.write_str("Empty"), + } + } +} + impl std::convert::Into for String { fn into(self) -> ResponseData { ResponseData::Bytes(self.into_bytes()) } } diff --git a/rust-lib/flowy-sys/src/response/response.rs b/rust-lib/flowy-sys/src/response/response.rs index 14bc6e1e66..abd606c8e2 100644 --- a/rust-lib/flowy-sys/src/response/response.rs +++ b/rust-lib/flowy-sys/src/response/response.rs @@ -3,16 +3,22 @@ use crate::{ request::FlowyRequest, response::{data::ResponseData, Responder}, }; +use serde::{Deserialize, Serialize}; +use serde_with::skip_serializing_none; +use std::{fmt, fmt::Formatter}; -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] pub enum StatusCode { Success, Error, } +#[skip_serializing_none] +#[derive(Serialize, Deserialize, Debug)] pub struct FlowyResponse { pub data: T, pub status: StatusCode, + #[serde(skip)] pub error: Option, } @@ -26,6 +32,16 @@ impl FlowyResponse { } } +impl std::fmt::Display for FlowyResponse { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match serde_json::to_string(self) { + Ok(json) => f.write_fmt(format_args!("{:?}", json))?, + Err(e) => f.write_fmt(format_args!("{:?}", e))?, + } + Ok(()) + } +} + impl Responder for FlowyResponse { #[inline] fn respond_to(self, _: &FlowyRequest) -> FlowyResponse { self } diff --git a/rust-lib/flowy-sys/src/rt/mod.rs b/rust-lib/flowy-sys/src/rt/mod.rs new file mode 100644 index 0000000000..d16a48555d --- /dev/null +++ b/rust-lib/flowy-sys/src/rt/mod.rs @@ -0,0 +1,4 @@ +mod runtime; +mod system; + +pub use runtime::*; diff --git a/rust-lib/flowy-sys/src/rt/runtime.rs b/rust-lib/flowy-sys/src/rt/runtime.rs new file mode 100644 index 0000000000..c6066540d3 --- /dev/null +++ b/rust-lib/flowy-sys/src/rt/runtime.rs @@ -0,0 +1,34 @@ +use std::{future::Future, io}; +use tokio::{runtime, task::LocalSet}; + +#[derive(Debug)] +pub struct Runtime { + local: LocalSet, + rt: runtime::Runtime, +} + +impl Runtime { + pub fn new() -> io::Result { + let rt = runtime::Builder::new_multi_thread().enable_io().enable_time().build()?; + + Ok(Runtime { + rt, + local: LocalSet::new(), + }) + } + + pub fn spawn(&self, future: F) -> &Self + where + F: Future + 'static, + { + self.local.spawn_local(future); + self + } + + pub fn block_on(&self, f: F) -> F::Output + where + F: Future + 'static, + { + self.local.block_on(&self.rt, f) + } +} diff --git a/rust-lib/flowy-sys/src/rt/system.rs b/rust-lib/flowy-sys/src/rt/system.rs new file mode 100644 index 0000000000..8cbe170b05 --- /dev/null +++ b/rust-lib/flowy-sys/src/rt/system.rs @@ -0,0 +1,157 @@ +use crate::{ + module::{Command, Module}, + request::FlowyRequest, + response::FlowyResponse, + rt::Runtime, +}; +use futures_core::{future::LocalBoxFuture, ready, task::Context}; +use futures_util::{future, pin_mut}; +use std::{cell::RefCell, future::Future, io, sync::Arc}; +use tokio::{ + macros::support::{Pin, Poll}, + sync::{ + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + oneshot, + }, +}; + +thread_local!( + static CURRENT: RefCell>> = RefCell::new(None); +); + +pub struct FlowySystem { + resp_tx: UnboundedSender, + modules: Vec, +} + +impl FlowySystem { + pub fn construct(module_factory: F) -> SystemRunner + where + F: FnOnce(UnboundedSender) -> Vec, + { + let runtime = Runtime::new().unwrap(); + let (resp_tx, mut resp_rx) = unbounded_channel::(); + let (stop_tx, stop_rx) = oneshot::channel(); + let controller = SystemController { resp_rx, stop_tx }; + runtime.spawn(controller); + + let mut system = Self { + resp_tx: resp_tx.clone(), + modules: vec![], + }; + + let factory = module_factory(resp_tx.clone()); + factory.into_iter().for_each(|m| { + runtime.spawn(m); + // system.add_module(m); + }); + + FlowySystem::set_current(system); + + let runner = SystemRunner { rt: runtime, stop_rx }; + runner + } + + pub fn handle_command(&self, cmd: Command, request: FlowyRequest) { + self.modules.iter().for_each(|m| { + if m.can_handle(&cmd) { + m.handle(request.clone()); + } + }) + } + + pub fn add_module(&mut self, module: Module) { self.modules.push(module); } + + #[doc(hidden)] + pub fn set_current(sys: FlowySystem) { + CURRENT.with(|cell| { + *cell.borrow_mut() = Some(Arc::new(sys)); + }) + } + + pub fn current() -> Arc { + CURRENT.with(|cell| match *cell.borrow() { + Some(ref sys) => sys.clone(), + None => panic!("System is not running"), + }) + } + + pub(crate) fn resp_tx(&self) -> UnboundedSender { self.resp_tx.clone() } +} + +struct SystemController { + resp_rx: UnboundedReceiver, + stop_tx: oneshot::Sender, +} + +impl Future for SystemController { + type Output = (); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + match ready!(Pin::new(&mut self.resp_rx).poll_recv(cx)) { + None => return Poll::Ready(()), + Some(resp) => { + // FFI + println!("Receive response: {:?}", resp); + }, + } + } + } +} + +pub struct SystemRunner { + rt: Runtime, + stop_rx: oneshot::Receiver, +} + +impl SystemRunner { + pub fn run(self) -> io::Result<()> { + let SystemRunner { rt, stop_rx } = self; + match rt.block_on(stop_rx) { + Ok(code) => { + if code != 0 { + Err(io::Error::new( + io::ErrorKind::Other, + format!("Non-zero exit code: {}", code), + )) + } else { + Ok(()) + } + }, + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + } + } + + pub fn spawn(self, future: F) -> Self + where + F: Future + 'static, + { + self.rt.spawn(future); + self + } +} + +#[cfg(test)] +mod tests { + use super::*; + + pub async fn hello_service() -> String { "hello".to_string() } + + #[test] + fn test() { + let command = "Hello".to_string(); + + FlowySystem::construct(|tx| { + vec![ + Module::new(tx.clone()).event(command.clone(), hello_service), + // Module::new(tx.clone()).event(command.clone(), hello_service), + ] + }) + .spawn(async { + let request = FlowyRequest::new(command.clone()); + FlowySystem::current().handle_command(command, request); + }) + .run() + .unwrap(); + } +} diff --git a/rust-lib/flowy-sys/src/service/handler.rs b/rust-lib/flowy-sys/src/service/handler.rs index a695402677..64ed3eb4d5 100644 --- a/rust-lib/flowy-sys/src/service/handler.rs +++ b/rust-lib/flowy-sys/src/service/handler.rs @@ -125,8 +125,8 @@ where match self.as_mut().project() { HandlerServiceProj::Extract(fut, req, handle) => { match ready!(fut.poll(cx)) { - Ok(item) => { - let fut = handle.call(item); + Ok(params) => { + let fut = handle.call(params); let state = HandlerServiceFuture::Handle(fut, req.take()); self.as_mut().set(state); }, diff --git a/rust-lib/flowy-sys/src/service/service.rs b/rust-lib/flowy-sys/src/service/service.rs index f66e5ac50b..421828deb0 100644 --- a/rust-lib/flowy-sys/src/service/service.rs +++ b/rust-lib/flowy-sys/src/service/service.rs @@ -42,4 +42,6 @@ pub struct ServiceResponse { impl ServiceResponse { pub fn new(request: FlowyRequest, response: FlowyResponse) -> Self { ServiceResponse { request, response } } + + pub fn into_parts(self) -> (FlowyRequest, FlowyResponse) { (self.request, self.response) } }