feat: enable dispatch event using single thread (#3828)

* refactor: lib dispatch

* chore: type def

* chore: type def

* fix: local set spawn

* chore: replace tokio spawn

* chore: update log

* chore: boxed event

* chore: tauri lock
This commit is contained in:
Nathan.fooo 2023-10-30 12:35:06 +08:00 committed by GitHub
parent 7f4e7e6aa0
commit e08a1a6974
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
67 changed files with 822 additions and 554 deletions

View File

@ -129,15 +129,6 @@ dependencies = [
"libc",
]
[[package]]
name = "ansi_term"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2"
dependencies = [
"winapi",
]
[[package]]
name = "anyhow"
version = "1.0.75"
@ -454,7 +445,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4114279215a005bc675e386011e594e1d9b800918cea18fcadadcce864a2046b"
dependencies = [
"borsh-derive",
"hashbrown 0.12.3",
"hashbrown 0.13.2",
]
[[package]]
@ -1939,6 +1930,7 @@ dependencies = [
"flowy-task",
"flowy-user",
"flowy-user-deps",
"futures",
"futures-core",
"lib-dispatch",
"lib-infra",
@ -2201,6 +2193,7 @@ dependencies = [
"hex",
"hyper",
"lazy_static",
"lib-dispatch",
"lib-infra",
"mime_guess",
"parking_lot",
@ -3484,8 +3477,8 @@ dependencies = [
"tracing-appender",
"tracing-bunyan-formatter",
"tracing-core",
"tracing-log",
"tracing-subscriber 0.2.25",
"tracing-log 0.2.0",
"tracing-subscriber",
]
[[package]]
@ -3607,7 +3600,7 @@ dependencies = [
"serde",
"serde_json",
"tracing",
"tracing-subscriber 0.3.17",
"tracing-subscriber",
]
[[package]]
@ -3662,15 +3655,6 @@ dependencies = [
"tendril",
]
[[package]]
name = "matchers"
version = "0.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1"
dependencies = [
"regex-automata",
]
[[package]]
name = "matchers"
version = "0.1.0"
@ -6653,13 +6637,13 @@ dependencies = [
[[package]]
name = "tracing-appender"
version = "0.1.2"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9965507e507f12c8901432a33e31131222abac31edd90cabbcf85cf544b7127a"
checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e"
dependencies = [
"chrono",
"crossbeam-channel",
"tracing-subscriber 0.2.25",
"time",
"tracing-subscriber",
]
[[package]]
@ -6675,19 +6659,20 @@ dependencies = [
[[package]]
name = "tracing-bunyan-formatter"
version = "0.2.6"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c408910c9b7eabc0215fe2b4a89f8ec95581a91cea1f7619f7c78caf14cbc2a1"
checksum = "b5c266b9ac83dedf0e0385ad78514949e6d89491269e7065bee51d2bb8ec7373"
dependencies = [
"chrono",
"ahash 0.8.3",
"gethostname",
"log",
"serde",
"serde_json",
"time",
"tracing",
"tracing-core",
"tracing-log",
"tracing-subscriber 0.2.25",
"tracing-log 0.1.3",
"tracing-subscriber",
]
[[package]]
@ -6711,6 +6696,17 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-serde"
version = "0.1.3"
@ -6723,14 +6719,13 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.2.25"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e0d2eaa99c3c2e41547cfa109e910a68ea03823cccad4a0525dcbc9b01e8c71"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
"ansi_term",
"chrono",
"lazy_static",
"matchers 0.0.1",
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",
"serde_json",
@ -6739,28 +6734,10 @@ dependencies = [
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
"tracing-log 0.1.3",
"tracing-serde",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
"matchers 0.1.0",
"nu-ansi-term",
"once_cell",
"regex",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]
[[package]]
name = "treediff"
version = "4.0.2"

View File

@ -115,15 +115,6 @@ dependencies = [
"libc",
]
[[package]]
name = "ansi_term"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2"
dependencies = [
"winapi",
]
[[package]]
name = "anyhow"
version = "1.0.75"
@ -461,7 +452,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4114279215a005bc675e386011e594e1d9b800918cea18fcadadcce864a2046b"
dependencies = [
"borsh-derive",
"hashbrown 0.12.3",
"hashbrown 0.13.2",
]
[[package]]
@ -979,7 +970,7 @@ dependencies = [
"tonic",
"tracing",
"tracing-core",
"tracing-subscriber 0.3.17",
"tracing-subscriber",
]
[[package]]
@ -1138,7 +1129,7 @@ dependencies = [
"cssparser-macros",
"dtoa-short",
"itoa",
"phf 0.8.0",
"phf 0.11.2",
"smallvec",
]
@ -1759,6 +1750,7 @@ dependencies = [
"flowy-task",
"flowy-user",
"flowy-user-deps",
"futures",
"futures-core",
"lib-dispatch",
"lib-infra",
@ -1900,7 +1892,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tracing",
"tracing-subscriber 0.3.17",
"tracing-subscriber",
"uuid",
]
@ -2026,6 +2018,7 @@ dependencies = [
"hex",
"hyper",
"lazy_static",
"lib-dispatch",
"lib-infra",
"mime_guess",
"parking_lot",
@ -2040,7 +2033,7 @@ dependencies = [
"tokio-stream",
"tokio-util",
"tracing",
"tracing-subscriber 0.3.17",
"tracing-subscriber",
"url",
"uuid",
"yrs",
@ -2242,9 +2235,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2"
checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb"
dependencies = [
"futures-core",
"futures-sink",
@ -2252,9 +2245,9 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c"
[[package]]
name = "futures-executor"
@ -2280,15 +2273,15 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa"
[[package]]
name = "futures-macro"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb"
dependencies = [
"proc-macro2",
"quote",
@ -2297,15 +2290,15 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e"
checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817"
[[package]]
name = "futures-task"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65"
checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2"
[[package]]
name = "futures-timer"
@ -2315,9 +2308,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]]
name = "futures-util"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104"
dependencies = [
"futures-channel",
"futures-core",
@ -2984,8 +2977,8 @@ dependencies = [
"tracing-appender",
"tracing-bunyan-formatter",
"tracing-core",
"tracing-log",
"tracing-subscriber 0.2.25",
"tracing-log 0.2.0",
"tracing-subscriber",
]
[[package]]
@ -3115,15 +3108,6 @@ dependencies = [
"tendril",
]
[[package]]
name = "matchers"
version = "0.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1"
dependencies = [
"regex-automata 0.1.10",
]
[[package]]
name = "matchers"
version = "0.1.0"
@ -3626,7 +3610,7 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dfb61232e34fcb633f43d12c58f83c1df82962dcdfa565a4e866ffc17dafe12"
dependencies = [
"phf_macros",
"phf_macros 0.8.0",
"phf_shared 0.8.0",
"proc-macro-hack",
]
@ -3646,6 +3630,7 @@ version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc"
dependencies = [
"phf_macros 0.11.2",
"phf_shared 0.11.2",
]
@ -3713,6 +3698,19 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "phf_macros"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3444646e286606587e49f3bcf1679b8cef1dc2c5ecc29ddacaffc305180d464b"
dependencies = [
"phf_generator 0.11.2",
"phf_shared 0.11.2",
"proc-macro2",
"quote",
"syn 2.0.31",
]
[[package]]
name = "phf_shared"
version = "0.8.0"
@ -5604,13 +5602,13 @@ dependencies = [
[[package]]
name = "tracing-appender"
version = "0.1.2"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9965507e507f12c8901432a33e31131222abac31edd90cabbcf85cf544b7127a"
checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e"
dependencies = [
"chrono",
"crossbeam-channel",
"tracing-subscriber 0.2.25",
"time",
"tracing-subscriber",
]
[[package]]
@ -5626,19 +5624,20 @@ dependencies = [
[[package]]
name = "tracing-bunyan-formatter"
version = "0.2.6"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c408910c9b7eabc0215fe2b4a89f8ec95581a91cea1f7619f7c78caf14cbc2a1"
checksum = "b5c266b9ac83dedf0e0385ad78514949e6d89491269e7065bee51d2bb8ec7373"
dependencies = [
"chrono",
"ahash 0.8.3",
"gethostname",
"log",
"serde",
"serde_json",
"time",
"tracing",
"tracing-core",
"tracing-log",
"tracing-subscriber 0.2.25",
"tracing-log 0.1.3",
"tracing-subscriber",
]
[[package]]
@ -5662,6 +5661,17 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-serde"
version = "0.1.3"
@ -5674,14 +5684,13 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.2.25"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e0d2eaa99c3c2e41547cfa109e910a68ea03823cccad4a0525dcbc9b01e8c71"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
"ansi_term",
"chrono",
"lazy_static",
"matchers 0.0.1",
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",
"serde_json",
@ -5690,28 +5699,10 @@ dependencies = [
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
"tracing-log 0.1.3",
"tracing-serde",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
"matchers 0.1.0",
"nu-ansi-term",
"once_cell",
"regex",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]
[[package]]
name = "try-lock"
version = "0.2.4"

View File

@ -41,10 +41,7 @@ pub enum CollabPluginContext {
pub trait CollabStorageProvider: Send + Sync + 'static {
fn storage_source(&self) -> CollabSource;
fn get_plugins(
&self,
context: CollabPluginContext,
) -> Fut<Vec<Arc<dyn collab::core::collab_plugin::CollabPlugin>>>;
fn get_plugins(&self, context: CollabPluginContext) -> Fut<Vec<Arc<dyn CollabPlugin>>>;
fn is_sync_enabled(&self) -> bool;
}

View File

@ -1,9 +1,12 @@
#![allow(clippy::not_unsafe_ptr_arg_deref)]
use std::sync::Arc;
use std::{ffi::CStr, os::raw::c_char};
use lazy_static::lazy_static;
use parking_lot::RwLock;
use log::error;
use parking_lot::Mutex;
use tracing::trace;
use flowy_core::*;
use flowy_notification::{register_notification_sender, unregister_all_notification_sender};
@ -25,9 +28,26 @@ mod protobuf;
mod util;
lazy_static! {
static ref APPFLOWY_CORE: RwLock<Option<AppFlowyCore>> = RwLock::new(None);
static ref APPFLOWY_CORE: MutexAppFlowyCore = MutexAppFlowyCore::new();
}
struct MutexAppFlowyCore(Arc<Mutex<Option<AppFlowyCore>>>);
impl MutexAppFlowyCore {
fn new() -> Self {
Self(Arc::new(Mutex::new(None)))
}
fn dispatcher(&self) -> Option<Arc<AFPluginDispatcher>> {
let binding = self.0.lock();
let core = binding.as_ref();
core.map(|core| core.event_dispatcher.clone())
}
}
unsafe impl Sync for MutexAppFlowyCore {}
unsafe impl Send for MutexAppFlowyCore {}
#[no_mangle]
pub extern "C" fn init_sdk(path: *mut c_char) -> i64 {
let c_str: &CStr = unsafe { CStr::from_ptr(path) };
@ -36,32 +56,33 @@ pub extern "C" fn init_sdk(path: *mut c_char) -> i64 {
let log_crates = vec!["flowy-ffi".to_string()];
let config =
AppFlowyCoreConfig::new(path, DEFAULT_NAME.to_string()).log_filter("info", log_crates);
*APPFLOWY_CORE.write() = Some(AppFlowyCore::new(config));
*APPFLOWY_CORE.0.lock() = Some(AppFlowyCore::new(config));
0
}
#[no_mangle]
#[allow(clippy::let_underscore_future)]
pub extern "C" fn async_event(port: i64, input: *const u8, len: usize) {
let request: AFPluginRequest = FFIRequest::from_u8_pointer(input, len).into();
log::trace!(
trace!(
"[FFI]: {} Async Event: {:?} with {} port",
&request.id,
&request.event,
port
);
let dispatcher = match APPFLOWY_CORE.read().as_ref() {
let dispatcher = match APPFLOWY_CORE.dispatcher() {
None => {
log::error!("sdk not init yet.");
error!("sdk not init yet.");
return;
},
Some(e) => e.event_dispatcher.clone(),
Some(dispatcher) => dispatcher,
};
AFPluginDispatcher::async_send_with_callback(
AFPluginDispatcher::boxed_async_send_with_callback(
dispatcher,
request,
move |resp: AFPluginEventResponse| {
log::trace!("[FFI]: Post data to dart through {} port", port);
trace!("[FFI]: Post data to dart through {} port", port);
Box::pin(post_to_flutter(resp, port))
},
);
@ -70,14 +91,14 @@ pub extern "C" fn async_event(port: i64, input: *const u8, len: usize) {
#[no_mangle]
pub extern "C" fn sync_event(input: *const u8, len: usize) -> *const u8 {
let request: AFPluginRequest = FFIRequest::from_u8_pointer(input, len).into();
log::trace!("[FFI]: {} Sync Event: {:?}", &request.id, &request.event,);
trace!("[FFI]: {} Sync Event: {:?}", &request.id, &request.event,);
let dispatcher = match APPFLOWY_CORE.read().as_ref() {
let dispatcher = match APPFLOWY_CORE.dispatcher() {
None => {
log::error!("sdk not init yet.");
error!("sdk not init yet.");
return forget_rust(Vec::default());
},
Some(e) => e.event_dispatcher.clone(),
Some(dispatcher) => dispatcher,
};
let _response = AFPluginDispatcher::sync_send(dispatcher, request);
@ -110,13 +131,13 @@ async fn post_to_flutter(response: AFPluginEventResponse, port: i64) {
.await
{
Ok(_success) => {
log::trace!("[FFI]: Post data to dart success");
trace!("[FFI]: Post data to dart success");
},
Err(e) => {
if let Some(msg) = e.downcast_ref::<&str>() {
log::error!("[FFI]: {:?}", msg);
error!("[FFI]: {:?}", msg);
} else {
log::error!("[FFI]: allo_isolate post panic");
error!("[FFI]: allo_isolate post panic");
}
},
}

View File

@ -53,4 +53,5 @@ zip = "0.6.6"
[features]
default = ["supabase_cloud_test"]
dart = ["flowy-core/dart"]
supabase_cloud_test = []
supabase_cloud_test = []
single_thread = ["flowy-core/single_thread"]

View File

@ -48,13 +48,6 @@ impl EventBuilder {
self
}
pub fn sync_send(mut self) -> Self {
let request = self.get_request();
let resp = AFPluginDispatcher::sync_send(self.dispatch(), request);
self.context.response = Some(resp);
self
}
pub async fn async_send(mut self) -> Self {
let request = self.get_request();
let resp = AFPluginDispatcher::async_send(self.dispatch(), request).await;

View File

@ -27,30 +27,23 @@ pub struct EventIntegrationTest {
pub notification_sender: TestNotificationSender,
}
impl Default for EventIntegrationTest {
fn default() -> Self {
impl EventIntegrationTest {
pub async fn new() -> Self {
let temp_dir = temp_dir().join(nanoid!(6));
std::fs::create_dir_all(&temp_dir).unwrap();
Self::new_with_user_data_path(temp_dir, nanoid!(6))
Self::new_with_user_data_path(temp_dir, nanoid!(6)).await
}
}
impl EventIntegrationTest {
pub fn new() -> Self {
Self::default()
}
pub fn new_with_user_data_path(path: PathBuf, name: String) -> Self {
pub async fn new_with_user_data_path(path: PathBuf, name: String) -> Self {
let config = AppFlowyCoreConfig::new(path.to_str().unwrap(), name).log_filter(
"trace",
vec![
"flowy_test".to_string(),
// "lib_dispatch".to_string()
"tokio".to_string(),
"lib_dispatch".to_string(),
],
);
let inner = std::thread::spawn(|| AppFlowyCore::new(config))
.join()
.unwrap();
let inner = init_core(config).await;
let notification_sender = TestNotificationSender::new();
let auth_type = Arc::new(RwLock::new(AuthTypePB::Local));
register_notification_sender(notification_sender.clone());
@ -64,6 +57,21 @@ impl EventIntegrationTest {
}
}
#[cfg(feature = "single_thread")]
async fn init_core(config: AppFlowyCoreConfig) -> AppFlowyCore {
// let runtime = tokio::runtime::Runtime::new().unwrap();
// let local_set = tokio::task::LocalSet::new();
// runtime.block_on(AppFlowyCore::new(config))
AppFlowyCore::new(config).await
}
#[cfg(not(feature = "single_thread"))]
async fn init_core(config: AppFlowyCoreConfig) -> AppFlowyCore {
std::thread::spawn(|| AppFlowyCore::new(config))
.join()
.unwrap()
}
impl std::ops::Deref for EventIntegrationTest {
type Target = AppFlowyCore;

View File

@ -6,6 +6,7 @@ use bytes::Bytes;
use nanoid::nanoid;
use protobuf::ProtobufError;
use tokio::sync::broadcast::{channel, Sender};
use tracing::error;
use uuid::Uuid;
use flowy_notification::entities::SubscribeObject;
@ -17,7 +18,7 @@ use flowy_user::entities::{
};
use flowy_user::errors::{FlowyError, FlowyResult};
use flowy_user::event_map::UserEvent::*;
use lib_dispatch::prelude::{AFPluginDispatcher, AFPluginRequest, ToBytes};
use lib_dispatch::prelude::{af_spawn, AFPluginDispatcher, AFPluginRequest, ToBytes};
use crate::event_builder::EventBuilder;
use crate::EventIntegrationTest;
@ -44,7 +45,7 @@ impl EventIntegrationTest {
}
pub async fn new_with_guest_user() -> Self {
let test = Self::default();
let test = Self::new().await;
test.sign_up_as_guest().await;
test
}
@ -213,7 +214,7 @@ impl TestNotificationSender {
let (tx, rx) = tokio::sync::mpsc::channel::<T>(10);
let mut receiver = self.sender.subscribe();
let ty = ty.into();
tokio::spawn(async move {
af_spawn(async move {
// DatabaseNotification::DidUpdateDatabaseSnapshotState
while let Ok(value) = receiver.recv().await {
if value.id == id && value.ty == ty {
@ -245,7 +246,7 @@ impl TestNotificationSender {
let id = id.to_string();
let (tx, rx) = tokio::sync::mpsc::channel::<T>(10);
let mut receiver = self.sender.subscribe();
tokio::spawn(async move {
af_spawn(async move {
while let Ok(value) = receiver.recv().await {
if value.id == id {
if let Some(payload) = value.payload {
@ -263,7 +264,9 @@ impl TestNotificationSender {
}
impl NotificationSender for TestNotificationSender {
fn send_subject(&self, subject: SubscribeObject) -> Result<(), String> {
let _ = self.sender.send(subject);
if let Err(err) = self.sender.send(subject) {
error!("Failed to send notification: {:?}", err);
}
Ok(())
}
}

View File

@ -22,13 +22,13 @@ pub struct FlowySupabaseDatabaseTest {
impl FlowySupabaseDatabaseTest {
#[allow(dead_code)]
pub async fn new_with_user(uuid: String) -> Option<Self> {
let inner = FlowySupabaseTest::new()?;
let inner = FlowySupabaseTest::new().await?;
inner.supabase_sign_up_with_uuid(&uuid, None).await.unwrap();
Some(Self { uuid, inner })
}
pub async fn new_with_new_user() -> Option<Self> {
let inner = FlowySupabaseTest::new()?;
let inner = FlowySupabaseTest::new().await?;
let uuid = uuid::Uuid::new_v4().to_string();
let _ = inner.supabase_sign_up_with_uuid(&uuid, None).await.unwrap();
Some(Self { uuid, inner })

View File

@ -14,11 +14,11 @@ use crate::util::receive_with_timeout;
async fn supabase_initial_database_snapshot_test() {
if let Some(test) = FlowySupabaseDatabaseTest::new_with_new_user().await {
let (view, database) = test.create_database().await;
let mut rx = test
let rx = test
.notification_sender
.subscribe::<DatabaseSnapshotStatePB>(&database.id, DidUpdateDatabaseSnapshotState);
receive_with_timeout(&mut rx, Duration::from_secs(30))
receive_with_timeout(rx, Duration::from_secs(30))
.await
.unwrap();
@ -51,10 +51,10 @@ async fn supabase_edit_database_test() {
.await;
// wait all updates are send to the remote
let mut rx = test
let rx = test
.notification_sender
.subscribe_with_condition::<DatabaseSyncStatePB, _>(&database.id, |pb| pb.is_finish);
receive_with_timeout(&mut rx, Duration::from_secs(30))
receive_with_timeout(rx, Duration::from_secs(30))
.await
.unwrap();

View File

@ -12,17 +12,17 @@ async fn af_cloud_edit_document_test() {
let document_id = test.create_document().await;
let cloned_test = test.clone();
let cloned_document_id = document_id.clone();
tokio::spawn(async move {
test.inner.dispatcher().spawn(async move {
cloned_test
.insert_document_text(&cloned_document_id, "hello world", 0)
.await;
});
// wait all update are send to the remote
let mut rx = test
let rx = test
.notification_sender
.subscribe_with_condition::<DocumentSyncStatePB, _>(&document_id, |pb| pb.is_finish);
receive_with_timeout(&mut rx, Duration::from_secs(15))
receive_with_timeout(rx, Duration::from_secs(15))
.await
.unwrap();

View File

@ -8,7 +8,7 @@ pub struct AFCloudDocumentTest {
impl AFCloudDocumentTest {
pub async fn new() -> Option<Self> {
let inner = AFCloudTest::new()?;
let inner = AFCloudTest::new().await?;
let email = generate_test_email();
let _ = inner.af_cloud_sign_in_with_email(&email).await.unwrap();
Some(Self { inner })

View File

@ -14,17 +14,17 @@ async fn supabase_document_edit_sync_test() {
let cloned_test = test.clone();
let cloned_document_id = document_id.clone();
tokio::spawn(async move {
test.inner.dispatcher().spawn(async move {
cloned_test
.insert_document_text(&cloned_document_id, "hello world", 0)
.await;
});
// wait all update are send to the remote
let mut rx = test
let rx = test
.notification_sender
.subscribe_with_condition::<DocumentSyncStatePB, _>(&document_id, |pb| pb.is_finish);
receive_with_timeout(&mut rx, Duration::from_secs(30))
receive_with_timeout(rx, Duration::from_secs(30))
.await
.unwrap();
@ -47,10 +47,10 @@ async fn supabase_document_edit_sync_test2() {
}
// wait all update are send to the remote
let mut rx = test
let rx = test
.notification_sender
.subscribe_with_condition::<DocumentSyncStatePB, _>(&document_id, |pb| pb.is_finish);
receive_with_timeout(&mut rx, Duration::from_secs(30))
receive_with_timeout(rx, Duration::from_secs(30))
.await
.unwrap();

View File

@ -13,7 +13,7 @@ pub struct FlowySupabaseDocumentTest {
impl FlowySupabaseDocumentTest {
pub async fn new() -> Option<Self> {
let inner = FlowySupabaseTest::new()?;
let inner = FlowySupabaseTest::new().await?;
let uuid = uuid::Uuid::new_v4().to_string();
let _ = inner.supabase_sign_up_with_uuid(&uuid, None).await;
Some(Self { inner })

View File

@ -75,7 +75,7 @@ pub struct FolderTest {
impl FolderTest {
pub async fn new() -> Self {
let sdk = EventIntegrationTest::new();
let sdk = EventIntegrationTest::new().await;
let _ = sdk.init_anon_user().await;
let workspace = create_workspace(&sdk, "FolderWorkspace", "Folder test workspace").await;
let parent_view = create_app(&sdk, &workspace.id, "Folder App", "Folder test app").await;

View File

@ -18,19 +18,19 @@ use crate::util::receive_with_timeout;
async fn create_child_view_in_workspace_subscription_test() {
let test = EventIntegrationTest::new_with_guest_user().await;
let workspace = test.get_current_workspace().await.workspace;
let mut rx = test
let rx = test
.notification_sender
.subscribe::<RepeatedViewPB>(&workspace.id, FolderNotification::DidUpdateWorkspaceViews);
let cloned_test = test.clone();
let cloned_workspace_id = workspace.id.clone();
tokio::spawn(async move {
test.inner.dispatcher().spawn(async move {
cloned_test
.create_view(&cloned_workspace_id, "workspace child view".to_string())
.await;
});
let views = receive_with_timeout(&mut rx, Duration::from_secs(30))
let views = receive_with_timeout(rx, Duration::from_secs(30))
.await
.unwrap()
.items;
@ -43,14 +43,14 @@ async fn create_child_view_in_view_subscription_test() {
let test = EventIntegrationTest::new_with_guest_user().await;
let mut workspace = test.get_current_workspace().await.workspace;
let workspace_child_view = workspace.views.pop().unwrap();
let mut rx = test.notification_sender.subscribe::<ChildViewUpdatePB>(
let rx = test.notification_sender.subscribe::<ChildViewUpdatePB>(
&workspace_child_view.id,
FolderNotification::DidUpdateChildViews,
);
let cloned_test = test.clone();
let child_view_id = workspace_child_view.id.clone();
tokio::spawn(async move {
test.inner.dispatcher().spawn(async move {
cloned_test
.create_view(
&child_view_id,
@ -59,7 +59,7 @@ async fn create_child_view_in_view_subscription_test() {
.await;
});
let update = receive_with_timeout(&mut rx, Duration::from_secs(30))
let update = receive_with_timeout(rx, Duration::from_secs(30))
.await
.unwrap();
@ -74,20 +74,29 @@ async fn create_child_view_in_view_subscription_test() {
async fn delete_view_subscription_test() {
let test = EventIntegrationTest::new_with_guest_user().await;
let workspace = test.get_current_workspace().await.workspace;
let mut rx = test
let rx = test
.notification_sender
.subscribe::<ChildViewUpdatePB>(&workspace.id, FolderNotification::DidUpdateChildViews);
let cloned_test = test.clone();
let delete_view_id = workspace.views.first().unwrap().id.clone();
let cloned_delete_view_id = delete_view_id.clone();
tokio::spawn(async move {
cloned_test.delete_view(&cloned_delete_view_id).await;
});
let update = receive_with_timeout(&mut rx, Duration::from_secs(30))
test
.inner
.dispatcher()
.spawn(async move {
cloned_test.delete_view(&cloned_delete_view_id).await;
})
.await
.unwrap();
let update = test
.inner
.dispatcher()
.run_until(receive_with_timeout(rx, Duration::from_secs(30)))
.await
.unwrap();
assert_eq!(update.delete_child_views.len(), 1);
assert_eq!(update.delete_child_views[0], delete_view_id);
}
@ -96,7 +105,7 @@ async fn delete_view_subscription_test() {
async fn update_view_subscription_test() {
let test = EventIntegrationTest::new_with_guest_user().await;
let mut workspace = test.get_current_workspace().await.workspace;
let mut rx = test
let rx = test
.notification_sender
.subscribe::<ChildViewUpdatePB>(&workspace.id, FolderNotification::DidUpdateChildViews);
@ -105,7 +114,7 @@ async fn update_view_subscription_test() {
assert!(!view.is_favorite);
let update_view_id = view.id.clone();
tokio::spawn(async move {
test.inner.dispatcher().spawn(async move {
cloned_test
.update_view(UpdateViewPayloadPB {
view_id: update_view_id,
@ -116,7 +125,7 @@ async fn update_view_subscription_test() {
.await;
});
let update = receive_with_timeout(&mut rx, Duration::from_secs(30))
let update = receive_with_timeout(rx, Duration::from_secs(30))
.await
.unwrap();
assert_eq!(update.update_child_views.len(), 1);

View File

@ -466,7 +466,7 @@ async fn move_view_event_after_delete_view_test2() {
#[tokio::test]
async fn create_parent_view_with_invalid_name() {
for (name, code) in invalid_workspace_name_test_case() {
let sdk = EventIntegrationTest::new();
let sdk = EventIntegrationTest::new().await;
let request = CreateWorkspacePayloadPB {
name,
desc: "".to_owned(),

View File

@ -19,7 +19,7 @@ pub struct FlowySupabaseFolderTest {
impl FlowySupabaseFolderTest {
pub async fn new() -> Option<Self> {
let inner = FlowySupabaseTest::new()?;
let inner = FlowySupabaseTest::new().await?;
let uuid = uuid::Uuid::new_v4().to_string();
let _ = inner.supabase_sign_up_with_uuid(&uuid, None).await;
Some(Self { inner })

View File

@ -34,11 +34,11 @@ async fn supabase_decrypt_folder_data_test() {
.create_view(&workspace_id, "encrypt view".to_string())
.await;
let mut rx = test
let rx = test
.notification_sender
.subscribe_with_condition::<FolderSyncStatePB, _>(&workspace_id, |pb| pb.is_finish);
receive_with_timeout(&mut rx, Duration::from_secs(10))
receive_with_timeout(rx, Duration::from_secs(10))
.await
.unwrap();
let folder_data = get_folder_data_from_server(&workspace_id, secret)
@ -59,10 +59,10 @@ async fn supabase_decrypt_with_invalid_secret_folder_data_test() {
test
.create_view(&workspace_id, "encrypt view".to_string())
.await;
let mut rx = test
let rx = test
.notification_sender
.subscribe_with_condition::<FolderSyncStatePB, _>(&workspace_id, |pb| pb.is_finish);
receive_with_timeout(&mut rx, Duration::from_secs(10))
receive_with_timeout(rx, Duration::from_secs(10))
.await
.unwrap();
@ -75,10 +75,10 @@ async fn supabase_decrypt_with_invalid_secret_folder_data_test() {
async fn supabase_folder_snapshot_test() {
if let Some(test) = FlowySupabaseFolderTest::new().await {
let workspace_id = test.get_current_workspace().await.workspace.id;
let mut rx = test
let rx = test
.notification_sender
.subscribe::<FolderSnapshotStatePB>(&workspace_id, DidUpdateFolderSnapshotState);
receive_with_timeout(&mut rx, Duration::from_secs(10))
receive_with_timeout(rx, Duration::from_secs(10))
.await
.unwrap();
@ -104,11 +104,11 @@ async fn supabase_initial_folder_snapshot_test2() {
.create_view(&workspace_id, "supabase test view3".to_string())
.await;
let mut rx = test
let rx = test
.notification_sender
.subscribe_with_condition::<FolderSyncStatePB, _>(&workspace_id, |pb| pb.is_finish);
receive_with_timeout(&mut rx, Duration::from_secs(10))
receive_with_timeout(rx, Duration::from_secs(10))
.await
.unwrap();

View File

@ -6,7 +6,7 @@ use crate::util::{generate_test_email, get_af_cloud_config};
#[tokio::test]
async fn af_cloud_sign_up_test() {
if get_af_cloud_config().is_some() {
let test = EventIntegrationTest::new();
let test = EventIntegrationTest::new().await;
let email = generate_test_email();
let user = test.af_cloud_sign_in_with_email(&email).await.unwrap();
assert_eq!(user.email, email);
@ -16,7 +16,7 @@ async fn af_cloud_sign_up_test() {
#[tokio::test]
async fn af_cloud_update_user_metadata() {
if get_af_cloud_config().is_some() {
let test = EventIntegrationTest::new();
let test = EventIntegrationTest::new().await;
let user = test.af_cloud_sign_up().await;
let old_profile = test.get_user_profile().await.unwrap();

View File

@ -5,10 +5,10 @@ use crate::util::get_af_cloud_config;
#[tokio::test]
async fn af_cloud_add_workspace_member_test() {
if get_af_cloud_config().is_some() {
let test_1 = EventIntegrationTest::new();
let test_1 = EventIntegrationTest::new().await;
let user_1 = test_1.af_cloud_sign_up().await;
let test_2 = EventIntegrationTest::new();
let test_2 = EventIntegrationTest::new().await;
let user_2 = test_2.af_cloud_sign_up().await;
let members = test_1.get_workspace_members(&user_1.workspace_id).await;
@ -29,10 +29,10 @@ async fn af_cloud_add_workspace_member_test() {
#[tokio::test]
async fn af_cloud_delete_workspace_member_test() {
if get_af_cloud_config().is_some() {
let test_1 = EventIntegrationTest::new();
let test_1 = EventIntegrationTest::new().await;
let user_1 = test_1.af_cloud_sign_up().await;
let test_2 = EventIntegrationTest::new();
let test_2 = EventIntegrationTest::new().await;
let user_2 = test_2.af_cloud_sign_up().await;
test_1

View File

@ -9,7 +9,7 @@ use crate::user::local_test::helper::*;
#[tokio::test]
async fn sign_up_with_invalid_email() {
for email in invalid_email_test_case() {
let sdk = EventIntegrationTest::new();
let sdk = EventIntegrationTest::new().await;
let request = SignUpPayloadPB {
email: email.to_string(),
name: valid_name(),
@ -33,7 +33,7 @@ async fn sign_up_with_invalid_email() {
}
#[tokio::test]
async fn sign_up_with_long_password() {
let sdk = EventIntegrationTest::new();
let sdk = EventIntegrationTest::new().await;
let request = SignUpPayloadPB {
email: unique_email(),
name: valid_name(),
@ -58,7 +58,7 @@ async fn sign_up_with_long_password() {
#[tokio::test]
async fn sign_in_with_invalid_email() {
for email in invalid_email_test_case() {
let sdk = EventIntegrationTest::new();
let sdk = EventIntegrationTest::new().await;
let request = SignInPayloadPB {
email: email.to_string(),
password: login_password(),
@ -84,7 +84,7 @@ async fn sign_in_with_invalid_email() {
#[tokio::test]
async fn sign_in_with_invalid_password() {
for password in invalid_password_test_case() {
let sdk = EventIntegrationTest::new();
let sdk = EventIntegrationTest::new().await;
let request = SignInPayloadPB {
email: unique_email(),

View File

@ -6,8 +6,8 @@ use flowy_user::entities::{ReminderPB, RepeatedReminderPB};
use flowy_user::event_map::UserEvent::*;
#[tokio::test]
async fn user_update_with_name() {
let sdk = EventIntegrationTest::new();
async fn user_update_with_reminder() {
let sdk = EventIntegrationTest::new().await;
let _ = sdk.sign_up_as_guest().await;
let mut meta = HashMap::new();
meta.insert("object_id".to_string(), "".to_string());

View File

@ -10,7 +10,7 @@ use crate::user::local_test::helper::*;
#[tokio::test]
async fn user_profile_get_failed() {
let sdk = EventIntegrationTest::new();
let sdk = EventIntegrationTest::new().await;
let result = EventBuilder::new(sdk)
.event(GetUserProfile)
.async_send()
@ -21,11 +21,12 @@ async fn user_profile_get_failed() {
#[tokio::test]
async fn anon_user_profile_get() {
let test = EventIntegrationTest::new();
let test = EventIntegrationTest::new().await;
let user_profile = test.init_anon_user().await;
let user = EventBuilder::new(test.clone())
.event(GetUserProfile)
.sync_send()
.async_send()
.await
.parse::<UserProfilePB>();
assert_eq!(user_profile.id, user.id);
assert_eq!(user_profile.openai_key, user.openai_key);
@ -36,18 +37,20 @@ async fn anon_user_profile_get() {
#[tokio::test]
async fn user_update_with_name() {
let sdk = EventIntegrationTest::new();
let sdk = EventIntegrationTest::new().await;
let user = sdk.init_anon_user().await;
let new_name = "hello_world".to_owned();
let request = UpdateUserProfilePayloadPB::new(user.id).name(&new_name);
let _ = EventBuilder::new(sdk.clone())
.event(UpdateUserProfile)
.payload(request)
.sync_send();
.async_send()
.await;
let user_profile = EventBuilder::new(sdk.clone())
.event(GetUserProfile)
.sync_send()
.async_send()
.await
.parse::<UserProfilePB>();
assert_eq!(user_profile.name, new_name,);
@ -55,7 +58,7 @@ async fn user_update_with_name() {
#[tokio::test]
async fn user_update_with_ai_key() {
let sdk = EventIntegrationTest::new();
let sdk = EventIntegrationTest::new().await;
let user = sdk.init_anon_user().await;
let openai_key = "openai_key".to_owned();
let stability_ai_key = "stability_ai_key".to_owned();
@ -65,11 +68,13 @@ async fn user_update_with_ai_key() {
let _ = EventBuilder::new(sdk.clone())
.event(UpdateUserProfile)
.payload(request)
.sync_send();
.async_send()
.await;
let user_profile = EventBuilder::new(sdk.clone())
.event(GetUserProfile)
.sync_send()
.async_send()
.await
.parse::<UserProfilePB>();
assert_eq!(user_profile.openai_key, openai_key,);
@ -78,17 +83,19 @@ async fn user_update_with_ai_key() {
#[tokio::test]
async fn anon_user_update_with_email() {
let sdk = EventIntegrationTest::new();
let sdk = EventIntegrationTest::new().await;
let user = sdk.init_anon_user().await;
let new_email = format!("{}@gmail.com", nanoid!(6));
let request = UpdateUserProfilePayloadPB::new(user.id).email(&new_email);
let _ = EventBuilder::new(sdk.clone())
.event(UpdateUserProfile)
.payload(request)
.sync_send();
.async_send()
.await;
let user_profile = EventBuilder::new(sdk.clone())
.event(GetUserProfile)
.sync_send()
.async_send()
.await
.parse::<UserProfilePB>();
// When the user is anonymous, the email is empty no matter what you set
@ -97,7 +104,7 @@ async fn anon_user_update_with_email() {
#[tokio::test]
async fn user_update_with_invalid_email() {
let test = EventIntegrationTest::new();
let test = EventIntegrationTest::new().await;
let user = test.init_anon_user().await;
for email in invalid_email_test_case() {
let request = UpdateUserProfilePayloadPB::new(user.id).email(&email);
@ -105,7 +112,8 @@ async fn user_update_with_invalid_email() {
EventBuilder::new(test.clone())
.event(UpdateUserProfile)
.payload(request)
.sync_send()
.async_send()
.await
.error()
.unwrap()
.code,
@ -116,7 +124,7 @@ async fn user_update_with_invalid_email() {
#[tokio::test]
async fn user_update_with_invalid_password() {
let test = EventIntegrationTest::new();
let test = EventIntegrationTest::new().await;
let user = test.init_anon_user().await;
for password in invalid_password_test_case() {
let request = UpdateUserProfilePayloadPB::new(user.id).password(&password);
@ -133,13 +141,14 @@ async fn user_update_with_invalid_password() {
#[tokio::test]
async fn user_update_with_invalid_name() {
let test = EventIntegrationTest::new();
let test = EventIntegrationTest::new().await;
let user = test.init_anon_user().await;
let request = UpdateUserProfilePayloadPB::new(user.id).name("");
assert!(EventBuilder::new(test.clone())
.event(UpdateUserProfile)
.payload(request)
.sync_send()
.async_send()
.await
.error()
.is_some())
}

View File

@ -11,7 +11,8 @@ async fn migrate_historical_empty_document_test() {
"historical_empty_document",
)
.unwrap();
let test = EventIntegrationTest::new_with_user_data_path(user_db_path, DEFAULT_NAME.to_string());
let test =
EventIntegrationTest::new_with_user_data_path(user_db_path, DEFAULT_NAME.to_string()).await;
let views = test.get_all_workspace_views().await;
assert_eq!(views.len(), 3);

View File

@ -11,7 +11,8 @@ async fn migrate_020_historical_empty_document_test() {
"020_historical_user_data",
)
.unwrap();
let test = EventIntegrationTest::new_with_user_data_path(user_db_path, DEFAULT_NAME.to_string());
let test =
EventIntegrationTest::new_with_user_data_path(user_db_path, DEFAULT_NAME.to_string()).await;
let mut views = test.get_all_workspace_views().await;
assert_eq!(views.len(), 1);

View File

@ -13,7 +13,7 @@ use event_integration::event_builder::EventBuilder;
use event_integration::EventIntegrationTest;
use flowy_core::DEFAULT_NAME;
use flowy_encrypt::decrypt_text;
use flowy_server::supabase::define::{USER_EMAIL, USER_UUID};
use flowy_server::supabase::define::{USER_DEVICE_ID, USER_EMAIL, USER_UUID};
use flowy_user::entities::{AuthTypePB, OauthSignInPB, UpdateUserProfilePayloadPB, UserProfilePB};
use flowy_user::errors::ErrorCode;
use flowy_user::event_map::UserEvent::*;
@ -23,13 +23,14 @@ use crate::util::*;
#[tokio::test]
async fn third_party_sign_up_test() {
if get_supabase_config().is_some() {
let test = EventIntegrationTest::new();
let test = EventIntegrationTest::new().await;
let mut map = HashMap::new();
map.insert(USER_UUID.to_string(), uuid::Uuid::new_v4().to_string());
map.insert(
USER_EMAIL.to_string(),
format!("{}@appflowy.io", nanoid!(6)),
);
map.insert(USER_DEVICE_ID.to_string(), uuid::Uuid::new_v4().to_string());
let payload = OauthSignInPB {
map,
auth_type: AuthTypePB::Supabase,
@ -48,7 +49,7 @@ async fn third_party_sign_up_test() {
#[tokio::test]
async fn third_party_sign_up_with_encrypt_test() {
if get_supabase_config().is_some() {
let test = EventIntegrationTest::new();
let test = EventIntegrationTest::new().await;
test.supabase_party_sign_up().await;
let user_profile = test.get_user_profile().await.unwrap();
assert!(user_profile.encryption_sign.is_empty());
@ -65,11 +66,12 @@ async fn third_party_sign_up_with_encrypt_test() {
#[tokio::test]
async fn third_party_sign_up_with_duplicated_uuid() {
if get_supabase_config().is_some() {
let test = EventIntegrationTest::new();
let test = EventIntegrationTest::new().await;
let email = format!("{}@appflowy.io", nanoid!(6));
let mut map = HashMap::new();
map.insert(USER_UUID.to_string(), uuid::Uuid::new_v4().to_string());
map.insert(USER_EMAIL.to_string(), email.clone());
map.insert(USER_DEVICE_ID.to_string(), uuid::Uuid::new_v4().to_string());
let response_1 = EventBuilder::new(test.clone())
.event(OauthSignIn)
@ -98,7 +100,7 @@ async fn third_party_sign_up_with_duplicated_uuid() {
#[tokio::test]
async fn third_party_sign_up_with_duplicated_email() {
if get_supabase_config().is_some() {
let test = EventIntegrationTest::new();
let test = EventIntegrationTest::new().await;
let email = format!("{}@appflowy.io", nanoid!(6));
test
.supabase_sign_up_with_uuid(&uuid::Uuid::new_v4().to_string(), Some(email.clone()))
@ -138,7 +140,6 @@ async fn sign_up_as_guest_and_then_update_to_new_cloud_user_test() {
assert_eq!(old_workspace.views.len(), new_workspace.views.len());
for (index, view) in old_views.iter().enumerate() {
assert_eq!(view.name, new_views[index].name);
assert_eq!(view.id, new_views[index].id);
assert_eq!(view.layout, new_views[index].layout);
assert_eq!(view.create_time, new_views[index].create_time);
}
@ -196,7 +197,7 @@ async fn sign_up_as_guest_and_then_update_to_existing_cloud_user_test() {
#[tokio::test]
async fn get_user_profile_test() {
if let Some(test) = FlowySupabaseTest::new() {
if let Some(test) = FlowySupabaseTest::new().await {
let uuid = uuid::Uuid::new_v4().to_string();
test.supabase_sign_up_with_uuid(&uuid, None).await.unwrap();
@ -207,7 +208,7 @@ async fn get_user_profile_test() {
#[tokio::test]
async fn update_user_profile_test() {
if let Some(test) = FlowySupabaseTest::new() {
if let Some(test) = FlowySupabaseTest::new().await {
let uuid = uuid::Uuid::new_v4().to_string();
let profile = test.supabase_sign_up_with_uuid(&uuid, None).await.unwrap();
test
@ -221,7 +222,7 @@ async fn update_user_profile_test() {
#[tokio::test]
async fn update_user_profile_with_existing_email_test() {
if let Some(test) = FlowySupabaseTest::new() {
if let Some(test) = FlowySupabaseTest::new().await {
let email = format!("{}@appflowy.io", nanoid!(6));
let _ = test
.supabase_sign_up_with_uuid(&uuid::Uuid::new_v4().to_string(), Some(email.clone()))
@ -249,7 +250,7 @@ async fn update_user_profile_with_existing_email_test() {
#[tokio::test]
async fn migrate_anon_document_on_cloud_signup() {
if get_supabase_config().is_some() {
let test = EventIntegrationTest::new();
let test = EventIntegrationTest::new().await;
let user_profile = test.sign_up_as_guest().await.user_profile;
let view = test
@ -295,7 +296,7 @@ async fn migrate_anon_data_on_cloud_signup() {
)
.unwrap();
let test =
EventIntegrationTest::new_with_user_data_path(user_db_path, DEFAULT_NAME.to_string());
EventIntegrationTest::new_with_user_data_path(user_db_path, DEFAULT_NAME.to_string()).await;
let user_profile = test.supabase_party_sign_up().await;
// Get the folder data from remote

View File

@ -12,7 +12,7 @@ use crate::util::*;
#[tokio::test]
async fn initial_workspace_test() {
if get_supabase_config().is_some() {
let test = EventIntegrationTest::new();
let test = EventIntegrationTest::new().await;
let mut map = HashMap::new();
map.insert(USER_UUID.to_string(), uuid::Uuid::new_v4().to_string());
map.insert(

View File

@ -40,9 +40,9 @@ pub struct FlowySupabaseTest {
}
impl FlowySupabaseTest {
pub fn new() -> Option<Self> {
pub async fn new() -> Option<Self> {
let _ = get_supabase_config()?;
let test = EventIntegrationTest::new();
let test = EventIntegrationTest::new().await;
test.set_auth_type(AuthTypePB::Supabase);
test.server_provider.set_auth_type(AuthType::Supabase);
@ -71,12 +71,10 @@ impl Deref for FlowySupabaseTest {
}
pub async fn receive_with_timeout<T>(
receiver: &mut Receiver<T>,
mut receiver: Receiver<T>,
duration: Duration,
) -> Result<T, Box<dyn std::error::Error>> {
let res = timeout(duration, receiver.recv())
.await?
.ok_or(anyhow::anyhow!("recv timeout"))?;
) -> Result<T, Box<dyn std::error::Error + Send>> {
let res = timeout(duration, receiver.recv()).await.unwrap().unwrap();
Ok(res)
}
@ -206,9 +204,9 @@ pub struct AFCloudTest {
}
impl AFCloudTest {
pub fn new() -> Option<Self> {
pub async fn new() -> Option<Self> {
let _ = get_af_cloud_config()?;
let test = EventIntegrationTest::new();
let test = EventIntegrationTest::new().await;
test.set_auth_type(AuthTypePB::AFCloud);
test.server_provider.set_auth_type(AuthType::AFCloud);

View File

@ -46,6 +46,7 @@ lib-infra = { path = "../../../shared-lib/lib-infra" }
serde = "1.0"
serde_json = "1.0"
serde_repr = "0.1"
futures = "0.3.28"
[features]
default = ["rev-sqlite"]
@ -71,4 +72,4 @@ ts = [
]
rev-sqlite = ["flowy-user/rev-sqlite"]
openssl_vendored = ["flowy-sqlite/openssl_vendored"]
single_thread = ["lib-dispatch/single_thread"]

View File

@ -34,6 +34,7 @@ pub(crate) fn create_log_filter(level: String, with_crates: Vec<String>) -> Stri
filters.push(format!("flowy_notification={}", "info"));
filters.push(format!("lib_infra={}", level));
filters.push(format!("flowy_task={}", level));
// filters.push(format!("lib_dispatch={}", level));
filters.push(format!("dart_ffi={}", "info"));
filters.push(format!("flowy_sqlite={}", "info"));

View File

@ -5,7 +5,7 @@ use std::time::Duration;
use std::{fmt, sync::Arc};
use tokio::sync::RwLock;
use tracing::error;
use tracing::{error, event, instrument};
use collab_integrate::collab_builder::{AppFlowyCollabBuilder, CollabSource};
use flowy_database2::DatabaseManager;
@ -17,7 +17,7 @@ use flowy_task::{TaskDispatcher, TaskRunner};
use flowy_user::event_map::UserCloudServiceProvider;
use flowy_user::manager::{UserManager, UserSessionConfig};
use lib_dispatch::prelude::*;
use lib_dispatch::runtime::tokio_default_runtime;
use lib_dispatch::runtime::AFPluginRuntime;
use module::make_plugins;
pub use module::*;
@ -82,7 +82,21 @@ pub struct AppFlowyCore {
}
impl AppFlowyCore {
#[cfg(feature = "single_thread")]
pub async fn new(config: AppFlowyCoreConfig) -> Self {
let runtime = Arc::new(AFPluginRuntime::new().unwrap());
Self::init(config, runtime).await
}
#[cfg(not(feature = "single_thread"))]
pub fn new(config: AppFlowyCoreConfig) -> Self {
let runtime = Arc::new(AFPluginRuntime::new().unwrap());
let cloned_runtime = runtime.clone();
runtime.block_on(Self::init(config, cloned_runtime))
}
#[instrument(skip(config, runtime))]
async fn init(config: AppFlowyCoreConfig, runtime: Arc<AFPluginRuntime>) -> Self {
/// The profiling can be used to tracing the performance of the application.
/// Check out the [Link](https://appflowy.gitbook.io/docs/essential-documentation/contribute-to-appflowy/architecture/backend/profiling)
/// for more information.
@ -95,8 +109,8 @@ impl AppFlowyCore {
// Init the key value database
let store_preference = Arc::new(StorePreferences::new(&config.storage_path).unwrap());
tracing::info!("🔥 {:?}", &config);
let runtime = tokio_default_runtime().unwrap();
tracing::info!("🔥db {:?}", &config);
tracing::debug!("🔥{}", runtime);
let task_scheduler = TaskDispatcher::new(Duration::from_secs(2));
let task_dispatcher = Arc::new(RwLock::new(task_scheduler));
runtime.spawn(TaskRunner::run(task_dispatcher.clone()));
@ -108,6 +122,7 @@ impl AppFlowyCore {
Arc::downgrade(&store_preference),
));
event!(tracing::Level::DEBUG, "Init managers",);
let (
user_manager,
folder_manager,
@ -115,7 +130,7 @@ impl AppFlowyCore {
database_manager,
document_manager,
collab_builder,
) = runtime.block_on(async {
) = async {
/// The shared collab builder is used to build the [Collab] instance. The plugins will be loaded
/// on demand based on the [CollabPluginConfig].
let collab_builder = Arc::new(AppFlowyCollabBuilder::new(server_provider.clone()));
@ -162,7 +177,8 @@ impl AppFlowyCore {
document_manager,
collab_builder,
)
});
}
.await;
let user_status_callback = UserStatusCallbackImpl {
collab_builder,
@ -179,17 +195,15 @@ impl AppFlowyCore {
};
let cloned_user_session = Arc::downgrade(&user_manager);
runtime.block_on(async move {
if let Some(user_manager) = cloned_user_session.upgrade() {
if let Err(err) = user_manager
.init(user_status_callback, collab_interact_impl)
.await
{
error!("Init user failed: {}", err)
}
if let Some(user_session) = cloned_user_session.upgrade() {
event!(tracing::Level::DEBUG, "init user session",);
if let Err(err) = user_session
.init(user_status_callback, collab_interact_impl)
.await
{
error!("Init user failed: {}", err)
}
});
}
let event_dispatcher = Arc::new(AFPluginDispatcher::construct(runtime, || {
make_plugins(
Arc::downgrade(&folder_manager),

View File

@ -5,7 +5,7 @@ use collab_database::rows::RowId;
use tokio::sync::oneshot;
use flowy_error::{FlowyError, FlowyResult};
use lib_dispatch::prelude::{data_result_ok, AFPluginData, AFPluginState, DataResult};
use lib_dispatch::prelude::{af_spawn, data_result_ok, AFPluginData, AFPluginState, DataResult};
use lib_infra::util::timestamp;
use crate::entities::*;
@ -697,7 +697,7 @@ pub(crate) async fn update_group_handler(
let database_editor = manager.get_database_with_view_id(&view_id).await?;
let group_changeset = GroupChangeset::from(params);
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
af_spawn(async move {
let result = database_editor
.update_group(&view_id, vec![group_changeset].into())
.await;

View File

@ -20,6 +20,7 @@ use collab_integrate::{CollabPersistenceConfig, RocksCollabDB};
use flowy_database_deps::cloud::DatabaseCloudService;
use flowy_error::{internal_error, FlowyError, FlowyResult};
use flowy_task::TaskDispatcher;
use lib_dispatch::prelude::af_spawn;
use crate::entities::{
DatabaseDescriptionPB, DatabaseLayoutPB, DatabaseSnapshotPB, DidFetchRowPB,
@ -361,7 +362,7 @@ impl DatabaseManager {
/// Send notification to all clients that are listening to the given object.
fn subscribe_block_event(workspace_database: &WorkspaceDatabase) {
let mut block_event_rx = workspace_database.subscribe_block_event();
tokio::spawn(async move {
af_spawn(async move {
while let Ok(event) = block_event_rx.recv().await {
match event {
BlockEvent::DidFetchRow(row_details) => {

View File

@ -12,6 +12,7 @@ use tracing::{event, warn};
use flowy_error::{internal_error, ErrorCode, FlowyError, FlowyResult};
use flowy_task::TaskDispatcher;
use lib_dispatch::prelude::af_spawn;
use lib_infra::future::{to_fut, Fut, FutureResult};
use crate::entities::*;
@ -56,7 +57,7 @@ impl DatabaseEditor {
// Receive database sync state and send to frontend via the notification
let mut sync_state = database.lock().subscribe_sync_state();
let cloned_database_id = database_id.clone();
tokio::spawn(async move {
af_spawn(async move {
while let Some(sync_state) = sync_state.next().await {
send_notification(
&cloned_database_id,
@ -69,7 +70,7 @@ impl DatabaseEditor {
// Receive database snapshot state and send to frontend via the notification
let mut snapshot_state = database.lock().subscribe_snapshot_state();
tokio::spawn(async move {
af_spawn(async move {
while let Some(snapshot_state) = snapshot_state.next().await {
if let Some(new_snapshot_id) = snapshot_state.snapshot_id() {
tracing::debug!(

View File

@ -9,6 +9,7 @@ use collab_database::views::{DatabaseLayout, DatabaseView};
use tokio::sync::{broadcast, RwLock};
use flowy_error::{FlowyError, FlowyResult};
use lib_dispatch::prelude::af_spawn;
use crate::entities::{
CalendarEventPB, DatabaseLayoutMetaPB, DatabaseLayoutSettingPB, DeleteFilterParams,
@ -60,7 +61,7 @@ impl DatabaseViewEditor {
cell_cache: CellCache,
) -> FlowyResult<Self> {
let (notifier, _) = broadcast::channel(100);
tokio::spawn(DatabaseViewChangedReceiverRunner(Some(notifier.subscribe())).run());
af_spawn(DatabaseViewChangedReceiverRunner(Some(notifier.subscribe())).run());
// Group
let group_controller = Arc::new(RwLock::new(
new_group_controller(view_id.clone(), delegate.clone()).await?,
@ -237,7 +238,7 @@ impl DatabaseViewEditor {
let row_id = row_detail.row.id.clone();
let weak_filter_controller = Arc::downgrade(&self.filter_controller);
let weak_sort_controller = Arc::downgrade(&self.sort_controller);
tokio::spawn(async move {
af_spawn(async move {
if let Some(filter_controller) = weak_filter_controller.upgrade() {
filter_controller
.did_receive_row_changed(row_id.clone())
@ -645,7 +646,7 @@ impl DatabaseViewEditor {
let filter_type = UpdatedFilterType::new(Some(old), new);
let filter_changeset = FilterChangeset::from_update(filter_type);
let filter_controller = self.filter_controller.clone();
tokio::spawn(async move {
af_spawn(async move {
if let Some(notification) = filter_controller
.did_receive_changes(filter_changeset)
.await

View File

@ -12,6 +12,7 @@ use serde::Serialize;
use tracing::event;
use flowy_error::{FlowyError, FlowyResult};
use lib_dispatch::prelude::af_spawn;
use lib_infra::future::Fut;
use crate::entities::{GroupChangesPB, GroupPB, InsertedGroupPB};
@ -415,7 +416,7 @@ where
let configuration = (*self.setting).clone();
let writer = self.writer.clone();
let view_id = self.view_id.clone();
tokio::spawn(async move {
af_spawn(async move {
match writer.save_configuration(&view_id, configuration).await {
Ok(_) => {},
Err(e) => {

View File

@ -37,7 +37,7 @@ pub struct DatabaseEditorTest {
impl DatabaseEditorTest {
pub async fn new_grid() -> Self {
let sdk = EventIntegrationTest::new();
let sdk = EventIntegrationTest::new().await;
let _ = sdk.init_anon_user().await;
let params = make_test_grid();
@ -46,7 +46,7 @@ impl DatabaseEditorTest {
}
pub async fn new_no_date_grid() -> Self {
let sdk = EventIntegrationTest::new();
let sdk = EventIntegrationTest::new().await;
let _ = sdk.init_anon_user().await;
let params = make_no_date_test_grid();
@ -55,7 +55,7 @@ impl DatabaseEditorTest {
}
pub async fn new_board() -> Self {
let sdk = EventIntegrationTest::new();
let sdk = EventIntegrationTest::new().await;
let _ = sdk.init_anon_user().await;
let params = make_test_board();
@ -64,7 +64,7 @@ impl DatabaseEditorTest {
}
pub async fn new_calendar() -> Self {
let sdk = EventIntegrationTest::new();
let sdk = EventIntegrationTest::new().await;
let _ = sdk.init_anon_user().await;
let params = make_test_calendar();

View File

@ -14,6 +14,7 @@ use flowy_database2::entities::{CheckboxFilterConditionPB, CheckboxFilterPB, Che
use flowy_database2::services::database_view::DatabaseViewChanged;
use flowy_database2::services::field::SelectOption;
use flowy_database2::services::filter::FilterType;
use lib_dispatch::prelude::af_spawn;
use crate::database::database_editor::DatabaseEditorTest;
@ -278,7 +279,7 @@ impl DatabaseFilterTest {
if change.is_none() {return;}
let change = change.unwrap();
let mut receiver = self.recv.take().unwrap();
tokio::spawn(async move {
af_spawn(async move {
match tokio::time::timeout(Duration::from_secs(2), receiver.recv()).await {
Ok(changed) => {
match changed.unwrap() { DatabaseViewChanged::FilterNotification(notification) => {

View File

@ -9,6 +9,7 @@ use futures::StreamExt;
use parking_lot::Mutex;
use flowy_error::FlowyResult;
use lib_dispatch::prelude::af_spawn;
use crate::entities::{DocEventPB, DocumentSnapshotStatePB, DocumentSyncStatePB};
use crate::notification::{send_notification, DocumentNotification};
@ -61,7 +62,7 @@ fn subscribe_document_changed(doc_id: &str, document: &MutexDocument) {
fn subscribe_document_snapshot_state(collab: &Arc<MutexCollab>) {
let document_id = collab.lock().object_id.clone();
let mut snapshot_state = collab.lock().subscribe_snapshot_state();
tokio::spawn(async move {
af_spawn(async move {
while let Some(snapshot_state) = snapshot_state.next().await {
if let Some(new_snapshot_id) = snapshot_state.snapshot_id() {
tracing::debug!("Did create document remote snapshot: {}", new_snapshot_id);
@ -79,7 +80,7 @@ fn subscribe_document_snapshot_state(collab: &Arc<MutexCollab>) {
fn subscribe_document_sync_state(collab: &Arc<MutexCollab>) {
let document_id = collab.lock().object_id.clone();
let mut sync_state_stream = collab.lock().subscribe_sync_state();
tokio::spawn(async move {
af_spawn(async move {
while let Some(sync_state) = sync_state_stream.next().await {
send_notification(
&document_id,

View File

@ -18,6 +18,7 @@ use collab_integrate::collab_builder::AppFlowyCollabBuilder;
use collab_integrate::{CollabPersistenceConfig, RocksCollabDB, YrsDocAction};
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_folder_deps::cloud::{gen_view_id, FolderCloudService};
use lib_dispatch::prelude::af_spawn;
use crate::entities::icon::UpdateViewIconParams;
use crate::entities::{
@ -1027,7 +1028,7 @@ fn subscribe_folder_view_changed(
weak_mutex_folder: &Weak<MutexFolder>,
) {
let weak_mutex_folder = weak_mutex_folder.clone();
tokio::spawn(async move {
af_spawn(async move {
while let Ok(value) = rx.recv().await {
if let Some(folder) = weak_mutex_folder.upgrade() {
tracing::trace!("Did receive view change: {:?}", value);
@ -1065,7 +1066,7 @@ fn subscribe_folder_snapshot_state_changed(
weak_mutex_folder: &Weak<MutexFolder>,
) {
let weak_mutex_folder = weak_mutex_folder.clone();
tokio::spawn(async move {
af_spawn(async move {
if let Some(mutex_folder) = weak_mutex_folder.upgrade() {
let stream = mutex_folder
.lock()
@ -1093,7 +1094,7 @@ fn subscribe_folder_sync_state_changed(
mut folder_sync_state_rx: WatchStream<SyncState>,
_weak_mutex_folder: &Weak<MutexFolder>,
) {
tokio::spawn(async move {
af_spawn(async move {
while let Some(state) = folder_sync_state_rx.next().await {
send_notification(&workspace_id, FolderNotification::DidUpdateFolderSyncUpdate)
.payload(FolderSyncStatePB::from(state))
@ -1108,7 +1109,7 @@ fn subscribe_folder_trash_changed(
weak_mutex_folder: &Weak<MutexFolder>,
) {
let weak_mutex_folder = weak_mutex_folder.clone();
tokio::spawn(async move {
af_spawn(async move {
while let Ok(value) = rx.recv().await {
if let Some(folder) = weak_mutex_folder.upgrade() {
let mut unique_ids = HashSet::new();

View File

@ -44,6 +44,7 @@ url = "2.4"
tokio-util = "0.7"
tokio-stream = { version = "0.1.14", features = ["sync"] }
client-api = { version = "0.1.0", features = ["collab-sync"] }
lib-dispatch = { workspace = true }
[dev-dependencies]
uuid = { version = "1.3.3", features = ["v4"] }

View File

@ -19,6 +19,7 @@ use flowy_server_config::af_cloud_config::AFCloudConfiguration;
use flowy_storage::FileStorageService;
use flowy_user_deps::cloud::UserCloudService;
use flowy_user_deps::entities::UserTokenState;
use lib_dispatch::prelude::af_spawn;
use lib_infra::future::FutureResult;
use crate::af_cloud::impls::{
@ -94,7 +95,7 @@ impl AppFlowyServer for AFCloudServer {
let mut token_state_rx = self.client.subscribe_token_state();
let (watch_tx, watch_rx) = watch::channel(UserTokenState::Invalid);
let weak_client = Arc::downgrade(&self.client);
tokio::spawn(async move {
af_spawn(async move {
while let Ok(token_state) = token_state_rx.recv().await {
if let Some(client) = weak_client.upgrade() {
match token_state {
@ -185,7 +186,7 @@ fn spawn_ws_conn(
let weak_api_client = Arc::downgrade(api_client);
let enable_sync = enable_sync.clone();
tokio::spawn(async move {
af_spawn(async move {
if let Some(ws_client) = weak_ws_client.upgrade() {
let mut state_recv = ws_client.subscribe_connect_state();
while let Ok(state) = state_recv.recv().await {
@ -215,7 +216,7 @@ fn spawn_ws_conn(
let weak_device_id = Arc::downgrade(device_id);
let weak_ws_client = Arc::downgrade(ws_client);
let weak_api_client = Arc::downgrade(api_client);
tokio::spawn(async move {
af_spawn(async move {
while let Ok(token_state) = token_state_rx.recv().await {
match token_state {
TokenState::Refresh => {

View File

@ -5,6 +5,7 @@ use tokio::sync::oneshot::channel;
use flowy_database_deps::cloud::{
CollabObjectUpdate, CollabObjectUpdateByOid, DatabaseCloudService, DatabaseSnapshot,
};
use lib_dispatch::prelude::af_spawn;
use lib_infra::future::FutureResult;
use crate::supabase::api::request::{
@ -35,7 +36,7 @@ where
let try_get_postgrest = self.server.try_get_weak_postgrest();
let object_id = object_id.to_string();
let (tx, rx) = channel();
tokio::spawn(async move {
af_spawn(async move {
tx.send(
async move {
let postgrest = try_get_postgrest?;
@ -58,7 +59,7 @@ where
) -> FutureResult<CollabObjectUpdateByOid, Error> {
let try_get_postgrest = self.server.try_get_weak_postgrest();
let (tx, rx) = channel();
tokio::spawn(async move {
af_spawn(async move {
tx.send(
async move {
let postgrest = try_get_postgrest?;

View File

@ -7,6 +7,7 @@ use tokio::sync::oneshot::channel;
use flowy_document_deps::cloud::{DocumentCloudService, DocumentSnapshot};
use flowy_error::FlowyError;
use lib_dispatch::prelude::af_spawn;
use lib_infra::future::FutureResult;
use crate::supabase::api::request::{get_snapshots_from_server, FetchObjectUpdateAction};
@ -35,7 +36,7 @@ where
let try_get_postgrest = self.server.try_get_weak_postgrest();
let document_id = document_id.to_string();
let (tx, rx) = channel();
tokio::spawn(async move {
af_spawn(async move {
tx.send(
async move {
let postgrest = try_get_postgrest?;
@ -85,7 +86,7 @@ where
let try_get_postgrest = self.server.try_get_weak_postgrest();
let document_id = document_id.to_string();
let (tx, rx) = channel();
tokio::spawn(async move {
af_spawn(async move {
tx.send(
async move {
let postgrest = try_get_postgrest?;

View File

@ -10,6 +10,7 @@ use tokio::sync::oneshot::channel;
use flowy_folder_deps::cloud::{
gen_workspace_id, Folder, FolderCloudService, FolderData, FolderSnapshot, Workspace,
};
use lib_dispatch::prelude::af_spawn;
use lib_infra::future::FutureResult;
use crate::response::ExtendedResponse;
@ -116,7 +117,7 @@ where
let try_get_postgrest = self.server.try_get_weak_postgrest();
let workspace_id = workspace_id.to_string();
let (tx, rx) = channel();
tokio::spawn(async move {
af_spawn(async move {
tx.send(
async move {
let postgrest = try_get_postgrest?;

View File

@ -21,6 +21,7 @@ use flowy_folder_deps::cloud::{Folder, Workspace};
use flowy_user_deps::cloud::*;
use flowy_user_deps::entities::*;
use flowy_user_deps::DEFAULT_USER_NAME;
use lib_dispatch::prelude::af_spawn;
use lib_infra::box_any::BoxAny;
use lib_infra::future::FutureResult;
use lib_infra::util::timestamp;
@ -238,7 +239,7 @@ where
let try_get_postgrest = self.server.try_get_weak_postgrest();
let awareness_id = uid.to_string();
let (tx, rx) = channel();
tokio::spawn(async move {
af_spawn(async move {
tx.send(
async move {
let postgrest = try_get_postgrest?;
@ -278,7 +279,7 @@ where
let try_get_postgrest = self.server.try_get_weak_postgrest();
let (tx, rx) = channel();
let init_update = empty_workspace_update(&collab_object);
tokio::spawn(async move {
af_spawn(async move {
tx.send(
async move {
let postgrest = try_get_postgrest?
@ -316,7 +317,7 @@ where
let try_get_postgrest = self.server.try_get_weak_postgrest();
let cloned_collab_object = collab_object.clone();
let (tx, rx) = channel();
tokio::spawn(async move {
af_spawn(async move {
tx.send(
async move {
CreateCollabAction::new(cloned_collab_object, try_get_postgrest?, update)

View File

@ -93,7 +93,7 @@ pub async fn get_user_profile_handler(
let cloned_user_profile = user_profile.clone();
// Refresh the user profile in the background
tokio::spawn(async move {
af_spawn(async move {
if let Some(manager) = weak_manager.upgrade() {
let _ = manager.refresh_user_profile(&cloned_user_profile).await;
}

View File

@ -16,6 +16,7 @@ use flowy_sqlite::ConnectionPool;
use flowy_sqlite::{query_dsl::*, DBConnection, ExpressionMethods};
use flowy_user_deps::cloud::UserUpdate;
use flowy_user_deps::entities::*;
use lib_dispatch::prelude::af_spawn;
use lib_infra::box_any::BoxAny;
use crate::entities::{AuthStateChangedPB, AuthStatePB, UserProfilePB, UserSettingPB};
@ -93,7 +94,7 @@ impl UserManager {
let weak_user_manager = Arc::downgrade(&user_manager);
if let Ok(user_service) = user_manager.cloud_services.get_user_service() {
if let Some(mut rx) = user_service.subscribe_user_update() {
tokio::spawn(async move {
af_spawn(async move {
while let Ok(update) = rx.recv().await {
if let Some(user_manager) = weak_user_manager.upgrade() {
if let Err(err) = user_manager.handler_user_update(update).await {
@ -133,7 +134,7 @@ impl UserManager {
// Subscribe the token state
let weak_pool = Arc::downgrade(&self.db_pool(user.uid)?);
if let Some(mut token_state_rx) = self.cloud_services.subscribe_token_state() {
tokio::spawn(async move {
af_spawn(async move {
while let Some(token_state) = token_state_rx.next().await {
match token_state {
UserTokenState::Refresh { token } => {
@ -401,7 +402,7 @@ impl UserManager {
self.set_session(None)?;
let server = self.cloud_services.get_user_service()?;
tokio::spawn(async move {
af_spawn(async move {
if let Err(err) = server.sign_out(None).await {
event!(tracing::Level::ERROR, "{:?}", err);
}
@ -536,7 +537,7 @@ impl UserManager {
params: UpdateUserProfileParams,
) -> Result<(), FlowyError> {
let server = self.cloud_services.get_user_service()?;
tokio::spawn(async move {
af_spawn(async move {
let credentials = UserCredentials::new(Some(token), Some(uid), None);
server.update_user(credentials, params).await
})

View File

@ -7,6 +7,7 @@ use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::schema::user_workspace_table;
use flowy_sqlite::{query_dsl::*, ConnectionPool, ExpressionMethods};
use flowy_user_deps::entities::{Role, UserWorkspace, WorkspaceMember};
use lib_dispatch::prelude::af_spawn;
use crate::entities::{RepeatedUserWorkspacePB, ResetWorkspacePB};
use crate::manager::UserManager;
@ -99,7 +100,7 @@ impl UserManager {
if let Ok(service) = self.cloud_services.get_user_service() {
if let Ok(pool) = self.db_pool(uid) {
tokio::spawn(async move {
af_spawn(async move {
if let Ok(new_user_workspaces) = service.get_all_user_workspaces(uid).await {
let _ = save_user_workspaces(uid, pool, &new_user_workspaces);
let repeated_workspace_pbs = RepeatedUserWorkspacePB::from(new_user_workspaces);

View File

@ -22,17 +22,18 @@ serde_json = {version = "1.0", optional = true }
serde = { version = "1.0", features = ["derive"], optional = true }
serde_repr = { version = "0.1", optional = true }
validator = "0.16.1"
tracing = { version = "0.1"}
#optional crate
bincode = { version = "1.3", optional = true}
protobuf = {version = "2.28.0", optional = true}
tracing = { version = "0.1"}
[dev-dependencies]
tokio = { version = "1.26", features = ["full"] }
futures-util = "0.3.26"
[features]
default = ["use_protobuf"]
default = ["use_protobuf", ]
use_serde = ["bincode", "serde_json", "serde", "serde_repr"]
use_protobuf= ["protobuf"]
single_thread = []

View File

@ -1,6 +1,7 @@
use crate::errors::{DispatchError, InternalError};
use bytes::Bytes;
use crate::errors::{DispatchError, InternalError};
// To bytes
pub trait ToBytes {
fn into_bytes(self) -> Result<Bytes, DispatchError>;
@ -26,21 +27,6 @@ where
}
}
// #[cfg(feature = "use_serde")]
// impl<T> ToBytes for T
// where
// T: serde::Serialize,
// {
// fn into_bytes(self) -> Result<Bytes, DispatchError> {
// match serde_json::to_string(&self.0) {
// Ok(s) => Ok(Bytes::from(s)),
// Err(e) => Err(InternalError::SerializeToBytes(format!("{:?}", e)).into()),
// }
// }
// }
// From bytes
pub trait AFPluginFromBytes: Sized {
fn parse_from_bytes(bytes: Bytes) -> Result<Self, DispatchError>;
}

View File

@ -1,3 +1,13 @@
use std::any::Any;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{future::Future, sync::Arc};
use derivative::*;
use pin_project::pin_project;
use tracing::event;
use crate::module::AFPluginStateMap;
use crate::runtime::AFPluginRuntime;
use crate::{
errors::{DispatchError, Error, InternalError},
@ -5,20 +15,76 @@ use crate::{
response::AFPluginEventResponse,
service::{AFPluginServiceFactory, Service},
};
use derivative::*;
use futures_core::future::BoxFuture;
use futures_util::task::Context;
use pin_project::pin_project;
use std::{future::Future, sync::Arc};
use tokio::macros::support::{Pin, Poll};
#[cfg(feature = "single_thread")]
pub trait AFConcurrent {}
#[cfg(feature = "single_thread")]
impl<T> AFConcurrent for T where T: ?Sized {}
#[cfg(not(feature = "single_thread"))]
pub trait AFConcurrent: Send + Sync {}
#[cfg(not(feature = "single_thread"))]
impl<T> AFConcurrent for T where T: Send + Sync {}
#[cfg(feature = "single_thread")]
pub type AFBoxFuture<'a, T> = futures_core::future::LocalBoxFuture<'a, T>;
#[cfg(not(feature = "single_thread"))]
pub type AFBoxFuture<'a, T> = futures_core::future::BoxFuture<'a, T>;
pub type AFStateMap = std::sync::Arc<AFPluginStateMap>;
#[cfg(feature = "single_thread")]
pub(crate) fn downcast_owned<T: 'static>(boxed: AFBox) -> Option<T> {
boxed.downcast().ok().map(|boxed| *boxed)
}
#[cfg(not(feature = "single_thread"))]
pub(crate) fn downcast_owned<T: 'static + Send + Sync>(boxed: AFBox) -> Option<T> {
boxed.downcast().ok().map(|boxed| *boxed)
}
#[cfg(feature = "single_thread")]
pub(crate) type AFBox = Box<dyn Any>;
#[cfg(not(feature = "single_thread"))]
pub(crate) type AFBox = Box<dyn Any + Send + Sync>;
#[cfg(feature = "single_thread")]
pub type BoxFutureCallback =
Box<dyn FnOnce(AFPluginEventResponse) -> AFBoxFuture<'static, ()> + 'static>;
#[cfg(not(feature = "single_thread"))]
pub type BoxFutureCallback =
Box<dyn FnOnce(AFPluginEventResponse) -> AFBoxFuture<'static, ()> + Send + Sync + 'static>;
#[cfg(feature = "single_thread")]
pub fn af_spawn<T>(future: T) -> tokio::task::JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
tokio::spawn(future)
}
#[cfg(not(feature = "single_thread"))]
pub fn af_spawn<T>(future: T) -> tokio::task::JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
tokio::spawn(future)
}
pub struct AFPluginDispatcher {
plugins: AFPluginMap,
runtime: AFPluginRuntime,
runtime: Arc<AFPluginRuntime>,
}
impl AFPluginDispatcher {
pub fn construct<F>(runtime: AFPluginRuntime, module_factory: F) -> AFPluginDispatcher
pub fn construct<F>(runtime: Arc<AFPluginRuntime>, module_factory: F) -> AFPluginDispatcher
where
F: FnOnce() -> Vec<AFPlugin>,
{
@ -30,24 +96,24 @@ impl AFPluginDispatcher {
}
}
pub fn async_send<Req>(
pub async fn async_send<Req>(
dispatch: Arc<AFPluginDispatcher>,
request: Req,
) -> DispatchFuture<AFPluginEventResponse>
) -> AFPluginEventResponse
where
Req: std::convert::Into<AFPluginRequest>,
Req: Into<AFPluginRequest>,
{
AFPluginDispatcher::async_send_with_callback(dispatch, request, |_| Box::pin(async {}))
AFPluginDispatcher::async_send_with_callback(dispatch, request, |_| Box::pin(async {})).await
}
pub fn async_send_with_callback<Req, Callback>(
pub async fn async_send_with_callback<Req, Callback>(
dispatch: Arc<AFPluginDispatcher>,
request: Req,
callback: Callback,
) -> DispatchFuture<AFPluginEventResponse>
) -> AFPluginEventResponse
where
Req: std::convert::Into<AFPluginRequest>,
Callback: FnOnce(AFPluginEventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync,
Req: Into<AFPluginRequest>,
Callback: FnOnce(AFPluginEventResponse) -> AFBoxFuture<'static, ()> + AFConcurrent + 'static,
{
let request: AFPluginRequest = request.into();
let plugins = dispatch.plugins.clone();
@ -57,7 +123,70 @@ impl AFPluginDispatcher {
request,
callback: Some(Box::new(callback)),
};
let join_handle = dispatch.runtime.spawn(async move {
// Spawns a future onto the runtime.
//
// This spawns the given future onto the runtime's executor, usually a
// thread pool. The thread pool is then responsible for polling the future
// until it completes.
//
// The provided future will start running in the background immediately
// when `spawn` is called, even if you don't await the returned
// `JoinHandle`.
let handle = dispatch.runtime.spawn(async move {
service.call(service_ctx).await.unwrap_or_else(|e| {
tracing::error!("Dispatch runtime error: {:?}", e);
InternalError::Other(format!("{:?}", e)).as_response()
})
});
let result = dispatch.runtime.run_until(handle).await;
result.unwrap_or_else(|e| {
let msg = format!("EVENT_DISPATCH join error: {:?}", e);
tracing::error!("{}", msg);
let error = InternalError::JoinError(msg);
error.as_response()
})
}
pub fn box_async_send<Req>(
dispatch: Arc<AFPluginDispatcher>,
request: Req,
) -> DispatchFuture<AFPluginEventResponse>
where
Req: Into<AFPluginRequest> + 'static,
{
AFPluginDispatcher::boxed_async_send_with_callback(dispatch, request, |_| Box::pin(async {}))
}
pub fn boxed_async_send_with_callback<Req, Callback>(
dispatch: Arc<AFPluginDispatcher>,
request: Req,
callback: Callback,
) -> DispatchFuture<AFPluginEventResponse>
where
Req: Into<AFPluginRequest> + 'static,
Callback: FnOnce(AFPluginEventResponse) -> AFBoxFuture<'static, ()> + AFConcurrent + 'static,
{
let request: AFPluginRequest = request.into();
let plugins = dispatch.plugins.clone();
let service = Box::new(DispatchService { plugins });
tracing::trace!("Async event: {:?}", &request.event);
let service_ctx = DispatchContext {
request,
callback: Some(Box::new(callback)),
};
// Spawns a future onto the runtime.
//
// This spawns the given future onto the runtime's executor, usually a
// thread pool. The thread pool is then responsible for polling the future
// until it completes.
//
// The provided future will start running in the background immediately
// when `spawn` is called, even if you don't await the returned
// `JoinHandle`.
let handle = dispatch.runtime.spawn(async move {
service.call(service_ctx).await.unwrap_or_else(|e| {
tracing::error!("Dispatch runtime error: {:?}", e);
InternalError::Other(format!("{:?}", e)).as_response()
@ -66,7 +195,8 @@ impl AFPluginDispatcher {
DispatchFuture {
fut: Box::pin(async move {
join_handle.await.unwrap_or_else(|e| {
let result = dispatch.runtime.run_until(handle).await;
result.unwrap_or_else(|e| {
let msg = format!("EVENT_DISPATCH join error: {:?}", e);
tracing::error!("{}", msg);
let error = InternalError::JoinError(msg);
@ -76,44 +206,56 @@ impl AFPluginDispatcher {
}
}
#[cfg(not(feature = "single_thread"))]
pub fn sync_send(
dispatch: Arc<AFPluginDispatcher>,
request: AFPluginRequest,
) -> AFPluginEventResponse {
futures::executor::block_on(async {
AFPluginDispatcher::async_send_with_callback(dispatch, request, |_| Box::pin(async {})).await
})
futures::executor::block_on(AFPluginDispatcher::async_send_with_callback(
dispatch,
request,
|_| Box::pin(async {}),
))
}
pub fn spawn<F>(&self, f: F)
#[cfg(feature = "single_thread")]
#[track_caller]
pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
where
F: Future<Output = ()> + Send + 'static,
F: Future + 'static,
{
self.runtime.spawn(f);
self.runtime.spawn(future)
}
#[cfg(not(feature = "single_thread"))]
#[track_caller]
pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
where
F: Future + Send + 'static,
<F as Future>::Output: Send + 'static,
{
self.runtime.spawn(future)
}
#[cfg(feature = "single_thread")]
pub async fn run_until<F>(&self, future: F) -> F::Output
where
F: Future + 'static,
{
let handle = self.runtime.spawn(future);
self.runtime.run_until(handle).await.unwrap()
}
#[cfg(not(feature = "single_thread"))]
pub async fn run_until<'a, F>(&self, future: F) -> F::Output
where
F: Future + Send + 'a,
<F as Future>::Output: Send + 'a,
{
self.runtime.run_until(future).await
}
}
#[pin_project]
pub struct DispatchFuture<T: Send + Sync> {
#[pin]
pub fut: Pin<Box<dyn Future<Output = T> + Sync + Send>>,
}
impl<T> Future for DispatchFuture<T>
where
T: Send + Sync,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project();
Poll::Ready(futures_core::ready!(this.fut.poll(cx)))
}
}
pub type BoxFutureCallback =
Box<dyn FnOnce(AFPluginEventResponse) -> BoxFuture<'static, ()> + 'static + Send + Sync>;
#[derive(Derivative)]
#[derivative(Debug)]
pub struct DispatchContext {
@ -136,36 +278,37 @@ pub(crate) struct DispatchService {
impl Service<DispatchContext> for DispatchService {
type Response = AFPluginEventResponse;
type Error = DispatchError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
type Future = AFBoxFuture<'static, Result<Self::Response, Self::Error>>;
#[cfg_attr(
feature = "use_tracing",
tracing::instrument(name = "DispatchService", level = "debug", skip(self, ctx))
)]
#[tracing::instrument(name = "DispatchService", level = "debug", skip(self, ctx))]
fn call(&self, ctx: DispatchContext) -> Self::Future {
let module_map = self.plugins.clone();
let (request, callback) = ctx.into_parts();
Box::pin(async move {
let result = {
// print_module_map_info(&module_map);
match module_map.get(&request.event) {
Some(module) => {
tracing::trace!("Handle event: {:?} by {:?}", &request.event, module.name);
event!(
tracing::Level::TRACE,
"Handle event: {:?} by {:?}",
&request.event,
module.name
);
let fut = module.new_service(());
let service_fut = fut.await?.call(request);
service_fut.await
},
None => {
let msg = format!("Can not find the event handler. {:?}", request);
tracing::error!("{}", msg);
event!(tracing::Level::ERROR, "{}", msg);
Err(InternalError::HandleNotFound(msg).into())
},
}
};
let response = result.unwrap_or_else(|e| e.into());
tracing::trace!("Dispatch result: {:?}", response);
event!(tracing::Level::TRACE, "Dispatch result: {:?}", response);
if let Some(callback) = callback {
callback(response.clone()).await;
}
@ -190,3 +333,21 @@ fn print_plugins(plugins: &AFPluginMap) {
tracing::info!("Event: {:?} plugin : {:?}", k, v.name);
})
}
#[pin_project]
pub struct DispatchFuture<T: AFConcurrent> {
#[pin]
pub fut: Pin<Box<dyn Future<Output = T> + 'static>>,
}
impl<T> Future for DispatchFuture<T>
where
T: AFConcurrent + 'static,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project();
Poll::Ready(futures_core::ready!(this.fut.poll(cx)))
}
}

View File

@ -1,15 +1,17 @@
use std::fmt;
use bytes::Bytes;
use dyn_clone::DynClone;
use tokio::{sync::mpsc::error::SendError, task::JoinError};
use crate::prelude::AFConcurrent;
use crate::{
byte_trait::AFPluginFromBytes,
request::AFPluginEventRequest,
response::{AFPluginEventResponse, ResponseBuilder},
};
use bytes::Bytes;
use dyn_clone::DynClone;
use std::fmt;
use tokio::{sync::mpsc::error::SendError, task::JoinError};
pub trait Error: fmt::Debug + DynClone + Send + Sync {
pub trait Error: fmt::Debug + DynClone + AFConcurrent {
fn as_response(&self) -> AFPluginEventResponse;
}

View File

@ -1,10 +1,9 @@
use std::{
any::{Any, TypeId},
collections::HashMap,
};
use std::{any::TypeId, collections::HashMap};
use crate::prelude::{downcast_owned, AFBox, AFConcurrent};
#[derive(Default, Debug)]
pub struct AFPluginStateMap(HashMap<TypeId, Box<dyn Any + Sync + Send>>);
pub struct AFPluginStateMap(HashMap<TypeId, AFBox>);
impl AFPluginStateMap {
#[inline]
@ -14,7 +13,7 @@ impl AFPluginStateMap {
pub fn insert<T>(&mut self, val: T) -> Option<T>
where
T: 'static + Send + Sync,
T: 'static + AFConcurrent,
{
self
.0
@ -24,14 +23,14 @@ impl AFPluginStateMap {
pub fn remove<T>(&mut self) -> Option<T>
where
T: 'static + Send + Sync,
T: 'static + AFConcurrent,
{
self.0.remove(&TypeId::of::<T>()).and_then(downcast_owned)
}
pub fn get<T>(&self) -> Option<&T>
where
T: 'static + Send + Sync,
T: 'static,
{
self
.0
@ -41,7 +40,7 @@ impl AFPluginStateMap {
pub fn get_mut<T>(&mut self) -> Option<&mut T>
where
T: 'static + Send + Sync,
T: 'static + AFConcurrent,
{
self
.0
@ -51,7 +50,7 @@ impl AFPluginStateMap {
pub fn contains<T>(&self) -> bool
where
T: 'static + Send + Sync,
T: 'static + AFConcurrent,
{
self.0.contains_key(&TypeId::of::<T>())
}
@ -60,7 +59,3 @@ impl AFPluginStateMap {
self.0.extend(other.0);
}
}
fn downcast_owned<T: 'static + Send + Sync>(boxed: Box<dyn Any + Send + Sync>) -> Option<T> {
boxed.downcast().ok().map(|boxed| *boxed)
}

View File

@ -1,15 +1,17 @@
use std::{any::type_name, ops::Deref, sync::Arc};
use crate::prelude::AFConcurrent;
use crate::{
errors::{DispatchError, InternalError},
request::{payload::Payload, AFPluginEventRequest, FromAFPluginRequest},
util::ready::{ready, Ready},
};
use std::{any::type_name, ops::Deref, sync::Arc};
pub struct AFPluginState<T: ?Sized + Send + Sync>(Arc<T>);
pub struct AFPluginState<T: ?Sized + AFConcurrent>(Arc<T>);
impl<T> AFPluginState<T>
where
T: Send + Sync,
T: AFConcurrent,
{
pub fn new(data: T) -> Self {
AFPluginState(Arc::new(data))
@ -22,7 +24,7 @@ where
impl<T> Deref for AFPluginState<T>
where
T: ?Sized + Send + Sync,
T: ?Sized + AFConcurrent,
{
type Target = Arc<T>;
@ -33,7 +35,7 @@ where
impl<T> Clone for AFPluginState<T>
where
T: ?Sized + Send + Sync,
T: ?Sized + AFConcurrent,
{
fn clone(&self) -> AFPluginState<T> {
AFPluginState(self.0.clone())
@ -42,7 +44,7 @@ where
impl<T> From<Arc<T>> for AFPluginState<T>
where
T: ?Sized + Send + Sync,
T: ?Sized + AFConcurrent,
{
fn from(arc: Arc<T>) -> Self {
AFPluginState(arc)
@ -51,7 +53,7 @@ where
impl<T> FromAFPluginRequest for AFPluginState<T>
where
T: ?Sized + Send + Sync + 'static,
T: ?Sized + AFConcurrent + 'static,
{
type Error = DispatchError;
type Future = Ready<Result<Self, DispatchError>>;
@ -59,7 +61,7 @@ where
#[inline]
fn from_request(req: &AFPluginEventRequest, _: &mut Payload) -> Self::Future {
if let Some(state) = req.get_state::<AFPluginState<T>>() {
ready(Ok(state.clone()))
ready(Ok(state))
} else {
let msg = format!(
"Failed to get the plugin state of type: {}",

View File

@ -1,4 +1,5 @@
#![allow(clippy::module_inception)]
pub use container::*;
pub use data::*;
pub use module::*;

View File

@ -9,15 +9,15 @@ use std::{
task::{Context, Poll},
};
use futures_core::future::BoxFuture;
use futures_core::ready;
use nanoid::nanoid;
use pin_project::pin_project;
use crate::dispatcher::AFConcurrent;
use crate::prelude::{AFBoxFuture, AFStateMap};
use crate::service::AFPluginHandler;
use crate::{
errors::{DispatchError, InternalError},
module::{container::AFPluginStateMap, AFPluginState},
request::{payload::Payload, AFPluginEventRequest, FromAFPluginRequest},
response::{AFPluginEventResponse, AFPluginResponder},
service::{
@ -58,7 +58,7 @@ pub struct AFPlugin {
pub name: String,
/// a list of `AFPluginState` that the plugin registers. The state can be read by the plugin's handler.
states: Arc<AFPluginStateMap>,
states: AFStateMap,
/// Contains a list of factories that are used to generate the services used to handle the passed-in
/// `ServiceRequest`.
@ -72,7 +72,7 @@ impl std::default::Default for AFPlugin {
fn default() -> Self {
Self {
name: "".to_owned(),
states: Arc::new(AFPluginStateMap::new()),
states: Default::default(),
event_service_factory: Arc::new(HashMap::new()),
}
}
@ -88,11 +88,10 @@ impl AFPlugin {
self
}
pub fn state<D: 'static + Send + Sync>(mut self, data: D) -> Self {
pub fn state<D: AFConcurrent + 'static>(mut self, data: D) -> Self {
Arc::get_mut(&mut self.states)
.unwrap()
.insert(AFPluginState::new(data));
.insert(crate::module::AFPluginState::new(data));
self
}
@ -100,9 +99,9 @@ impl AFPlugin {
pub fn event<E, H, T, R>(mut self, event: E, handler: H) -> Self
where
H: AFPluginHandler<T, R>,
T: FromAFPluginRequest + 'static + Send + Sync,
<T as FromAFPluginRequest>::Future: Sync + Send,
R: Future + 'static + Send + Sync,
T: FromAFPluginRequest + 'static + AFConcurrent,
<T as FromAFPluginRequest>::Future: AFConcurrent,
R: Future + AFConcurrent + 'static,
R::Output: AFPluginResponder + 'static,
E: Eq + Hash + Debug + Clone + Display,
{
@ -169,7 +168,7 @@ impl AFPluginServiceFactory<AFPluginRequest> for AFPlugin {
type Error = DispatchError;
type Service = BoxService<AFPluginRequest, Self::Response, Self::Error>;
type Context = ();
type Future = BoxFuture<'static, Result<Self::Service, Self::Error>>;
type Future = AFBoxFuture<'static, Result<Self::Service, Self::Error>>;
fn new_service(&self, _cfg: Self::Context) -> Self::Future {
let services = self.event_service_factory.clone();
@ -185,13 +184,14 @@ pub struct AFPluginService {
services: Arc<
HashMap<AFPluginEvent, BoxServiceFactory<(), ServiceRequest, ServiceResponse, DispatchError>>,
>,
states: Arc<AFPluginStateMap>,
states: AFStateMap,
}
impl Service<AFPluginRequest> for AFPluginService {
type Response = AFPluginEventResponse;
type Error = DispatchError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
type Future = AFBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn call(&self, request: AFPluginRequest) -> Self::Future {
let AFPluginRequest { id, event, payload } = request;
@ -224,7 +224,7 @@ impl Service<AFPluginRequest> for AFPluginService {
#[pin_project]
pub struct AFPluginServiceFuture {
#[pin]
fut: BoxFuture<'static, Result<ServiceResponse, DispatchError>>,
fut: AFBoxFuture<'static, Result<ServiceResponse, DispatchError>>,
}
impl Future for AFPluginServiceFuture {

View File

@ -1,9 +1,7 @@
use bytes::Bytes;
use std::{fmt, fmt::Formatter};
pub enum PayloadError {}
use bytes::Bytes;
// TODO: support stream data
#[derive(Clone)]
#[cfg_attr(feature = "use_serde", derive(serde::Serialize))]
pub enum Payload {

View File

@ -1,47 +1,48 @@
use std::future::Future;
use crate::{
errors::{DispatchError, InternalError},
module::{AFPluginEvent, AFPluginStateMap},
request::payload::Payload,
util::ready::{ready, Ready},
};
use derivative::*;
use futures_core::ready;
use std::{
fmt::Debug,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use derivative::*;
use futures_core::ready;
use crate::prelude::{AFConcurrent, AFStateMap};
use crate::{
errors::{DispatchError, InternalError},
module::AFPluginEvent,
request::payload::Payload,
util::ready::{ready, Ready},
};
#[derive(Clone, Debug, Derivative)]
pub struct AFPluginEventRequest {
#[allow(dead_code)]
pub(crate) id: String,
pub(crate) event: AFPluginEvent,
#[derivative(Debug = "ignore")]
pub(crate) states: Arc<AFPluginStateMap>,
pub(crate) states: AFStateMap,
}
impl AFPluginEventRequest {
pub fn new<E>(id: String, event: E, module_data: Arc<AFPluginStateMap>) -> AFPluginEventRequest
pub fn new<E>(id: String, event: E, states: AFStateMap) -> AFPluginEventRequest
where
E: Into<AFPluginEvent>,
{
Self {
id,
event: event.into(),
states: module_data,
states,
}
}
pub fn get_state<T: 'static>(&self) -> Option<&T>
pub fn get_state<T>(&self) -> Option<T>
where
T: Send + Sync,
T: AFConcurrent + 'static + Clone,
{
if let Some(data) = self.states.get::<T>() {
return Some(data);
return Some(data.clone());
}
None

View File

@ -1,24 +1,117 @@
use std::{io, thread};
use std::fmt::{Display, Formatter};
use std::future::Future;
use std::io;
use tokio::runtime;
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
pub type AFPluginRuntime = tokio::runtime::Runtime;
pub struct AFPluginRuntime {
inner: Runtime,
#[cfg(feature = "single_thread")]
local: tokio::task::LocalSet,
}
pub fn tokio_default_runtime() -> io::Result<AFPluginRuntime> {
impl Display for AFPluginRuntime {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if cfg!(feature = "single_thread") {
write!(f, "Runtime(single_thread)")
} else {
write!(f, "Runtime(multi_thread)")
}
}
}
impl AFPluginRuntime {
pub fn new() -> io::Result<Self> {
let inner = default_tokio_runtime()?;
Ok(Self {
inner,
#[cfg(feature = "single_thread")]
local: tokio::task::LocalSet::new(),
})
}
#[cfg(feature = "single_thread")]
#[track_caller]
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
{
self.local.spawn_local(future)
}
#[cfg(not(feature = "single_thread"))]
#[track_caller]
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
<F as Future>::Output: Send + 'static,
{
self.inner.spawn(future)
}
#[cfg(feature = "single_thread")]
pub async fn run_until<F>(&self, future: F) -> F::Output
where
F: Future,
{
self.local.run_until(future).await
}
#[cfg(not(feature = "single_thread"))]
pub async fn run_until<F>(&self, future: F) -> F::Output
where
F: Future,
{
future.await
}
#[cfg(feature = "single_thread")]
#[track_caller]
pub fn block_on<F>(&self, f: F) -> F::Output
where
F: Future,
{
self.local.block_on(&self.inner, f)
}
#[cfg(not(feature = "single_thread"))]
#[track_caller]
pub fn block_on<F>(&self, f: F) -> F::Output
where
F: Future,
{
self.inner.block_on(f)
}
}
#[cfg(feature = "single_thread")]
pub fn default_tokio_runtime() -> io::Result<Runtime> {
runtime::Builder::new_current_thread()
.thread_name("dispatch-rt-st")
.enable_io()
.enable_time()
.build()
}
#[cfg(not(feature = "single_thread"))]
pub fn default_tokio_runtime() -> io::Result<Runtime> {
runtime::Builder::new_multi_thread()
.thread_name("dispatch-rt")
.thread_name("dispatch-rt-mt")
.enable_io()
.enable_time()
.on_thread_start(move || {
tracing::trace!(
"{:?} thread started: thread_id= {}",
thread::current(),
std::thread::current(),
thread_id::get()
);
})
.on_thread_stop(move || {
tracing::trace!(
"{:?} thread stopping: thread_id= {}",
thread::current(),
std::thread::current(),
thread_id::get(),
);
})

View File

@ -1,21 +1,33 @@
use crate::prelude::{AFBoxFuture, AFConcurrent};
use crate::service::{AFPluginServiceFactory, Service};
use futures_core::future::BoxFuture;
pub fn factory<SF, Req>(factory: SF) -> BoxServiceFactory<SF::Context, Req, SF::Response, SF::Error>
where
SF: AFPluginServiceFactory<Req> + 'static + Sync + Send,
SF: AFPluginServiceFactory<Req> + 'static + AFConcurrent,
Req: 'static,
SF::Response: 'static,
SF::Service: 'static,
SF::Future: 'static,
SF::Error: 'static + Send + Sync,
<SF as AFPluginServiceFactory<Req>>::Service: Sync + Send,
<<SF as AFPluginServiceFactory<Req>>::Service as Service<Req>>::Future: Send + Sync,
<SF as AFPluginServiceFactory<Req>>::Future: Send + Sync,
SF::Error: 'static,
<SF as AFPluginServiceFactory<Req>>::Service: AFConcurrent,
<<SF as AFPluginServiceFactory<Req>>::Service as Service<Req>>::Future: AFConcurrent,
<SF as AFPluginServiceFactory<Req>>::Future: AFConcurrent,
{
BoxServiceFactory(Box::new(FactoryWrapper(factory)))
}
#[cfg(feature = "single_thread")]
type Inner<Cfg, Req, Res, Err> = Box<
dyn AFPluginServiceFactory<
Req,
Context = Cfg,
Response = Res,
Error = Err,
Service = BoxService<Req, Res, Err>,
Future = AFBoxFuture<'static, Result<BoxService<Req, Res, Err>, Err>>,
>,
>;
#[cfg(not(feature = "single_thread"))]
type Inner<Cfg, Req, Res, Err> = Box<
dyn AFPluginServiceFactory<
Req,
@ -23,9 +35,9 @@ type Inner<Cfg, Req, Res, Err> = Box<
Response = Res,
Error = Err,
Service = BoxService<Req, Res, Err>,
Future = BoxFuture<'static, Result<BoxService<Req, Res, Err>, Err>>,
> + Sync
+ Send,
Future = AFBoxFuture<'static, Result<BoxService<Req, Res, Err>, Err>>,
> + Send
+ Sync,
>;
pub struct BoxServiceFactory<Cfg, Req, Res, Err>(Inner<Cfg, Req, Res, Err>);
@ -39,15 +51,21 @@ where
type Error = Err;
type Service = BoxService<Req, Res, Err>;
type Context = Cfg;
type Future = BoxFuture<'static, Result<Self::Service, Self::Error>>;
type Future = AFBoxFuture<'static, Result<Self::Service, Self::Error>>;
fn new_service(&self, cfg: Cfg) -> Self::Future {
self.0.new_service(cfg)
}
}
#[cfg(feature = "single_thread")]
pub type BoxService<Req, Res, Err> = Box<
dyn Service<Req, Response = Res, Error = Err, Future = BoxFuture<'static, Result<Res, Err>>>
dyn Service<Req, Response = Res, Error = Err, Future = AFBoxFuture<'static, Result<Res, Err>>>,
>;
#[cfg(not(feature = "single_thread"))]
pub type BoxService<Req, Res, Err> = Box<
dyn Service<Req, Response = Res, Error = Err, Future = AFBoxFuture<'static, Result<Res, Err>>>
+ Sync
+ Send,
>;
@ -88,11 +106,11 @@ impl<S> ServiceWrapper<S> {
impl<S, Req, Res, Err> Service<Req> for ServiceWrapper<S>
where
S: Service<Req, Response = Res, Error = Err>,
S::Future: 'static + Send + Sync,
S::Future: 'static + AFConcurrent,
{
type Response = Res;
type Error = Err;
type Future = BoxFuture<'static, Result<Res, Err>>;
type Future = AFBoxFuture<'static, Result<Res, Err>>;
fn call(&self, req: Req) -> Self::Future {
Box::pin(self.inner.call(req))
@ -108,15 +126,15 @@ where
Err: 'static,
SF: AFPluginServiceFactory<Req, Context = Cfg, Response = Res, Error = Err>,
SF::Future: 'static,
SF::Service: 'static + Send + Sync,
<<SF as AFPluginServiceFactory<Req>>::Service as Service<Req>>::Future: Send + Sync + 'static,
<SF as AFPluginServiceFactory<Req>>::Future: Send + Sync,
SF::Service: 'static + AFConcurrent,
<<SF as AFPluginServiceFactory<Req>>::Service as Service<Req>>::Future: AFConcurrent + 'static,
<SF as AFPluginServiceFactory<Req>>::Future: AFConcurrent,
{
type Response = Res;
type Error = Err;
type Service = BoxService<Req, Res, Err>;
type Context = Cfg;
type Future = BoxFuture<'static, Result<Self::Service, Self::Error>>;
type Future = AFBoxFuture<'static, Result<Self::Service, Self::Error>>;
fn new_service(&self, cfg: Cfg) -> Self::Future {
let f = self.0.new_service(cfg);

View File

@ -8,18 +8,19 @@ use std::{
use futures_core::ready;
use pin_project::pin_project;
use crate::dispatcher::AFConcurrent;
use crate::{
errors::DispatchError,
request::{payload::Payload, AFPluginEventRequest, FromAFPluginRequest},
request::{AFPluginEventRequest, FromAFPluginRequest},
response::{AFPluginEventResponse, AFPluginResponder},
service::{AFPluginServiceFactory, Service, ServiceRequest, ServiceResponse},
util::ready::*,
};
/// A closure that is run every time for the specified plugin event
pub trait AFPluginHandler<T, R>: Clone + 'static + Sync + Send
pub trait AFPluginHandler<T, R>: Clone + AFConcurrent + 'static
where
R: Future + Send + Sync,
R: Future + AFConcurrent,
R::Output: AFPluginResponder,
{
fn call(&self, param: T) -> R;
@ -29,7 +30,7 @@ pub struct AFPluginHandlerService<H, T, R>
where
H: AFPluginHandler<T, R>,
T: FromAFPluginRequest,
R: Future + Sync + Send,
R: Future + AFConcurrent,
R::Output: AFPluginResponder,
{
handler: H,
@ -40,7 +41,7 @@ impl<H, T, R> AFPluginHandlerService<H, T, R>
where
H: AFPluginHandler<T, R>,
T: FromAFPluginRequest,
R: Future + Sync + Send,
R: Future + AFConcurrent,
R::Output: AFPluginResponder,
{
pub fn new(handler: H) -> Self {
@ -55,7 +56,7 @@ impl<H, T, R> Clone for AFPluginHandlerService<H, T, R>
where
H: AFPluginHandler<T, R>,
T: FromAFPluginRequest,
R: Future + Sync + Send,
R: Future + AFConcurrent,
R::Output: AFPluginResponder,
{
fn clone(&self) -> Self {
@ -70,7 +71,7 @@ impl<F, T, R> AFPluginServiceFactory<ServiceRequest> for AFPluginHandlerService<
where
F: AFPluginHandler<T, R>,
T: FromAFPluginRequest,
R: Future + Send + Sync,
R: Future + AFConcurrent,
R::Output: AFPluginResponder,
{
type Response = ServiceResponse;
@ -88,7 +89,7 @@ impl<H, T, R> Service<ServiceRequest> for AFPluginHandlerService<H, T, R>
where
H: AFPluginHandler<T, R>,
T: FromAFPluginRequest,
R: Future + Sync + Send,
R: Future + AFConcurrent,
R::Output: AFPluginResponder,
{
type Response = ServiceResponse;
@ -107,7 +108,7 @@ pub enum HandlerServiceFuture<H, T, R>
where
H: AFPluginHandler<T, R>,
T: FromAFPluginRequest,
R: Future + Sync + Send,
R: Future + AFConcurrent,
R::Output: AFPluginResponder,
{
Extract(#[pin] T::Future, Option<AFPluginEventRequest>, H),
@ -118,7 +119,7 @@ impl<F, T, R> Future for HandlerServiceFuture<F, T, R>
where
F: AFPluginHandler<T, R>,
T: FromAFPluginRequest,
R: Future + Sync + Send,
R: Future + AFConcurrent,
R::Output: AFPluginResponder,
{
type Output = Result<ServiceResponse, DispatchError>;
@ -154,8 +155,8 @@ where
macro_rules! factory_tuple ({ $($param:ident)* } => {
impl<Func, $($param,)* Res> AFPluginHandler<($($param,)*), Res> for Func
where Func: Fn($($param),*) -> Res + Clone + 'static + Sync + Send,
Res: Future + Sync + Send,
where Func: Fn($($param),*) -> Res + Clone + 'static + AFConcurrent,
Res: Future + AFConcurrent,
Res::Output: AFPluginResponder,
{
#[allow(non_snake_case)]
@ -181,7 +182,7 @@ macro_rules! tuple_from_req ({$tuple_type:ident, $(($n:tt, $T:ident)),+} => {
type Error = DispatchError;
type Future = $tuple_type<$($T),+>;
fn from_request(req: &AFPluginEventRequest, payload: &mut Payload) -> Self::Future {
fn from_request(req: &AFPluginEventRequest, payload: &mut crate::prelude::Payload) -> Self::Future {
$tuple_type {
items: <($(Option<$T>,)+)>::default(),
futs: FromRequestFutures($($T::from_request(req, payload),)+),

View File

@ -1,7 +1,8 @@
use lib_dispatch::prelude::*;
use lib_dispatch::runtime::tokio_default_runtime;
use std::sync::Arc;
use lib_dispatch::prelude::*;
use lib_dispatch::runtime::AFPluginRuntime;
pub async fn hello() -> String {
"say hello".to_string()
}
@ -9,7 +10,7 @@ pub async fn hello() -> String {
#[tokio::test]
async fn test() {
let event = "1";
let runtime = tokio_default_runtime().unwrap();
let runtime = Arc::new(AFPluginRuntime::new().unwrap());
let dispatch = Arc::new(AFPluginDispatcher::construct(runtime, || {
vec![AFPlugin::new().event(event, hello)]
}));

View File

@ -7,10 +7,10 @@ edition = "2018"
[dependencies]
tracing-log = { version = "0.1.3"}
tracing-subscriber = { version = "0.2.25", features = ["registry", "env-filter", "ansi", "json"] }
tracing-bunyan-formatter = "0.2.6"
tracing-appender = "0.1"
tracing-log = { version = "0.2"}
tracing-subscriber = { version = "0.3.17", features = ["registry", "env-filter", "ansi", "json"] }
tracing-bunyan-formatter = "0.3.9"
tracing-appender = "0.2.2"
tracing-core = "0.1"
tracing = { version = "0.1", features = ["log"] }
log = "0.4.17"

View File

@ -4,7 +4,7 @@ use serde::ser::{SerializeMap, Serializer};
use serde_json::Value;
use tracing::{Event, Id, Subscriber};
use tracing_bunyan_formatter::JsonStorage;
use tracing_core::{metadata::Level, span::Attributes};
use tracing_core::metadata::Level;
use tracing_subscriber::{fmt::MakeWriter, layer::Context, registry::SpanRef, Layer};
const LEVEL: &str = "level";
@ -17,17 +17,22 @@ const LOG_TARGET_PATH: &str = "log.target";
const RESERVED_FIELDS: [&str; 3] = [LEVEL, TIME, MESSAGE];
const IGNORE_FIELDS: [&str; 2] = [LOG_MODULE_PATH, LOG_TARGET_PATH];
pub struct FlowyFormattingLayer<W: MakeWriter + 'static> {
pub struct FlowyFormattingLayer<'a, W: MakeWriter<'static> + 'static> {
make_writer: W,
with_target: bool,
phantom: std::marker::PhantomData<&'a ()>,
}
impl<W: MakeWriter + 'static> FlowyFormattingLayer<W> {
impl<'a, W> FlowyFormattingLayer<'a, W>
where
W: for<'writer> MakeWriter<'writer> + 'static,
{
#[allow(dead_code)]
pub fn new(make_writer: W) -> Self {
Self {
make_writer,
with_target: false,
phantom: std::marker::PhantomData,
}
}
@ -43,9 +48,9 @@ impl<W: MakeWriter + 'static> FlowyFormattingLayer<W> {
Ok(())
}
fn serialize_span<S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>>(
fn serialize_span<S: Subscriber + for<'b> tracing_subscriber::registry::LookupSpan<'b>>(
&self,
span: &SpanRef<S>,
span: &SpanRef<'a, S>,
ty: Type,
ctx: &Context<'_, S>,
) -> Result<Vec<u8>, std::io::Error> {
@ -86,6 +91,7 @@ impl<W: MakeWriter + 'static> FlowyFormattingLayer<W> {
/// The type of record we are dealing with: entering a span, exiting a span, an
/// event.
#[allow(dead_code)]
#[derive(Clone, Debug)]
pub enum Type {
EnterSpan,
@ -104,8 +110,8 @@ impl fmt::Display for Type {
}
}
fn format_span_context<S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>>(
span: &SpanRef<S>,
fn format_span_context<'b, S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>>(
span: &SpanRef<'b, S>,
ty: Type,
context: &Context<'_, S>,
) -> String {
@ -153,10 +159,10 @@ fn format_event_message<S: Subscriber + for<'a> tracing_subscriber::registry::Lo
message
}
impl<S, W> Layer<S> for FlowyFormattingLayer<W>
impl<S, W> Layer<S> for FlowyFormattingLayer<'static, W>
where
S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
W: MakeWriter + 'static,
W: for<'writer> MakeWriter<'writer> + 'static,
{
fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
// Events do not necessarily happen in the context of a span, hence
@ -221,13 +227,6 @@ where
}
}
fn new_span(&self, _attrs: &Attributes, id: &Id, ctx: Context<'_, S>) {
let span = ctx.span(id).expect("Span not found, this is a bug");
if let Ok(serialized) = self.serialize_span(&span, Type::EnterSpan, &ctx) {
let _ = self.emit(serialized);
}
}
fn on_close(&self, id: Id, ctx: Context<'_, S>) {
let span = ctx.span(&id).expect("Span not found, this is a bug");
if let Ok(serialized) = self.serialize_span(&span, Type::ExitSpan, &ctx) {

View File

@ -1,16 +1,15 @@
use std::sync::RwLock;
use lazy_static::lazy_static;
use log::LevelFilter;
use tracing::subscriber::set_global_default;
use tracing_appender::{non_blocking::WorkerGuard, rolling::RollingFileAppender};
use tracing_bunyan_formatter::JsonStorageLayer;
use tracing_log::LogTracer;
use tracing_subscriber::{layer::SubscriberExt, EnvFilter};
use crate::layer::FlowyFormattingLayer;
mod layer;
lazy_static! {
static ref LOG_GUARD: RwLock<Option<WorkerGuard>> = RwLock::new(None);
}
@ -47,48 +46,17 @@ impl Builder {
.with_ansi(true)
.with_target(true)
.with_max_level(tracing::Level::TRACE)
.with_thread_ids(false)
.with_file(false)
.with_writer(std::io::stderr)
.with_thread_ids(true)
.json()
.with_current_span(true)
.with_span_list(true)
.compact()
.pretty()
.with_env_filter(env_filter)
.finish()
.with(env_filter)
.with(JsonStorageLayer)
.with(FlowyFormattingLayer::new(std::io::stdout))
.with(FlowyFormattingLayer::new(non_blocking));
set_global_default(subscriber).map_err(|e| format!("{:?}", e))?;
LogTracer::builder()
.with_max_level(LevelFilter::Trace)
.init()
.map_err(|e| format!("{:?}", e))?;
*LOG_GUARD.write().unwrap() = Some(guard);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
// run cargo test --features="use_bunyan" or cargo test
#[test]
fn test_log() {
Builder::new("flowy", ".")
.env_filter("debug")
.build()
.unwrap();
tracing::info!("😁 tracing::info call");
log::debug!("😁 log::debug call");
say("hello world");
}
#[tracing::instrument(level = "trace", name = "say")]
fn say(s: &str) {
tracing::info!("{}", s);
}
}