feat: enable local set (#5955)

* chore: enable local set

* chore: fix test

* chore: clippy

* chore: fix tauri build

* chore: fix tauri build
This commit is contained in:
Nathan.fooo 2024-08-13 23:36:44 +08:00 committed by GitHub
parent e2359cf047
commit 463c8c7ee4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 250 additions and 265 deletions

View File

@ -1,7 +1,7 @@
use flowy_core::config::AppFlowyCoreConfig; use flowy_core::config::AppFlowyCoreConfig;
use flowy_core::{AppFlowyCore, DEFAULT_NAME}; use flowy_core::{AppFlowyCore, MutexAppFlowyCore, DEFAULT_NAME};
use lib_dispatch::runtime::AFPluginRuntime; use lib_dispatch::runtime::AFPluginRuntime;
use std::sync::Arc; use std::rc::Rc;
use dotenv::dotenv; use dotenv::dotenv;
@ -25,7 +25,7 @@ pub fn read_env() {
} }
} }
pub fn init_flowy_core() -> AppFlowyCore { pub fn init_flowy_core() -> MutexAppFlowyCore {
let config_json = include_str!("../tauri.conf.json"); let config_json = include_str!("../tauri.conf.json");
let config: tauri_utils::config::Config = serde_json::from_str(config_json).unwrap(); let config: tauri_utils::config::Config = serde_json::from_str(config_json).unwrap();
@ -35,7 +35,8 @@ pub fn init_flowy_core() -> AppFlowyCore {
.clone() .clone()
.map(|v| v.to_string()) .map(|v| v.to_string())
.unwrap_or_else(|| "0.5.8".to_string()); .unwrap_or_else(|| "0.5.8".to_string());
let app_version = semver::Version::parse(&app_version).unwrap_or_else(|_| semver::Version::new(0, 5, 8)); let app_version =
semver::Version::parse(&app_version).unwrap_or_else(|_| semver::Version::new(0, 5, 8));
let mut data_path = tauri::api::path::app_local_data_dir(&config).unwrap(); let mut data_path = tauri::api::path::app_local_data_dir(&config).unwrap();
if cfg!(debug_assertions) { if cfg!(debug_assertions) {
data_path.push("data_dev"); data_path.push("data_dev");
@ -60,7 +61,9 @@ pub fn init_flowy_core() -> AppFlowyCore {
) )
.log_filter("trace", vec!["appflowy_tauri".to_string()]); .log_filter("trace", vec!["appflowy_tauri".to_string()]);
let runtime = Arc::new(AFPluginRuntime::new().unwrap()); let runtime = Rc::new(AFPluginRuntime::new().unwrap());
let cloned_runtime = runtime.clone(); let cloned_runtime = runtime.clone();
runtime.block_on(async move { AppFlowyCore::new(config, cloned_runtime, None).await }) runtime.block_on(async move {
MutexAppFlowyCore::new(AppFlowyCore::new(config, cloned_runtime, None).await)
})
} }

View File

@ -1,4 +1,4 @@
use flowy_core::AppFlowyCore; use flowy_core::MutexAppFlowyCore;
use lib_dispatch::prelude::{ use lib_dispatch::prelude::{
AFPluginDispatcher, AFPluginEventResponse, AFPluginRequest, StatusCode, AFPluginDispatcher, AFPluginEventResponse, AFPluginRequest, StatusCode,
}; };
@ -38,8 +38,8 @@ pub async fn invoke_request(
app_handler: AppHandle<Wry>, app_handler: AppHandle<Wry>,
) -> AFTauriResponse { ) -> AFTauriResponse {
let request: AFPluginRequest = request.into(); let request: AFPluginRequest = request.into();
let state: State<AppFlowyCore> = app_handler.state(); let state: State<MutexAppFlowyCore> = app_handler.state();
let dispatcher = state.inner().dispatcher(); let dispatcher = state.0.lock().dispatcher();
let response = AFPluginDispatcher::async_send(dispatcher.as_ref(), request).await; let response = AFPluginDispatcher::sync_send(dispatcher, request);
response.into() response.into()
} }

View File

@ -1,7 +1,7 @@
use flowy_core::config::AppFlowyCoreConfig; use flowy_core::config::AppFlowyCoreConfig;
use flowy_core::{AppFlowyCore, DEFAULT_NAME}; use flowy_core::{AppFlowyCore, MutexAppFlowyCore, DEFAULT_NAME};
use lib_dispatch::runtime::AFPluginRuntime; use lib_dispatch::runtime::AFPluginRuntime;
use std::sync::Arc; use std::rc::Rc;
use dotenv::dotenv; use dotenv::dotenv;
@ -25,12 +25,18 @@ pub fn read_env() {
} }
} }
pub fn init_flowy_core() -> AppFlowyCore { pub fn init_flowy_core() -> MutexAppFlowyCore {
let config_json = include_str!("../tauri.conf.json"); let config_json = include_str!("../tauri.conf.json");
let config: tauri_utils::config::Config = serde_json::from_str(config_json).unwrap(); let config: tauri_utils::config::Config = serde_json::from_str(config_json).unwrap();
let app_version = config.package.version.clone().map(|v| v.to_string()).unwrap_or_else(|| "0.0.0".to_string()); let app_version = config
let app_version = semver::Version::parse(&app_version).unwrap_or_else(|_| semver::Version::new(0, 5, 8)); .package
.version
.clone()
.map(|v| v.to_string())
.unwrap_or_else(|| "0.5.8".to_string());
let app_version =
semver::Version::parse(&app_version).unwrap_or_else(|_| semver::Version::new(0, 5, 8));
let mut data_path = tauri::api::path::app_local_data_dir(&config).unwrap(); let mut data_path = tauri::api::path::app_local_data_dir(&config).unwrap();
if cfg!(debug_assertions) { if cfg!(debug_assertions) {
data_path.push("data_dev"); data_path.push("data_dev");
@ -50,12 +56,14 @@ pub fn init_flowy_core() -> AppFlowyCore {
custom_application_path, custom_application_path,
application_path, application_path,
device_id, device_id,
"web".to_string(), "tauri".to_string(),
DEFAULT_NAME.to_string(), DEFAULT_NAME.to_string(),
) )
.log_filter("trace", vec!["appflowy_tauri".to_string()]); .log_filter("trace", vec!["appflowy_tauri".to_string()]);
let runtime = Arc::new(AFPluginRuntime::new().unwrap()); let runtime = Rc::new(AFPluginRuntime::new().unwrap());
let cloned_runtime = runtime.clone(); let cloned_runtime = runtime.clone();
runtime.block_on(async move { AppFlowyCore::new(config, cloned_runtime, None).await }) runtime.block_on(async move {
MutexAppFlowyCore::new(AppFlowyCore::new(config, cloned_runtime, None).await)
})
} }

View File

@ -1,4 +1,4 @@
use flowy_core::AppFlowyCore; use flowy_core::{AppFlowyCore, MutexAppFlowyCore};
use lib_dispatch::prelude::{ use lib_dispatch::prelude::{
AFPluginDispatcher, AFPluginEventResponse, AFPluginRequest, StatusCode, AFPluginDispatcher, AFPluginEventResponse, AFPluginRequest, StatusCode,
}; };
@ -38,8 +38,8 @@ pub async fn invoke_request(
app_handler: AppHandle<Wry>, app_handler: AppHandle<Wry>,
) -> AFTauriResponse { ) -> AFTauriResponse {
let request: AFPluginRequest = request.into(); let request: AFPluginRequest = request.into();
let state: State<AppFlowyCore> = app_handler.state(); let state: State<MutexAppFlowyCore> = app_handler.state();
let dispatcher = state.inner().dispatcher(); let dispatcher = state.0.lock().dispatcher();
let response = AFPluginDispatcher::async_send(dispatcher.as_ref(), request).await; let response = AFPluginDispatcher::sync_send(dispatcher, request);
response.into() response.into()
} }

View File

@ -28,7 +28,7 @@ lib-log.workspace = true
semver = "1.0.22" semver = "1.0.22"
# workspace # workspace
lib-dispatch = { workspace = true } lib-dispatch = { workspace = true, features = ["local_set"] }
# Core # Core
#flowy-core = { workspace = true, features = ["profiling"] } #flowy-core = { workspace = true, features = ["profiling"] }

View File

@ -4,6 +4,7 @@ use allo_isolate::Isolate;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use parking_lot::Mutex; use parking_lot::Mutex;
use semver::Version; use semver::Version;
use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
use std::{ffi::CStr, os::raw::c_char}; use std::{ffi::CStr, os::raw::c_char};
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, trace, warn};
@ -37,14 +38,14 @@ lazy_static! {
static ref LOG_STREAM_ISOLATE: Mutex<Option<Isolate>> = Mutex::new(None); static ref LOG_STREAM_ISOLATE: Mutex<Option<Isolate>> = Mutex::new(None);
} }
struct MutexAppFlowyCore(Arc<Mutex<Option<AppFlowyCore>>>); struct MutexAppFlowyCore(Rc<Mutex<Option<AppFlowyCore>>>);
impl MutexAppFlowyCore { impl MutexAppFlowyCore {
fn new() -> Self { fn new() -> Self {
Self(Arc::new(Mutex::new(None))) Self(Rc::new(Mutex::new(None)))
} }
fn dispatcher(&self) -> Option<Arc<AFPluginDispatcher>> { fn dispatcher(&self) -> Option<Rc<AFPluginDispatcher>> {
let binding = self.0.lock(); let binding = self.0.lock();
let core = binding.as_ref(); let core = binding.as_ref();
core.map(|core| core.event_dispatcher.clone()) core.map(|core| core.event_dispatcher.clone())
@ -90,7 +91,7 @@ pub extern "C" fn init_sdk(_port: i64, data: *mut c_char) -> i64 {
core.close_db(); core.close_db();
} }
let runtime = Arc::new(AFPluginRuntime::new().unwrap()); let runtime = Rc::new(AFPluginRuntime::new().unwrap());
let cloned_runtime = runtime.clone(); let cloned_runtime = runtime.clone();
let log_stream = LOG_STREAM_ISOLATE let log_stream = LOG_STREAM_ISOLATE

View File

@ -1,13 +1,12 @@
use flowy_user::errors::{internal_error, FlowyError};
use lib_dispatch::prelude::{
AFPluginDispatcher, AFPluginEventResponse, AFPluginFromBytes, AFPluginRequest, ToBytes, *,
};
use std::rc::Rc;
use std::{ use std::{
convert::TryFrom, convert::TryFrom,
fmt::{Debug, Display}, fmt::{Debug, Display},
hash::Hash, hash::Hash,
sync::Arc,
};
use flowy_user::errors::{internal_error, FlowyError};
use lib_dispatch::prelude::{
AFPluginDispatcher, AFPluginEventResponse, AFPluginFromBytes, AFPluginRequest, ToBytes, *,
}; };
use crate::EventIntegrationTest; use crate::EventIntegrationTest;
@ -86,7 +85,7 @@ impl EventBuilder {
.map(|data| data.into_inner()) .map(|data| data.into_inner())
} }
fn dispatch(&self) -> Arc<AFPluginDispatcher> { fn dispatch(&self) -> Rc<AFPluginDispatcher> {
self.context.sdk.dispatcher() self.context.sdk.dispatcher()
} }

View File

@ -5,6 +5,7 @@ use collab_document::document::Document;
use collab_entity::CollabType; use collab_entity::CollabType;
use std::env::temp_dir; use std::env::temp_dir;
use std::path::PathBuf; use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -163,13 +164,9 @@ pub fn document_from_document_doc_state(doc_id: &str, doc_state: Vec<u8>) -> Doc
} }
async fn init_core(config: AppFlowyCoreConfig) -> AppFlowyCore { async fn init_core(config: AppFlowyCoreConfig) -> AppFlowyCore {
std::thread::spawn(|| { let runtime = Rc::new(AFPluginRuntime::new().unwrap());
let runtime = Arc::new(AFPluginRuntime::new().unwrap());
let cloned_runtime = runtime.clone(); let cloned_runtime = runtime.clone();
runtime.block_on(async move { AppFlowyCore::new(config, cloned_runtime, None).await }) AppFlowyCore::new(config, cloned_runtime, None).await
})
.join()
.unwrap()
} }
impl std::ops::Deref for EventIntegrationTest { impl std::ops::Deref for EventIntegrationTest {

View File

@ -94,22 +94,17 @@ async fn af_cloud_upload_6_files_test() {
// Wait for all uploads to finish // Wait for all uploads to finish
let uploads = Arc::new(Mutex::new(created_uploads)); let uploads = Arc::new(Mutex::new(created_uploads));
let mut handles = vec![]; let mut handles = vec![];
for mut receiver in receivers { for mut receiver in receivers {
let cloned_uploads = uploads.clone(); let cloned_uploads = uploads.clone();
let cloned_test = test.clone(); let state = test.storage_manager.get_file_state(&receiver.file_id).await;
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
if let Some(state) = cloned_test if let Some(FileUploadState::Finished { file_id }) = state {
.storage_manager
.get_file_state(&receiver.file_id)
.await
{
if let FileUploadState::Finished { file_id } = state {
cloned_uploads cloned_uploads
.lock() .lock()
.await .await
.retain(|upload| upload.file_id != file_id); .retain(|upload| upload.file_id != file_id);
} }
} else {
while let Some(value) = receiver.recv().await { while let Some(value) = receiver.recv().await {
if let FileUploadState::Finished { file_id } = value { if let FileUploadState::Finished { file_id } = value {
cloned_uploads cloned_uploads
@ -119,7 +114,6 @@ async fn af_cloud_upload_6_files_test() {
break; break;
} }
} }
}
}); });
handles.push(handle); handles.push(handle);
} }

View File

@ -24,11 +24,9 @@ async fn create_child_view_in_workspace_subscription_test() {
let cloned_test = test.clone(); let cloned_test = test.clone();
let cloned_workspace_id = workspace.id.clone(); let cloned_workspace_id = workspace.id.clone();
test.appflowy_core.dispatcher().spawn(async move {
cloned_test cloned_test
.create_view(&cloned_workspace_id, "workspace child view".to_string()) .create_view(&cloned_workspace_id, "workspace child view".to_string())
.await; .await;
});
let views = receive_with_timeout(rx, Duration::from_secs(30)) let views = receive_with_timeout(rx, Duration::from_secs(30))
.await .await
@ -50,14 +48,17 @@ async fn create_child_view_in_view_subscription_test() {
let cloned_test = test.clone(); let cloned_test = test.clone();
let child_view_id = workspace_child_view.id.clone(); let child_view_id = workspace_child_view.id.clone();
test.appflowy_core.dispatcher().spawn(async move { let local_set = tokio::task::LocalSet::new();
local_set
.run_until(async move {
cloned_test cloned_test
.create_view( .create_view(
&child_view_id, &child_view_id,
"workspace child view's child view".to_string(), "workspace child view's child view".to_string(),
) )
.await; .await;
}); })
.await;
let update = receive_with_timeout(rx, Duration::from_secs(30)) let update = receive_with_timeout(rx, Duration::from_secs(30))
.await .await
@ -81,22 +82,11 @@ async fn delete_view_subscription_test() {
let cloned_test = test.clone(); let cloned_test = test.clone();
let delete_view_id = workspace.views.first().unwrap().id.clone(); let delete_view_id = workspace.views.first().unwrap().id.clone();
let cloned_delete_view_id = delete_view_id.clone(); let cloned_delete_view_id = delete_view_id.clone();
test
.appflowy_core
.dispatcher()
.spawn(async move {
cloned_test.delete_view(&cloned_delete_view_id).await; cloned_test.delete_view(&cloned_delete_view_id).await;
}) let update = receive_with_timeout(rx, Duration::from_secs(60))
.await .await
.unwrap(); .unwrap();
let update = test
.appflowy_core
.dispatcher()
.run_until(receive_with_timeout(rx, Duration::from_secs(60)))
.await
.unwrap();
assert_eq!(update.delete_child_views.len(), 1); assert_eq!(update.delete_child_views.len(), 1);
assert_eq!(update.delete_child_views[0], delete_view_id); assert_eq!(update.delete_child_views[0], delete_view_id);
} }
@ -114,7 +104,6 @@ async fn update_view_subscription_test() {
assert!(!view.is_favorite); assert!(!view.is_favorite);
let update_view_id = view.id.clone(); let update_view_id = view.id.clone();
test.appflowy_core.dispatcher().spawn(async move {
cloned_test cloned_test
.update_view(UpdateViewPayloadPB { .update_view(UpdateViewPayloadPB {
view_id: update_view_id, view_id: update_view_id,
@ -123,8 +112,6 @@ async fn update_view_subscription_test() {
..Default::default() ..Default::default()
}) })
.await; .await;
});
let update = receive_with_timeout(rx, Duration::from_secs(30)) let update = receive_with_timeout(rx, Duration::from_secs(30))
.await .await
.unwrap(); .unwrap();

View File

@ -5,6 +5,7 @@ use collab_folder::Folder;
use event_integration_test::user_event::user_localhost_af_cloud; use event_integration_test::user_event::user_localhost_af_cloud;
use event_integration_test::EventIntegrationTest; use event_integration_test::EventIntegrationTest;
use std::time::Duration; use std::time::Duration;
use tokio::task::LocalSet;
use tokio::time::sleep; use tokio::time::sleep;
use crate::user::af_cloud_test::util::get_synced_workspaces; use crate::user::af_cloud_test::util::get_synced_workspaces;
@ -158,9 +159,9 @@ async fn af_cloud_different_open_same_workspace_test() {
user_localhost_af_cloud().await; user_localhost_af_cloud().await;
// Set up the primary client and sign them up to the cloud. // Set up the primary client and sign them up to the cloud.
let client_1 = EventIntegrationTest::new().await; let test_runner = EventIntegrationTest::new().await;
let owner_profile = client_1.af_cloud_sign_up().await; let owner_profile = test_runner.af_cloud_sign_up().await;
let shared_workspace_id = client_1.get_current_workspace().await.id.clone(); let shared_workspace_id = test_runner.get_current_workspace().await.id.clone();
// Verify that the workspace ID from the profile matches the current session's workspace ID. // Verify that the workspace ID from the profile matches the current session's workspace ID.
assert_eq!(shared_workspace_id, owner_profile.workspace_id); assert_eq!(shared_workspace_id, owner_profile.workspace_id);
@ -181,7 +182,7 @@ async fn af_cloud_different_open_same_workspace_test() {
client.delete_view(&view.id).await; client.delete_view(&view.id).await;
} }
client_1 test_runner
.add_workspace_member(&owner_profile.workspace_id, &client) .add_workspace_member(&owner_profile.workspace_id, &client)
.await; .await;
clients.push((client, client_profile)); clients.push((client, client_profile));
@ -195,9 +196,10 @@ async fn af_cloud_different_open_same_workspace_test() {
// Simulate each client open different workspace 30 times // Simulate each client open different workspace 30 times
let mut handles = vec![]; let mut handles = vec![];
let local_set = LocalSet::new();
for client in clients.clone() { for client in clients.clone() {
let cloned_shared_workspace_id = shared_workspace_id.clone(); let cloned_shared_workspace_id = shared_workspace_id.clone();
let handle = tokio::spawn(async move { let handle = local_set.spawn_local(async move {
let (client, profile) = client; let (client, profile) = client;
let all_workspaces = get_synced_workspaces(&client, profile.id).await; let all_workspaces = get_synced_workspaces(&client, profile.id).await;
for i in 0..30 { for i in 0..30 {
@ -216,10 +218,16 @@ async fn af_cloud_different_open_same_workspace_test() {
}); });
handles.push(handle); handles.push(handle);
} }
futures::future::join_all(handles).await; let results = local_set
.run_until(futures::future::join_all(handles))
.await;
for result in results {
assert!(result.is_ok());
}
// Retrieve and verify the collaborative document state for Client 1's workspace. // Retrieve and verify the collaborative document state for Client 1's workspace.
let doc_state = client_1 let doc_state = test_runner
.get_collab_doc_state(&shared_workspace_id, CollabType::Folder) .get_collab_doc_state(&shared_workspace_id, CollabType::Folder)
.await .await
.unwrap(); .unwrap();

View File

@ -63,9 +63,7 @@ pub(crate) async fn stream_chat_message_handler(
.collect::<Vec<_>>(); .collect::<Vec<_>>();
trace!("Stream chat message with metadata: {:?}", metadata); trace!("Stream chat message with metadata: {:?}", metadata);
let (tx, rx) = oneshot::channel::<Result<ChatMessagePB, FlowyError>>();
let ai_manager = upgrade_ai_manager(ai_manager)?; let ai_manager = upgrade_ai_manager(ai_manager)?;
tokio::spawn(async move {
let result = ai_manager let result = ai_manager
.stream_chat_message( .stream_chat_message(
&data.chat_id, &data.chat_id,
@ -75,12 +73,8 @@ pub(crate) async fn stream_chat_message_handler(
data.question_stream_port, data.question_stream_port,
metadata, metadata,
) )
.await; .await?;
let _ = tx.send(result); data_result_ok(result)
});
let question = rx.await??;
data_result_ok(question)
} }
#[tracing::instrument(level = "debug", skip_all, err)] #[tracing::instrument(level = "debug", skip_all, err)]

View File

@ -288,7 +288,7 @@ impl LocalAIResourceController {
while let Ok(value) = rx.recv().await { while let Ok(value) = rx.recv().await {
let is_finish = value == DOWNLOAD_FINISH; let is_finish = value == DOWNLOAD_FINISH;
if let Err(err) = progress_sink.send(value).await { if let Err(err) = progress_sink.send(value).await {
error!("Failed to send progress: {:?}", err); warn!("Failed to send progress: {:?}", err);
break; break;
} }

View File

@ -2,6 +2,8 @@
use flowy_search::folder::indexer::FolderIndexManagerImpl; use flowy_search::folder::indexer::FolderIndexManagerImpl;
use flowy_search::services::manager::SearchManager; use flowy_search::services::manager::SearchManager;
use parking_lot::Mutex;
use std::rc::Rc;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::time::Duration; use std::time::Duration;
use sysinfo::System; use sysinfo::System;
@ -54,7 +56,7 @@ pub struct AppFlowyCore {
pub document_manager: Arc<DocumentManager>, pub document_manager: Arc<DocumentManager>,
pub folder_manager: Arc<FolderManager>, pub folder_manager: Arc<FolderManager>,
pub database_manager: Arc<DatabaseManager>, pub database_manager: Arc<DatabaseManager>,
pub event_dispatcher: Arc<AFPluginDispatcher>, pub event_dispatcher: Rc<AFPluginDispatcher>,
pub server_provider: Arc<ServerProvider>, pub server_provider: Arc<ServerProvider>,
pub task_dispatcher: Arc<RwLock<TaskDispatcher>>, pub task_dispatcher: Arc<RwLock<TaskDispatcher>>,
pub store_preference: Arc<KVStorePreferences>, pub store_preference: Arc<KVStorePreferences>,
@ -66,7 +68,7 @@ pub struct AppFlowyCore {
impl AppFlowyCore { impl AppFlowyCore {
pub async fn new( pub async fn new(
config: AppFlowyCoreConfig, config: AppFlowyCoreConfig,
runtime: Arc<AFPluginRuntime>, runtime: Rc<AFPluginRuntime>,
stream_log_sender: Option<Arc<dyn StreamLogSender>>, stream_log_sender: Option<Arc<dyn StreamLogSender>>,
) -> Self { ) -> Self {
let platform = OperatingSystem::from(&config.platform); let platform = OperatingSystem::from(&config.platform);
@ -102,7 +104,7 @@ impl AppFlowyCore {
} }
#[instrument(skip(config, runtime))] #[instrument(skip(config, runtime))]
async fn init(config: AppFlowyCoreConfig, runtime: Arc<AFPluginRuntime>) -> Self { async fn init(config: AppFlowyCoreConfig, runtime: Rc<AFPluginRuntime>) -> Self {
// Init the key value database // Init the key value database
let store_preference = Arc::new(KVStorePreferences::new(&config.storage_path).unwrap()); let store_preference = Arc::new(KVStorePreferences::new(&config.storage_path).unwrap());
info!("🔥{:?}", &config); info!("🔥{:?}", &config);
@ -261,7 +263,7 @@ impl AppFlowyCore {
error!("Init user failed: {}", err) error!("Init user failed: {}", err)
} }
} }
let event_dispatcher = Arc::new(AFPluginDispatcher::new( let event_dispatcher = Rc::new(AFPluginDispatcher::new(
runtime, runtime,
make_plugins( make_plugins(
Arc::downgrade(&folder_manager), Arc::downgrade(&folder_manager),
@ -290,7 +292,7 @@ impl AppFlowyCore {
} }
/// Only expose the dispatcher in test /// Only expose the dispatcher in test
pub fn dispatcher(&self) -> Arc<AFPluginDispatcher> { pub fn dispatcher(&self) -> Rc<AFPluginDispatcher> {
self.event_dispatcher.clone() self.event_dispatcher.clone()
} }
} }
@ -321,3 +323,13 @@ impl ServerUser for ServerUserImpl {
self.upgrade_user()?.workspace_id() self.upgrade_user()?.workspace_id()
} }
} }
pub struct MutexAppFlowyCore(pub Rc<Mutex<AppFlowyCore>>);
impl MutexAppFlowyCore {
pub fn new(appflowy_core: AppFlowyCore) -> Self {
Self(Rc::new(Mutex::new(appflowy_core)))
}
}
unsafe impl Sync for MutexAppFlowyCore {}
unsafe impl Send for MutexAppFlowyCore {}

View File

@ -42,7 +42,7 @@ tokio = { workspace = true, features = ["rt"] }
futures-util = "0.3.26" futures-util = "0.3.26"
[features] [features]
default = ["use_protobuf"] default = ["local_set", "use_protobuf"]
use_serde = ["bincode", "serde_json", "serde", "serde_repr"] use_serde = ["bincode", "serde_json", "serde", "serde_repr"]
use_protobuf = ["protobuf"] use_protobuf = ["protobuf"]
local_set = [] local_set = []

View File

@ -1,10 +1,10 @@
use std::any::Any;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{future::Future, sync::Arc};
use derivative::*; use derivative::*;
use pin_project::pin_project; use pin_project::pin_project;
use std::any::Any;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use tracing::event; use tracing::event;
use crate::module::AFPluginStateMap; use crate::module::AFPluginStateMap;
@ -16,60 +16,50 @@ use crate::{
service::{AFPluginServiceFactory, Service}, service::{AFPluginServiceFactory, Service},
}; };
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
pub trait AFConcurrent {} pub trait AFConcurrent {}
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
impl<T> AFConcurrent for T where T: ?Sized {} impl<T> AFConcurrent for T where T: ?Sized {}
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(not(feature = "local_set"))]
pub trait AFConcurrent: Send + Sync {} pub trait AFConcurrent: Send + Sync {}
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(not(feature = "local_set"))]
impl<T> AFConcurrent for T where T: Send + Sync {} impl<T> AFConcurrent for T where T: Send + Sync {}
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
pub type AFBoxFuture<'a, T> = futures_core::future::LocalBoxFuture<'a, T>; pub type AFBoxFuture<'a, T> = futures_core::future::LocalBoxFuture<'a, T>;
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(not(feature = "local_set"))]
pub type AFBoxFuture<'a, T> = futures_core::future::BoxFuture<'a, T>; pub type AFBoxFuture<'a, T> = futures_core::future::BoxFuture<'a, T>;
pub type AFStateMap = std::sync::Arc<AFPluginStateMap>; pub type AFStateMap = std::sync::Arc<AFPluginStateMap>;
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
pub(crate) fn downcast_owned<T: 'static>(boxed: AFBox) -> Option<T> { pub(crate) fn downcast_owned<T: 'static>(boxed: AFBox) -> Option<T> {
boxed.downcast().ok().map(|boxed| *boxed) boxed.downcast().ok().map(|boxed| *boxed)
} }
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(not(feature = "local_set"))]
pub(crate) fn downcast_owned<T: 'static + Send + Sync>(boxed: AFBox) -> Option<T> { pub(crate) fn downcast_owned<T: 'static + Send + Sync>(boxed: AFBox) -> Option<T> {
boxed.downcast().ok().map(|boxed| *boxed) boxed.downcast().ok().map(|boxed| *boxed)
} }
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
pub(crate) type AFBox = Box<dyn Any>; pub(crate) type AFBox = Box<dyn Any>;
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(not(feature = "local_set"))]
pub(crate) type AFBox = Box<dyn Any + Send + Sync>; pub(crate) type AFBox = Box<dyn Any + Send + Sync>;
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
pub type BoxFutureCallback = pub type BoxFutureCallback =
Box<dyn FnOnce(AFPluginEventResponse) -> AFBoxFuture<'static, ()> + 'static>; Box<dyn FnOnce(AFPluginEventResponse) -> AFBoxFuture<'static, ()> + 'static>;
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(not(feature = "local_set"))]
pub type BoxFutureCallback = pub type BoxFutureCallback =
Box<dyn FnOnce(AFPluginEventResponse) -> AFBoxFuture<'static, ()> + Send + Sync + 'static>; Box<dyn FnOnce(AFPluginEventResponse) -> AFBoxFuture<'static, ()> + Send + Sync + 'static>;
#[cfg(any(target_arch = "wasm32", feature = "local_set"))]
pub fn af_spawn<T>(future: T) -> tokio::task::JoinHandle<T::Output>
where
T: Future + 'static,
T::Output: 'static,
{
tokio::task::spawn_local(future)
}
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))]
pub fn af_spawn<T>(future: T) -> tokio::task::JoinHandle<T::Output> pub fn af_spawn<T>(future: T) -> tokio::task::JoinHandle<T::Output>
where where
T: Future + Send + 'static, T: Future + Send + 'static,
@ -80,11 +70,11 @@ where
pub struct AFPluginDispatcher { pub struct AFPluginDispatcher {
plugins: AFPluginMap, plugins: AFPluginMap,
runtime: Arc<AFPluginRuntime>, runtime: Rc<AFPluginRuntime>,
} }
impl AFPluginDispatcher { impl AFPluginDispatcher {
pub fn new(runtime: Arc<AFPluginRuntime>, plugins: Vec<AFPlugin>) -> AFPluginDispatcher { pub fn new(runtime: Rc<AFPluginRuntime>, plugins: Vec<AFPlugin>) -> AFPluginDispatcher {
tracing::trace!("{}", plugin_info(&plugins)); tracing::trace!("{}", plugin_info(&plugins));
AFPluginDispatcher { AFPluginDispatcher {
plugins: plugin_map_or_crash(plugins), plugins: plugin_map_or_crash(plugins),
@ -126,14 +116,37 @@ impl AFPluginDispatcher {
// The provided future will start running in the background immediately // The provided future will start running in the background immediately
// when `spawn` is called, even if you don't await the returned // when `spawn` is called, even if you don't await the returned
// `JoinHandle`. // `JoinHandle`.
let handle = dispatch.runtime.spawn(async move { let result: Result<AFPluginEventResponse, DispatchError>;
#[cfg(feature = "local_set")]
{
let handle = dispatch.runtime.local.spawn_local(async move {
service.call(service_ctx).await.unwrap_or_else(|e| { service.call(service_ctx).await.unwrap_or_else(|e| {
tracing::error!("Dispatch runtime error: {:?}", e); tracing::error!("Dispatch runtime error: {:?}", e);
InternalError::Other(format!("{:?}", e)).as_response() InternalError::Other(format!("{:?}", e)).as_response()
}) })
}); });
let result = dispatch.runtime.run_until(handle).await; result = dispatch
.runtime
.local
.run_until(handle)
.await
.map_err(|e| e.to_string().into())
}
#[cfg(not(feature = "local_set"))]
{
result = dispatch
.runtime
.spawn(async move {
service.call(service_ctx).await.unwrap_or_else(|e| {
tracing::error!("Dispatch runtime error: {:?}", e);
InternalError::Other(format!("{:?}", e)).as_response()
})
})
.await;
}
result.unwrap_or_else(|e| { result.unwrap_or_else(|e| {
let msg = format!("EVENT_DISPATCH join error: {:?}", e); let msg = format!("EVENT_DISPATCH join error: {:?}", e);
tracing::error!("{}", msg); tracing::error!("{}", msg);
@ -170,16 +183,17 @@ impl AFPluginDispatcher {
callback: Some(Box::new(callback)), callback: Some(Box::new(callback)),
}; };
let handle = dispatch.runtime.spawn(async move { #[cfg(feature = "local_set")]
{
let handle = dispatch.runtime.local.spawn_local(async move {
service.call(service_ctx).await.unwrap_or_else(|e| { service.call(service_ctx).await.unwrap_or_else(|e| {
tracing::error!("[dispatch]: runtime error: {:?}", e); tracing::error!("Dispatch runtime error: {:?}", e);
InternalError::Other(format!("{:?}", e)).as_response() InternalError::Other(format!("{:?}", e)).as_response()
}) })
}); });
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] let fut = dispatch.runtime.local.run_until(handle);
{ let result = dispatch.runtime.block_on(fut);
let result = dispatch.runtime.block_on(handle);
DispatchFuture { DispatchFuture {
fut: Box::pin(async move { fut: Box::pin(async move {
result.unwrap_or_else(|e| { result.unwrap_or_else(|e| {
@ -192,8 +206,18 @@ impl AFPluginDispatcher {
} }
} }
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(not(feature = "local_set"))]
{ {
let handle = dispatch.runtime.spawn(async move {
service
.call(crate::service::service::Service)
.await
.unwrap_or_else(|e| {
tracing::error!("[dispatch]: runtime error: {:?}", e);
InternalError::Other(format!("{:?}", e)).as_response()
})
});
let runtime = dispatch.runtime.clone(); let runtime = dispatch.runtime.clone();
DispatchFuture { DispatchFuture {
fut: Box::pin(async move { fut: Box::pin(async move {
@ -211,7 +235,7 @@ impl AFPluginDispatcher {
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub fn sync_send( pub fn sync_send(
dispatch: Arc<AFPluginDispatcher>, dispatch: Rc<AFPluginDispatcher>,
request: AFPluginRequest, request: AFPluginRequest,
) -> AFPluginEventResponse { ) -> AFPluginEventResponse {
futures::executor::block_on(AFPluginDispatcher::async_send_with_callback( futures::executor::block_on(AFPluginDispatcher::async_send_with_callback(
@ -221,16 +245,6 @@ impl AFPluginDispatcher {
)) ))
} }
#[cfg(any(target_arch = "wasm32", feature = "local_set"))]
#[track_caller]
pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
where
F: Future + 'static,
{
self.runtime.spawn(future)
}
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))]
#[track_caller] #[track_caller]
pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output> pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
where where
@ -239,24 +253,6 @@ impl AFPluginDispatcher {
{ {
self.runtime.spawn(future) self.runtime.spawn(future)
} }
#[cfg(any(target_arch = "wasm32", feature = "local_set"))]
pub async fn run_until<F>(&self, future: F) -> F::Output
where
F: Future + 'static,
{
let handle = self.runtime.spawn(future);
self.runtime.run_until(handle).await.unwrap()
}
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))]
pub async fn run_until<'a, F>(&self, future: F) -> F::Output
where
F: Future + Send + 'a,
<F as Future>::Output: Send + 'a,
{
self.runtime.run_until(future).await
}
} }
#[derive(Derivative)] #[derive(Derivative)]

View File

@ -1,3 +1,7 @@
use futures_core::ready;
use nanoid::nanoid;
use pin_project::pin_project;
use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
use std::{ use std::{
collections::HashMap, collections::HashMap,
@ -9,10 +13,6 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use futures_core::ready;
use nanoid::nanoid;
use pin_project::pin_project;
use crate::dispatcher::AFConcurrent; use crate::dispatcher::AFConcurrent;
use crate::prelude::{AFBoxFuture, AFStateMap}; use crate::prelude::{AFBoxFuture, AFStateMap};
use crate::service::AFPluginHandler; use crate::service::AFPluginHandler;
@ -26,12 +26,12 @@ use crate::{
}, },
}; };
pub type AFPluginMap = Arc<HashMap<AFPluginEvent, Arc<AFPlugin>>>; pub type AFPluginMap = Rc<HashMap<AFPluginEvent, Rc<AFPlugin>>>;
pub(crate) fn plugin_map_or_crash(plugins: Vec<AFPlugin>) -> AFPluginMap { pub(crate) fn plugin_map_or_crash(plugins: Vec<AFPlugin>) -> AFPluginMap {
let mut plugin_map: HashMap<AFPluginEvent, Arc<AFPlugin>> = HashMap::new(); let mut plugin_map: HashMap<AFPluginEvent, Rc<AFPlugin>> = HashMap::new();
plugins.into_iter().for_each(|m| { plugins.into_iter().for_each(|m| {
let events = m.events(); let events = m.events();
let plugins = Arc::new(m); let plugins = Rc::new(m);
events.into_iter().for_each(|e| { events.into_iter().for_each(|e| {
if plugin_map.contains_key(&e) { if plugin_map.contains_key(&e) {
let plugin_name = plugin_map.get(&e).map(|p| &p.name); let plugin_name = plugin_map.get(&e).map(|p| &p.name);
@ -40,7 +40,7 @@ pub(crate) fn plugin_map_or_crash(plugins: Vec<AFPlugin>) -> AFPluginMap {
plugin_map.insert(e, plugins.clone()); plugin_map.insert(e, plugins.clone());
}); });
}); });
Arc::new(plugin_map) Rc::new(plugin_map)
} }
#[derive(PartialEq, Eq, Hash, Debug, Clone)] #[derive(PartialEq, Eq, Hash, Debug, Clone)]
@ -67,7 +67,7 @@ pub struct AFPlugin {
/// Contains a list of factories that are used to generate the services used to handle the passed-in /// Contains a list of factories that are used to generate the services used to handle the passed-in
/// `ServiceRequest`. /// `ServiceRequest`.
/// ///
event_service_factory: Arc< event_service_factory: Rc<
HashMap<AFPluginEvent, BoxServiceFactory<(), ServiceRequest, ServiceResponse, DispatchError>>, HashMap<AFPluginEvent, BoxServiceFactory<(), ServiceRequest, ServiceResponse, DispatchError>>,
>, >,
} }
@ -77,7 +77,7 @@ impl std::default::Default for AFPlugin {
Self { Self {
name: "".to_owned(), name: "".to_owned(),
states: Default::default(), states: Default::default(),
event_service_factory: Arc::new(HashMap::new()), event_service_factory: Rc::new(HashMap::new()),
} }
} }
} }
@ -113,7 +113,7 @@ impl AFPlugin {
if self.event_service_factory.contains_key(&event) { if self.event_service_factory.contains_key(&event) {
panic!("Register duplicate Event: {:?}", &event); panic!("Register duplicate Event: {:?}", &event);
} else { } else {
Arc::get_mut(&mut self.event_service_factory) Rc::get_mut(&mut self.event_service_factory)
.unwrap() .unwrap()
.insert(event, factory(AFPluginHandlerService::new(handler))); .insert(event, factory(AFPluginHandlerService::new(handler)));
} }
@ -185,7 +185,7 @@ impl AFPluginServiceFactory<AFPluginRequest> for AFPlugin {
} }
pub struct AFPluginService { pub struct AFPluginService {
services: Arc< services: Rc<
HashMap<AFPluginEvent, BoxServiceFactory<(), ServiceRequest, ServiceResponse, DispatchError>>, HashMap<AFPluginEvent, BoxServiceFactory<(), ServiceRequest, ServiceResponse, DispatchError>>,
>, >,
states: AFStateMap, states: AFStateMap,

View File

@ -8,8 +8,8 @@ use tokio::task::JoinHandle;
pub struct AFPluginRuntime { pub struct AFPluginRuntime {
inner: Runtime, inner: Runtime,
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
local: tokio::task::LocalSet, pub(crate) local: tokio::task::LocalSet,
} }
impl Display for AFPluginRuntime { impl Display for AFPluginRuntime {
@ -27,21 +27,11 @@ impl AFPluginRuntime {
let inner = default_tokio_runtime()?; let inner = default_tokio_runtime()?;
Ok(Self { Ok(Self {
inner, inner,
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
local: tokio::task::LocalSet::new(), local: tokio::task::LocalSet::new(),
}) })
} }
#[cfg(any(target_arch = "wasm32", feature = "local_set"))]
#[track_caller]
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
{
self.local.spawn_local(future)
}
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))]
#[track_caller] #[track_caller]
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where where
@ -51,23 +41,7 @@ impl AFPluginRuntime {
self.inner.spawn(future) self.inner.spawn(future)
} }
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
pub async fn run_until<F>(&self, future: F) -> F::Output
where
F: Future,
{
self.local.run_until(future).await
}
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))]
pub async fn run_until<F>(&self, future: F) -> F::Output
where
F: Future,
{
future.await
}
#[cfg(any(target_arch = "wasm32", feature = "local_set"))]
#[track_caller] #[track_caller]
pub fn block_on<F>(&self, f: F) -> F::Output pub fn block_on<F>(&self, f: F) -> F::Output
where where
@ -76,7 +50,7 @@ impl AFPluginRuntime {
self.local.block_on(&self.inner, f) self.local.block_on(&self.inner, f)
} }
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(not(feature = "local_set"))]
#[track_caller] #[track_caller]
pub fn block_on<F>(&self, f: F) -> F::Output pub fn block_on<F>(&self, f: F) -> F::Output
where where
@ -86,14 +60,26 @@ impl AFPluginRuntime {
} }
} }
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
pub fn default_tokio_runtime() -> io::Result<Runtime> { pub fn default_tokio_runtime() -> io::Result<Runtime> {
runtime::Builder::new_current_thread() #[cfg(not(target_arch = "wasm32"))]
{
runtime::Builder::new_multi_thread()
.enable_io()
.enable_time()
.thread_name("dispatch-rt-st") .thread_name("dispatch-rt-st")
.build() .build()
} }
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(target_arch = "wasm32")]
{
runtime::Builder::new_current_thread()
.thread_name("dispatch-rt-st")
.build()
}
}
#[cfg(not(feature = "local_set"))]
pub fn default_tokio_runtime() -> io::Result<Runtime> { pub fn default_tokio_runtime() -> io::Result<Runtime> {
runtime::Builder::new_multi_thread() runtime::Builder::new_multi_thread()
.thread_name("dispatch-rt-mt") .thread_name("dispatch-rt-mt")

View File

@ -16,7 +16,7 @@ where
BoxServiceFactory(Box::new(FactoryWrapper(factory))) BoxServiceFactory(Box::new(FactoryWrapper(factory)))
} }
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
type Inner<Cfg, Req, Res, Err> = Box< type Inner<Cfg, Req, Res, Err> = Box<
dyn AFPluginServiceFactory< dyn AFPluginServiceFactory<
Req, Req,
@ -27,7 +27,7 @@ type Inner<Cfg, Req, Res, Err> = Box<
Future = AFBoxFuture<'static, Result<BoxService<Req, Res, Err>, Err>>, Future = AFBoxFuture<'static, Result<BoxService<Req, Res, Err>, Err>>,
>, >,
>; >;
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(not(feature = "local_set"))]
type Inner<Cfg, Req, Res, Err> = Box< type Inner<Cfg, Req, Res, Err> = Box<
dyn AFPluginServiceFactory< dyn AFPluginServiceFactory<
Req, Req,
@ -58,12 +58,12 @@ where
} }
} }
#[cfg(any(target_arch = "wasm32", feature = "local_set"))] #[cfg(feature = "local_set")]
pub type BoxService<Req, Res, Err> = Box< pub type BoxService<Req, Res, Err> = Box<
dyn Service<Req, Response = Res, Error = Err, Future = AFBoxFuture<'static, Result<Res, Err>>>, dyn Service<Req, Response = Res, Error = Err, Future = AFBoxFuture<'static, Result<Res, Err>>>,
>; >;
#[cfg(all(not(target_arch = "wasm32"), not(feature = "local_set")))] #[cfg(not(feature = "local_set"))]
pub type BoxService<Req, Res, Err> = Box< pub type BoxService<Req, Res, Err> = Box<
dyn Service<Req, Response = Res, Error = Err, Future = AFBoxFuture<'static, Result<Res, Err>>> dyn Service<Req, Response = Res, Error = Err, Future = AFBoxFuture<'static, Result<Res, Err>>>
+ Sync + Sync

View File

@ -1,4 +1,4 @@
use std::sync::Arc; use std::rc::Rc;
use lib_dispatch::prelude::*; use lib_dispatch::prelude::*;
use lib_dispatch::runtime::AFPluginRuntime; use lib_dispatch::runtime::AFPluginRuntime;
@ -10,8 +10,8 @@ pub async fn hello() -> String {
#[tokio::test] #[tokio::test]
async fn test() { async fn test() {
let event = "1"; let event = "1";
let runtime = Arc::new(AFPluginRuntime::new().unwrap()); let runtime = Rc::new(AFPluginRuntime::new().unwrap());
let dispatch = Arc::new(AFPluginDispatcher::new( let dispatch = Rc::new(AFPluginDispatcher::new(
runtime, runtime,
vec![AFPlugin::new().event(event, hello)], vec![AFPlugin::new().event(event, hello)],
)); ));