chore: async load database row (#6051)

* chore: update

* chore: fix test

* chore: clippy

* chore: clippy
This commit is contained in:
Nathan.fooo 2024-08-24 00:40:57 +08:00 committed by GitHub
parent 3fa72106e9
commit d3b7c5fea5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 177 additions and 151 deletions

View File

@ -964,7 +964,7 @@ dependencies = [
[[package]]
name = "collab"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79e649061da9245d81b9114f3aaaf0064c265c97#79e649061da9245d81b9114f3aaaf0064c265c97"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c714b6bd458420663c7a5b29370d6892902e995c#c714b6bd458420663c7a5b29370d6892902e995c"
dependencies = [
"anyhow",
"arc-swap",
@ -989,7 +989,7 @@ dependencies = [
[[package]]
name = "collab-database"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79e649061da9245d81b9114f3aaaf0064c265c97#79e649061da9245d81b9114f3aaaf0064c265c97"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c714b6bd458420663c7a5b29370d6892902e995c#c714b6bd458420663c7a5b29370d6892902e995c"
dependencies = [
"anyhow",
"async-trait",
@ -1018,7 +1018,7 @@ dependencies = [
[[package]]
name = "collab-document"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79e649061da9245d81b9114f3aaaf0064c265c97#79e649061da9245d81b9114f3aaaf0064c265c97"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c714b6bd458420663c7a5b29370d6892902e995c#c714b6bd458420663c7a5b29370d6892902e995c"
dependencies = [
"anyhow",
"arc-swap",
@ -1038,7 +1038,7 @@ dependencies = [
[[package]]
name = "collab-entity"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79e649061da9245d81b9114f3aaaf0064c265c97#79e649061da9245d81b9114f3aaaf0064c265c97"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c714b6bd458420663c7a5b29370d6892902e995c#c714b6bd458420663c7a5b29370d6892902e995c"
dependencies = [
"anyhow",
"bytes",
@ -1057,7 +1057,7 @@ dependencies = [
[[package]]
name = "collab-folder"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79e649061da9245d81b9114f3aaaf0064c265c97#79e649061da9245d81b9114f3aaaf0064c265c97"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c714b6bd458420663c7a5b29370d6892902e995c#c714b6bd458420663c7a5b29370d6892902e995c"
dependencies = [
"anyhow",
"arc-swap",
@ -1100,7 +1100,7 @@ dependencies = [
[[package]]
name = "collab-plugins"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79e649061da9245d81b9114f3aaaf0064c265c97#79e649061da9245d81b9114f3aaaf0064c265c97"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c714b6bd458420663c7a5b29370d6892902e995c#c714b6bd458420663c7a5b29370d6892902e995c"
dependencies = [
"anyhow",
"async-stream",
@ -1180,7 +1180,7 @@ dependencies = [
[[package]]
name = "collab-user"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79e649061da9245d81b9114f3aaaf0064c265c97#79e649061da9245d81b9114f3aaaf0064c265c97"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c714b6bd458420663c7a5b29370d6892902e995c#c714b6bd458420663c7a5b29370d6892902e995c"
dependencies = [
"anyhow",
"collab",

View File

@ -116,13 +116,13 @@ custom-protocol = ["tauri/custom-protocol"]
# To switch to the local path, run:
# scripts/tool/update_collab_source.sh
# ⚠️⚠️⚠️️
collab = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79e649061da9245d81b9114f3aaaf0064c265c97" }
collab-entity = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79e649061da9245d81b9114f3aaaf0064c265c97" }
collab-folder = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79e649061da9245d81b9114f3aaaf0064c265c97" }
collab-document = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79e649061da9245d81b9114f3aaaf0064c265c97" }
collab-database = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79e649061da9245d81b9114f3aaaf0064c265c97" }
collab-plugins = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79e649061da9245d81b9114f3aaaf0064c265c97" }
collab-user = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79e649061da9245d81b9114f3aaaf0064c265c97" }
collab = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c714b6bd458420663c7a5b29370d6892902e995c" }
collab-entity = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c714b6bd458420663c7a5b29370d6892902e995c" }
collab-folder = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c714b6bd458420663c7a5b29370d6892902e995c" }
collab-document = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c714b6bd458420663c7a5b29370d6892902e995c" }
collab-database = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c714b6bd458420663c7a5b29370d6892902e995c" }
collab-plugins = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c714b6bd458420663c7a5b29370d6892902e995c" }
collab-user = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c714b6bd458420663c7a5b29370d6892902e995c" }
# Working directory: frontend
# To update the commit ID, run:

View File

@ -947,7 +947,7 @@ dependencies = [
[[package]]
name = "collab"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79e649061da9245d81b9114f3aaaf0064c265c97#79e649061da9245d81b9114f3aaaf0064c265c97"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c714b6bd458420663c7a5b29370d6892902e995c#c714b6bd458420663c7a5b29370d6892902e995c"
dependencies = [
"anyhow",
"arc-swap",
@ -972,7 +972,7 @@ dependencies = [
[[package]]
name = "collab-database"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79e649061da9245d81b9114f3aaaf0064c265c97#79e649061da9245d81b9114f3aaaf0064c265c97"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c714b6bd458420663c7a5b29370d6892902e995c#c714b6bd458420663c7a5b29370d6892902e995c"
dependencies = [
"anyhow",
"async-trait",
@ -1001,7 +1001,7 @@ dependencies = [
[[package]]
name = "collab-document"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79e649061da9245d81b9114f3aaaf0064c265c97#79e649061da9245d81b9114f3aaaf0064c265c97"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c714b6bd458420663c7a5b29370d6892902e995c#c714b6bd458420663c7a5b29370d6892902e995c"
dependencies = [
"anyhow",
"arc-swap",
@ -1021,7 +1021,7 @@ dependencies = [
[[package]]
name = "collab-entity"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79e649061da9245d81b9114f3aaaf0064c265c97#79e649061da9245d81b9114f3aaaf0064c265c97"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c714b6bd458420663c7a5b29370d6892902e995c#c714b6bd458420663c7a5b29370d6892902e995c"
dependencies = [
"anyhow",
"bytes",
@ -1040,7 +1040,7 @@ dependencies = [
[[package]]
name = "collab-folder"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79e649061da9245d81b9114f3aaaf0064c265c97#79e649061da9245d81b9114f3aaaf0064c265c97"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c714b6bd458420663c7a5b29370d6892902e995c#c714b6bd458420663c7a5b29370d6892902e995c"
dependencies = [
"anyhow",
"arc-swap",
@ -1083,7 +1083,7 @@ dependencies = [
[[package]]
name = "collab-plugins"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79e649061da9245d81b9114f3aaaf0064c265c97#79e649061da9245d81b9114f3aaaf0064c265c97"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c714b6bd458420663c7a5b29370d6892902e995c#c714b6bd458420663c7a5b29370d6892902e995c"
dependencies = [
"anyhow",
"async-stream",
@ -1163,7 +1163,7 @@ dependencies = [
[[package]]
name = "collab-user"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79e649061da9245d81b9114f3aaaf0064c265c97#79e649061da9245d81b9114f3aaaf0064c265c97"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c714b6bd458420663c7a5b29370d6892902e995c#c714b6bd458420663c7a5b29370d6892902e995c"
dependencies = [
"anyhow",
"collab",

View File

@ -116,13 +116,13 @@ custom-protocol = ["tauri/custom-protocol"]
# To switch to the local path, run:
# scripts/tool/update_collab_source.sh
# ⚠️⚠️⚠️️
collab = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79e649061da9245d81b9114f3aaaf0064c265c97" }
collab-entity = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79e649061da9245d81b9114f3aaaf0064c265c97" }
collab-folder = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79e649061da9245d81b9114f3aaaf0064c265c97" }
collab-document = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79e649061da9245d81b9114f3aaaf0064c265c97" }
collab-database = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79e649061da9245d81b9114f3aaaf0064c265c97" }
collab-plugins = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79e649061da9245d81b9114f3aaaf0064c265c97" }
collab-user = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79e649061da9245d81b9114f3aaaf0064c265c97" }
collab = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c714b6bd458420663c7a5b29370d6892902e995c" }
collab-entity = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c714b6bd458420663c7a5b29370d6892902e995c" }
collab-folder = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c714b6bd458420663c7a5b29370d6892902e995c" }
collab-document = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c714b6bd458420663c7a5b29370d6892902e995c" }
collab-database = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c714b6bd458420663c7a5b29370d6892902e995c" }
collab-plugins = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c714b6bd458420663c7a5b29370d6892902e995c" }
collab-user = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c714b6bd458420663c7a5b29370d6892902e995c" }
# Working directory: frontend
# To update the commit ID, run:

View File

@ -825,7 +825,7 @@ dependencies = [
[[package]]
name = "collab"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79e649061da9245d81b9114f3aaaf0064c265c97#79e649061da9245d81b9114f3aaaf0064c265c97"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c714b6bd458420663c7a5b29370d6892902e995c#c714b6bd458420663c7a5b29370d6892902e995c"
dependencies = [
"anyhow",
"arc-swap",
@ -850,7 +850,7 @@ dependencies = [
[[package]]
name = "collab-database"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79e649061da9245d81b9114f3aaaf0064c265c97#79e649061da9245d81b9114f3aaaf0064c265c97"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c714b6bd458420663c7a5b29370d6892902e995c#c714b6bd458420663c7a5b29370d6892902e995c"
dependencies = [
"anyhow",
"async-trait",
@ -879,7 +879,7 @@ dependencies = [
[[package]]
name = "collab-document"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79e649061da9245d81b9114f3aaaf0064c265c97#79e649061da9245d81b9114f3aaaf0064c265c97"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c714b6bd458420663c7a5b29370d6892902e995c#c714b6bd458420663c7a5b29370d6892902e995c"
dependencies = [
"anyhow",
"arc-swap",
@ -899,7 +899,7 @@ dependencies = [
[[package]]
name = "collab-entity"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79e649061da9245d81b9114f3aaaf0064c265c97#79e649061da9245d81b9114f3aaaf0064c265c97"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c714b6bd458420663c7a5b29370d6892902e995c#c714b6bd458420663c7a5b29370d6892902e995c"
dependencies = [
"anyhow",
"bytes",
@ -918,7 +918,7 @@ dependencies = [
[[package]]
name = "collab-folder"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79e649061da9245d81b9114f3aaaf0064c265c97#79e649061da9245d81b9114f3aaaf0064c265c97"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c714b6bd458420663c7a5b29370d6892902e995c#c714b6bd458420663c7a5b29370d6892902e995c"
dependencies = [
"anyhow",
"arc-swap",
@ -961,7 +961,7 @@ dependencies = [
[[package]]
name = "collab-plugins"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79e649061da9245d81b9114f3aaaf0064c265c97#79e649061da9245d81b9114f3aaaf0064c265c97"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c714b6bd458420663c7a5b29370d6892902e995c#c714b6bd458420663c7a5b29370d6892902e995c"
dependencies = [
"anyhow",
"async-stream",
@ -1041,7 +1041,7 @@ dependencies = [
[[package]]
name = "collab-user"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=79e649061da9245d81b9114f3aaaf0064c265c97#79e649061da9245d81b9114f3aaaf0064c265c97"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c714b6bd458420663c7a5b29370d6892902e995c#c714b6bd458420663c7a5b29370d6892902e995c"
dependencies = [
"anyhow",
"collab",

View File

@ -136,13 +136,13 @@ rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb", rev = "1710120
# To switch to the local path, run:
# scripts/tool/update_collab_source.sh
# ⚠️⚠️⚠️️
collab = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79e649061da9245d81b9114f3aaaf0064c265c97" }
collab-entity = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79e649061da9245d81b9114f3aaaf0064c265c97" }
collab-folder = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79e649061da9245d81b9114f3aaaf0064c265c97" }
collab-document = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79e649061da9245d81b9114f3aaaf0064c265c97" }
collab-database = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79e649061da9245d81b9114f3aaaf0064c265c97" }
collab-plugins = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79e649061da9245d81b9114f3aaaf0064c265c97" }
collab-user = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "79e649061da9245d81b9114f3aaaf0064c265c97" }
collab = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c714b6bd458420663c7a5b29370d6892902e995c" }
collab-entity = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c714b6bd458420663c7a5b29370d6892902e995c" }
collab-folder = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c714b6bd458420663c7a5b29370d6892902e995c" }
collab-document = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c714b6bd458420663c7a5b29370d6892902e995c" }
collab-database = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c714b6bd458420663c7a5b29370d6892902e995c" }
collab-plugins = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c714b6bd458420663c7a5b29370d6892902e995c" }
collab-user = { version = "0.2", git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c714b6bd458420663c7a5b29370d6892902e995c" }
# Working directory: frontend
# To update the commit ID, run:

View File

@ -8,9 +8,7 @@ use arc_swap::{ArcSwap, ArcSwapOption};
use collab::core::collab::DataSource;
use collab::core::collab_plugin::CollabPersistence;
use collab::preclude::{Collab, CollabBuilder};
use collab_database::workspace_database::{
DatabaseCollabCloudService, DatabaseCollabService, WorkspaceDatabase,
};
use collab_database::workspace_database::{DatabaseCollabService, WorkspaceDatabase};
use collab_document::blocks::DocumentData;
use collab_document::document::Document;
use collab_entity::{CollabObject, CollabType};
@ -242,16 +240,14 @@ impl AppFlowyCollabBuilder {
pub fn create_workspace_database(
&self,
object: CollabObject,
doc_state: DataSource,
collab: Collab,
collab_db: Weak<CollabKVDB>,
builder_config: CollabBuilderConfig,
collab_service: impl DatabaseCollabService,
cloud_service: impl DatabaseCollabCloudService,
) -> Result<Arc<RwLock<WorkspaceDatabase>>, Error> {
let expected_collab_type = CollabType::WorkspaceDatabase;
assert_eq!(object.collab_type, expected_collab_type);
let collab = self.build_collab(&object, &collab_db, doc_state)?;
let workspace = WorkspaceDatabase::open(collab, collab_service, cloud_service);
let workspace = WorkspaceDatabase::open(collab, collab_service);
self.flush_collab_if_not_exist(
object.uid,

View File

@ -13,17 +13,18 @@ use collab_database::error::DatabaseError;
use collab_database::rows::RowId;
use collab_database::views::DatabaseLayout;
use collab_database::workspace_database::{
DatabaseCollabCloudService, DatabaseCollabPersistenceService, DatabaseCollabService,
DatabaseMeta, EncodeCollabByOid, WorkspaceDatabase,
CollabPersistenceImpl, DatabaseCollabPersistenceService, DatabaseCollabService, DatabaseMeta,
EncodeCollabByOid, WorkspaceDatabase,
};
use collab_entity::{CollabType, EncodedCollab};
use collab_entity::{CollabObject, CollabType, EncodedCollab};
use collab_plugins::local_storage::kv::KVTransactionDB;
use dashmap::DashMap;
use tokio::select;
use tokio::sync::{Mutex, RwLock};
use tracing::{error, event, instrument, trace};
use tokio_util::sync::CancellationToken;
use tracing::{error, instrument, trace};
use collab_integrate::collab_builder::{
AppFlowyCollabBuilder, CollabBuilderConfig, KVDBCollabPersistenceImpl,
};
use collab_integrate::collab_builder::{AppFlowyCollabBuilder, CollabBuilderConfig};
use collab_integrate::{CollabKVAction, CollabKVDB};
use flowy_database_pub::cloud::{
DatabaseAIService, DatabaseCloudService, SummaryRowContent, TranslateItem, TranslateRowContent,
@ -80,16 +81,6 @@ impl DatabaseManager {
}
}
fn is_collab_exist(&self, uid: i64, collab_db: &Weak<CollabKVDB>, object_id: &str) -> bool {
match collab_db.upgrade() {
None => false,
Some(collab_db) => {
let read_txn = collab_db.read_txn();
read_txn.is_exist(uid, object_id)
},
}
}
/// When initialize with new workspace, all the resources will be cleared.
pub async fn initialize(&self, uid: i64) -> FlowyResult<()> {
// 1. Clear all existing tasks
@ -107,72 +98,30 @@ impl DatabaseManager {
}
let collab_db = self.user.collab_db(uid)?;
let collab_service = WorkspaceDatabaseCollabServiceImpl {
user: self.user.clone(),
collab_builder: self.collab_builder.clone(),
};
let collab_cloud_service = WorkspaceDatabaseCloudServiceImpl {
user: self.user.clone(),
cloud_service: self.cloud_service.clone(),
};
let workspace_id = self.user.workspace_id()?;
let workspace_database_object_id = self.user.workspace_database_object_id()?;
let mut workspace_database_data_source =
KVDBCollabPersistenceImpl::new(collab_db.clone(), uid).into_data_source();
let is_exist_in_disk = self.is_collab_exist(uid, &collab_db, &workspace_database_object_id);
// 4.If the workspace database not exist in disk, try to fetch from remote.
if !is_exist_in_disk {
trace!("workspace database not exist, try to fetch from remote");
match self
.cloud_service
.get_database_encode_collab(
&workspace_database_object_id,
CollabType::WorkspaceDatabase,
&workspace_id,
)
.await
{
Ok(value) => {
if let Some(encode_collab) = value {
workspace_database_data_source = DataSource::from(encode_collab);
}
},
Err(err) => {
return Err(FlowyError::record_not_found().with_context(format!(
"get workspace database :{} failed: {}",
workspace_database_object_id, err,
)));
},
}
}
// Construct the workspace database.
event!(
tracing::Level::INFO,
"create workspace database object: {}",
&workspace_database_object_id
let collab_service = WorkspaceDatabaseCollabServiceImpl::new(
self.user.clone(),
self.collab_builder.clone(),
self.cloud_service.clone(),
);
let workspace_id = self
.user
.workspace_id()
.map_err(|err| DatabaseError::Internal(err.into()))?;
let collab_object = self.collab_builder.collab_object(
&workspace_id,
uid,
&workspace_database_object_id,
let workspace_database_object_id = self.user.workspace_database_object_id()?;
let workspace_database_collab = collab_service
.build_collab(
workspace_database_object_id.as_str(),
CollabType::WorkspaceDatabase,
)?;
false,
)
.await?;
let collab_object = collab_service
.build_collab_object(&workspace_database_object_id, CollabType::WorkspaceDatabase)?;
let workspace_database = self.collab_builder.create_workspace_database(
collab_object,
workspace_database_data_source,
workspace_database_collab,
collab_db,
CollabBuilderConfig::default().sync_enable(true),
collab_service,
collab_cloud_service,
)?;
self.workspace_database.store(Some(workspace_database));
Ok(())
}
@ -677,13 +626,30 @@ impl DatabaseManager {
}
}
struct WorkspaceDatabaseCloudServiceImpl {
struct WorkspaceDatabaseCollabServiceImpl {
user: Arc<dyn DatabaseUser>,
collab_builder: Arc<AppFlowyCollabBuilder>,
persistence: Arc<dyn DatabaseCollabPersistenceService>,
cloud_service: Arc<dyn DatabaseCloudService>,
cancellation: Arc<DashMap<String, Option<CancellationToken>>>,
}
#[async_trait]
impl DatabaseCollabCloudService for WorkspaceDatabaseCloudServiceImpl {
impl WorkspaceDatabaseCollabServiceImpl {
fn new(
user: Arc<dyn DatabaseUser>,
collab_builder: Arc<AppFlowyCollabBuilder>,
cloud_service: Arc<dyn DatabaseCloudService>,
) -> Self {
let persistence = DatabasePersistenceImpl { user: user.clone() };
Self {
user,
collab_builder,
persistence: Arc::new(persistence),
cloud_service,
cancellation: Arc::new(Default::default()),
}
}
async fn get_encode_collab(
&self,
object_id: &str,
@ -693,18 +659,35 @@ impl DatabaseCollabCloudService for WorkspaceDatabaseCloudServiceImpl {
let object_id = object_id.to_string();
trace!("[Database]: fetch {}:{} from remote", object_id, object_ty);
let weak_cloud_service = Arc::downgrade(&self.cloud_service);
match weak_cloud_service.upgrade() {
None => Err(DatabaseError::Internal(anyhow!("Cloud service is dropped"))),
Some(cloud_service) => {
let encode_collab = cloud_service
.get_database_encode_collab(&object_id, object_ty, &workspace_id)
.await?;
Ok(encode_collab)
if let Some((_, Some(token))) = self.cancellation.remove(&object_id) {
trace!("cancel previous request for object:{}", object_id);
token.cancel();
}
let token = CancellationToken::new();
self
.cancellation
.insert(object_id.clone(), Some(token.clone()));
let cloned_cancellation = self.cancellation.clone();
tokio::spawn(async move {
select! {
_ = token.cancelled() => {
Err(DatabaseError::ActionCancelled)
},
encode_collab = cloud_service.get_database_encode_collab(&object_id, object_ty, &workspace_id) => {
cloned_cancellation.remove(&object_id);
Ok(encode_collab?)
}
}
}).await.map_err(|err| DatabaseError::Internal(err.into()))?
},
}
}
#[allow(dead_code)]
async fn batch_get_encode_collab(
&self,
object_ids: Vec<String>,
@ -729,23 +712,23 @@ impl DatabaseCollabCloudService for WorkspaceDatabaseCloudServiceImpl {
},
}
}
}
struct WorkspaceDatabaseCollabServiceImpl {
user: Arc<dyn DatabaseUser>,
collab_builder: Arc<AppFlowyCollabBuilder>,
}
fn collab_db(&self) -> Result<Weak<CollabKVDB>, DatabaseError> {
let uid = self
.user
.user_id()
.map_err(|err| DatabaseError::Internal(err.into()))?;
self
.user
.collab_db(uid)
.map_err(|err| DatabaseError::Internal(err.into()))
}
#[async_trait]
impl DatabaseCollabService for WorkspaceDatabaseCollabServiceImpl {
///NOTE: this method doesn't initialize plugins, however it is passed into WorkspaceDatabase,
/// therefore all Database/DatabaseRow creation methods must initialize plugins thmselves.
fn build_collab(
fn build_collab_object(
&self,
object_id: &str,
object_type: CollabType,
data_source: DataSource,
) -> Result<Collab, DatabaseError> {
) -> Result<CollabObject, DatabaseError> {
let uid = self
.user
.user_id()
@ -756,11 +739,54 @@ impl DatabaseCollabService for WorkspaceDatabaseCollabServiceImpl {
.map_err(|err| DatabaseError::Internal(err.into()))?;
let object = self
.collab_builder
.collab_object(&workspace_id, uid, object_id, object_type)?;
let collab_db = self
.user
.collab_db(uid)
.map_err(|err| DatabaseError::Internal(anyhow!("Failed to get collab db: {}", err)))?;
.collab_object(&workspace_id, uid, object_id, object_type)
.map_err(|err| DatabaseError::Internal(anyhow!("Failed to build collab object: {}", err)))?;
Ok(object)
}
}
#[async_trait]
impl DatabaseCollabService for WorkspaceDatabaseCollabServiceImpl {
///NOTE: this method doesn't initialize plugins, however it is passed into WorkspaceDatabase,
/// therefore all Database/DatabaseRow creation methods must initialize plugins thmselves.
async fn build_collab(
&self,
object_id: &str,
collab_type: CollabType,
is_new: bool,
) -> Result<Collab, DatabaseError> {
let object = self.build_collab_object(object_id, collab_type.clone())?;
let data_source: DataSource = if is_new {
DataSource::Disk(None)
} else if self.persistence.is_collab_exist(object_id) {
CollabPersistenceImpl {
persistence: Some(self.persistence.clone()),
}
.into()
} else {
match self.get_encode_collab(object_id, collab_type).await {
Ok(Some(encode_collab)) => {
self
.persistence
.flush_collab(object_id, encode_collab.clone())
.map_err(|err| DatabaseError::Internal(anyhow!("Failed to flush collab: {}", err)))?;
DataSource::from(encode_collab)
},
Ok(None) => {
// when collab not exist, create a default collab
CollabPersistenceImpl {
persistence: Some(self.persistence.clone()),
}
.into()
},
Err(err) => {
error!("Failed to get encode collab: {}", err);
return Err(err);
},
}
};
let collab_db = self.collab_db()?;
let collab = self
.collab_builder
.build_collab(&object, &collab_db, data_source)?;
@ -793,6 +819,7 @@ impl DatabaseCollabPersistenceService for DatabasePersistenceImpl {
"[Database]: collab:{} not exist in local storage",
object_id
);
return;
}
trace!("[Database]: start loading collab:{}", object_id);

View File

@ -507,7 +507,7 @@ impl DatabaseEditor {
.await
.ok_or_else(|| FlowyError::internal().with_context("error while copying row"))?;
let (index, row_order) = database.create_row_in_view(view_id, params)?;
let (index, row_order) = database.create_row_in_view(view_id, params).await?;
trace!(
"duplicate row: {:?} at index:{}, new row:{:?}",
row_id,
@ -577,7 +577,9 @@ impl DatabaseEditor {
} = view_editor.v_will_create_row(params).await?;
let mut database = self.database.write().await;
let (index, order_id) = database.create_row_in_view(&view_editor.view_id, collab_params)?;
let (index, order_id) = database
.create_row_in_view(&view_editor.view_id, collab_params)
.await?;
let row_detail = database.get_row_detail(&order_id.id).await;
drop(database);
@ -698,6 +700,7 @@ impl DatabaseEditor {
.read()
.await
.init_database_row(row_id)
.await
.ok_or_else(|| {
FlowyError::record_not_found()
.with_context(format!("The row:{} in database not found", row_id))