diff --git a/frontend/appflowy_tauri/src-tauri/src/init.rs b/frontend/appflowy_tauri/src-tauri/src/init.rs index 7591ba37ff..636735e5f4 100644 --- a/frontend/appflowy_tauri/src-tauri/src/init.rs +++ b/frontend/appflowy_tauri/src-tauri/src/init.rs @@ -1,7 +1,7 @@ use flowy_core::config::AppFlowyCoreConfig; -use flowy_core::{AppFlowyCore, DEFAULT_NAME}; +use flowy_core::{AppFlowyCore, MutexAppFlowyCore, DEFAULT_NAME}; use lib_dispatch::runtime::AFPluginRuntime; -use std::sync::Arc; +use std::rc::Rc; use dotenv::dotenv; @@ -25,7 +25,7 @@ pub fn read_env() { } } -pub fn init_flowy_core() -> AppFlowyCore { +pub fn init_flowy_core() -> MutexAppFlowyCore { let config_json = include_str!("../tauri.conf.json"); let config: tauri_utils::config::Config = serde_json::from_str(config_json).unwrap(); @@ -35,7 +35,8 @@ pub fn init_flowy_core() -> AppFlowyCore { .clone() .map(|v| v.to_string()) .unwrap_or_else(|| "0.5.8".to_string()); - let app_version = semver::Version::parse(&app_version).unwrap_or_else(|_| semver::Version::new(0, 5, 8)); + let app_version = + semver::Version::parse(&app_version).unwrap_or_else(|_| semver::Version::new(0, 5, 8)); let mut data_path = tauri::api::path::app_local_data_dir(&config).unwrap(); if cfg!(debug_assertions) { data_path.push("data_dev"); @@ -60,7 +61,9 @@ pub fn init_flowy_core() -> AppFlowyCore { ) .log_filter("trace", vec!["appflowy_tauri".to_string()]); - let runtime = Arc::new(AFPluginRuntime::new().unwrap()); + let runtime = Rc::new(AFPluginRuntime::new().unwrap()); let cloned_runtime = runtime.clone(); - runtime.block_on(async move { AppFlowyCore::new(config, cloned_runtime, None).await }) + runtime.block_on(async move { + MutexAppFlowyCore::new(AppFlowyCore::new(config, cloned_runtime, None).await) + }) } diff --git a/frontend/appflowy_tauri/src-tauri/src/request.rs b/frontend/appflowy_tauri/src-tauri/src/request.rs index 029e71c18c..6d2d01fb6e 100644 --- a/frontend/appflowy_tauri/src-tauri/src/request.rs +++ b/frontend/appflowy_tauri/src-tauri/src/request.rs @@ -1,4 +1,4 @@ -use flowy_core::AppFlowyCore; +use flowy_core::MutexAppFlowyCore; use lib_dispatch::prelude::{ AFPluginDispatcher, AFPluginEventResponse, AFPluginRequest, StatusCode, }; @@ -38,8 +38,8 @@ pub async fn invoke_request( app_handler: AppHandle, ) -> AFTauriResponse { let request: AFPluginRequest = request.into(); - let state: State = app_handler.state(); - let dispatcher = state.inner().dispatcher(); - let response = AFPluginDispatcher::async_send(dispatcher.as_ref(), request).await; + let state: State = app_handler.state(); + let dispatcher = state.0.lock().dispatcher(); + let response = AFPluginDispatcher::sync_send(dispatcher, request); response.into() } diff --git a/frontend/appflowy_web_app/src-tauri/src/init.rs b/frontend/appflowy_web_app/src-tauri/src/init.rs index 42c857abdf..636735e5f4 100644 --- a/frontend/appflowy_web_app/src-tauri/src/init.rs +++ b/frontend/appflowy_web_app/src-tauri/src/init.rs @@ -1,7 +1,7 @@ use flowy_core::config::AppFlowyCoreConfig; -use flowy_core::{AppFlowyCore, DEFAULT_NAME}; +use flowy_core::{AppFlowyCore, MutexAppFlowyCore, DEFAULT_NAME}; use lib_dispatch::runtime::AFPluginRuntime; -use std::sync::Arc; +use std::rc::Rc; use dotenv::dotenv; @@ -9,28 +9,34 @@ pub fn read_env() { dotenv().ok(); let env = if cfg!(debug_assertions) { - include_str!("../env.development") + include_str!("../env.development") } else { - include_str!("../env.production") + include_str!("../env.production") }; for line in env.lines() { - if let Some((key, value)) = line.split_once('=') { - // Check if the environment variable is not already set in the system - let current_value = std::env::var(key).unwrap_or_default(); - if current_value.is_empty() { - std::env::set_var(key, value); - } + if let Some((key, value)) = line.split_once('=') { + // Check if the environment variable is not already set in the system + let current_value = std::env::var(key).unwrap_or_default(); + if current_value.is_empty() { + std::env::set_var(key, value); } + } } } -pub fn init_flowy_core() -> AppFlowyCore { +pub fn init_flowy_core() -> MutexAppFlowyCore { let config_json = include_str!("../tauri.conf.json"); let config: tauri_utils::config::Config = serde_json::from_str(config_json).unwrap(); - let app_version = config.package.version.clone().map(|v| v.to_string()).unwrap_or_else(|| "0.0.0".to_string()); - let app_version = semver::Version::parse(&app_version).unwrap_or_else(|_| semver::Version::new(0, 5, 8)); + let app_version = config + .package + .version + .clone() + .map(|v| v.to_string()) + .unwrap_or_else(|| "0.5.8".to_string()); + let app_version = + semver::Version::parse(&app_version).unwrap_or_else(|_| semver::Version::new(0, 5, 8)); let mut data_path = tauri::api::path::app_local_data_dir(&config).unwrap(); if cfg!(debug_assertions) { data_path.push("data_dev"); @@ -50,12 +56,14 @@ pub fn init_flowy_core() -> AppFlowyCore { custom_application_path, application_path, device_id, - "web".to_string(), + "tauri".to_string(), DEFAULT_NAME.to_string(), ) .log_filter("trace", vec!["appflowy_tauri".to_string()]); - let runtime = Arc::new(AFPluginRuntime::new().unwrap()); + let runtime = Rc::new(AFPluginRuntime::new().unwrap()); let cloned_runtime = runtime.clone(); - runtime.block_on(async move { AppFlowyCore::new(config, cloned_runtime, None).await }) + runtime.block_on(async move { + MutexAppFlowyCore::new(AppFlowyCore::new(config, cloned_runtime, None).await) + }) } diff --git a/frontend/appflowy_web_app/src-tauri/src/request.rs b/frontend/appflowy_web_app/src-tauri/src/request.rs index 029e71c18c..0ec6a8dadc 100644 --- a/frontend/appflowy_web_app/src-tauri/src/request.rs +++ b/frontend/appflowy_web_app/src-tauri/src/request.rs @@ -1,4 +1,4 @@ -use flowy_core::AppFlowyCore; +use flowy_core::{AppFlowyCore, MutexAppFlowyCore}; use lib_dispatch::prelude::{ AFPluginDispatcher, AFPluginEventResponse, AFPluginRequest, StatusCode, }; @@ -38,8 +38,8 @@ pub async fn invoke_request( app_handler: AppHandle, ) -> AFTauriResponse { let request: AFPluginRequest = request.into(); - let state: State = app_handler.state(); - let dispatcher = state.inner().dispatcher(); - let response = AFPluginDispatcher::async_send(dispatcher.as_ref(), request).await; + let state: State = app_handler.state(); + let dispatcher = state.0.lock().dispatcher(); + let response = AFPluginDispatcher::sync_send(dispatcher, request); response.into() } diff --git a/frontend/rust-lib/dart-ffi/Cargo.toml b/frontend/rust-lib/dart-ffi/Cargo.toml index bca0489e7b..22e07f3483 100644 --- a/frontend/rust-lib/dart-ffi/Cargo.toml +++ b/frontend/rust-lib/dart-ffi/Cargo.toml @@ -28,7 +28,7 @@ lib-log.workspace = true semver = "1.0.22" # workspace -lib-dispatch = { workspace = true } +lib-dispatch = { workspace = true, features = ["local_set"] } # Core #flowy-core = { workspace = true, features = ["profiling"] } diff --git a/frontend/rust-lib/dart-ffi/src/lib.rs b/frontend/rust-lib/dart-ffi/src/lib.rs index aecd9bef28..14b5a13a24 100644 --- a/frontend/rust-lib/dart-ffi/src/lib.rs +++ b/frontend/rust-lib/dart-ffi/src/lib.rs @@ -4,6 +4,7 @@ use allo_isolate::Isolate; use lazy_static::lazy_static; use parking_lot::Mutex; use semver::Version; +use std::rc::Rc; use std::sync::Arc; use std::{ffi::CStr, os::raw::c_char}; use tracing::{debug, error, info, trace, warn}; @@ -37,14 +38,14 @@ lazy_static! { static ref LOG_STREAM_ISOLATE: Mutex> = Mutex::new(None); } -struct MutexAppFlowyCore(Arc>>); +struct MutexAppFlowyCore(Rc>>); impl MutexAppFlowyCore { fn new() -> Self { - Self(Arc::new(Mutex::new(None))) + Self(Rc::new(Mutex::new(None))) } - fn dispatcher(&self) -> Option> { + fn dispatcher(&self) -> Option> { let binding = self.0.lock(); let core = binding.as_ref(); core.map(|core| core.event_dispatcher.clone()) @@ -90,7 +91,7 @@ pub extern "C" fn init_sdk(_port: i64, data: *mut c_char) -> i64 { core.close_db(); } - let runtime = Arc::new(AFPluginRuntime::new().unwrap()); + let runtime = Rc::new(AFPluginRuntime::new().unwrap()); let cloned_runtime = runtime.clone(); let log_stream = LOG_STREAM_ISOLATE diff --git a/frontend/rust-lib/event-integration-test/src/event_builder.rs b/frontend/rust-lib/event-integration-test/src/event_builder.rs index 0d083b1037..5168723981 100644 --- a/frontend/rust-lib/event-integration-test/src/event_builder.rs +++ b/frontend/rust-lib/event-integration-test/src/event_builder.rs @@ -1,13 +1,12 @@ +use flowy_user::errors::{internal_error, FlowyError}; +use lib_dispatch::prelude::{ + AFPluginDispatcher, AFPluginEventResponse, AFPluginFromBytes, AFPluginRequest, ToBytes, *, +}; +use std::rc::Rc; use std::{ convert::TryFrom, fmt::{Debug, Display}, hash::Hash, - sync::Arc, -}; - -use flowy_user::errors::{internal_error, FlowyError}; -use lib_dispatch::prelude::{ - AFPluginDispatcher, AFPluginEventResponse, AFPluginFromBytes, AFPluginRequest, ToBytes, *, }; use crate::EventIntegrationTest; @@ -86,7 +85,7 @@ impl EventBuilder { .map(|data| data.into_inner()) } - fn dispatch(&self) -> Arc { + fn dispatch(&self) -> Rc { self.context.sdk.dispatcher() } diff --git a/frontend/rust-lib/event-integration-test/src/lib.rs b/frontend/rust-lib/event-integration-test/src/lib.rs index cd2a01d84f..e368c4168c 100644 --- a/frontend/rust-lib/event-integration-test/src/lib.rs +++ b/frontend/rust-lib/event-integration-test/src/lib.rs @@ -5,6 +5,7 @@ use collab_document::document::Document; use collab_entity::CollabType; use std::env::temp_dir; use std::path::PathBuf; +use std::rc::Rc; use std::sync::Arc; use std::time::Duration; @@ -163,13 +164,9 @@ pub fn document_from_document_doc_state(doc_id: &str, doc_state: Vec) -> Doc } async fn init_core(config: AppFlowyCoreConfig) -> AppFlowyCore { - std::thread::spawn(|| { - let runtime = Arc::new(AFPluginRuntime::new().unwrap()); - let cloned_runtime = runtime.clone(); - runtime.block_on(async move { AppFlowyCore::new(config, cloned_runtime, None).await }) - }) - .join() - .unwrap() + let runtime = Rc::new(AFPluginRuntime::new().unwrap()); + let cloned_runtime = runtime.clone(); + AppFlowyCore::new(config, cloned_runtime, None).await } impl std::ops::Deref for EventIntegrationTest { diff --git a/frontend/rust-lib/event-integration-test/tests/document/af_cloud_test/file_upload_test.rs b/frontend/rust-lib/event-integration-test/tests/document/af_cloud_test/file_upload_test.rs index cf0050341e..a2ab2d1245 100644 --- a/frontend/rust-lib/event-integration-test/tests/document/af_cloud_test/file_upload_test.rs +++ b/frontend/rust-lib/event-integration-test/tests/document/af_cloud_test/file_upload_test.rs @@ -94,30 +94,24 @@ async fn af_cloud_upload_6_files_test() { // Wait for all uploads to finish let uploads = Arc::new(Mutex::new(created_uploads)); let mut handles = vec![]; + for mut receiver in receivers { let cloned_uploads = uploads.clone(); - let cloned_test = test.clone(); + let state = test.storage_manager.get_file_state(&receiver.file_id).await; let handle = tokio::spawn(async move { - if let Some(state) = cloned_test - .storage_manager - .get_file_state(&receiver.file_id) - .await - { - if let FileUploadState::Finished { file_id } = state { + if let Some(FileUploadState::Finished { file_id }) = state { + cloned_uploads + .lock() + .await + .retain(|upload| upload.file_id != file_id); + } + while let Some(value) = receiver.recv().await { + if let FileUploadState::Finished { file_id } = value { cloned_uploads .lock() .await .retain(|upload| upload.file_id != file_id); - } - } else { - while let Some(value) = receiver.recv().await { - if let FileUploadState::Finished { file_id } = value { - cloned_uploads - .lock() - .await - .retain(|upload| upload.file_id != file_id); - break; - } + break; } } }); diff --git a/frontend/rust-lib/event-integration-test/tests/folder/local_test/subscription_test.rs b/frontend/rust-lib/event-integration-test/tests/folder/local_test/subscription_test.rs index 089bbae7ba..c9460d9db0 100644 --- a/frontend/rust-lib/event-integration-test/tests/folder/local_test/subscription_test.rs +++ b/frontend/rust-lib/event-integration-test/tests/folder/local_test/subscription_test.rs @@ -24,11 +24,9 @@ async fn create_child_view_in_workspace_subscription_test() { let cloned_test = test.clone(); let cloned_workspace_id = workspace.id.clone(); - test.appflowy_core.dispatcher().spawn(async move { - cloned_test - .create_view(&cloned_workspace_id, "workspace child view".to_string()) - .await; - }); + cloned_test + .create_view(&cloned_workspace_id, "workspace child view".to_string()) + .await; let views = receive_with_timeout(rx, Duration::from_secs(30)) .await @@ -50,14 +48,17 @@ async fn create_child_view_in_view_subscription_test() { let cloned_test = test.clone(); let child_view_id = workspace_child_view.id.clone(); - test.appflowy_core.dispatcher().spawn(async move { - cloned_test - .create_view( - &child_view_id, - "workspace child view's child view".to_string(), - ) - .await; - }); + let local_set = tokio::task::LocalSet::new(); + local_set + .run_until(async move { + cloned_test + .create_view( + &child_view_id, + "workspace child view's child view".to_string(), + ) + .await; + }) + .await; let update = receive_with_timeout(rx, Duration::from_secs(30)) .await @@ -81,22 +82,11 @@ async fn delete_view_subscription_test() { let cloned_test = test.clone(); let delete_view_id = workspace.views.first().unwrap().id.clone(); let cloned_delete_view_id = delete_view_id.clone(); - test - .appflowy_core - .dispatcher() - .spawn(async move { - cloned_test.delete_view(&cloned_delete_view_id).await; - }) + + cloned_test.delete_view(&cloned_delete_view_id).await; + let update = receive_with_timeout(rx, Duration::from_secs(60)) .await .unwrap(); - - let update = test - .appflowy_core - .dispatcher() - .run_until(receive_with_timeout(rx, Duration::from_secs(60))) - .await - .unwrap(); - assert_eq!(update.delete_child_views.len(), 1); assert_eq!(update.delete_child_views[0], delete_view_id); } @@ -114,17 +104,14 @@ async fn update_view_subscription_test() { assert!(!view.is_favorite); let update_view_id = view.id.clone(); - test.appflowy_core.dispatcher().spawn(async move { - cloned_test - .update_view(UpdateViewPayloadPB { - view_id: update_view_id, - name: Some("hello world".to_string()), - is_favorite: Some(true), - ..Default::default() - }) - .await; - }); - + cloned_test + .update_view(UpdateViewPayloadPB { + view_id: update_view_id, + name: Some("hello world".to_string()), + is_favorite: Some(true), + ..Default::default() + }) + .await; let update = receive_with_timeout(rx, Duration::from_secs(30)) .await .unwrap(); diff --git a/frontend/rust-lib/event-integration-test/tests/user/af_cloud_test/workspace_test.rs b/frontend/rust-lib/event-integration-test/tests/user/af_cloud_test/workspace_test.rs index b72ceba33f..c35224ea99 100644 --- a/frontend/rust-lib/event-integration-test/tests/user/af_cloud_test/workspace_test.rs +++ b/frontend/rust-lib/event-integration-test/tests/user/af_cloud_test/workspace_test.rs @@ -5,6 +5,7 @@ use collab_folder::Folder; use event_integration_test::user_event::user_localhost_af_cloud; use event_integration_test::EventIntegrationTest; use std::time::Duration; +use tokio::task::LocalSet; use tokio::time::sleep; use crate::user::af_cloud_test::util::get_synced_workspaces; @@ -158,9 +159,9 @@ async fn af_cloud_different_open_same_workspace_test() { user_localhost_af_cloud().await; // Set up the primary client and sign them up to the cloud. - let client_1 = EventIntegrationTest::new().await; - let owner_profile = client_1.af_cloud_sign_up().await; - let shared_workspace_id = client_1.get_current_workspace().await.id.clone(); + let test_runner = EventIntegrationTest::new().await; + let owner_profile = test_runner.af_cloud_sign_up().await; + let shared_workspace_id = test_runner.get_current_workspace().await.id.clone(); // Verify that the workspace ID from the profile matches the current session's workspace ID. assert_eq!(shared_workspace_id, owner_profile.workspace_id); @@ -181,7 +182,7 @@ async fn af_cloud_different_open_same_workspace_test() { client.delete_view(&view.id).await; } - client_1 + test_runner .add_workspace_member(&owner_profile.workspace_id, &client) .await; clients.push((client, client_profile)); @@ -195,9 +196,10 @@ async fn af_cloud_different_open_same_workspace_test() { // Simulate each client open different workspace 30 times let mut handles = vec![]; + let local_set = LocalSet::new(); for client in clients.clone() { let cloned_shared_workspace_id = shared_workspace_id.clone(); - let handle = tokio::spawn(async move { + let handle = local_set.spawn_local(async move { let (client, profile) = client; let all_workspaces = get_synced_workspaces(&client, profile.id).await; for i in 0..30 { @@ -216,10 +218,16 @@ async fn af_cloud_different_open_same_workspace_test() { }); handles.push(handle); } - futures::future::join_all(handles).await; + let results = local_set + .run_until(futures::future::join_all(handles)) + .await; + + for result in results { + assert!(result.is_ok()); + } // Retrieve and verify the collaborative document state for Client 1's workspace. - let doc_state = client_1 + let doc_state = test_runner .get_collab_doc_state(&shared_workspace_id, CollabType::Folder) .await .unwrap(); diff --git a/frontend/rust-lib/flowy-ai/src/event_handler.rs b/frontend/rust-lib/flowy-ai/src/event_handler.rs index 21f14070f4..3ec6eeb660 100644 --- a/frontend/rust-lib/flowy-ai/src/event_handler.rs +++ b/frontend/rust-lib/flowy-ai/src/event_handler.rs @@ -63,24 +63,18 @@ pub(crate) async fn stream_chat_message_handler( .collect::>(); trace!("Stream chat message with metadata: {:?}", metadata); - let (tx, rx) = oneshot::channel::>(); let ai_manager = upgrade_ai_manager(ai_manager)?; - tokio::spawn(async move { - let result = ai_manager - .stream_chat_message( - &data.chat_id, - &data.message, - message_type, - data.answer_stream_port, - data.question_stream_port, - metadata, - ) - .await; - let _ = tx.send(result); - }); - - let question = rx.await??; - data_result_ok(question) + let result = ai_manager + .stream_chat_message( + &data.chat_id, + &data.message, + message_type, + data.answer_stream_port, + data.question_stream_port, + metadata, + ) + .await?; + data_result_ok(result) } #[tracing::instrument(level = "debug", skip_all, err)] diff --git a/frontend/rust-lib/flowy-ai/src/local_ai/local_llm_resource.rs b/frontend/rust-lib/flowy-ai/src/local_ai/local_llm_resource.rs index 5528a00ac5..457322a111 100644 --- a/frontend/rust-lib/flowy-ai/src/local_ai/local_llm_resource.rs +++ b/frontend/rust-lib/flowy-ai/src/local_ai/local_llm_resource.rs @@ -288,7 +288,7 @@ impl LocalAIResourceController { while let Ok(value) = rx.recv().await { let is_finish = value == DOWNLOAD_FINISH; if let Err(err) = progress_sink.send(value).await { - error!("Failed to send progress: {:?}", err); + warn!("Failed to send progress: {:?}", err); break; } diff --git a/frontend/rust-lib/flowy-core/src/lib.rs b/frontend/rust-lib/flowy-core/src/lib.rs index 761708bfe5..ae5b1d801d 100644 --- a/frontend/rust-lib/flowy-core/src/lib.rs +++ b/frontend/rust-lib/flowy-core/src/lib.rs @@ -2,6 +2,8 @@ use flowy_search::folder::indexer::FolderIndexManagerImpl; use flowy_search::services::manager::SearchManager; +use parking_lot::Mutex; +use std::rc::Rc; use std::sync::{Arc, Weak}; use std::time::Duration; use sysinfo::System; @@ -54,7 +56,7 @@ pub struct AppFlowyCore { pub document_manager: Arc, pub folder_manager: Arc, pub database_manager: Arc, - pub event_dispatcher: Arc, + pub event_dispatcher: Rc, pub server_provider: Arc, pub task_dispatcher: Arc>, pub store_preference: Arc, @@ -66,7 +68,7 @@ pub struct AppFlowyCore { impl AppFlowyCore { pub async fn new( config: AppFlowyCoreConfig, - runtime: Arc, + runtime: Rc, stream_log_sender: Option>, ) -> Self { let platform = OperatingSystem::from(&config.platform); @@ -102,7 +104,7 @@ impl AppFlowyCore { } #[instrument(skip(config, runtime))] - async fn init(config: AppFlowyCoreConfig, runtime: Arc) -> Self { + async fn init(config: AppFlowyCoreConfig, runtime: Rc) -> Self { // Init the key value database let store_preference = Arc::new(KVStorePreferences::new(&config.storage_path).unwrap()); info!("🔥{:?}", &config); @@ -261,7 +263,7 @@ impl AppFlowyCore { error!("Init user failed: {}", err) } } - let event_dispatcher = Arc::new(AFPluginDispatcher::new( + let event_dispatcher = Rc::new(AFPluginDispatcher::new( runtime, make_plugins( Arc::downgrade(&folder_manager), @@ -290,7 +292,7 @@ impl AppFlowyCore { } /// Only expose the dispatcher in test - pub fn dispatcher(&self) -> Arc { + pub fn dispatcher(&self) -> Rc { self.event_dispatcher.clone() } } @@ -321,3 +323,13 @@ impl ServerUser for ServerUserImpl { self.upgrade_user()?.workspace_id() } } + +pub struct MutexAppFlowyCore(pub Rc>); + +impl MutexAppFlowyCore { + pub fn new(appflowy_core: AppFlowyCore) -> Self { + Self(Rc::new(Mutex::new(appflowy_core))) + } +} +unsafe impl Sync for MutexAppFlowyCore {} +unsafe impl Send for MutexAppFlowyCore {} diff --git a/frontend/rust-lib/lib-dispatch/Cargo.toml b/frontend/rust-lib/lib-dispatch/Cargo.toml index f81eb2d084..0d835915c7 100644 --- a/frontend/rust-lib/lib-dispatch/Cargo.toml +++ b/frontend/rust-lib/lib-dispatch/Cargo.toml @@ -42,7 +42,7 @@ tokio = { workspace = true, features = ["rt"] } futures-util = "0.3.26" [features] -default = ["use_protobuf"] +default = ["local_set", "use_protobuf"] use_serde = ["bincode", "serde_json", "serde", "serde_repr"] use_protobuf = ["protobuf"] local_set = [] diff --git a/frontend/rust-lib/lib-dispatch/src/dispatcher.rs b/frontend/rust-lib/lib-dispatch/src/dispatcher.rs index eb55bfc4fa..e3e72ff2be 100644 --- a/frontend/rust-lib/lib-dispatch/src/dispatcher.rs +++ b/frontend/rust-lib/lib-dispatch/src/dispatcher.rs @@ -1,10 +1,10 @@ -use std::any::Any; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::{future::Future, sync::Arc}; - use derivative::*; use pin_project::pin_project; +use std::any::Any; +use std::future::Future; +use std::pin::Pin; +use std::rc::Rc; +use std::task::{Context, Poll}; use tracing::event; use crate::module::AFPluginStateMap; @@ -16,60 +16,50 @@ use crate::{ service::{AFPluginServiceFactory, Service}, }; -#[cfg(any(target_arch = "wasm32", feature = "local_set"))] +#[cfg(feature = "local_set")] pub trait AFConcurrent {} -#[cfg(any(target_arch = "wasm32", feature = "local_set"))] +#[cfg(feature = "local_set")] impl AFConcurrent for T where T: ?Sized {} -#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] +#[cfg(not(feature = "local_set"))] pub trait AFConcurrent: Send + Sync {} -#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] +#[cfg(not(feature = "local_set"))] impl AFConcurrent for T where T: Send + Sync {} -#[cfg(any(target_arch = "wasm32", feature = "local_set"))] +#[cfg(feature = "local_set")] pub type AFBoxFuture<'a, T> = futures_core::future::LocalBoxFuture<'a, T>; -#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] +#[cfg(not(feature = "local_set"))] pub type AFBoxFuture<'a, T> = futures_core::future::BoxFuture<'a, T>; pub type AFStateMap = std::sync::Arc; -#[cfg(any(target_arch = "wasm32", feature = "local_set"))] +#[cfg(feature = "local_set")] pub(crate) fn downcast_owned(boxed: AFBox) -> Option { boxed.downcast().ok().map(|boxed| *boxed) } -#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] +#[cfg(not(feature = "local_set"))] pub(crate) fn downcast_owned(boxed: AFBox) -> Option { boxed.downcast().ok().map(|boxed| *boxed) } -#[cfg(any(target_arch = "wasm32", feature = "local_set"))] +#[cfg(feature = "local_set")] pub(crate) type AFBox = Box; -#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] +#[cfg(not(feature = "local_set"))] pub(crate) type AFBox = Box; -#[cfg(any(target_arch = "wasm32", feature = "local_set"))] +#[cfg(feature = "local_set")] pub type BoxFutureCallback = Box AFBoxFuture<'static, ()> + 'static>; -#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] +#[cfg(not(feature = "local_set"))] pub type BoxFutureCallback = Box AFBoxFuture<'static, ()> + Send + Sync + 'static>; -#[cfg(any(target_arch = "wasm32", feature = "local_set"))] -pub fn af_spawn(future: T) -> tokio::task::JoinHandle -where - T: Future + 'static, - T::Output: 'static, -{ - tokio::task::spawn_local(future) -} - -#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] pub fn af_spawn(future: T) -> tokio::task::JoinHandle where T: Future + Send + 'static, @@ -80,11 +70,11 @@ where pub struct AFPluginDispatcher { plugins: AFPluginMap, - runtime: Arc, + runtime: Rc, } impl AFPluginDispatcher { - pub fn new(runtime: Arc, plugins: Vec) -> AFPluginDispatcher { + pub fn new(runtime: Rc, plugins: Vec) -> AFPluginDispatcher { tracing::trace!("{}", plugin_info(&plugins)); AFPluginDispatcher { plugins: plugin_map_or_crash(plugins), @@ -126,14 +116,37 @@ impl AFPluginDispatcher { // The provided future will start running in the background immediately // when `spawn` is called, even if you don't await the returned // `JoinHandle`. - let handle = dispatch.runtime.spawn(async move { - service.call(service_ctx).await.unwrap_or_else(|e| { - tracing::error!("Dispatch runtime error: {:?}", e); - InternalError::Other(format!("{:?}", e)).as_response() - }) - }); + let result: Result; + #[cfg(feature = "local_set")] + { + let handle = dispatch.runtime.local.spawn_local(async move { + service.call(service_ctx).await.unwrap_or_else(|e| { + tracing::error!("Dispatch runtime error: {:?}", e); + InternalError::Other(format!("{:?}", e)).as_response() + }) + }); + + result = dispatch + .runtime + .local + .run_until(handle) + .await + .map_err(|e| e.to_string().into()) + } + + #[cfg(not(feature = "local_set"))] + { + result = dispatch + .runtime + .spawn(async move { + service.call(service_ctx).await.unwrap_or_else(|e| { + tracing::error!("Dispatch runtime error: {:?}", e); + InternalError::Other(format!("{:?}", e)).as_response() + }) + }) + .await; + } - let result = dispatch.runtime.run_until(handle).await; result.unwrap_or_else(|e| { let msg = format!("EVENT_DISPATCH join error: {:?}", e); tracing::error!("{}", msg); @@ -170,16 +183,17 @@ impl AFPluginDispatcher { callback: Some(Box::new(callback)), }; - let handle = dispatch.runtime.spawn(async move { - service.call(service_ctx).await.unwrap_or_else(|e| { - tracing::error!("[dispatch]: runtime error: {:?}", e); - InternalError::Other(format!("{:?}", e)).as_response() - }) - }); - - #[cfg(any(target_arch = "wasm32", feature = "local_set"))] + #[cfg(feature = "local_set")] { - let result = dispatch.runtime.block_on(handle); + let handle = dispatch.runtime.local.spawn_local(async move { + service.call(service_ctx).await.unwrap_or_else(|e| { + tracing::error!("Dispatch runtime error: {:?}", e); + InternalError::Other(format!("{:?}", e)).as_response() + }) + }); + + let fut = dispatch.runtime.local.run_until(handle); + let result = dispatch.runtime.block_on(fut); DispatchFuture { fut: Box::pin(async move { result.unwrap_or_else(|e| { @@ -192,8 +206,18 @@ impl AFPluginDispatcher { } } - #[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] + #[cfg(not(feature = "local_set"))] { + let handle = dispatch.runtime.spawn(async move { + service + .call(crate::service::service::Service) + .await + .unwrap_or_else(|e| { + tracing::error!("[dispatch]: runtime error: {:?}", e); + InternalError::Other(format!("{:?}", e)).as_response() + }) + }); + let runtime = dispatch.runtime.clone(); DispatchFuture { fut: Box::pin(async move { @@ -211,7 +235,7 @@ impl AFPluginDispatcher { #[cfg(not(target_arch = "wasm32"))] pub fn sync_send( - dispatch: Arc, + dispatch: Rc, request: AFPluginRequest, ) -> AFPluginEventResponse { futures::executor::block_on(AFPluginDispatcher::async_send_with_callback( @@ -221,16 +245,6 @@ impl AFPluginDispatcher { )) } - #[cfg(any(target_arch = "wasm32", feature = "local_set"))] - #[track_caller] - pub fn spawn(&self, future: F) -> tokio::task::JoinHandle - where - F: Future + 'static, - { - self.runtime.spawn(future) - } - - #[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[track_caller] pub fn spawn(&self, future: F) -> tokio::task::JoinHandle where @@ -239,24 +253,6 @@ impl AFPluginDispatcher { { self.runtime.spawn(future) } - - #[cfg(any(target_arch = "wasm32", feature = "local_set"))] - pub async fn run_until(&self, future: F) -> F::Output - where - F: Future + 'static, - { - let handle = self.runtime.spawn(future); - self.runtime.run_until(handle).await.unwrap() - } - - #[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] - pub async fn run_until<'a, F>(&self, future: F) -> F::Output - where - F: Future + Send + 'a, - ::Output: Send + 'a, - { - self.runtime.run_until(future).await - } } #[derive(Derivative)] diff --git a/frontend/rust-lib/lib-dispatch/src/module/module.rs b/frontend/rust-lib/lib-dispatch/src/module/module.rs index a5b2df234a..ae92cf9a0c 100644 --- a/frontend/rust-lib/lib-dispatch/src/module/module.rs +++ b/frontend/rust-lib/lib-dispatch/src/module/module.rs @@ -1,3 +1,7 @@ +use futures_core::ready; +use nanoid::nanoid; +use pin_project::pin_project; +use std::rc::Rc; use std::sync::Arc; use std::{ collections::HashMap, @@ -9,10 +13,6 @@ use std::{ task::{Context, Poll}, }; -use futures_core::ready; -use nanoid::nanoid; -use pin_project::pin_project; - use crate::dispatcher::AFConcurrent; use crate::prelude::{AFBoxFuture, AFStateMap}; use crate::service::AFPluginHandler; @@ -26,12 +26,12 @@ use crate::{ }, }; -pub type AFPluginMap = Arc>>; +pub type AFPluginMap = Rc>>; pub(crate) fn plugin_map_or_crash(plugins: Vec) -> AFPluginMap { - let mut plugin_map: HashMap> = HashMap::new(); + let mut plugin_map: HashMap> = HashMap::new(); plugins.into_iter().for_each(|m| { let events = m.events(); - let plugins = Arc::new(m); + let plugins = Rc::new(m); events.into_iter().for_each(|e| { if plugin_map.contains_key(&e) { let plugin_name = plugin_map.get(&e).map(|p| &p.name); @@ -40,7 +40,7 @@ pub(crate) fn plugin_map_or_crash(plugins: Vec) -> AFPluginMap { plugin_map.insert(e, plugins.clone()); }); }); - Arc::new(plugin_map) + Rc::new(plugin_map) } #[derive(PartialEq, Eq, Hash, Debug, Clone)] @@ -67,7 +67,7 @@ pub struct AFPlugin { /// Contains a list of factories that are used to generate the services used to handle the passed-in /// `ServiceRequest`. /// - event_service_factory: Arc< + event_service_factory: Rc< HashMap>, >, } @@ -77,7 +77,7 @@ impl std::default::Default for AFPlugin { Self { name: "".to_owned(), states: Default::default(), - event_service_factory: Arc::new(HashMap::new()), + event_service_factory: Rc::new(HashMap::new()), } } } @@ -113,7 +113,7 @@ impl AFPlugin { if self.event_service_factory.contains_key(&event) { panic!("Register duplicate Event: {:?}", &event); } else { - Arc::get_mut(&mut self.event_service_factory) + Rc::get_mut(&mut self.event_service_factory) .unwrap() .insert(event, factory(AFPluginHandlerService::new(handler))); } @@ -185,7 +185,7 @@ impl AFPluginServiceFactory for AFPlugin { } pub struct AFPluginService { - services: Arc< + services: Rc< HashMap>, >, states: AFStateMap, diff --git a/frontend/rust-lib/lib-dispatch/src/runtime.rs b/frontend/rust-lib/lib-dispatch/src/runtime.rs index fd3658517c..eaa3223a20 100644 --- a/frontend/rust-lib/lib-dispatch/src/runtime.rs +++ b/frontend/rust-lib/lib-dispatch/src/runtime.rs @@ -8,8 +8,8 @@ use tokio::task::JoinHandle; pub struct AFPluginRuntime { inner: Runtime, - #[cfg(any(target_arch = "wasm32", feature = "local_set"))] - local: tokio::task::LocalSet, + #[cfg(feature = "local_set")] + pub(crate) local: tokio::task::LocalSet, } impl Display for AFPluginRuntime { @@ -27,21 +27,11 @@ impl AFPluginRuntime { let inner = default_tokio_runtime()?; Ok(Self { inner, - #[cfg(any(target_arch = "wasm32", feature = "local_set"))] + #[cfg(feature = "local_set")] local: tokio::task::LocalSet::new(), }) } - #[cfg(any(target_arch = "wasm32", feature = "local_set"))] - #[track_caller] - pub fn spawn(&self, future: F) -> JoinHandle - where - F: Future + 'static, - { - self.local.spawn_local(future) - } - - #[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[track_caller] pub fn spawn(&self, future: F) -> JoinHandle where @@ -51,23 +41,7 @@ impl AFPluginRuntime { self.inner.spawn(future) } - #[cfg(any(target_arch = "wasm32", feature = "local_set"))] - pub async fn run_until(&self, future: F) -> F::Output - where - F: Future, - { - self.local.run_until(future).await - } - - #[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] - pub async fn run_until(&self, future: F) -> F::Output - where - F: Future, - { - future.await - } - - #[cfg(any(target_arch = "wasm32", feature = "local_set"))] + #[cfg(feature = "local_set")] #[track_caller] pub fn block_on(&self, f: F) -> F::Output where @@ -76,7 +50,7 @@ impl AFPluginRuntime { self.local.block_on(&self.inner, f) } - #[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] + #[cfg(not(feature = "local_set"))] #[track_caller] pub fn block_on(&self, f: F) -> F::Output where @@ -86,14 +60,26 @@ impl AFPluginRuntime { } } -#[cfg(any(target_arch = "wasm32", feature = "local_set"))] +#[cfg(feature = "local_set")] pub fn default_tokio_runtime() -> io::Result { - runtime::Builder::new_current_thread() - .thread_name("dispatch-rt-st") - .build() + #[cfg(not(target_arch = "wasm32"))] + { + runtime::Builder::new_multi_thread() + .enable_io() + .enable_time() + .thread_name("dispatch-rt-st") + .build() + } + + #[cfg(target_arch = "wasm32")] + { + runtime::Builder::new_current_thread() + .thread_name("dispatch-rt-st") + .build() + } } -#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] +#[cfg(not(feature = "local_set"))] pub fn default_tokio_runtime() -> io::Result { runtime::Builder::new_multi_thread() .thread_name("dispatch-rt-mt") diff --git a/frontend/rust-lib/lib-dispatch/src/service/boxed.rs b/frontend/rust-lib/lib-dispatch/src/service/boxed.rs index 7ff7a7c116..811b995082 100644 --- a/frontend/rust-lib/lib-dispatch/src/service/boxed.rs +++ b/frontend/rust-lib/lib-dispatch/src/service/boxed.rs @@ -16,7 +16,7 @@ where BoxServiceFactory(Box::new(FactoryWrapper(factory))) } -#[cfg(any(target_arch = "wasm32", feature = "local_set"))] +#[cfg(feature = "local_set")] type Inner = Box< dyn AFPluginServiceFactory< Req, @@ -27,7 +27,7 @@ type Inner = Box< Future = AFBoxFuture<'static, Result, Err>>, >, >; -#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] +#[cfg(not(feature = "local_set"))] type Inner = Box< dyn AFPluginServiceFactory< Req, @@ -58,12 +58,12 @@ where } } -#[cfg(any(target_arch = "wasm32", feature = "local_set"))] +#[cfg(feature = "local_set")] pub type BoxService = Box< dyn Service>>, >; -#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] +#[cfg(not(feature = "local_set"))] pub type BoxService = Box< dyn Service>> + Sync diff --git a/frontend/rust-lib/lib-dispatch/tests/api/module.rs b/frontend/rust-lib/lib-dispatch/tests/api/module.rs index 2c4539bd7e..fed8d75720 100644 --- a/frontend/rust-lib/lib-dispatch/tests/api/module.rs +++ b/frontend/rust-lib/lib-dispatch/tests/api/module.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::rc::Rc; use lib_dispatch::prelude::*; use lib_dispatch::runtime::AFPluginRuntime; @@ -10,8 +10,8 @@ pub async fn hello() -> String { #[tokio::test] async fn test() { let event = "1"; - let runtime = Arc::new(AFPluginRuntime::new().unwrap()); - let dispatch = Arc::new(AFPluginDispatcher::new( + let runtime = Rc::new(AFPluginRuntime::new().unwrap()); + let dispatch = Rc::new(AFPluginDispatcher::new( runtime, vec![AFPlugin::new().event(event, hello)], ));