diff --git a/frontend/appflowy_tauri/src-tauri/Cargo.lock b/frontend/appflowy_tauri/src-tauri/Cargo.lock index 61421256b0..625ccdecf3 100644 --- a/frontend/appflowy_tauri/src-tauri/Cargo.lock +++ b/frontend/appflowy_tauri/src-tauri/Cargo.lock @@ -963,7 +963,7 @@ dependencies = [ [[package]] name = "collab" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2ba00c1e430f6157a2b6cbda89992d3b154ea6fb#2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b0cd69ec92a42a319a1fdb2e184151162db52cd6#b0cd69ec92a42a319a1fdb2e184151162db52cd6" dependencies = [ "anyhow", "arc-swap", @@ -988,7 +988,7 @@ dependencies = [ [[package]] name = "collab-database" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2ba00c1e430f6157a2b6cbda89992d3b154ea6fb#2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b0cd69ec92a42a319a1fdb2e184151162db52cd6#b0cd69ec92a42a319a1fdb2e184151162db52cd6" dependencies = [ "anyhow", "async-trait", @@ -1018,7 +1018,7 @@ dependencies = [ [[package]] name = "collab-document" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2ba00c1e430f6157a2b6cbda89992d3b154ea6fb#2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b0cd69ec92a42a319a1fdb2e184151162db52cd6#b0cd69ec92a42a319a1fdb2e184151162db52cd6" dependencies = [ "anyhow", "arc-swap", @@ -1038,7 +1038,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2ba00c1e430f6157a2b6cbda89992d3b154ea6fb#2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b0cd69ec92a42a319a1fdb2e184151162db52cd6#b0cd69ec92a42a319a1fdb2e184151162db52cd6" dependencies = [ "anyhow", "bytes", @@ -1057,7 +1057,7 @@ dependencies = [ [[package]] name = "collab-folder" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2ba00c1e430f6157a2b6cbda89992d3b154ea6fb#2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b0cd69ec92a42a319a1fdb2e184151162db52cd6#b0cd69ec92a42a319a1fdb2e184151162db52cd6" dependencies = [ "anyhow", "arc-swap", @@ -1100,7 +1100,7 @@ dependencies = [ [[package]] name = "collab-plugins" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2ba00c1e430f6157a2b6cbda89992d3b154ea6fb#2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b0cd69ec92a42a319a1fdb2e184151162db52cd6#b0cd69ec92a42a319a1fdb2e184151162db52cd6" dependencies = [ "anyhow", "async-stream", @@ -1180,7 +1180,7 @@ dependencies = [ [[package]] name = "collab-user" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2ba00c1e430f6157a2b6cbda89992d3b154ea6fb#2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b0cd69ec92a42a319a1fdb2e184151162db52cd6#b0cd69ec92a42a319a1fdb2e184151162db52cd6" dependencies = [ "anyhow", "collab", diff --git a/frontend/appflowy_tauri/src-tauri/Cargo.toml b/frontend/appflowy_tauri/src-tauri/Cargo.toml index eb8d21befc..450e065ce7 100644 --- a/frontend/appflowy_tauri/src-tauri/Cargo.toml +++ b/frontend/appflowy_tauri/src-tauri/Cargo.toml @@ -116,13 +116,13 @@ custom-protocol = ["tauri/custom-protocol"] # To switch to the local path, run: # scripts/tool/update_collab_source.sh # ⚠️⚠️⚠️️ -collab = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" } -collab-entity = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" } -collab-folder = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" } -collab-document = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" } -collab-database = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" } -collab-plugins = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" } -collab-user = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" } +collab = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b0cd69ec92a42a319a1fdb2e184151162db52cd6" } +collab-entity = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b0cd69ec92a42a319a1fdb2e184151162db52cd6" } +collab-folder = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b0cd69ec92a42a319a1fdb2e184151162db52cd6" } +collab-document = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b0cd69ec92a42a319a1fdb2e184151162db52cd6" } +collab-database = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b0cd69ec92a42a319a1fdb2e184151162db52cd6" } +collab-plugins = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b0cd69ec92a42a319a1fdb2e184151162db52cd6" } +collab-user = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b0cd69ec92a42a319a1fdb2e184151162db52cd6" } # Working directory: frontend # To update the commit ID, run: diff --git a/frontend/appflowy_tauri/src-tauri/src/init.rs b/frontend/appflowy_tauri/src-tauri/src/init.rs index 72e60b4a41..4903e1fe34 100644 --- a/frontend/appflowy_tauri/src-tauri/src/init.rs +++ b/frontend/appflowy_tauri/src-tauri/src/init.rs @@ -2,7 +2,6 @@ use dotenv::dotenv; use flowy_core::config::AppFlowyCoreConfig; use flowy_core::{AppFlowyCore, DEFAULT_NAME}; use lib_dispatch::runtime::AFPluginRuntime; -use std::rc::Rc; use std::sync::Mutex; pub fn read_env() { @@ -61,18 +60,18 @@ pub(crate) fn init_appflowy_core() -> MutexAppFlowyCore { ) .log_filter("trace", vec!["appflowy_tauri".to_string()]); - let runtime = Rc::new(AFPluginRuntime::new().unwrap()); + let runtime = Arc::new(AFPluginRuntime::new().unwrap()); let cloned_runtime = runtime.clone(); runtime.block_on(async move { MutexAppFlowyCore::new(AppFlowyCore::new(config, cloned_runtime, None).await) }) } -pub struct MutexAppFlowyCore(pub Rc>); +pub struct MutexAppFlowyCore(pub Arc>); impl MutexAppFlowyCore { fn new(appflowy_core: AppFlowyCore) -> Self { - Self(Rc::new(Mutex::new(appflowy_core))) + Self(Arc::new(Mutex::new(appflowy_core))) } } unsafe impl Sync for MutexAppFlowyCore {} diff --git a/frontend/appflowy_web_app/src-tauri/Cargo.lock b/frontend/appflowy_web_app/src-tauri/Cargo.lock index 149735e800..abef0c2e05 100644 --- a/frontend/appflowy_web_app/src-tauri/Cargo.lock +++ b/frontend/appflowy_web_app/src-tauri/Cargo.lock @@ -946,7 +946,7 @@ dependencies = [ [[package]] name = "collab" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2ba00c1e430f6157a2b6cbda89992d3b154ea6fb#2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b0cd69ec92a42a319a1fdb2e184151162db52cd6#b0cd69ec92a42a319a1fdb2e184151162db52cd6" dependencies = [ "anyhow", "arc-swap", @@ -971,7 +971,7 @@ dependencies = [ [[package]] name = "collab-database" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2ba00c1e430f6157a2b6cbda89992d3b154ea6fb#2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b0cd69ec92a42a319a1fdb2e184151162db52cd6#b0cd69ec92a42a319a1fdb2e184151162db52cd6" dependencies = [ "anyhow", "async-trait", @@ -1001,7 +1001,7 @@ dependencies = [ [[package]] name = "collab-document" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2ba00c1e430f6157a2b6cbda89992d3b154ea6fb#2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b0cd69ec92a42a319a1fdb2e184151162db52cd6#b0cd69ec92a42a319a1fdb2e184151162db52cd6" dependencies = [ "anyhow", "arc-swap", @@ -1021,7 +1021,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2ba00c1e430f6157a2b6cbda89992d3b154ea6fb#2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b0cd69ec92a42a319a1fdb2e184151162db52cd6#b0cd69ec92a42a319a1fdb2e184151162db52cd6" dependencies = [ "anyhow", "bytes", @@ -1040,7 +1040,7 @@ dependencies = [ [[package]] name = "collab-folder" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2ba00c1e430f6157a2b6cbda89992d3b154ea6fb#2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b0cd69ec92a42a319a1fdb2e184151162db52cd6#b0cd69ec92a42a319a1fdb2e184151162db52cd6" dependencies = [ "anyhow", "arc-swap", @@ -1083,7 +1083,7 @@ dependencies = [ [[package]] name = "collab-plugins" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2ba00c1e430f6157a2b6cbda89992d3b154ea6fb#2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b0cd69ec92a42a319a1fdb2e184151162db52cd6#b0cd69ec92a42a319a1fdb2e184151162db52cd6" dependencies = [ "anyhow", "async-stream", @@ -1163,7 +1163,7 @@ dependencies = [ [[package]] name = "collab-user" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2ba00c1e430f6157a2b6cbda89992d3b154ea6fb#2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b0cd69ec92a42a319a1fdb2e184151162db52cd6#b0cd69ec92a42a319a1fdb2e184151162db52cd6" dependencies = [ "anyhow", "collab", diff --git a/frontend/appflowy_web_app/src-tauri/Cargo.toml b/frontend/appflowy_web_app/src-tauri/Cargo.toml index 566213747f..5abf8ea0fd 100644 --- a/frontend/appflowy_web_app/src-tauri/Cargo.toml +++ b/frontend/appflowy_web_app/src-tauri/Cargo.toml @@ -116,13 +116,13 @@ custom-protocol = ["tauri/custom-protocol"] # To switch to the local path, run: # scripts/tool/update_collab_source.sh # ⚠️⚠️⚠️️ -collab = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" } -collab-entity = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" } -collab-folder = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" } -collab-document = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" } -collab-database = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" } -collab-plugins = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" } -collab-user = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" } +collab = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b0cd69ec92a42a319a1fdb2e184151162db52cd6" } +collab-entity = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b0cd69ec92a42a319a1fdb2e184151162db52cd6" } +collab-folder = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b0cd69ec92a42a319a1fdb2e184151162db52cd6" } +collab-document = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b0cd69ec92a42a319a1fdb2e184151162db52cd6" } +collab-database = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b0cd69ec92a42a319a1fdb2e184151162db52cd6" } +collab-plugins = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b0cd69ec92a42a319a1fdb2e184151162db52cd6" } +collab-user = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b0cd69ec92a42a319a1fdb2e184151162db52cd6" } # Working directory: frontend # To update the commit ID, run: diff --git a/frontend/appflowy_web_app/src-tauri/src/init.rs b/frontend/appflowy_web_app/src-tauri/src/init.rs index b4c771b1b5..7af31af362 100644 --- a/frontend/appflowy_web_app/src-tauri/src/init.rs +++ b/frontend/appflowy_web_app/src-tauri/src/init.rs @@ -3,7 +3,7 @@ use flowy_core::config::AppFlowyCoreConfig; use flowy_core::{AppFlowyCore, DEFAULT_NAME}; use lib_dispatch::runtime::AFPluginRuntime; use std::rc::Rc; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; pub fn read_env() { dotenv().ok(); @@ -61,18 +61,18 @@ pub fn init_appflowy_core() -> MutexAppFlowyCore { ) .log_filter("trace", vec!["appflowy_tauri".to_string()]); - let runtime = Rc::new(AFPluginRuntime::new().unwrap()); + let runtime = Arc::new(AFPluginRuntime::new().unwrap()); let cloned_runtime = runtime.clone(); runtime.block_on(async move { MutexAppFlowyCore::new(AppFlowyCore::new(config, cloned_runtime, None).await) }) } -pub struct MutexAppFlowyCore(pub Rc>); +pub struct MutexAppFlowyCore(pub Arc>); impl MutexAppFlowyCore { pub(crate) fn new(appflowy_core: AppFlowyCore) -> Self { - Self(Rc::new(Mutex::new(appflowy_core))) + Self(Arc::new(Mutex::new(appflowy_core))) } } unsafe impl Sync for MutexAppFlowyCore {} diff --git a/frontend/rust-lib/Cargo.lock b/frontend/rust-lib/Cargo.lock index 45bebc1af8..a53b2cc8ca 100644 --- a/frontend/rust-lib/Cargo.lock +++ b/frontend/rust-lib/Cargo.lock @@ -824,7 +824,7 @@ dependencies = [ [[package]] name = "collab" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2ba00c1e430f6157a2b6cbda89992d3b154ea6fb#2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b0cd69ec92a42a319a1fdb2e184151162db52cd6#b0cd69ec92a42a319a1fdb2e184151162db52cd6" dependencies = [ "anyhow", "arc-swap", @@ -849,7 +849,7 @@ dependencies = [ [[package]] name = "collab-database" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2ba00c1e430f6157a2b6cbda89992d3b154ea6fb#2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b0cd69ec92a42a319a1fdb2e184151162db52cd6#b0cd69ec92a42a319a1fdb2e184151162db52cd6" dependencies = [ "anyhow", "async-trait", @@ -879,7 +879,7 @@ dependencies = [ [[package]] name = "collab-document" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2ba00c1e430f6157a2b6cbda89992d3b154ea6fb#2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b0cd69ec92a42a319a1fdb2e184151162db52cd6#b0cd69ec92a42a319a1fdb2e184151162db52cd6" dependencies = [ "anyhow", "arc-swap", @@ -899,7 +899,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2ba00c1e430f6157a2b6cbda89992d3b154ea6fb#2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b0cd69ec92a42a319a1fdb2e184151162db52cd6#b0cd69ec92a42a319a1fdb2e184151162db52cd6" dependencies = [ "anyhow", "bytes", @@ -918,7 +918,7 @@ dependencies = [ [[package]] name = "collab-folder" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2ba00c1e430f6157a2b6cbda89992d3b154ea6fb#2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b0cd69ec92a42a319a1fdb2e184151162db52cd6#b0cd69ec92a42a319a1fdb2e184151162db52cd6" dependencies = [ "anyhow", "arc-swap", @@ -961,7 +961,7 @@ dependencies = [ [[package]] name = "collab-plugins" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2ba00c1e430f6157a2b6cbda89992d3b154ea6fb#2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b0cd69ec92a42a319a1fdb2e184151162db52cd6#b0cd69ec92a42a319a1fdb2e184151162db52cd6" dependencies = [ "anyhow", "async-stream", @@ -1041,7 +1041,7 @@ dependencies = [ [[package]] name = "collab-user" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2ba00c1e430f6157a2b6cbda89992d3b154ea6fb#2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b0cd69ec92a42a319a1fdb2e184151162db52cd6#b0cd69ec92a42a319a1fdb2e184151162db52cd6" dependencies = [ "anyhow", "collab", @@ -1325,6 +1325,7 @@ dependencies = [ "flowy-server", "flowy-server-pub", "flowy-user", + "futures", "lazy_static", "lib-dispatch", "lib-log", diff --git a/frontend/rust-lib/Cargo.toml b/frontend/rust-lib/Cargo.toml index fdabfe8379..0016e596af 100644 --- a/frontend/rust-lib/Cargo.toml +++ b/frontend/rust-lib/Cargo.toml @@ -136,13 +136,13 @@ rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb", rev = "1710120 # To switch to the local path, run: # scripts/tool/update_collab_source.sh # ⚠️⚠️⚠️️ -collab = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" } -collab-entity = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" } -collab-folder = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" } -collab-document = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" } -collab-database = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" } -collab-plugins = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" } -collab-user = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2ba00c1e430f6157a2b6cbda89992d3b154ea6fb" } +collab = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b0cd69ec92a42a319a1fdb2e184151162db52cd6" } +collab-entity = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b0cd69ec92a42a319a1fdb2e184151162db52cd6" } +collab-folder = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b0cd69ec92a42a319a1fdb2e184151162db52cd6" } +collab-document = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b0cd69ec92a42a319a1fdb2e184151162db52cd6" } +collab-database = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b0cd69ec92a42a319a1fdb2e184151162db52cd6" } +collab-plugins = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b0cd69ec92a42a319a1fdb2e184151162db52cd6" } +collab-user = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b0cd69ec92a42a319a1fdb2e184151162db52cd6" } # Working directory: frontend # To update the commit ID, run: diff --git a/frontend/rust-lib/collab-integrate/src/collab_builder.rs b/frontend/rust-lib/collab-integrate/src/collab_builder.rs index 3742b2fc72..f539cca182 100644 --- a/frontend/rust-lib/collab-integrate/src/collab_builder.rs +++ b/frontend/rust-lib/collab-integrate/src/collab_builder.rs @@ -257,6 +257,7 @@ impl AppFlowyCollabBuilder { { let mut write_collab = collab.try_write()?; if !write_collab.borrow().get_state().is_uninitialized() { + warn!("{} is already initialized", object); drop(write_collab); return Ok(collab); } @@ -285,10 +286,7 @@ impl AppFlowyCollabBuilder { } } - if build_config.auto_initialize { - // at the moment when we get the lock, the collab object is not yet exposed outside - (*write_collab).borrow_mut().initialize(); - } + (*write_collab).borrow_mut().initialize(); drop(write_collab); Ok(collab) } @@ -296,19 +294,11 @@ impl AppFlowyCollabBuilder { pub struct CollabBuilderConfig { pub sync_enable: bool, - /// If auto_initialize is false, the collab object will not be initialized automatically. - /// You need to call collab.initialize() manually. - /// - /// Default is true. - pub auto_initialize: bool, } impl Default for CollabBuilderConfig { fn default() -> Self { - Self { - sync_enable: true, - auto_initialize: true, - } + Self { sync_enable: true } } } @@ -317,11 +307,6 @@ impl CollabBuilderConfig { self.sync_enable = sync_enable; self } - - pub fn auto_initialize(mut self, auto_initialize: bool) -> Self { - self.auto_initialize = auto_initialize; - self - } } pub struct KVDBCollabPersistenceImpl { diff --git a/frontend/rust-lib/dart-ffi/Cargo.toml b/frontend/rust-lib/dart-ffi/Cargo.toml index c60c09e0c9..91ed0d9bf6 100644 --- a/frontend/rust-lib/dart-ffi/Cargo.toml +++ b/frontend/rust-lib/dart-ffi/Cargo.toml @@ -45,6 +45,7 @@ collab-integrate = { workspace = true } flowy-derive.workspace = true serde_yaml = "0.9.27" flowy-error = { workspace = true, features = ["impl_from_sqlite", "impl_from_dispatch_error", "impl_from_appflowy_cloud", "impl_from_reqwest", "impl_from_serde", "dart"] } +futures = "0.3.26" [features] default = ["dart"] diff --git a/frontend/rust-lib/dart-ffi/src/lib.rs b/frontend/rust-lib/dart-ffi/src/lib.rs index 85281c8cb0..134290e76e 100644 --- a/frontend/rust-lib/dart-ffi/src/lib.rs +++ b/frontend/rust-lib/dart-ffi/src/lib.rs @@ -3,9 +3,10 @@ use allo_isolate::Isolate; use lazy_static::lazy_static; use semver::Version; -use std::rc::Rc; -use std::sync::{Arc, Mutex}; +use std::sync::{mpsc, Arc, RwLock}; use std::{ffi::CStr, os::raw::c_char}; +use tokio::runtime::Builder; +use tokio::task::LocalSet; use tracing::{debug, error, info, trace, warn}; use flowy_core::config::AppFlowyCoreConfig; @@ -33,34 +34,77 @@ mod notification; mod protobuf; lazy_static! { - static ref APPFLOWY_CORE: MutexAppFlowyCore = MutexAppFlowyCore::new(); - static ref LOG_STREAM_ISOLATE: Mutex> = Mutex::new(None); + static ref DART_APPFLOWY_CORE: DartAppFlowyCore = DartAppFlowyCore::new(); + static ref LOG_STREAM_ISOLATE: RwLock> = RwLock::new(None); } -unsafe impl Send for MutexAppFlowyCore {} -unsafe impl Sync for MutexAppFlowyCore {} +pub struct Task { + dispatcher: Arc, + request: AFPluginRequest, + port: i64, + ret: Option>>, +} -///FIXME: I'm pretty sure that there's a better way to do this -struct MutexAppFlowyCore(Rc>>); +unsafe impl Send for Task {} +unsafe impl Sync for DartAppFlowyCore {} -impl MutexAppFlowyCore { +struct DartAppFlowyCore { + core: Arc>>, + handle: RwLock>>, + sender: RwLock>>, +} + +impl DartAppFlowyCore { fn new() -> Self { - Self(Rc::new(Mutex::new(None))) + Self { + core: Arc::new(RwLock::new(None)), + handle: RwLock::new(None), + sender: RwLock::new(None), + } } - fn dispatcher(&self) -> Option> { - let binding = self.0.lock().unwrap(); + fn dispatcher(&self) -> Option> { + let binding = self + .core + .read() + .expect("Failed to acquire read lock for core"); let core = binding.as_ref(); core.map(|core| core.event_dispatcher.clone()) } + + fn dispatch( + &self, + request: AFPluginRequest, + port: i64, + ret: Option>>, + ) { + if let Ok(sender_guard) = self.sender.read() { + if let Err(e) = sender_guard.as_ref().unwrap().send(Task { + dispatcher: self.dispatcher().unwrap(), + request, + port, + ret, + }) { + error!("Failed to send task: {}", e); + } + } else { + warn!("Failed to acquire read lock for sender"); + return; + } + } } #[no_mangle] pub extern "C" fn init_sdk(_port: i64, data: *mut c_char) -> i64 { - // and sent it the `Rust's` result - // no need to convert anything :) - let c_str = unsafe { CStr::from_ptr(data) }; - let serde_str = c_str.to_str().unwrap(); + let c_str = unsafe { + if data.is_null() { + return -1; + } + CStr::from_ptr(data) + }; + let serde_str = c_str + .to_str() + .expect("Failed to convert C string to Rust string"); let configuration = AppFlowyDartConfiguration::from_str(serde_str); configuration.write_env(); @@ -85,26 +129,49 @@ pub extern "C" fn init_sdk(_port: i64, data: *mut c_char) -> i64 { DEFAULT_NAME.to_string(), ); - // Ensure that the database is closed before initialization. Also, verify that the init_sdk function can be called - // multiple times (is reentrant). Currently, only the database resource is exclusive. - if let Some(core) = &*APPFLOWY_CORE.0.lock().unwrap() { + if let Some(core) = &*DART_APPFLOWY_CORE.core.write().unwrap() { core.close_db(); } - let runtime = Rc::new(AFPluginRuntime::new().unwrap()); - let cloned_runtime = runtime.clone(); - let log_stream = LOG_STREAM_ISOLATE - .lock() + .write() .unwrap() .take() .map(|isolate| Arc::new(LogStreamSenderImpl { isolate }) as Arc); + let (sender, task_rx) = mpsc::channel::(); + let handle = std::thread::spawn(move || { + let local_set = LocalSet::new(); + while let Ok(task) = task_rx.recv() { + let Task { + dispatcher, + request, + port, + ret, + } = task; - // let isolate = allo_isolate::Isolate::new(port); - *APPFLOWY_CORE.0.lock().unwrap() = runtime.block_on(async move { - Some(AppFlowyCore::new(config, cloned_runtime, log_stream).await) - // isolate.post("".to_string()); + let resp = AFPluginDispatcher::boxed_async_send_with_callback( + dispatcher.as_ref(), + request, + move |resp: AFPluginEventResponse| { + #[cfg(feature = "sync_verbose_log")] + trace!("[FFI]: Post data to dart through {} port", port); + Box::pin(post_to_flutter(resp, port)) + }, + &local_set, + ); + + if let Some(ret) = ret { + let _ = ret.send(resp); + } + } }); + + *DART_APPFLOWY_CORE.sender.write().unwrap() = Some(sender); + *DART_APPFLOWY_CORE.handle.write().unwrap() = Some(handle); + let runtime = Arc::new(AFPluginRuntime::new().unwrap()); + let cloned_runtime = runtime.clone(); + *DART_APPFLOWY_CORE.core.write().unwrap() = runtime + .block_on(async move { Some(AppFlowyCore::new(config, cloned_runtime, log_stream).await) }); 0 } @@ -120,40 +187,13 @@ pub extern "C" fn async_event(port: i64, input: *const u8, len: usize) { port ); - let dispatcher = match APPFLOWY_CORE.dispatcher() { - None => { - error!("sdk not init yet."); - return; - }, - Some(dispatcher) => dispatcher, - }; - AFPluginDispatcher::boxed_async_send_with_callback( - dispatcher.as_ref(), - request, - move |resp: AFPluginEventResponse| { - #[cfg(feature = "sync_verbose_log")] - trace!("[FFI]: Post data to dart through {} port", port); - Box::pin(post_to_flutter(resp, port)) - }, - ); + DART_APPFLOWY_CORE.dispatch(request, port, None); } #[no_mangle] -pub extern "C" fn sync_event(input: *const u8, len: usize) -> *const u8 { - let request: AFPluginRequest = FFIRequest::from_u8_pointer(input, len).into(); - #[cfg(feature = "sync_verbose_log")] - trace!("[FFI]: {} Sync Event: {:?}", &request.id, &request.event,); +pub extern "C" fn sync_event(_input: *const u8, _len: usize) -> *const u8 { + error!("unimplemented sync_event"); - let dispatcher = match APPFLOWY_CORE.dispatcher() { - None => { - error!("sdk not init yet."); - return forget_rust(Vec::default()); - }, - Some(dispatcher) => dispatcher, - }; - let _response = AFPluginDispatcher::sync_send(dispatcher, request); - - // FFIResponse { } let response_bytes = vec![]; let result = extend_front_four_bytes_into_bytes(&response_bytes); forget_rust(result) @@ -161,7 +201,6 @@ pub extern "C" fn sync_event(input: *const u8, len: usize) -> *const u8 { #[no_mangle] pub extern "C" fn set_stream_port(notification_port: i64) -> i32 { - // Make sure hot reload won't register the notification sender twice unregister_all_notification_sender(); register_notification_sender(DartNotificationSender::new(notification_port)); 0 @@ -169,8 +208,7 @@ pub extern "C" fn set_stream_port(notification_port: i64) -> i32 { #[no_mangle] pub extern "C" fn set_log_stream_port(port: i64) -> i32 { - *LOG_STREAM_ISOLATE.lock().unwrap() = Some(Isolate::new(port)); - + *LOG_STREAM_ISOLATE.write().unwrap() = Some(Isolate::new(port)); 0 } @@ -181,31 +219,22 @@ pub extern "C" fn link_me_please() {} #[inline(always)] async fn post_to_flutter(response: AFPluginEventResponse, port: i64) { let isolate = allo_isolate::Isolate::new(port); - #[allow(clippy::blocks_in_conditions)] - match isolate + if let Ok(_) = isolate .catch_unwind(async { let ffi_resp = FFIResponse::from(response); ffi_resp.into_bytes().unwrap().to_vec() }) .await { - Ok(_success) => { - #[cfg(feature = "sync_verbose_log")] - trace!("[FFI]: Post data to dart success"); - }, - Err(e) => { - if let Some(msg) = e.downcast_ref::<&str>() { - error!("[FFI]: {:?}", msg); - } else { - error!("[FFI]: allo_isolate post panic"); - } - }, + #[cfg(feature = "sync_verbose_log")] + trace!("[FFI]: Post data to dart success"); + } else { + error!("[FFI]: allo_isolate post panic"); } } #[no_mangle] pub extern "C" fn rust_log(level: i64, data: *const c_char) { - // Check if the data pointer is not null if data.is_null() { error!("[flutter error]: null pointer provided to backend_log"); return; @@ -213,7 +242,6 @@ pub extern "C" fn rust_log(level: i64, data: *const c_char) { let log_result = unsafe { CStr::from_ptr(data) }.to_str(); - // Handle potential UTF-8 conversion error let log_str = match log_result { Ok(str) => str, Err(e) => { @@ -225,29 +253,13 @@ pub extern "C" fn rust_log(level: i64, data: *const c_char) { }, }; - // Simplify logging by determining the log level outside of the match - let log_level = match level { - 0 => "info", - 1 => "debug", - 2 => "trace", - 3 => "warn", - 4 => "error", - _ => { - warn!("[flutter error]: Unsupported log level: {}", level); - return; - }, - }; - - // Log the message at the appropriate level - match log_level { - "info" => info!("[Flutter]: {}", log_str), - "debug" => debug!("[Flutter]: {}", log_str), - "trace" => trace!("[Flutter]: {}", log_str), - "warn" => warn!("[Flutter]: {}", log_str), - "error" => error!("[Flutter]: {}", log_str), - _ => { - warn!("[flutter error]: Unsupported log level: {}", log_level); - }, + match level { + 0 => info!("[Flutter]: {}", log_str), + 1 => debug!("[Flutter]: {}", log_str), + 2 => trace!("[Flutter]: {}", log_str), + 3 => warn!("[Flutter]: {}", log_str), + 4 => error!("[Flutter]: {}", log_str), + _ => warn!("[flutter error]: Unsupported log level: {}", level), } } diff --git a/frontend/rust-lib/event-integration-test/src/event_builder.rs b/frontend/rust-lib/event-integration-test/src/event_builder.rs index c4149378e5..a50f3aa314 100644 --- a/frontend/rust-lib/event-integration-test/src/event_builder.rs +++ b/frontend/rust-lib/event-integration-test/src/event_builder.rs @@ -3,12 +3,13 @@ use flowy_user::errors::{internal_error, FlowyError}; use lib_dispatch::prelude::{ AFPluginDispatcher, AFPluginEventResponse, AFPluginFromBytes, AFPluginRequest, ToBytes, *, }; -use std::rc::Rc; +use std::sync::Arc; use std::{ convert::TryFrom, fmt::{Debug, Display}, hash::Hash, }; +use tokio::task::LocalSet; #[derive(Clone)] pub struct EventBuilder { @@ -47,8 +48,9 @@ impl EventBuilder { } pub async fn async_send(mut self) -> Self { + let local_set = LocalSet::new(); let request = self.get_request(); - let resp = AFPluginDispatcher::async_send(self.dispatch().as_ref(), request).await; + let resp = AFPluginDispatcher::async_send(self.dispatch().as_ref(), request, &local_set).await; self.context.response = Some(resp); self } @@ -84,7 +86,7 @@ impl EventBuilder { .map(|data| data.into_inner()) } - fn dispatch(&self) -> Rc { + fn dispatch(&self) -> Arc { self.context.sdk.dispatcher() } diff --git a/frontend/rust-lib/event-integration-test/src/lib.rs b/frontend/rust-lib/event-integration-test/src/lib.rs index 88100b2f85..03e93bd90e 100644 --- a/frontend/rust-lib/event-integration-test/src/lib.rs +++ b/frontend/rust-lib/event-integration-test/src/lib.rs @@ -164,7 +164,7 @@ pub fn document_from_document_doc_state(doc_id: &str, doc_state: Vec) -> Doc } async fn init_core(config: AppFlowyCoreConfig) -> AppFlowyCore { - let runtime = Rc::new(AFPluginRuntime::new().unwrap()); + let runtime = Arc::new(AFPluginRuntime::new().unwrap()); let cloned_runtime = runtime.clone(); AppFlowyCore::new(config, cloned_runtime, None).await } diff --git a/frontend/rust-lib/event-integration-test/src/user_event.rs b/frontend/rust-lib/event-integration-test/src/user_event.rs index d4de053426..b2b3ac33e5 100644 --- a/frontend/rust-lib/event-integration-test/src/user_event.rs +++ b/frontend/rust-lib/event-integration-test/src/user_event.rs @@ -9,6 +9,7 @@ use flowy_folder::entities::{RepeatedViewPB, WorkspacePB}; use protobuf::ProtobufError; use tokio::sync::broadcast::{channel, Sender}; +use tokio::task::LocalSet; use tracing::error; use uuid::Uuid; @@ -73,11 +74,12 @@ impl EventIntegrationTest { .unwrap(); let request = AFPluginRequest::new(UserEvent::SignUp).payload(payload); - let user_profile = AFPluginDispatcher::async_send(&self.appflowy_core.dispatcher(), request) - .await - .parse::() - .unwrap() - .unwrap(); + let user_profile = + AFPluginDispatcher::async_send(&self.appflowy_core.dispatcher(), request, &LocalSet::new()) + .await + .parse::() + .unwrap() + .unwrap(); // let _ = create_default_workspace_if_need(dispatch.clone(), &user_profile.id); SignUpContext { diff --git a/frontend/rust-lib/flowy-core/src/lib.rs b/frontend/rust-lib/flowy-core/src/lib.rs index 55b4753c66..a5d60ae703 100644 --- a/frontend/rust-lib/flowy-core/src/lib.rs +++ b/frontend/rust-lib/flowy-core/src/lib.rs @@ -2,7 +2,6 @@ use flowy_search::folder::indexer::FolderIndexManagerImpl; use flowy_search::services::manager::SearchManager; -use std::rc::Rc; use std::sync::{Arc, Weak}; use std::time::Duration; use sysinfo::System; @@ -55,7 +54,7 @@ pub struct AppFlowyCore { pub document_manager: Arc, pub folder_manager: Arc, pub database_manager: Arc, - pub event_dispatcher: Rc, + pub event_dispatcher: Arc, pub server_provider: Arc, pub task_dispatcher: Arc>, pub store_preference: Arc, @@ -67,7 +66,7 @@ pub struct AppFlowyCore { impl AppFlowyCore { pub async fn new( config: AppFlowyCoreConfig, - runtime: Rc, + runtime: Arc, stream_log_sender: Option>, ) -> Self { let platform = OperatingSystem::from(&config.platform); @@ -103,7 +102,7 @@ impl AppFlowyCore { } #[instrument(skip(config, runtime))] - async fn init(config: AppFlowyCoreConfig, runtime: Rc) -> Self { + async fn init(config: AppFlowyCoreConfig, runtime: Arc) -> Self { // Init the key value database let store_preference = Arc::new(KVStorePreferences::new(&config.storage_path).unwrap()); info!("🔥{:?}", &config); @@ -262,7 +261,7 @@ impl AppFlowyCore { error!("Init user failed: {}", err) } } - let event_dispatcher = Rc::new(AFPluginDispatcher::new( + let event_dispatcher = Arc::new(AFPluginDispatcher::new( runtime, make_plugins( Arc::downgrade(&folder_manager), @@ -291,7 +290,7 @@ impl AppFlowyCore { } /// Only expose the dispatcher in test - pub fn dispatcher(&self) -> Rc { + pub fn dispatcher(&self) -> Arc { self.event_dispatcher.clone() } } diff --git a/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs b/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs index c260a0fe14..2f084537cd 100644 --- a/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs +++ b/frontend/rust-lib/flowy-database2/src/services/database/database_editor.rs @@ -681,14 +681,16 @@ impl DatabaseEditor { } pub async fn init_database_row(&self, row_id: &RowId) -> FlowyResult<()> { - if self - .database - .read() - .await - .get_database_row(row_id) - .is_some() - { - return Ok(()); + if let Some(database_row) = self.database.read().await.get_database_row(row_id) { + if !database_row + .read() + .await + .collab + .get_state() + .is_uninitialized() + { + return Ok(()); + } } debug!("Init database row: {}", row_id); @@ -696,7 +698,7 @@ impl DatabaseEditor { .database .read() .await - .create_database_row(row_id) + .init_database_row(row_id) .ok_or_else(|| { FlowyError::record_not_found() .with_context(format!("The row:{} in database not found", row_id)) diff --git a/frontend/rust-lib/flowy-folder/src/manager.rs b/frontend/rust-lib/flowy-folder/src/manager.rs index bc691ab7dc..7a6966d306 100644 --- a/frontend/rust-lib/flowy-folder/src/manager.rs +++ b/frontend/rust-lib/flowy-folder/src/manager.rs @@ -157,9 +157,7 @@ impl FolderManager { ) -> Result>, FlowyError> { let folder_notifier = folder_notifier.into(); // only need the check the workspace id when the doc state is not from the disk. - let config = CollabBuilderConfig::default() - .sync_enable(true) - .auto_initialize(true); + let config = CollabBuilderConfig::default().sync_enable(true); let data_source = data_source .unwrap_or_else(|| KVDBCollabPersistenceImpl::new(collab_db.clone(), uid).into_data_source()); diff --git a/frontend/rust-lib/lib-dispatch/src/dispatcher.rs b/frontend/rust-lib/lib-dispatch/src/dispatcher.rs index e3e72ff2be..920a9f1e2a 100644 --- a/frontend/rust-lib/lib-dispatch/src/dispatcher.rs +++ b/frontend/rust-lib/lib-dispatch/src/dispatcher.rs @@ -3,7 +3,7 @@ use pin_project::pin_project; use std::any::Any; use std::future::Future; use std::pin::Pin; -use std::rc::Rc; +use std::sync::Arc; use std::task::{Context, Poll}; use tracing::event; @@ -17,10 +17,10 @@ use crate::{ }; #[cfg(feature = "local_set")] -pub trait AFConcurrent {} +pub trait AFConcurrent: Send {} #[cfg(feature = "local_set")] -impl AFConcurrent for T where T: ?Sized {} +impl AFConcurrent for T where T: Send + ?Sized {} #[cfg(not(feature = "local_set"))] pub trait AFConcurrent: Send + Sync {} @@ -47,7 +47,7 @@ pub(crate) fn downcast_owned(boxed: AFBox) -> Option; +pub(crate) type AFBox = Box; #[cfg(not(feature = "local_set"))] pub(crate) type AFBox = Box; @@ -70,11 +70,12 @@ where pub struct AFPluginDispatcher { plugins: AFPluginMap, - runtime: Rc, + #[allow(dead_code)] + runtime: Arc, } impl AFPluginDispatcher { - pub fn new(runtime: Rc, plugins: Vec) -> AFPluginDispatcher { + pub fn new(runtime: Arc, plugins: Vec) -> AFPluginDispatcher { tracing::trace!("{}", plugin_info(&plugins)); AFPluginDispatcher { plugins: plugin_map_or_crash(plugins), @@ -82,13 +83,111 @@ impl AFPluginDispatcher { } } - pub async fn async_send(dispatch: &AFPluginDispatcher, request: Req) -> AFPluginEventResponse + #[cfg(feature = "local_set")] + pub async fn async_send( + dispatch: &AFPluginDispatcher, + request: Req, + local_set: &tokio::task::LocalSet, + ) -> AFPluginEventResponse where Req: Into, { - AFPluginDispatcher::async_send_with_callback(dispatch, request, |_| Box::pin(async {})).await + AFPluginDispatcher::async_send_with_callback( + dispatch, + request, + |_| Box::pin(async {}), + local_set, + ) + .await + } + #[cfg(feature = "local_set")] + pub async fn async_send_with_callback( + dispatch: &AFPluginDispatcher, + request: Req, + callback: Callback, + local_set: &tokio::task::LocalSet, + ) -> AFPluginEventResponse + where + Req: Into, + Callback: FnOnce(AFPluginEventResponse) -> AFBoxFuture<'static, ()> + AFConcurrent + 'static, + { + let request: AFPluginRequest = request.into(); + let plugins = dispatch.plugins.clone(); + let service = Box::new(DispatchService { plugins }); + tracing::trace!("Async event: {:?}", &request.event); + let service_ctx = DispatchContext { + request, + callback: Some(Box::new(callback)), + }; + + let handle = local_set.spawn_local(async move { + service.call(service_ctx).await.unwrap_or_else(|e| { + tracing::error!("Dispatch runtime error: {:?}", e); + InternalError::Other(format!("{:?}", e)).as_response() + }) + }); + // let handle = tokio::spawn(async move { + // service.call(service_ctx).await.unwrap_or_else(|e| { + // tracing::error!("Dispatch runtime error: {:?}", e); + // InternalError::Other(format!("{:?}", e)).as_response() + // }) + // }); + + let result: Result = local_set + .run_until(handle) + .await + .map_err(|e| e.to_string().into()); + + result.unwrap_or_else(|e| { + let msg = format!("EVENT_DISPATCH join error: {:?}", e); + tracing::error!("{}", msg); + let error = InternalError::JoinError(msg); + error.as_response() + }) } + #[cfg(feature = "local_set")] + pub fn boxed_async_send_with_callback( + dispatch: &AFPluginDispatcher, + request: Req, + callback: Callback, + local_set: &tokio::task::LocalSet, + ) -> DispatchFuture + where + Req: Into + 'static, + Callback: FnOnce(AFPluginEventResponse) -> AFBoxFuture<'static, ()> + AFConcurrent + 'static, + { + let request: AFPluginRequest = request.into(); + let plugins = dispatch.plugins.clone(); + let service = Box::new(DispatchService { plugins }); + tracing::trace!("[dispatch]: Async event: {:?}", &request.event); + let service_ctx = DispatchContext { + request, + callback: Some(Box::new(callback)), + }; + + let handle = local_set.spawn_local(async move { + service.call(service_ctx).await.unwrap_or_else(|e| { + tracing::error!("Dispatch runtime error: {:?}", e); + InternalError::Other(format!("{:?}", e)).as_response() + }) + }); + + let fut = local_set.run_until(handle); + let result = local_set.block_on(&dispatch.runtime.inner, fut); + DispatchFuture { + fut: Box::pin(async move { + result.unwrap_or_else(|e| { + let msg = format!("EVENT_DISPATCH join error: {:?}", e); + tracing::error!("{}", msg); + let error = InternalError::JoinError(msg); + error.as_response() + }) + }), + } + } + + #[cfg(not(feature = "local_set"))] pub async fn async_send_with_callback( dispatch: &AFPluginDispatcher, request: Req, @@ -107,65 +206,25 @@ impl AFPluginDispatcher { callback: Some(Box::new(callback)), }; - // Spawns a future onto the runtime. - // - // This spawns the given future onto the runtime's executor, usually a - // thread pool. The thread pool is then responsible for polling the future - // until it completes. - // - // The provided future will start running in the background immediately - // when `spawn` is called, even if you don't await the returned - // `JoinHandle`. - let result: Result; - #[cfg(feature = "local_set")] - { - let handle = dispatch.runtime.local.spawn_local(async move { + dispatch + .runtime + .spawn(async move { service.call(service_ctx).await.unwrap_or_else(|e| { tracing::error!("Dispatch runtime error: {:?}", e); InternalError::Other(format!("{:?}", e)).as_response() }) - }); - - result = dispatch - .runtime - .local - .run_until(handle) - .await - .map_err(|e| e.to_string().into()) - } - - #[cfg(not(feature = "local_set"))] - { - result = dispatch - .runtime - .spawn(async move { - service.call(service_ctx).await.unwrap_or_else(|e| { - tracing::error!("Dispatch runtime error: {:?}", e); - InternalError::Other(format!("{:?}", e)).as_response() - }) - }) - .await; - } - - result.unwrap_or_else(|e| { - let msg = format!("EVENT_DISPATCH join error: {:?}", e); - tracing::error!("{}", msg); - let error = InternalError::JoinError(msg); - error.as_response() - }) + }) + .await + .unwrap_or_else(|e| { + let msg = format!("EVENT_DISPATCH join error: {:?}", e); + tracing::error!("{}", msg); + let error = InternalError::JoinError(msg); + error.as_response() + }) } - pub fn box_async_send( - dispatch: &AFPluginDispatcher, - request: Req, - ) -> DispatchFuture - where - Req: Into + 'static, - { - AFPluginDispatcher::boxed_async_send_with_callback(dispatch, request, |_| Box::pin(async {})) - } - - pub fn boxed_async_send_with_callback( + #[cfg(not(feature = "local_set"))] + pub async fn boxed_async_send_with_callback( dispatch: &AFPluginDispatcher, request: Req, callback: Callback, @@ -183,76 +242,39 @@ impl AFPluginDispatcher { callback: Some(Box::new(callback)), }; - #[cfg(feature = "local_set")] - { - let handle = dispatch.runtime.local.spawn_local(async move { - service.call(service_ctx).await.unwrap_or_else(|e| { - tracing::error!("Dispatch runtime error: {:?}", e); - InternalError::Other(format!("{:?}", e)).as_response() + let handle = dispatch.runtime.spawn(async move { + service.call(service_ctx).await.unwrap_or_else(|e| { + tracing::error!("[dispatch]: runtime error: {:?}", e); + InternalError::Other(format!("{:?}", e)).as_response() + }) + }); + + let runtime = dispatch.runtime.clone(); + DispatchFuture { + fut: Box::pin(async move { + let result = runtime.spawn(handle).await.unwrap(); + result.unwrap_or_else(|e| { + let msg = format!("EVENT_DISPATCH join error: {:?}", e); + tracing::error!("{}", msg); + let error = InternalError::JoinError(msg); + error.as_response() }) - }); - - let fut = dispatch.runtime.local.run_until(handle); - let result = dispatch.runtime.block_on(fut); - DispatchFuture { - fut: Box::pin(async move { - result.unwrap_or_else(|e| { - let msg = format!("EVENT_DISPATCH join error: {:?}", e); - tracing::error!("{}", msg); - let error = InternalError::JoinError(msg); - error.as_response() - }) - }), - } - } - - #[cfg(not(feature = "local_set"))] - { - let handle = dispatch.runtime.spawn(async move { - service - .call(crate::service::service::Service) - .await - .unwrap_or_else(|e| { - tracing::error!("[dispatch]: runtime error: {:?}", e); - InternalError::Other(format!("{:?}", e)).as_response() - }) - }); - - let runtime = dispatch.runtime.clone(); - DispatchFuture { - fut: Box::pin(async move { - let result = runtime.run_until(handle).await; - result.unwrap_or_else(|e| { - let msg = format!("EVENT_DISPATCH join error: {:?}", e); - tracing::error!("{}", msg); - let error = InternalError::JoinError(msg); - error.as_response() - }) - }), - } + }), } } - #[cfg(not(target_arch = "wasm32"))] + #[cfg(feature = "local_set")] pub fn sync_send( - dispatch: Rc, + dispatch: Arc, request: AFPluginRequest, ) -> AFPluginEventResponse { futures::executor::block_on(AFPluginDispatcher::async_send_with_callback( dispatch.as_ref(), request, |_| Box::pin(async {}), + &tokio::task::LocalSet::new(), )) } - - #[track_caller] - pub fn spawn(&self, future: F) -> tokio::task::JoinHandle - where - F: Future + Send + 'static, - ::Output: Send + 'static, - { - self.runtime.spawn(future) - } } #[derive(Derivative)] diff --git a/frontend/rust-lib/lib-dispatch/src/module/container.rs b/frontend/rust-lib/lib-dispatch/src/module/container.rs index d6fdf24d67..4082590345 100644 --- a/frontend/rust-lib/lib-dispatch/src/module/container.rs +++ b/frontend/rust-lib/lib-dispatch/src/module/container.rs @@ -13,7 +13,7 @@ impl AFPluginStateMap { pub fn insert(&mut self, val: T) -> Option where - T: 'static + AFConcurrent, + T: 'static + Send + Sync, { self .0 diff --git a/frontend/rust-lib/lib-dispatch/src/module/data.rs b/frontend/rust-lib/lib-dispatch/src/module/data.rs index 520c3e2494..3cf8f23d51 100644 --- a/frontend/rust-lib/lib-dispatch/src/module/data.rs +++ b/frontend/rust-lib/lib-dispatch/src/module/data.rs @@ -53,7 +53,7 @@ where impl FromAFPluginRequest for AFPluginState where - T: ?Sized + AFConcurrent + 'static, + T: ?Sized + Send + Sync + 'static, { type Error = DispatchError; type Future = Ready>; diff --git a/frontend/rust-lib/lib-dispatch/src/module/module.rs b/frontend/rust-lib/lib-dispatch/src/module/module.rs index 883225c1b0..12cbb0c150 100644 --- a/frontend/rust-lib/lib-dispatch/src/module/module.rs +++ b/frontend/rust-lib/lib-dispatch/src/module/module.rs @@ -13,7 +13,6 @@ use crate::{ use futures_core::ready; use nanoid::nanoid; use pin_project::pin_project; -use std::rc::Rc; use std::sync::Arc; use std::{ collections::HashMap, @@ -25,12 +24,12 @@ use std::{ task::{Context, Poll}, }; -pub type AFPluginMap = Rc>>; +pub type AFPluginMap = Arc>>; pub(crate) fn plugin_map_or_crash(plugins: Vec) -> AFPluginMap { - let mut plugin_map: HashMap> = HashMap::new(); + let mut plugin_map: HashMap> = HashMap::new(); plugins.into_iter().for_each(|m| { let events = m.events(); - let plugins = Rc::new(m); + let plugins = Arc::new(m); events.into_iter().for_each(|e| { if plugin_map.contains_key(&e) { let plugin_name = plugin_map.get(&e).map(|p| &p.name); @@ -39,7 +38,7 @@ pub(crate) fn plugin_map_or_crash(plugins: Vec) -> AFPluginMap { plugin_map.insert(e, plugins.clone()); }); }); - Rc::new(plugin_map) + Arc::new(plugin_map) } #[derive(PartialEq, Eq, Hash, Debug, Clone)] @@ -66,7 +65,7 @@ pub struct AFPlugin { /// Contains a list of factories that are used to generate the services used to handle the passed-in /// `ServiceRequest`. /// - event_service_factory: Rc< + event_service_factory: Arc< HashMap>, >, } @@ -76,7 +75,7 @@ impl std::default::Default for AFPlugin { Self { name: "".to_owned(), states: Default::default(), - event_service_factory: Rc::new(HashMap::new()), + event_service_factory: Arc::new(HashMap::new()), } } } @@ -91,7 +90,7 @@ impl AFPlugin { self } - pub fn state(mut self, data: D) -> Self { + pub fn state(mut self, data: D) -> Self { Arc::get_mut(&mut self.states) .unwrap() .insert(crate::module::AFPluginState::new(data)); @@ -112,7 +111,7 @@ impl AFPlugin { if self.event_service_factory.contains_key(&event) { panic!("Register duplicate Event: {:?}", &event); } else { - Rc::get_mut(&mut self.event_service_factory) + Arc::get_mut(&mut self.event_service_factory) .unwrap() .insert(event, factory(AFPluginHandlerService::new(handler))); } @@ -184,7 +183,7 @@ impl AFPluginServiceFactory for AFPlugin { } pub struct AFPluginService { - services: Rc< + services: Arc< HashMap>, >, states: AFStateMap, diff --git a/frontend/rust-lib/lib-dispatch/src/request/request.rs b/frontend/rust-lib/lib-dispatch/src/request/request.rs index c62950f65d..68aab764d4 100644 --- a/frontend/rust-lib/lib-dispatch/src/request/request.rs +++ b/frontend/rust-lib/lib-dispatch/src/request/request.rs @@ -8,7 +8,7 @@ use std::{ use derivative::*; use futures_core::ready; -use crate::prelude::{AFConcurrent, AFStateMap}; +use crate::prelude::AFStateMap; use crate::{ errors::{DispatchError, InternalError}, module::AFPluginEvent, @@ -39,7 +39,7 @@ impl AFPluginEventRequest { pub fn get_state(&self) -> Option where - T: AFConcurrent + 'static + Clone, + T: Send + Sync + 'static + Clone, { if let Some(data) = self.states.get::() { return Some(data.clone()); diff --git a/frontend/rust-lib/lib-dispatch/src/runtime.rs b/frontend/rust-lib/lib-dispatch/src/runtime.rs index eaa3223a20..e2f5cd56c3 100644 --- a/frontend/rust-lib/lib-dispatch/src/runtime.rs +++ b/frontend/rust-lib/lib-dispatch/src/runtime.rs @@ -7,17 +7,15 @@ use tokio::runtime::Runtime; use tokio::task::JoinHandle; pub struct AFPluginRuntime { - inner: Runtime, - #[cfg(feature = "local_set")] - pub(crate) local: tokio::task::LocalSet, + pub(crate) inner: Runtime, } impl Display for AFPluginRuntime { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { if cfg!(any(target_arch = "wasm32", feature = "local_set")) { - write!(f, "Runtime(current_thread)") + write!(f, "Runtime(local_set)") } else { - write!(f, "Runtime(multi_thread)") + write!(f, "Runtime") } } } @@ -25,11 +23,7 @@ impl Display for AFPluginRuntime { impl AFPluginRuntime { pub fn new() -> io::Result { let inner = default_tokio_runtime()?; - Ok(Self { - inner, - #[cfg(feature = "local_set")] - local: tokio::task::LocalSet::new(), - }) + Ok(Self { inner }) } #[track_caller] @@ -41,16 +35,6 @@ impl AFPluginRuntime { self.inner.spawn(future) } - #[cfg(feature = "local_set")] - #[track_caller] - pub fn block_on(&self, f: F) -> F::Output - where - F: Future, - { - self.local.block_on(&self.inner, f) - } - - #[cfg(not(feature = "local_set"))] #[track_caller] pub fn block_on(&self, f: F) -> F::Output where @@ -62,21 +46,11 @@ impl AFPluginRuntime { #[cfg(feature = "local_set")] pub fn default_tokio_runtime() -> io::Result { - #[cfg(not(target_arch = "wasm32"))] - { - runtime::Builder::new_multi_thread() - .enable_io() - .enable_time() - .thread_name("dispatch-rt-st") - .build() - } - - #[cfg(target_arch = "wasm32")] - { - runtime::Builder::new_current_thread() - .thread_name("dispatch-rt-st") - .build() - } + runtime::Builder::new_multi_thread() + .enable_io() + .enable_time() + .thread_name("dispatch-rt-st") + .build() } #[cfg(not(feature = "local_set"))] diff --git a/frontend/rust-lib/lib-dispatch/tests/api/module.rs b/frontend/rust-lib/lib-dispatch/tests/api/module.rs index fed8d75720..f7c4a9f591 100644 --- a/frontend/rust-lib/lib-dispatch/tests/api/module.rs +++ b/frontend/rust-lib/lib-dispatch/tests/api/module.rs @@ -1,7 +1,7 @@ -use std::rc::Rc; - use lib_dispatch::prelude::*; use lib_dispatch::runtime::AFPluginRuntime; +use std::sync::Arc; +use tokio::task::LocalSet; pub async fn hello() -> String { "say hello".to_string() @@ -10,17 +10,22 @@ pub async fn hello() -> String { #[tokio::test] async fn test() { let event = "1"; - let runtime = Rc::new(AFPluginRuntime::new().unwrap()); - let dispatch = Rc::new(AFPluginDispatcher::new( + let runtime = Arc::new(AFPluginRuntime::new().unwrap()); + let dispatch = Arc::new(AFPluginDispatcher::new( runtime, vec![AFPlugin::new().event(event, hello)], )); let request = AFPluginRequest::new(event); - let _ = AFPluginDispatcher::async_send_with_callback(dispatch.as_ref(), request, |resp| { - Box::pin(async move { - dbg!(&resp); - }) - }) + let _ = AFPluginDispatcher::async_send_with_callback( + dispatch.as_ref(), + request, + |resp| { + Box::pin(async move { + dbg!(&resp); + }) + }, + &LocalSet::new(), + ) .await; std::mem::forget(dispatch);