add dart ffi

This commit is contained in:
appflowy 2021-06-28 16:27:46 +08:00
parent 97dde2d711
commit 42b8add240
11 changed files with 188 additions and 73 deletions

View File

@ -26,10 +26,12 @@ thread-id = "3.3.0"
allo-isolate = {version = "^0.1", features = ["catch-unwind",], optional = true} allo-isolate = {version = "^0.1", features = ["catch-unwind",], optional = true}
byteorder = {version = "1.3.4", optional = true} byteorder = {version = "1.3.4", optional = true}
ffi-support = {version = "0.4.2", optional = true} ffi-support = {version = "0.4.2", optional = true}
protobuf = {version = "2.20.0", optional = true}
lazy_static = {version = "1.4.0", optional = true}
[dev-dependencies] [dev-dependencies]
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
futures-util = "0.3.15" futures-util = "0.3.15"
[features] [features]
dart_ffi = ["ffi-support", "allo-isolate", "byteorder"] dart_ffi = ["ffi-support", "allo-isolate", "byteorder", "protobuf", "lazy_static"]

View File

@ -1,13 +1,33 @@
use crate::{response::EventResponse, rt::SystemCommand}; use crate::{
module::Module,
request::EventRequest,
response::EventResponse,
rt::SystemCommand,
stream::*,
system::FlowySystem,
};
use futures_core::ready; use futures_core::ready;
use std::{future::Future, task::Context}; use lazy_static::lazy_static;
use protobuf::Message;
use std::{
cell::RefCell,
future::Future,
sync::{Arc, RwLock},
task::Context,
};
use tokio::{ use tokio::{
macros::support::{Pin, Poll}, macros::support::{Pin, Poll},
sync::{mpsc::UnboundedReceiver, oneshot}, sync::{mpsc::UnboundedSender, oneshot},
}; };
#[no_mangle] #[no_mangle]
pub extern "C" fn async_command(port: i64, input: *const u8, len: usize) {} pub extern "C" fn async_command(port: i64, input: *const u8, len: usize) {
let bytes = unsafe { std::slice::from_raw_parts(input, len) }.to_vec();
let request = EventRequest::from_data(bytes);
let stream_data = StreamData::new(port, Some(request), Box::new(|port, response| {}));
send(stream_data);
}
#[no_mangle] #[no_mangle]
pub extern "C" fn free_rust(ptr: *mut u8, length: u32) { reclaim_rust(ptr, length) } pub extern "C" fn free_rust(ptr: *mut u8, length: u32) { reclaim_rust(ptr, length) }
@ -15,20 +35,59 @@ pub extern "C" fn free_rust(ptr: *mut u8, length: u32) { reclaim_rust(ptr, lengt
#[no_mangle] #[no_mangle]
pub extern "C" fn init_stream(port: i64) -> i32 { return 0; } pub extern "C" fn init_stream(port: i64) -> i32 { return 0; }
struct SystemFFI { #[allow(unused_attributes)]
resp_rx: UnboundedReceiver<EventResponse>, pub fn reclaim_rust(ptr: *mut u8, length: u32) {
} unsafe {
let len: usize = length as usize;
impl Future for SystemFFI { Vec::from_raw_parts(ptr, len, len);
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(Pin::new(&mut self.resp_rx).poll_recv(cx)) {
None => return Poll::Ready(()),
Some(resp) => {
log::trace!("Response: {:?}", resp);
},
}
}
} }
} }
thread_local!(
static STREAM_SENDER: RefCell<Option<UnboundedSender<StreamData<i64>>>> = RefCell::new(None);
);
pub fn send(data: StreamData<i64>) {
STREAM_SENDER.with(|cell| match &*cell.borrow() {
Some(tx) => {
tx.send(data);
},
None => panic!(""),
});
}
pub fn init_dart<F>(modules: Vec<Module>, f: F)
where
F: FnOnce() + 'static,
{
let mut stream = CommandStream::<i64>::new();
let stream = CommandStream::<i64>::new();
let tx = stream.tx();
STREAM_SENDER.with(|cell| {
*cell.borrow_mut() = Some(tx);
});
FlowySystem::construct(|| modules, stream)
.spawn(async { f() })
.run()
.unwrap();
// FlowySystem::construct(|| modules, stream)
// .spawn(async move {
// let request = EventRequest::new("1".to_string());
// let stream_data = StreamData::new(
// 1,
// Some(request),
// Box::new(|config, response| {
// log::info!("😁{:?}", response);
// }),
// );
//
// send(stream_data);
//
// FlowySystem::current().stop();
// })
// .run()
// .unwrap();
}

View File

@ -1,2 +1,4 @@
#[cfg(feature = "dart_ffi")] #[cfg(feature = "dart_ffi")]
mod ffi; mod ffi;
pub use ffi::*;

View File

@ -10,7 +10,7 @@ mod service;
mod util; mod util;
#[cfg(feature = "dart_ffi")] #[cfg(feature = "dart_ffi")]
mod dart_ffi; pub mod dart_ffi;
mod stream; mod stream;
mod system; mod system;

View File

@ -21,6 +21,7 @@ use std::{
future::Future, future::Future,
pin::Pin, pin::Pin,
rc::Rc, rc::Rc,
sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
}; };
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};

View File

@ -30,6 +30,8 @@ impl EventRequest {
pub fn get_event(&self) -> &str { &self.event } pub fn get_event(&self) -> &str { &self.event }
pub fn get_id(&self) -> &str { &self.id } pub fn get_id(&self) -> &str { &self.id }
pub fn from_data(data: Vec<u8>) -> Self { unimplemented!() }
} }
pub trait FromRequest: Sized { pub trait FromRequest: Sized {

View File

@ -4,7 +4,7 @@ use crate::{
request::EventRequest, request::EventRequest,
response::EventResponse, response::EventResponse,
service::{BoxService, Service, ServiceFactory}, service::{BoxService, Service, ServiceFactory},
system::ModuleMap, system::ModuleServiceMap,
}; };
use futures_core::{future::LocalBoxFuture, ready, task::Context}; use futures_core::{future::LocalBoxFuture, ready, task::Context};
use std::{collections::HashMap, future::Future, rc::Rc}; use std::{collections::HashMap, future::Future, rc::Rc};
@ -13,7 +13,7 @@ use tokio::{
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
}; };
pub type BoxStreamCallback<T> = Box<dyn FnOnce(T, EventResponse) + 'static>; pub type BoxStreamCallback<T> = Box<dyn FnOnce(T, EventResponse) + 'static + Send + Sync>;
pub struct StreamData<T> pub struct StreamData<T>
where where
T: 'static, T: 'static,
@ -37,22 +37,26 @@ pub struct CommandStream<T>
where where
T: 'static, T: 'static,
{ {
module_map: ModuleMap, module_map: Option<ModuleServiceMap>,
pub data_tx: UnboundedSender<StreamData<T>>, data_tx: UnboundedSender<StreamData<T>>,
data_rx: UnboundedReceiver<StreamData<T>>, data_rx: UnboundedReceiver<StreamData<T>>,
} }
impl<T> CommandStream<T> { impl<T> CommandStream<T> {
pub fn new(module_map: ModuleMap) -> Self { pub fn new() -> Self {
let (data_tx, data_rx) = unbounded_channel::<StreamData<T>>(); let (data_tx, data_rx) = unbounded_channel::<StreamData<T>>();
Self { Self {
module_map, module_map: None,
data_tx, data_tx,
data_rx, data_rx,
} }
} }
pub fn send(&self, data: StreamData<T>) { let _ = self.data_tx.send(data); } pub fn send(&self, data: StreamData<T>) { let _ = self.data_tx.send(data); }
pub fn module_service_map(&mut self, map: ModuleServiceMap) { self.module_map = Some(map) }
pub fn tx(&self) -> UnboundedSender<StreamData<T>> { self.data_tx.clone() }
} }
impl<T> Future for CommandStream<T> impl<T> Future for CommandStream<T>
@ -87,14 +91,14 @@ where
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::Error>>; type Future = LocalBoxFuture<'static, Result<Self::Service, Self::Error>>;
fn new_service(&self, _cfg: Self::Config) -> Self::Future { fn new_service(&self, _cfg: Self::Config) -> Self::Future {
let module_map = self.module_map.clone(); let module_map = self.module_map.as_ref().unwrap().clone();
let service = Box::new(CommandStreamService { module_map }); let service = Box::new(CommandStreamService { module_map });
Box::pin(async move { Ok(service as Self::Service) }) Box::pin(async move { Ok(service as Self::Service) })
} }
} }
pub struct CommandStreamService { pub struct CommandStreamService {
module_map: ModuleMap, module_map: ModuleServiceMap,
} }
impl<T> Service<StreamData<T>> for CommandStreamService impl<T> Service<StreamData<T>> for CommandStreamService
@ -118,12 +122,18 @@ where
let service_fut = fut.await?.call(request); let service_fut = fut.await?.call(request);
service_fut.await service_fut.await
}, },
None => Err(InternalError::new("".to_owned()).into()), None => {
let msg = format!("Can not find the module to handle the request:{:?}", request);
Err(InternalError::new(msg).into())
},
} }
}; };
let resp = result().await.unwrap(); match result().await {
(data.callback)(data.config, resp); Ok(resp) => (data.callback)(data.config, resp),
Err(e) => log::error!("{:?}", e),
}
Ok(()) Ok(())
}; };
Box::pin(fut) Box::pin(fut)

View File

@ -4,7 +4,7 @@ use crate::{
request::EventRequest, request::EventRequest,
response::EventResponse, response::EventResponse,
rt::Runtime, rt::Runtime,
stream::CommandStreamService, stream::{CommandStream, CommandStreamService, StreamData},
}; };
use futures_core::{ready, task::Context}; use futures_core::{ready, task::Context};
use std::{cell::RefCell, collections::HashMap, future::Future, io, rc::Rc, sync::Arc}; use std::{cell::RefCell, collections::HashMap, future::Future, io, rc::Rc, sync::Arc};
@ -26,14 +26,14 @@ pub enum SystemCommand {
Response(EventResponse), Response(EventResponse),
} }
pub type ModuleMap = Rc<HashMap<Event, Rc<Module>>>; pub type ModuleServiceMap = Rc<HashMap<Event, Rc<Module>>>;
pub struct FlowySystem { pub struct FlowySystem {
sys_cmd_tx: UnboundedSender<SystemCommand>, sys_cmd_tx: UnboundedSender<SystemCommand>,
module_map: ModuleMap, module_map: ModuleServiceMap,
} }
impl FlowySystem { impl FlowySystem {
pub fn construct<F>(module_factory: F) -> SystemRunner pub fn construct<F, T>(module_factory: F, mut stream: CommandStream<T>) -> SystemRunner
where where
F: FnOnce() -> Vec<Module>, F: FnOnce() -> Vec<Module>,
{ {
@ -46,21 +46,26 @@ impl FlowySystem {
sys_cmd_rx, sys_cmd_rx,
}); });
let factory = module_factory();
let mut module_service_map = HashMap::new();
factory.into_iter().for_each(|m| {
let events = m.events();
let rc_module = Rc::new(m);
events.into_iter().for_each(|e| {
module_service_map.insert(e, rc_module.clone());
});
});
let mut system = Self { let mut system = Self {
sys_cmd_tx: sys_cmd_tx.clone(), sys_cmd_tx: sys_cmd_tx.clone(),
module_map: Rc::new(HashMap::default()), module_map: Rc::new(HashMap::default()),
}; };
let factory = module_factory(); let map = Rc::new(module_service_map);
let mut module_map = HashMap::new(); system.module_map = map.clone();
factory.into_iter().for_each(|m| { stream.module_service_map(map.clone());
let events = m.events();
let rc_module = Rc::new(m); runtime.spawn(stream);
events.into_iter().for_each(|e| {
module_map.insert(e, rc_module.clone());
});
});
system.module_map = Rc::new(module_map);
FlowySystem::set_current(system); FlowySystem::set_current(system);
let runner = SystemRunner { rt: runtime, stop_rx }; let runner = SystemRunner { rt: runtime, stop_rx };
@ -76,7 +81,7 @@ impl FlowySystem {
} }
} }
pub fn module_map(&self) -> ModuleMap { self.module_map.clone() } pub fn module_map(&self) -> ModuleServiceMap { self.module_map.clone() }
#[doc(hidden)] #[doc(hidden)]
pub fn set_current(sys: FlowySystem) { pub fn set_current(sys: FlowySystem) {

View File

@ -0,0 +1,34 @@
use crate::helper::*;
use flowy_sys::{dart_ffi::*, prelude::*};
pub async fn no_params() -> String { "no params function call".to_string() }
pub async fn one_params(_s: String) -> String { "one params function call".to_string() }
pub async fn two_params(_s1: String, _s2: String) -> String { "two params function call".to_string() }
#[test]
fn test_init() {
setup_env();
let no_params_command = "no params".to_string();
let one_params_command = "one params".to_string();
let two_params_command = "two params".to_string();
let modules = vec![Module::new()
.event(no_params_command.clone(), no_params)
.event(one_params_command.clone(), one_params)
.event(two_params_command.clone(), two_params)];
init_dart(modules, || {
let request = EventRequest::new(no_params_command);
let stream_data = StreamData::new(
1,
Some(request),
Box::new(|config, response| {
log::info!("😁😁😁 {:?}", response);
}),
);
send(stream_data);
FlowySystem::current().stop();
});
}

View File

@ -1,2 +1,3 @@
mod dart_ffi;
mod helper; mod helper;
mod module_event; mod module_event;

View File

@ -13,30 +13,29 @@ fn test() {
let one_params_command = "one params".to_string(); let one_params_command = "one params".to_string();
let two_params_command = "two params".to_string(); let two_params_command = "two params".to_string();
let runner = FlowySystem::construct(|| { let stream = CommandStream::<i64>::new();
vec![Module::new() let tx = stream.tx();
.event(no_params_command.clone(), no_params) FlowySystem::construct(
.event(one_params_command.clone(), one_params) || {
.event(two_params_command.clone(), two_params)] vec![Module::new()
}); .event(no_params_command.clone(), no_params)
.event(one_params_command.clone(), one_params)
let stream = CommandStream::new(FlowySystem::current().module_map()); .event(two_params_command.clone(), two_params)]
let tx = stream.data_tx.clone(); },
stream,
runner )
.spawn(stream) .spawn(async move {
.spawn(async move { let request = EventRequest::new(no_params_command.clone());
let request = EventRequest::new(no_params_command.clone()); let stream_data = StreamData::new(
let stream_data = StreamData::new( 1,
1, Some(request),
Some(request), Box::new(|config, response| {
Box::new(|config, response| { log::info!("{:?}", response);
log::info!("{:?}", response); }),
}), );
); tx.send(stream_data);
tx.send(stream_data); FlowySystem::current().stop();
FlowySystem::current().stop(); })
}) .run()
.run() .unwrap();
.unwrap();
} }