diff --git a/rust-lib/flowy-sys/Cargo.toml b/rust-lib/flowy-sys/Cargo.toml index c79e07dd88..6343f6c6c4 100644 --- a/rust-lib/flowy-sys/Cargo.toml +++ b/rust-lib/flowy-sys/Cargo.toml @@ -26,10 +26,12 @@ thread-id = "3.3.0" allo-isolate = {version = "^0.1", features = ["catch-unwind",], optional = true} byteorder = {version = "1.3.4", optional = true} ffi-support = {version = "0.4.2", optional = true} +protobuf = {version = "2.20.0", optional = true} +lazy_static = {version = "1.4.0", optional = true} [dev-dependencies] tokio = { version = "1", features = ["full"] } futures-util = "0.3.15" [features] -dart_ffi = ["ffi-support", "allo-isolate", "byteorder"] \ No newline at end of file +dart_ffi = ["ffi-support", "allo-isolate", "byteorder", "protobuf", "lazy_static"] \ No newline at end of file diff --git a/rust-lib/flowy-sys/src/dart_ffi/ffi.rs b/rust-lib/flowy-sys/src/dart_ffi/ffi.rs index bf1b49f0bb..ae1425833d 100644 --- a/rust-lib/flowy-sys/src/dart_ffi/ffi.rs +++ b/rust-lib/flowy-sys/src/dart_ffi/ffi.rs @@ -1,13 +1,33 @@ -use crate::{response::EventResponse, rt::SystemCommand}; +use crate::{ + module::Module, + request::EventRequest, + response::EventResponse, + rt::SystemCommand, + stream::*, + system::FlowySystem, +}; use futures_core::ready; -use std::{future::Future, task::Context}; +use lazy_static::lazy_static; +use protobuf::Message; +use std::{ + cell::RefCell, + future::Future, + sync::{Arc, RwLock}, + task::Context, +}; use tokio::{ macros::support::{Pin, Poll}, - sync::{mpsc::UnboundedReceiver, oneshot}, + sync::{mpsc::UnboundedSender, oneshot}, }; #[no_mangle] -pub extern "C" fn async_command(port: i64, input: *const u8, len: usize) {} +pub extern "C" fn async_command(port: i64, input: *const u8, len: usize) { + let bytes = unsafe { std::slice::from_raw_parts(input, len) }.to_vec(); + let request = EventRequest::from_data(bytes); + + let stream_data = StreamData::new(port, Some(request), Box::new(|port, response| {})); + send(stream_data); +} #[no_mangle] pub extern "C" fn free_rust(ptr: *mut u8, length: u32) { reclaim_rust(ptr, length) } @@ -15,20 +35,59 @@ pub extern "C" fn free_rust(ptr: *mut u8, length: u32) { reclaim_rust(ptr, lengt #[no_mangle] pub extern "C" fn init_stream(port: i64) -> i32 { return 0; } -struct SystemFFI { - resp_rx: UnboundedReceiver, -} - -impl Future for SystemFFI { - type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match ready!(Pin::new(&mut self.resp_rx).poll_recv(cx)) { - None => return Poll::Ready(()), - Some(resp) => { - log::trace!("Response: {:?}", resp); - }, - } - } +#[allow(unused_attributes)] +pub fn reclaim_rust(ptr: *mut u8, length: u32) { + unsafe { + let len: usize = length as usize; + Vec::from_raw_parts(ptr, len, len); } } + +thread_local!( + static STREAM_SENDER: RefCell>>> = RefCell::new(None); +); + +pub fn send(data: StreamData) { + STREAM_SENDER.with(|cell| match &*cell.borrow() { + Some(tx) => { + tx.send(data); + }, + None => panic!(""), + }); +} + +pub fn init_dart(modules: Vec, f: F) +where + F: FnOnce() + 'static, +{ + let mut stream = CommandStream::::new(); + let stream = CommandStream::::new(); + let tx = stream.tx(); + + STREAM_SENDER.with(|cell| { + *cell.borrow_mut() = Some(tx); + }); + + FlowySystem::construct(|| modules, stream) + .spawn(async { f() }) + .run() + .unwrap(); + + // FlowySystem::construct(|| modules, stream) + // .spawn(async move { + // let request = EventRequest::new("1".to_string()); + // let stream_data = StreamData::new( + // 1, + // Some(request), + // Box::new(|config, response| { + // log::info!("😁{:?}", response); + // }), + // ); + // + // send(stream_data); + // + // FlowySystem::current().stop(); + // }) + // .run() + // .unwrap(); +} diff --git a/rust-lib/flowy-sys/src/dart_ffi/mod.rs b/rust-lib/flowy-sys/src/dart_ffi/mod.rs index 9fd02ec7c1..873bec1e08 100644 --- a/rust-lib/flowy-sys/src/dart_ffi/mod.rs +++ b/rust-lib/flowy-sys/src/dart_ffi/mod.rs @@ -1,2 +1,4 @@ #[cfg(feature = "dart_ffi")] mod ffi; + +pub use ffi::*; diff --git a/rust-lib/flowy-sys/src/lib.rs b/rust-lib/flowy-sys/src/lib.rs index 375785d38c..1fc8dffdc8 100644 --- a/rust-lib/flowy-sys/src/lib.rs +++ b/rust-lib/flowy-sys/src/lib.rs @@ -10,7 +10,7 @@ mod service; mod util; #[cfg(feature = "dart_ffi")] -mod dart_ffi; +pub mod dart_ffi; mod stream; mod system; diff --git a/rust-lib/flowy-sys/src/module/module.rs b/rust-lib/flowy-sys/src/module/module.rs index 19b72cabcc..42aaa650ef 100644 --- a/rust-lib/flowy-sys/src/module/module.rs +++ b/rust-lib/flowy-sys/src/module/module.rs @@ -21,6 +21,7 @@ use std::{ future::Future, pin::Pin, rc::Rc, + sync::Arc, task::{Context, Poll}, }; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; diff --git a/rust-lib/flowy-sys/src/request/request.rs b/rust-lib/flowy-sys/src/request/request.rs index de343170f0..7dfcc91bfb 100644 --- a/rust-lib/flowy-sys/src/request/request.rs +++ b/rust-lib/flowy-sys/src/request/request.rs @@ -30,6 +30,8 @@ impl EventRequest { pub fn get_event(&self) -> &str { &self.event } pub fn get_id(&self) -> &str { &self.id } + + pub fn from_data(data: Vec) -> Self { unimplemented!() } } pub trait FromRequest: Sized { diff --git a/rust-lib/flowy-sys/src/stream.rs b/rust-lib/flowy-sys/src/stream.rs index 63dc9ae06d..233ee3e6fe 100644 --- a/rust-lib/flowy-sys/src/stream.rs +++ b/rust-lib/flowy-sys/src/stream.rs @@ -4,7 +4,7 @@ use crate::{ request::EventRequest, response::EventResponse, service::{BoxService, Service, ServiceFactory}, - system::ModuleMap, + system::ModuleServiceMap, }; use futures_core::{future::LocalBoxFuture, ready, task::Context}; use std::{collections::HashMap, future::Future, rc::Rc}; @@ -13,7 +13,7 @@ use tokio::{ sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, }; -pub type BoxStreamCallback = Box; +pub type BoxStreamCallback = Box; pub struct StreamData where T: 'static, @@ -37,22 +37,26 @@ pub struct CommandStream where T: 'static, { - module_map: ModuleMap, - pub data_tx: UnboundedSender>, + module_map: Option, + data_tx: UnboundedSender>, data_rx: UnboundedReceiver>, } impl CommandStream { - pub fn new(module_map: ModuleMap) -> Self { + pub fn new() -> Self { let (data_tx, data_rx) = unbounded_channel::>(); Self { - module_map, + module_map: None, data_tx, data_rx, } } pub fn send(&self, data: StreamData) { let _ = self.data_tx.send(data); } + + pub fn module_service_map(&mut self, map: ModuleServiceMap) { self.module_map = Some(map) } + + pub fn tx(&self) -> UnboundedSender> { self.data_tx.clone() } } impl Future for CommandStream @@ -87,14 +91,14 @@ where type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _cfg: Self::Config) -> Self::Future { - let module_map = self.module_map.clone(); + let module_map = self.module_map.as_ref().unwrap().clone(); let service = Box::new(CommandStreamService { module_map }); Box::pin(async move { Ok(service as Self::Service) }) } } pub struct CommandStreamService { - module_map: ModuleMap, + module_map: ModuleServiceMap, } impl Service> for CommandStreamService @@ -118,12 +122,18 @@ where let service_fut = fut.await?.call(request); service_fut.await }, - None => Err(InternalError::new("".to_owned()).into()), + None => { + let msg = format!("Can not find the module to handle the request:{:?}", request); + Err(InternalError::new(msg).into()) + }, } }; - let resp = result().await.unwrap(); - (data.callback)(data.config, resp); + match result().await { + Ok(resp) => (data.callback)(data.config, resp), + Err(e) => log::error!("{:?}", e), + } + Ok(()) }; Box::pin(fut) diff --git a/rust-lib/flowy-sys/src/system.rs b/rust-lib/flowy-sys/src/system.rs index 3e961cf055..d35187681e 100644 --- a/rust-lib/flowy-sys/src/system.rs +++ b/rust-lib/flowy-sys/src/system.rs @@ -4,7 +4,7 @@ use crate::{ request::EventRequest, response::EventResponse, rt::Runtime, - stream::CommandStreamService, + stream::{CommandStream, CommandStreamService, StreamData}, }; use futures_core::{ready, task::Context}; use std::{cell::RefCell, collections::HashMap, future::Future, io, rc::Rc, sync::Arc}; @@ -26,14 +26,14 @@ pub enum SystemCommand { Response(EventResponse), } -pub type ModuleMap = Rc>>; +pub type ModuleServiceMap = Rc>>; pub struct FlowySystem { sys_cmd_tx: UnboundedSender, - module_map: ModuleMap, + module_map: ModuleServiceMap, } impl FlowySystem { - pub fn construct(module_factory: F) -> SystemRunner + pub fn construct(module_factory: F, mut stream: CommandStream) -> SystemRunner where F: FnOnce() -> Vec, { @@ -46,21 +46,26 @@ impl FlowySystem { sys_cmd_rx, }); + let factory = module_factory(); + let mut module_service_map = HashMap::new(); + factory.into_iter().for_each(|m| { + let events = m.events(); + let rc_module = Rc::new(m); + events.into_iter().for_each(|e| { + module_service_map.insert(e, rc_module.clone()); + }); + }); + let mut system = Self { sys_cmd_tx: sys_cmd_tx.clone(), module_map: Rc::new(HashMap::default()), }; - let factory = module_factory(); - let mut module_map = HashMap::new(); - factory.into_iter().for_each(|m| { - let events = m.events(); - let rc_module = Rc::new(m); - events.into_iter().for_each(|e| { - module_map.insert(e, rc_module.clone()); - }); - }); - system.module_map = Rc::new(module_map); + let map = Rc::new(module_service_map); + system.module_map = map.clone(); + stream.module_service_map(map.clone()); + + runtime.spawn(stream); FlowySystem::set_current(system); let runner = SystemRunner { rt: runtime, stop_rx }; @@ -76,7 +81,7 @@ impl FlowySystem { } } - pub fn module_map(&self) -> ModuleMap { self.module_map.clone() } + pub fn module_map(&self) -> ModuleServiceMap { self.module_map.clone() } #[doc(hidden)] pub fn set_current(sys: FlowySystem) { diff --git a/rust-lib/flowy-sys/tests/api/dart_ffi.rs b/rust-lib/flowy-sys/tests/api/dart_ffi.rs new file mode 100644 index 0000000000..1b645abaf1 --- /dev/null +++ b/rust-lib/flowy-sys/tests/api/dart_ffi.rs @@ -0,0 +1,34 @@ +use crate::helper::*; +use flowy_sys::{dart_ffi::*, prelude::*}; + +pub async fn no_params() -> String { "no params function call".to_string() } +pub async fn one_params(_s: String) -> String { "one params function call".to_string() } +pub async fn two_params(_s1: String, _s2: String) -> String { "two params function call".to_string() } + +#[test] +fn test_init() { + setup_env(); + + let no_params_command = "no params".to_string(); + let one_params_command = "one params".to_string(); + let two_params_command = "two params".to_string(); + + let modules = vec![Module::new() + .event(no_params_command.clone(), no_params) + .event(one_params_command.clone(), one_params) + .event(two_params_command.clone(), two_params)]; + + init_dart(modules, || { + let request = EventRequest::new(no_params_command); + let stream_data = StreamData::new( + 1, + Some(request), + Box::new(|config, response| { + log::info!("😁😁😁 {:?}", response); + }), + ); + + send(stream_data); + FlowySystem::current().stop(); + }); +} diff --git a/rust-lib/flowy-sys/tests/api/main.rs b/rust-lib/flowy-sys/tests/api/main.rs index 21ac62726e..8814b112ec 100644 --- a/rust-lib/flowy-sys/tests/api/main.rs +++ b/rust-lib/flowy-sys/tests/api/main.rs @@ -1,2 +1,3 @@ +mod dart_ffi; mod helper; mod module_event; diff --git a/rust-lib/flowy-sys/tests/api/module_event.rs b/rust-lib/flowy-sys/tests/api/module_event.rs index 14a5a49532..0b6e2227b6 100644 --- a/rust-lib/flowy-sys/tests/api/module_event.rs +++ b/rust-lib/flowy-sys/tests/api/module_event.rs @@ -13,30 +13,29 @@ fn test() { let one_params_command = "one params".to_string(); let two_params_command = "two params".to_string(); - let runner = FlowySystem::construct(|| { - vec![Module::new() - .event(no_params_command.clone(), no_params) - .event(one_params_command.clone(), one_params) - .event(two_params_command.clone(), two_params)] - }); - - let stream = CommandStream::new(FlowySystem::current().module_map()); - let tx = stream.data_tx.clone(); - - runner - .spawn(stream) - .spawn(async move { - let request = EventRequest::new(no_params_command.clone()); - let stream_data = StreamData::new( - 1, - Some(request), - Box::new(|config, response| { - log::info!("{:?}", response); - }), - ); - tx.send(stream_data); - FlowySystem::current().stop(); - }) - .run() - .unwrap(); + let stream = CommandStream::::new(); + let tx = stream.tx(); + FlowySystem::construct( + || { + vec![Module::new() + .event(no_params_command.clone(), no_params) + .event(one_params_command.clone(), one_params) + .event(two_params_command.clone(), two_params)] + }, + stream, + ) + .spawn(async move { + let request = EventRequest::new(no_params_command.clone()); + let stream_data = StreamData::new( + 1, + Some(request), + Box::new(|config, response| { + log::info!("{:?}", response); + }), + ); + tx.send(stream_data); + FlowySystem::current().stop(); + }) + .run() + .unwrap(); }