chore: spawn task on local set (#6012)

* chore: spawn local

* chore: using multiple thread runtime

* chore: fix test
This commit is contained in:
Nathan.fooo 2024-08-20 14:16:24 +08:00 committed by GitHub
parent faf1e98d15
commit 6d09c33782
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 114 additions and 121 deletions

View File

@ -1,11 +1,16 @@
#![allow(clippy::not_unsafe_ptr_arg_deref)]
use allo_isolate::Isolate;
use futures::ready;
use lazy_static::lazy_static;
use semver::Version;
use std::sync::{mpsc, Arc, RwLock};
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use std::{ffi::CStr, os::raw::c_char};
use tokio::runtime::Builder;
use tokio::sync::mpsc;
use tokio::task::LocalSet;
use tracing::{debug, error, info, trace, warn};
@ -42,7 +47,7 @@ pub struct Task {
dispatcher: Arc<AFPluginDispatcher>,
request: AFPluginRequest,
port: i64,
ret: Option<mpsc::Sender<DispatchFuture<AFPluginEventResponse>>>,
ret: Option<mpsc::Sender<AFPluginEventResponse>>,
}
unsafe impl Send for Task {}
@ -51,7 +56,7 @@ unsafe impl Sync for DartAppFlowyCore {}
struct DartAppFlowyCore {
core: Arc<RwLock<Option<AppFlowyCore>>>,
handle: RwLock<Option<std::thread::JoinHandle<()>>>,
sender: RwLock<Option<mpsc::Sender<Task>>>,
sender: RwLock<Option<mpsc::UnboundedSender<Task>>>,
}
impl DartAppFlowyCore {
@ -76,7 +81,7 @@ impl DartAppFlowyCore {
&self,
request: AFPluginRequest,
port: i64,
ret: Option<mpsc::Sender<DispatchFuture<AFPluginEventResponse>>>,
ret: Option<mpsc::Sender<AFPluginEventResponse>>,
) {
if let Ok(sender_guard) = self.sender.read() {
if let Err(e) = sender_guard.as_ref().unwrap().send(Task {
@ -138,32 +143,11 @@ pub extern "C" fn init_sdk(_port: i64, data: *mut c_char) -> i64 {
.unwrap()
.take()
.map(|isolate| Arc::new(LogStreamSenderImpl { isolate }) as Arc<dyn StreamLogSender>);
let (sender, task_rx) = mpsc::channel::<Task>();
let (sender, task_rx) = mpsc::unbounded_channel::<Task>();
let handle = std::thread::spawn(move || {
let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
let local_set = LocalSet::new();
while let Ok(task) = task_rx.recv() {
let Task {
dispatcher,
request,
port,
ret,
} = task;
let resp = AFPluginDispatcher::boxed_async_send_with_callback(
dispatcher.as_ref(),
request,
move |resp: AFPluginEventResponse| {
#[cfg(feature = "sync_verbose_log")]
trace!("[FFI]: Post data to dart through {} port", port);
Box::pin(post_to_flutter(resp, port))
},
&local_set,
);
if let Some(ret) = ret {
let _ = ret.send(resp);
}
}
runtime.block_on(local_set.run_until(Runner { rx: task_rx }));
});
*DART_APPFLOWY_CORE.sender.write().unwrap() = Some(sender);
@ -190,6 +174,48 @@ pub extern "C" fn async_event(port: i64, input: *const u8, len: usize) {
DART_APPFLOWY_CORE.dispatch(request, port, None);
}
/// A persistent future that processes [Arbiter] commands.
struct Runner {
rx: mpsc::UnboundedReceiver<Task>,
}
impl Future for Runner {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(self.rx.poll_recv(cx)) {
None => return Poll::Ready(()),
Some(task) => {
let Task {
dispatcher,
request,
port,
ret,
} = task;
tokio::task::spawn_local(async move {
let resp = AFPluginDispatcher::boxed_async_send_with_callback(
dispatcher.as_ref(),
request,
move |resp: AFPluginEventResponse| {
#[cfg(feature = "sync_verbose_log")]
trace!("[FFI]: Post data to dart through {} port", port);
Box::pin(post_to_flutter(resp, port))
},
)
.await;
if let Some(ret) = ret {
let _ = ret.send(resp);
}
});
},
}
}
}
}
#[no_mangle]
pub extern "C" fn sync_event(_input: *const u8, _len: usize) -> *const u8 {
error!("unimplemented sync_event");

View File

@ -11,15 +11,17 @@ use std::{
};
use tokio::task::LocalSet;
#[derive(Clone)]
// #[derive(Clone)]
pub struct EventBuilder {
context: TestContext,
local_set: LocalSet,
}
impl EventBuilder {
pub fn new(sdk: EventIntegrationTest) -> Self {
Self {
context: TestContext::new(sdk),
local_set: Default::default(),
}
}
@ -48,9 +50,14 @@ impl EventBuilder {
}
pub async fn async_send(mut self) -> Self {
let local_set = LocalSet::new();
let request = self.get_request();
let resp = AFPluginDispatcher::async_send(self.dispatch().as_ref(), request, &local_set).await;
let resp = self
.local_set
.run_until(AFPluginDispatcher::async_send(
self.dispatch().as_ref(),
request,
))
.await;
self.context.response = Some(resp);
self
}

View File

@ -5,7 +5,6 @@ use collab_document::document::Document;
use collab_entity::CollabType;
use std::env::temp_dir;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::sync::Arc;
use std::time::Duration;
@ -13,6 +12,7 @@ use std::time::Duration;
use nanoid::nanoid;
use semver::Version;
use tokio::select;
use tokio::task::LocalSet;
use tokio::time::sleep;
use flowy_core::config::AppFlowyCoreConfig;
@ -40,6 +40,7 @@ pub struct EventIntegrationTest {
#[allow(dead_code)]
cleaner: Arc<Cleaner>,
pub notification_sender: TestNotificationSender,
local_set: Arc<LocalSet>,
}
impl EventIntegrationTest {
@ -67,6 +68,7 @@ impl EventIntegrationTest {
authenticator,
notification_sender,
cleaner: Arc::new(Cleaner::new(PathBuf::from(clean_path))),
local_set: Arc::new(Default::default()),
}
}

View File

@ -4,12 +4,9 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use bytes::Bytes;
use flowy_folder::entities::{RepeatedViewPB, WorkspacePB};
use protobuf::ProtobufError;
use tokio::sync::broadcast::{channel, Sender};
use tokio::task::LocalSet;
use tracing::error;
use uuid::Uuid;
@ -74,12 +71,16 @@ impl EventIntegrationTest {
.unwrap();
let request = AFPluginRequest::new(UserEvent::SignUp).payload(payload);
let user_profile =
AFPluginDispatcher::async_send(&self.appflowy_core.dispatcher(), request, &LocalSet::new())
.await
.parse::<UserProfilePB, FlowyError>()
.unwrap()
.unwrap();
let user_profile = self
.local_set
.run_until(AFPluginDispatcher::async_send(
&self.appflowy_core.dispatcher(),
request,
))
.await
.parse::<UserProfilePB, FlowyError>()
.unwrap()
.unwrap();
// let _ = create_default_workspace_if_need(dispatch.clone(), &user_profile.id);
SignUpContext {

View File

@ -1,7 +1,6 @@
use std::sync::{Arc, Weak};
use collab_database::rows::RowId;
use lib_infra::box_any::BoxAny;
use std::sync::{Arc, Weak};
use tokio::sync::oneshot;
use tracing::{error, trace};

View File

@ -1309,7 +1309,7 @@ impl DatabaseEditor {
.get_all_field_orders()
.into_iter()
.map(FieldIdPB::from)
.collect();
.collect::<Vec<_>>();
let is_linked = database.is_inline_view(view_id);
(database_id, fields, is_linked)
};
@ -1318,6 +1318,13 @@ impl DatabaseEditor {
.into_iter()
.map(|detail| RowMetaPB::from(detail.as_ref()))
.collect::<Vec<RowMetaPB>>();
trace!(
"database: {}, num fields: {}, num row: {}",
database_id,
fields.len(),
rows.len()
);
Ok(DatabasePB {
id: database_id,
fields,

View File

@ -84,75 +84,31 @@ impl AFPluginDispatcher {
}
#[cfg(feature = "local_set")]
pub async fn async_send<Req>(
dispatch: &AFPluginDispatcher,
request: Req,
local_set: &tokio::task::LocalSet,
) -> AFPluginEventResponse
pub async fn async_send<Req>(dispatch: &AFPluginDispatcher, request: Req) -> AFPluginEventResponse
where
Req: Into<AFPluginRequest>,
Req: Into<AFPluginRequest> + 'static,
{
AFPluginDispatcher::async_send_with_callback(
dispatch,
request,
|_| Box::pin(async {}),
local_set,
)
.await
AFPluginDispatcher::async_send_with_callback(dispatch, request, |_| Box::pin(async {})).await
}
#[cfg(feature = "local_set")]
pub async fn async_send_with_callback<Req, Callback>(
dispatch: &AFPluginDispatcher,
request: Req,
callback: Callback,
local_set: &tokio::task::LocalSet,
) -> AFPluginEventResponse
where
Req: Into<AFPluginRequest>,
Req: Into<AFPluginRequest> + 'static,
Callback: FnOnce(AFPluginEventResponse) -> AFBoxFuture<'static, ()> + AFConcurrent + 'static,
{
let request: AFPluginRequest = request.into();
let plugins = dispatch.plugins.clone();
let service = Box::new(DispatchService { plugins });
tracing::trace!("Async event: {:?}", &request.event);
let service_ctx = DispatchContext {
request,
callback: Some(Box::new(callback)),
};
let handle = local_set.spawn_local(async move {
service.call(service_ctx).await.unwrap_or_else(|e| {
tracing::error!("Dispatch runtime error: {:?}", e);
InternalError::Other(format!("{:?}", e)).as_response()
})
});
// let handle = tokio::spawn(async move {
// service.call(service_ctx).await.unwrap_or_else(|e| {
// tracing::error!("Dispatch runtime error: {:?}", e);
// InternalError::Other(format!("{:?}", e)).as_response()
// })
// });
let result: Result<AFPluginEventResponse, DispatchError> = local_set
.run_until(handle)
.await
.map_err(|e| e.to_string().into());
result.unwrap_or_else(|e| {
let msg = format!("EVENT_DISPATCH join error: {:?}", e);
tracing::error!("{}", msg);
let error = InternalError::JoinError(msg);
error.as_response()
})
Self::boxed_async_send_with_callback(dispatch, request, callback).await
}
#[cfg(feature = "local_set")]
pub fn boxed_async_send_with_callback<Req, Callback>(
pub async fn boxed_async_send_with_callback<Req, Callback>(
dispatch: &AFPluginDispatcher,
request: Req,
callback: Callback,
local_set: &tokio::task::LocalSet,
) -> DispatchFuture<AFPluginEventResponse>
) -> AFPluginEventResponse
where
Req: Into<AFPluginRequest> + 'static,
Callback: FnOnce(AFPluginEventResponse) -> AFBoxFuture<'static, ()> + AFConcurrent + 'static,
@ -166,25 +122,20 @@ impl AFPluginDispatcher {
callback: Some(Box::new(callback)),
};
let handle = local_set.spawn_local(async move {
let result = tokio::task::spawn_local(async move {
service.call(service_ctx).await.unwrap_or_else(|e| {
tracing::error!("Dispatch runtime error: {:?}", e);
InternalError::Other(format!("{:?}", e)).as_response()
})
});
})
.await;
let fut = local_set.run_until(handle);
let result = local_set.block_on(&dispatch.runtime.inner, fut);
DispatchFuture {
fut: Box::pin(async move {
result.unwrap_or_else(|e| {
let msg = format!("EVENT_DISPATCH join error: {:?}", e);
tracing::error!("{}", msg);
let error = InternalError::JoinError(msg);
error.as_response()
})
}),
}
result.unwrap_or_else(|e| {
let msg = format!("EVENT_DISPATCH join error: {:?}", e);
tracing::error!("{}", msg);
let error = InternalError::JoinError(msg);
error.as_response()
})
}
#[cfg(not(feature = "local_set"))]
@ -272,7 +223,6 @@ impl AFPluginDispatcher {
dispatch.as_ref(),
request,
|_| Box::pin(async {}),
&tokio::task::LocalSet::new(),
))
}
}

View File

@ -16,17 +16,18 @@ async fn test() {
vec![AFPlugin::new().event(event, hello)],
));
let request = AFPluginRequest::new(event);
let _ = AFPluginDispatcher::async_send_with_callback(
dispatch.as_ref(),
request,
|resp| {
Box::pin(async move {
dbg!(&resp);
})
},
&LocalSet::new(),
)
.await;
let local_set = LocalSet::new();
local_set
.run_until(AFPluginDispatcher::async_send_with_callback(
dispatch.as_ref(),
request,
|resp| {
Box::pin(async move {
dbg!(&resp);
})
},
))
.await;
std::mem::forget(dispatch);
}