From 6d09c337820c1d49ed08c004ad6c5a615c4bbcdb Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Tue, 20 Aug 2024 14:16:24 +0800 Subject: [PATCH] chore: spawn task on local set (#6012) * chore: spawn local * chore: using multiple thread runtime * chore: fix test --- frontend/rust-lib/dart-ffi/src/lib.rs | 82 ++++++++++++------- .../src/event_builder.rs | 13 ++- .../event-integration-test/src/lib.rs | 4 +- .../event-integration-test/src/user_event.rs | 19 +++-- .../flowy-database2/src/event_handler.rs | 3 +- .../src/services/database/database_editor.rs | 9 +- .../rust-lib/lib-dispatch/src/dispatcher.rs | 82 ++++--------------- .../rust-lib/lib-dispatch/tests/api/module.rs | 23 +++--- 8 files changed, 114 insertions(+), 121 deletions(-) diff --git a/frontend/rust-lib/dart-ffi/src/lib.rs b/frontend/rust-lib/dart-ffi/src/lib.rs index 134290e76e..b9aca0fc90 100644 --- a/frontend/rust-lib/dart-ffi/src/lib.rs +++ b/frontend/rust-lib/dart-ffi/src/lib.rs @@ -1,11 +1,16 @@ #![allow(clippy::not_unsafe_ptr_arg_deref)] use allo_isolate::Isolate; +use futures::ready; use lazy_static::lazy_static; use semver::Version; -use std::sync::{mpsc, Arc, RwLock}; +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, RwLock}; +use std::task::{Context, Poll}; use std::{ffi::CStr, os::raw::c_char}; use tokio::runtime::Builder; +use tokio::sync::mpsc; use tokio::task::LocalSet; use tracing::{debug, error, info, trace, warn}; @@ -42,7 +47,7 @@ pub struct Task { dispatcher: Arc, request: AFPluginRequest, port: i64, - ret: Option>>, + ret: Option>, } unsafe impl Send for Task {} @@ -51,7 +56,7 @@ unsafe impl Sync for DartAppFlowyCore {} struct DartAppFlowyCore { core: Arc>>, handle: RwLock>>, - sender: RwLock>>, + sender: RwLock>>, } impl DartAppFlowyCore { @@ -76,7 +81,7 @@ impl DartAppFlowyCore { &self, request: AFPluginRequest, port: i64, - ret: Option>>, + ret: Option>, ) { if let Ok(sender_guard) = self.sender.read() { if let Err(e) = sender_guard.as_ref().unwrap().send(Task { @@ -138,32 +143,11 @@ pub extern "C" fn init_sdk(_port: i64, data: *mut c_char) -> i64 { .unwrap() .take() .map(|isolate| Arc::new(LogStreamSenderImpl { isolate }) as Arc); - let (sender, task_rx) = mpsc::channel::(); + let (sender, task_rx) = mpsc::unbounded_channel::(); let handle = std::thread::spawn(move || { + let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); let local_set = LocalSet::new(); - while let Ok(task) = task_rx.recv() { - let Task { - dispatcher, - request, - port, - ret, - } = task; - - let resp = AFPluginDispatcher::boxed_async_send_with_callback( - dispatcher.as_ref(), - request, - move |resp: AFPluginEventResponse| { - #[cfg(feature = "sync_verbose_log")] - trace!("[FFI]: Post data to dart through {} port", port); - Box::pin(post_to_flutter(resp, port)) - }, - &local_set, - ); - - if let Some(ret) = ret { - let _ = ret.send(resp); - } - } + runtime.block_on(local_set.run_until(Runner { rx: task_rx })); }); *DART_APPFLOWY_CORE.sender.write().unwrap() = Some(sender); @@ -190,6 +174,48 @@ pub extern "C" fn async_event(port: i64, input: *const u8, len: usize) { DART_APPFLOWY_CORE.dispatch(request, port, None); } +/// A persistent future that processes [Arbiter] commands. +struct Runner { + rx: mpsc::UnboundedReceiver, +} + +impl Future for Runner { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + match ready!(self.rx.poll_recv(cx)) { + None => return Poll::Ready(()), + Some(task) => { + let Task { + dispatcher, + request, + port, + ret, + } = task; + + tokio::task::spawn_local(async move { + let resp = AFPluginDispatcher::boxed_async_send_with_callback( + dispatcher.as_ref(), + request, + move |resp: AFPluginEventResponse| { + #[cfg(feature = "sync_verbose_log")] + trace!("[FFI]: Post data to dart through {} port", port); + Box::pin(post_to_flutter(resp, port)) + }, + ) + .await; + + if let Some(ret) = ret { + let _ = ret.send(resp); + } + }); + }, + } + } + } +} + #[no_mangle] pub extern "C" fn sync_event(_input: *const u8, _len: usize) -> *const u8 { error!("unimplemented sync_event"); diff --git a/frontend/rust-lib/event-integration-test/src/event_builder.rs b/frontend/rust-lib/event-integration-test/src/event_builder.rs index a50f3aa314..b3d4a313f0 100644 --- a/frontend/rust-lib/event-integration-test/src/event_builder.rs +++ b/frontend/rust-lib/event-integration-test/src/event_builder.rs @@ -11,15 +11,17 @@ use std::{ }; use tokio::task::LocalSet; -#[derive(Clone)] +// #[derive(Clone)] pub struct EventBuilder { context: TestContext, + local_set: LocalSet, } impl EventBuilder { pub fn new(sdk: EventIntegrationTest) -> Self { Self { context: TestContext::new(sdk), + local_set: Default::default(), } } @@ -48,9 +50,14 @@ impl EventBuilder { } pub async fn async_send(mut self) -> Self { - let local_set = LocalSet::new(); let request = self.get_request(); - let resp = AFPluginDispatcher::async_send(self.dispatch().as_ref(), request, &local_set).await; + let resp = self + .local_set + .run_until(AFPluginDispatcher::async_send( + self.dispatch().as_ref(), + request, + )) + .await; self.context.response = Some(resp); self } diff --git a/frontend/rust-lib/event-integration-test/src/lib.rs b/frontend/rust-lib/event-integration-test/src/lib.rs index 03e93bd90e..1aaf4a57db 100644 --- a/frontend/rust-lib/event-integration-test/src/lib.rs +++ b/frontend/rust-lib/event-integration-test/src/lib.rs @@ -5,7 +5,6 @@ use collab_document::document::Document; use collab_entity::CollabType; use std::env::temp_dir; use std::path::PathBuf; -use std::rc::Rc; use std::sync::atomic::{AtomicBool, AtomicU8, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -13,6 +12,7 @@ use std::time::Duration; use nanoid::nanoid; use semver::Version; use tokio::select; +use tokio::task::LocalSet; use tokio::time::sleep; use flowy_core::config::AppFlowyCoreConfig; @@ -40,6 +40,7 @@ pub struct EventIntegrationTest { #[allow(dead_code)] cleaner: Arc, pub notification_sender: TestNotificationSender, + local_set: Arc, } impl EventIntegrationTest { @@ -67,6 +68,7 @@ impl EventIntegrationTest { authenticator, notification_sender, cleaner: Arc::new(Cleaner::new(PathBuf::from(clean_path))), + local_set: Arc::new(Default::default()), } } diff --git a/frontend/rust-lib/event-integration-test/src/user_event.rs b/frontend/rust-lib/event-integration-test/src/user_event.rs index b2b3ac33e5..e11f645aec 100644 --- a/frontend/rust-lib/event-integration-test/src/user_event.rs +++ b/frontend/rust-lib/event-integration-test/src/user_event.rs @@ -4,12 +4,9 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use bytes::Bytes; - use flowy_folder::entities::{RepeatedViewPB, WorkspacePB}; - use protobuf::ProtobufError; use tokio::sync::broadcast::{channel, Sender}; -use tokio::task::LocalSet; use tracing::error; use uuid::Uuid; @@ -74,12 +71,16 @@ impl EventIntegrationTest { .unwrap(); let request = AFPluginRequest::new(UserEvent::SignUp).payload(payload); - let user_profile = - AFPluginDispatcher::async_send(&self.appflowy_core.dispatcher(), request, &LocalSet::new()) - .await - .parse::() - .unwrap() - .unwrap(); + let user_profile = self + .local_set + .run_until(AFPluginDispatcher::async_send( + &self.appflowy_core.dispatcher(), + request, + )) + .await + .parse::() + .unwrap() + .unwrap(); // let _ = create_default_workspace_if_need(dispatch.clone(), &user_profile.id); SignUpContext { diff --git a/frontend/rust-lib/flowy-database2/src/event_handler.rs b/frontend/rust-lib/flowy-database2/src/event_handler.rs index 84564473f2..89aef3a89a 100644 --- a/frontend/rust-lib/flowy-database2/src/event_handler.rs +++ b/frontend/rust-lib/flowy-database2/src/event_handler.rs @@ -1,7 +1,6 @@ -use std::sync::{Arc, Weak}; - use collab_database::rows::RowId; use lib_infra::box_any::BoxAny; +use std::sync::{Arc, Weak}; use tokio::sync::oneshot; use tracing::{error, trace}; diff --git a/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs b/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs index 2f084537cd..fcd6c253a1 100644 --- a/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs +++ b/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs @@ -1309,7 +1309,7 @@ impl DatabaseEditor { .get_all_field_orders() .into_iter() .map(FieldIdPB::from) - .collect(); + .collect::>(); let is_linked = database.is_inline_view(view_id); (database_id, fields, is_linked) }; @@ -1318,6 +1318,13 @@ impl DatabaseEditor { .into_iter() .map(|detail| RowMetaPB::from(detail.as_ref())) .collect::>(); + + trace!( + "database: {}, num fields: {}, num row: {}", + database_id, + fields.len(), + rows.len() + ); Ok(DatabasePB { id: database_id, fields, diff --git a/frontend/rust-lib/lib-dispatch/src/dispatcher.rs b/frontend/rust-lib/lib-dispatch/src/dispatcher.rs index 920a9f1e2a..8a1f1d0e31 100644 --- a/frontend/rust-lib/lib-dispatch/src/dispatcher.rs +++ b/frontend/rust-lib/lib-dispatch/src/dispatcher.rs @@ -84,75 +84,31 @@ impl AFPluginDispatcher { } #[cfg(feature = "local_set")] - pub async fn async_send( - dispatch: &AFPluginDispatcher, - request: Req, - local_set: &tokio::task::LocalSet, - ) -> AFPluginEventResponse + pub async fn async_send(dispatch: &AFPluginDispatcher, request: Req) -> AFPluginEventResponse where - Req: Into, + Req: Into + 'static, { - AFPluginDispatcher::async_send_with_callback( - dispatch, - request, - |_| Box::pin(async {}), - local_set, - ) - .await + AFPluginDispatcher::async_send_with_callback(dispatch, request, |_| Box::pin(async {})).await } #[cfg(feature = "local_set")] pub async fn async_send_with_callback( dispatch: &AFPluginDispatcher, request: Req, callback: Callback, - local_set: &tokio::task::LocalSet, ) -> AFPluginEventResponse where - Req: Into, + Req: Into + 'static, Callback: FnOnce(AFPluginEventResponse) -> AFBoxFuture<'static, ()> + AFConcurrent + 'static, { - let request: AFPluginRequest = request.into(); - let plugins = dispatch.plugins.clone(); - let service = Box::new(DispatchService { plugins }); - tracing::trace!("Async event: {:?}", &request.event); - let service_ctx = DispatchContext { - request, - callback: Some(Box::new(callback)), - }; - - let handle = local_set.spawn_local(async move { - service.call(service_ctx).await.unwrap_or_else(|e| { - tracing::error!("Dispatch runtime error: {:?}", e); - InternalError::Other(format!("{:?}", e)).as_response() - }) - }); - // let handle = tokio::spawn(async move { - // service.call(service_ctx).await.unwrap_or_else(|e| { - // tracing::error!("Dispatch runtime error: {:?}", e); - // InternalError::Other(format!("{:?}", e)).as_response() - // }) - // }); - - let result: Result = local_set - .run_until(handle) - .await - .map_err(|e| e.to_string().into()); - - result.unwrap_or_else(|e| { - let msg = format!("EVENT_DISPATCH join error: {:?}", e); - tracing::error!("{}", msg); - let error = InternalError::JoinError(msg); - error.as_response() - }) + Self::boxed_async_send_with_callback(dispatch, request, callback).await } #[cfg(feature = "local_set")] - pub fn boxed_async_send_with_callback( + pub async fn boxed_async_send_with_callback( dispatch: &AFPluginDispatcher, request: Req, callback: Callback, - local_set: &tokio::task::LocalSet, - ) -> DispatchFuture + ) -> AFPluginEventResponse where Req: Into + 'static, Callback: FnOnce(AFPluginEventResponse) -> AFBoxFuture<'static, ()> + AFConcurrent + 'static, @@ -166,25 +122,20 @@ impl AFPluginDispatcher { callback: Some(Box::new(callback)), }; - let handle = local_set.spawn_local(async move { + let result = tokio::task::spawn_local(async move { service.call(service_ctx).await.unwrap_or_else(|e| { tracing::error!("Dispatch runtime error: {:?}", e); InternalError::Other(format!("{:?}", e)).as_response() }) - }); + }) + .await; - let fut = local_set.run_until(handle); - let result = local_set.block_on(&dispatch.runtime.inner, fut); - DispatchFuture { - fut: Box::pin(async move { - result.unwrap_or_else(|e| { - let msg = format!("EVENT_DISPATCH join error: {:?}", e); - tracing::error!("{}", msg); - let error = InternalError::JoinError(msg); - error.as_response() - }) - }), - } + result.unwrap_or_else(|e| { + let msg = format!("EVENT_DISPATCH join error: {:?}", e); + tracing::error!("{}", msg); + let error = InternalError::JoinError(msg); + error.as_response() + }) } #[cfg(not(feature = "local_set"))] @@ -272,7 +223,6 @@ impl AFPluginDispatcher { dispatch.as_ref(), request, |_| Box::pin(async {}), - &tokio::task::LocalSet::new(), )) } } diff --git a/frontend/rust-lib/lib-dispatch/tests/api/module.rs b/frontend/rust-lib/lib-dispatch/tests/api/module.rs index f7c4a9f591..27ee94a9ee 100644 --- a/frontend/rust-lib/lib-dispatch/tests/api/module.rs +++ b/frontend/rust-lib/lib-dispatch/tests/api/module.rs @@ -16,17 +16,18 @@ async fn test() { vec![AFPlugin::new().event(event, hello)], )); let request = AFPluginRequest::new(event); - let _ = AFPluginDispatcher::async_send_with_callback( - dispatch.as_ref(), - request, - |resp| { - Box::pin(async move { - dbg!(&resp); - }) - }, - &LocalSet::new(), - ) - .await; + let local_set = LocalSet::new(); + local_set + .run_until(AFPluginDispatcher::async_send_with_callback( + dispatch.as_ref(), + request, + |resp| { + Box::pin(async move { + dbg!(&resp); + }) + }, + )) + .await; std::mem::forget(dispatch); }