diff --git a/rust-lib/dart-ffi/src/lib.rs b/rust-lib/dart-ffi/src/lib.rs index fd72252596..b14fc8bdde 100644 --- a/rust-lib/dart-ffi/src/lib.rs +++ b/rust-lib/dart-ffi/src/lib.rs @@ -18,7 +18,7 @@ pub extern "C" fn async_command(port: i64, input: *const u8, len: usize) { let bytes = unsafe { std::slice::from_raw_parts(input, len) }.to_vec(); let request = EventRequest::from_data(bytes); - let stream_data = StreamData::new(port, Some(request)).with_callback(Box::new(|_config, response| { + let stream_data = CommandData::new(port, Some(request)).with_callback(Box::new(|_config, response| { log::info!("async resp: {:?}", response); })); diff --git a/rust-lib/flowy-sdk/src/lib.rs b/rust-lib/flowy-sdk/src/lib.rs index c5adb37c35..9dc1a60208 100644 --- a/rust-lib/flowy-sdk/src/lib.rs +++ b/rust-lib/flowy-sdk/src/lib.rs @@ -19,14 +19,14 @@ pub fn init_system(modules: Vec) { FlowySystem::construct( || modules, |module_map| { - let mut stream = CommandStream::::new(module_map.clone()); - let stream_fut = CommandStreamFuture::new(module_map, stream.take_data_rx()); + let mut stream = CommandSender::::new(module_map.clone()); + let runner = CommandSenderRunner::new(module_map, stream.take_data_rx()); - STREAM_SENDER.with(|cell| { + CMD_SENDER.with(|cell| { *cell.borrow_mut() = Some(stream); }); - stream_fut + runner }, ) .run() @@ -34,18 +34,18 @@ pub fn init_system(modules: Vec) { } thread_local!( - static STREAM_SENDER: RefCell>> = RefCell::new(None); + static CMD_SENDER: RefCell>> = RefCell::new(None); ); -pub fn sync_send(data: StreamData) -> EventResponse { - STREAM_SENDER.with(|cell| match &*cell.borrow() { +pub fn sync_send(data: CommandData) -> EventResponse { + CMD_SENDER.with(|cell| match &*cell.borrow() { Some(stream) => stream.sync_send(data), None => panic!(""), }) } -pub fn async_send(data: StreamData) { - STREAM_SENDER.with(|cell| match &*cell.borrow() { +pub fn async_send(data: CommandData) { + CMD_SENDER.with(|cell| match &*cell.borrow() { Some(stream) => { stream.async_send(data); }, diff --git a/rust-lib/flowy-sys/src/stream.rs b/rust-lib/flowy-sys/src/stream.rs index 9dc0445514..91c6cc515c 100644 --- a/rust-lib/flowy-sys/src/stream.rs +++ b/rust-lib/flowy-sys/src/stream.rs @@ -15,131 +15,35 @@ use tokio::{ macro_rules! service_factor_impl { ($name:ident) => { #[allow(non_snake_case, missing_docs)] - impl ServiceFactory> for $name + impl ServiceFactory> for $name where T: 'static, { type Response = EventResponse; type Error = SystemError; - type Service = BoxService, Self::Response, Self::Error>; + type Service = BoxService, Self::Response, Self::Error>; type Config = (); type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _cfg: Self::Config) -> Self::Future { let module_map = self.module_map.clone(); - let service = Box::new(CommandStreamService { module_map }); + let service = Box::new(CommandSenderService { module_map }); Box::pin(async move { Ok(service as Self::Service) }) } } }; } -pub type BoxStreamCallback = Box; -pub struct StreamData -where - T: 'static, -{ - config: T, - request: Option, - callback: Option>, -} - -impl StreamData { - pub fn new(config: T, request: Option) -> Self { - Self { - config, - request, - callback: None, - } - } - - pub fn with_callback(mut self, callback: BoxStreamCallback) -> Self { - self.callback = Some(callback); - self - } -} - -pub struct CommandStream -where - T: 'static, -{ - module_map: ModuleMap, - data_tx: UnboundedSender>, - data_rx: Option>>, -} - -service_factor_impl!(CommandStream); - -impl CommandStream { - pub fn new(module_map: ModuleMap) -> Self { - let (data_tx, data_rx) = unbounded_channel::>(); - Self { - module_map, - data_tx, - data_rx: Some(data_rx), - } - } - - pub fn async_send(&self, data: StreamData) { let _ = self.data_tx.send(data); } - - pub fn sync_send(&self, data: StreamData) -> EventResponse { - let factory = self.new_service(()); - - futures::executor::block_on(async { - let service = factory.await.unwrap(); - service.call(data).await.unwrap() - }) - } - - pub fn tx(&self) -> UnboundedSender> { self.data_tx.clone() } - - pub fn take_data_rx(&mut self) -> UnboundedReceiver> { self.data_rx.take().unwrap() } -} - -pub struct CommandStreamFuture { - module_map: ModuleMap, - data_rx: UnboundedReceiver>, -} - -service_factor_impl!(CommandStreamFuture); - -impl CommandStreamFuture { - pub fn new(module_map: ModuleMap, data_rx: UnboundedReceiver>) -> Self { - Self { module_map, data_rx } - } -} - -impl Future for CommandStreamFuture -where - T: 'static, -{ - type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match ready!(Pin::new(&mut self.data_rx).poll_recv(cx)) { - None => return Poll::Ready(()), - Some(ctx) => { - let factory = self.new_service(()); - tokio::task::spawn_local(async move { - let service = factory.await.unwrap(); - let _ = service.call(ctx).await; - }); - }, - } - } - } -} - -pub struct CommandStreamService { +struct CommandSenderService { module_map: ModuleMap, } -impl Service> for CommandStreamService { +impl Service> for CommandSenderService { type Response = EventResponse; type Error = SystemError; type Future = LocalBoxFuture<'static, Result>; - fn call(&self, mut data: StreamData) -> Self::Future { + fn call(&self, mut data: CommandData) -> Self::Future { let module_map = self.module_map.clone(); let request = data.request.take().unwrap(); let fut = async move { @@ -168,3 +72,99 @@ impl Service> for CommandStreamService { Box::pin(fut) } } + +pub type BoxStreamCallback = Box; +pub struct CommandData +where + T: 'static, +{ + config: T, + request: Option, + callback: Option>, +} + +impl CommandData { + pub fn new(config: T, request: Option) -> Self { + Self { + config, + request, + callback: None, + } + } + + pub fn with_callback(mut self, callback: BoxStreamCallback) -> Self { + self.callback = Some(callback); + self + } +} + +pub struct CommandSender +where + T: 'static, +{ + module_map: ModuleMap, + data_tx: UnboundedSender>, + data_rx: Option>>, +} + +service_factor_impl!(CommandSender); + +impl CommandSender { + pub fn new(module_map: ModuleMap) -> Self { + let (data_tx, data_rx) = unbounded_channel::>(); + Self { + module_map, + data_tx, + data_rx: Some(data_rx), + } + } + + pub fn async_send(&self, data: CommandData) { let _ = self.data_tx.send(data); } + + pub fn sync_send(&self, data: CommandData) -> EventResponse { + let factory = self.new_service(()); + + futures::executor::block_on(async { + let service = factory.await.unwrap(); + service.call(data).await.unwrap() + }) + } + + pub fn tx(&self) -> UnboundedSender> { self.data_tx.clone() } + + pub fn take_data_rx(&mut self) -> UnboundedReceiver> { self.data_rx.take().unwrap() } +} + +pub struct CommandSenderRunner { + module_map: ModuleMap, + data_rx: UnboundedReceiver>, +} + +service_factor_impl!(CommandSenderRunner); + +impl CommandSenderRunner { + pub fn new(module_map: ModuleMap, data_rx: UnboundedReceiver>) -> Self { + Self { module_map, data_rx } + } +} + +impl Future for CommandSenderRunner +where + T: 'static, +{ + type Output = (); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + match ready!(Pin::new(&mut self.data_rx).poll_recv(cx)) { + None => return Poll::Ready(()), + Some(ctx) => { + let factory = self.new_service(()); + tokio::task::spawn_local(async move { + let service = factory.await.unwrap(); + let _ = service.call(ctx).await; + }); + }, + } + } + } +} diff --git a/rust-lib/flowy-sys/src/system.rs b/rust-lib/flowy-sys/src/system.rs index 6dbb12ae6d..2db8c18fd6 100644 --- a/rust-lib/flowy-sys/src/system.rs +++ b/rust-lib/flowy-sys/src/system.rs @@ -1,7 +1,7 @@ use crate::{ module::{Event, Module}, rt::Runtime, - stream::CommandStreamFuture, + stream::CommandSenderRunner, }; use futures_core::{ready, task::Context}; use std::{cell::RefCell, collections::HashMap, future::Future, io, rc::Rc, sync::Arc}; @@ -28,10 +28,10 @@ pub struct FlowySystem { } impl FlowySystem { - pub fn construct(module_factory: F, stream_factory: S) -> SystemRunner + pub fn construct(module_factory: F, sender_factory: S) -> SystemRunner where F: FnOnce() -> Vec, - S: FnOnce(ModuleMap) -> CommandStreamFuture, + S: FnOnce(ModuleMap) -> CommandSenderRunner, T: 'static, { let runtime = Runtime::new().unwrap(); @@ -54,8 +54,8 @@ impl FlowySystem { }); let system = Self { sys_cmd_tx }; - let stream_fut = stream_factory(Rc::new(module_map)); - runtime.spawn(stream_fut); + let sender_runner = sender_factory(Rc::new(module_map)); + runtime.spawn(sender_runner); FlowySystem::set_current(system); let runner = SystemRunner { rt: runtime, stop_rx }; diff --git a/rust-lib/flowy-sys/tests/api/helper.rs b/rust-lib/flowy-sys/tests/api/helper.rs index ac93b9e529..d8c43c976d 100644 --- a/rust-lib/flowy-sys/tests/api/helper.rs +++ b/rust-lib/flowy-sys/tests/api/helper.rs @@ -1,4 +1,4 @@ -use flowy_sys::prelude::{CommandStream, CommandStreamFuture, EventResponse, FlowySystem, Module, StreamData}; +use flowy_sys::prelude::{CommandData, CommandSender, CommandSenderRunner, EventResponse, FlowySystem, Module}; use std::{ cell::RefCell, sync::{Once, RwLock}, @@ -21,18 +21,18 @@ pub struct ExecutorAction { pub struct FlowySystemExecutor {} thread_local!( - static STREAM_SENDER: RefCell>> = RefCell::new(None); + static CMD_SENDER: RefCell>> = RefCell::new(None); ); -pub fn sync_send(data: StreamData) -> EventResponse { - STREAM_SENDER.with(|cell| match &*cell.borrow() { +pub fn sync_send(data: CommandData) -> EventResponse { + CMD_SENDER.with(|cell| match &*cell.borrow() { Some(stream) => stream.sync_send(data), None => panic!(""), }) } -pub fn async_send(data: StreamData) { - STREAM_SENDER.with(|cell| match &*cell.borrow() { +pub fn async_send(data: CommandData) { + CMD_SENDER.with(|cell| match &*cell.borrow() { Some(stream) => { stream.async_send(data); }, @@ -40,8 +40,6 @@ pub fn async_send(data: StreamData) { }); } -pub fn stop_system() { FlowySystem::current().stop(); } - pub fn init_system(modules: Vec, f: F) where F: FnOnce() + 'static, @@ -49,17 +47,19 @@ where FlowySystem::construct( || modules, |module_map| { - let mut stream = CommandStream::::new(module_map.clone()); - let stream_fut = CommandStreamFuture::new(module_map, stream.take_data_rx()); + let mut stream = CommandSender::::new(module_map.clone()); + let runner = CommandSenderRunner::new(module_map, stream.take_data_rx()); - STREAM_SENDER.with(|cell| { + CMD_SENDER.with(|cell| { *cell.borrow_mut() = Some(stream); }); - stream_fut + runner }, ) .spawn(async { f() }) .run() .unwrap(); } + +pub fn stop_system() { FlowySystem::current().stop(); } diff --git a/rust-lib/flowy-sys/tests/api/module_event.rs b/rust-lib/flowy-sys/tests/api/module_event.rs index 2e23ebce6f..5cf14a570a 100644 --- a/rust-lib/flowy-sys/tests/api/module_event.rs +++ b/rust-lib/flowy-sys/tests/api/module_event.rs @@ -20,7 +20,7 @@ fn test_init() { init_system(modules, || { let request = EventRequest::new(no_params_command); - let stream_data = StreamData::new(1, Some(request)).with_callback(Box::new(|_config, response| { + let stream_data = CommandData::new(1, Some(request)).with_callback(Box::new(|_config, response| { log::info!("async resp: {:?}", response); }));