AppFlowy/frontend/rust-lib/flowy-database2/src/manager.rs
Nathan.fooo adc2ee755e
chore: remove lru (#5008)
* chore: remove lru

* chore: update logs

* chore: clippy
2024-03-30 16:28:24 +08:00

503 lines
16 KiB
Rust

use anyhow::anyhow;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use collab::core::collab::{DocStateSource, MutexCollab};
use collab_database::blocks::BlockEvent;
use collab_database::database::{DatabaseData, MutexDatabase};
use collab_database::error::DatabaseError;
use collab_database::views::{CreateDatabaseParams, CreateViewParams, DatabaseLayout};
use collab_database::workspace_database::{
CollabDocStateByOid, CollabFuture, DatabaseCollabService, DatabaseMeta, WorkspaceDatabase,
};
use collab_entity::CollabType;
use collab_plugins::local_storage::kv::KVTransactionDB;
use tokio::sync::{Mutex, RwLock};
use tracing::{event, instrument, trace};
use collab_integrate::collab_builder::{AppFlowyCollabBuilder, CollabBuilderConfig};
use collab_integrate::{CollabKVAction, CollabKVDB, CollabPersistenceConfig};
use flowy_database_pub::cloud::DatabaseCloudService;
use flowy_error::{internal_error, FlowyError, FlowyResult};
use lib_dispatch::prelude::af_spawn;
use lib_infra::priority_task::TaskDispatcher;
use crate::entities::{DatabaseLayoutPB, DatabaseSnapshotPB, DidFetchRowPB};
use crate::notification::{send_notification, DatabaseNotification};
use crate::services::database::DatabaseEditor;
use crate::services::database_view::DatabaseLayoutDepsResolver;
use crate::services::field_settings::default_field_settings_by_layout_map;
use crate::services::share::csv::{CSVFormat, CSVImporter, ImportResult};
pub trait DatabaseUser: Send + Sync {
fn user_id(&self) -> Result<i64, FlowyError>;
fn collab_db(&self, uid: i64) -> Result<Weak<CollabKVDB>, FlowyError>;
}
pub struct DatabaseManager {
user: Arc<dyn DatabaseUser>,
workspace_database: Arc<RwLock<Option<Arc<WorkspaceDatabase>>>>,
task_scheduler: Arc<RwLock<TaskDispatcher>>,
editors: Mutex<HashMap<String, Arc<DatabaseEditor>>>,
collab_builder: Arc<AppFlowyCollabBuilder>,
cloud_service: Arc<dyn DatabaseCloudService>,
}
impl DatabaseManager {
pub fn new(
database_user: Arc<dyn DatabaseUser>,
task_scheduler: Arc<RwLock<TaskDispatcher>>,
collab_builder: Arc<AppFlowyCollabBuilder>,
cloud_service: Arc<dyn DatabaseCloudService>,
) -> Self {
Self {
user: database_user,
workspace_database: Default::default(),
task_scheduler,
editors: Default::default(),
collab_builder,
cloud_service,
}
}
fn is_collab_exist(&self, uid: i64, collab_db: &Weak<CollabKVDB>, object_id: &str) -> bool {
match collab_db.upgrade() {
None => false,
Some(collab_db) => {
let read_txn = collab_db.read_txn();
read_txn.is_exist(uid, object_id)
},
}
}
/// When initialize with new workspace, all the resources will be cleared.
pub async fn initialize(
&self,
uid: i64,
workspace_id: String,
workspace_database_object_id: String,
) -> FlowyResult<()> {
// 1. Clear all existing tasks
self.task_scheduler.write().await.clear_task();
// 2. Release all existing editors
for (_, editor) in self.editors.lock().await.iter() {
editor.close_all_views().await;
}
self.editors.lock().await.clear();
// 3. Clear the workspace database
*self.workspace_database.write().await = None;
let collab_db = self.user.collab_db(uid)?;
let collab_builder = UserDatabaseCollabServiceImpl {
workspace_id: workspace_id.clone(),
collab_builder: self.collab_builder.clone(),
cloud_service: self.cloud_service.clone(),
};
let config = CollabPersistenceConfig::new().snapshot_per_update(100);
let mut workspace_database_doc_state = DocStateSource::FromDisk;
// If the workspace database not exist in disk, try to fetch from remote.
if !self.is_collab_exist(uid, &collab_db, &workspace_database_object_id) {
trace!("workspace database not exist, try to fetch from remote");
match self
.cloud_service
.get_database_object_doc_state(
&workspace_database_object_id,
CollabType::WorkspaceDatabase,
&workspace_id,
)
.await
{
Ok(doc_state) => match doc_state {
Some(doc_state) => {
workspace_database_doc_state = DocStateSource::FromDocState(doc_state);
},
None => {
workspace_database_doc_state = DocStateSource::FromDisk;
},
},
Err(err) => {
return Err(FlowyError::record_not_found().with_context(format!(
"get workspace database :{} failed: {}",
workspace_database_object_id, err,
)));
},
}
}
// Construct the workspace database.
event!(
tracing::Level::INFO,
"open aggregate database views object: {}",
&workspace_database_object_id
);
let collab = collab_builder.build_collab_with_config(
uid,
&workspace_database_object_id,
CollabType::WorkspaceDatabase,
collab_db.clone(),
workspace_database_doc_state,
config.clone(),
)?;
let workspace_database =
WorkspaceDatabase::open(uid, collab, collab_db, config, collab_builder);
*self.workspace_database.write().await = Some(Arc::new(workspace_database));
Ok(())
}
#[instrument(
name = "database_initialize_with_new_user",
level = "debug",
skip_all,
err
)]
pub async fn initialize_with_new_user(
&self,
user_id: i64,
workspace_id: String,
workspace_database_object_id: String,
) -> FlowyResult<()> {
self
.initialize(user_id, workspace_id, workspace_database_object_id)
.await?;
Ok(())
}
pub async fn get_database_inline_view_id(&self, database_id: &str) -> FlowyResult<String> {
let wdb = self.get_workspace_database().await?;
let database_collab = wdb.get_database(database_id).await.ok_or_else(|| {
FlowyError::record_not_found().with_context(format!("The database:{} not found", database_id))
})?;
let lock_guard = database_collab.lock();
Ok(lock_guard.get_inline_view_id())
}
pub async fn get_all_databases_meta(&self) -> Vec<DatabaseMeta> {
let mut items = vec![];
if let Ok(wdb) = self.get_workspace_database().await {
items = wdb.get_all_database_meta()
}
items
}
pub async fn track_database(
&self,
view_ids_by_database_id: HashMap<String, Vec<String>>,
) -> FlowyResult<()> {
let wdb = self.get_workspace_database().await?;
view_ids_by_database_id
.into_iter()
.for_each(|(database_id, view_ids)| {
wdb.track_database(&database_id, view_ids);
});
Ok(())
}
pub async fn get_database_with_view_id(&self, view_id: &str) -> FlowyResult<Arc<DatabaseEditor>> {
let database_id = self.get_database_id_with_view_id(view_id).await?;
self.get_database(&database_id).await
}
pub async fn get_database_id_with_view_id(&self, view_id: &str) -> FlowyResult<String> {
let wdb = self.get_workspace_database().await?;
wdb.get_database_id_with_view_id(view_id).ok_or_else(|| {
FlowyError::record_not_found()
.with_context(format!("The database for view id: {} not found", view_id))
})
}
pub async fn get_database(&self, database_id: &str) -> FlowyResult<Arc<DatabaseEditor>> {
if let Some(editor) = self.editors.lock().await.get(database_id).cloned() {
return Ok(editor);
}
self.open_database(database_id).await
}
pub async fn open_database(&self, database_id: &str) -> FlowyResult<Arc<DatabaseEditor>> {
trace!("open database editor:{}", database_id);
let database = self
.get_workspace_database()
.await?
.get_database(database_id)
.await
.ok_or_else(FlowyError::collab_not_sync)?;
// Subscribe the [BlockEvent]
subscribe_block_event(&database);
let editor = Arc::new(DatabaseEditor::new(database, self.task_scheduler.clone()).await?);
self
.editors
.lock()
.await
.insert(database_id.to_string(), editor.clone());
Ok(editor)
}
pub async fn open_database_view<T: AsRef<str>>(&self, view_id: T) -> FlowyResult<()> {
let view_id = view_id.as_ref();
let wdb = self.get_workspace_database().await?;
if let Some(database_id) = wdb.get_database_id_with_view_id(view_id) {
wdb.open_database(&database_id);
}
Ok(())
}
pub async fn close_database_view<T: AsRef<str>>(&self, view_id: T) -> FlowyResult<()> {
let view_id = view_id.as_ref();
let wdb = self.get_workspace_database().await?;
let database_id = wdb.get_database_id_with_view_id(view_id);
if let Some(database_id) = database_id {
let mut editors = self.editors.lock().await;
let mut should_remove = false;
if let Some(editor) = editors.get(&database_id) {
editor.close_view(view_id).await;
should_remove = editor.num_views().await == 0;
}
if should_remove {
trace!("remove database editor:{}", database_id);
editors.remove(&database_id);
wdb.close_database(&database_id);
}
}
Ok(())
}
pub async fn delete_database_view(&self, view_id: &str) -> FlowyResult<()> {
let database = self.get_database_with_view_id(view_id).await?;
let _ = database.delete_database_view(view_id).await?;
Ok(())
}
pub async fn duplicate_database(&self, view_id: &str) -> FlowyResult<Vec<u8>> {
let wdb = self.get_workspace_database().await?;
let data = wdb.get_database_duplicated_data(view_id).await?;
let json_bytes = data.to_json_bytes()?;
Ok(json_bytes)
}
/// Create a new database with the given data that can be deserialized to [DatabaseData].
#[tracing::instrument(level = "trace", skip_all, err)]
pub async fn create_database_with_database_data(
&self,
view_id: &str,
data: Vec<u8>,
) -> FlowyResult<()> {
let mut database_data = DatabaseData::from_json_bytes(data)?;
database_data.view.id = view_id.to_string();
let wdb = self.get_workspace_database().await?;
let _ = wdb.create_database_with_data(database_data)?;
Ok(())
}
pub async fn create_database_with_params(&self, params: CreateDatabaseParams) -> FlowyResult<()> {
let wdb = self.get_workspace_database().await?;
let _ = wdb.create_database(params)?;
Ok(())
}
/// A linked view is a view that is linked to existing database.
#[tracing::instrument(level = "trace", skip(self), err)]
pub async fn create_linked_view(
&self,
name: String,
layout: DatabaseLayout,
database_id: String,
database_view_id: String,
) -> FlowyResult<()> {
let wdb = self.get_workspace_database().await?;
let mut params = CreateViewParams::new(database_id.clone(), database_view_id, name, layout);
if let Some(database) = wdb.get_database(&database_id).await {
let (field, layout_setting) = DatabaseLayoutDepsResolver::new(database, layout)
.resolve_deps_when_create_database_linked_view();
if let Some(field) = field {
params = params.with_deps_fields(vec![field], vec![default_field_settings_by_layout_map()]);
}
if let Some(layout_setting) = layout_setting {
params = params.with_layout_setting(layout_setting);
}
};
wdb.create_database_linked_view(params).await?;
Ok(())
}
pub async fn import_csv(
&self,
view_id: String,
content: String,
format: CSVFormat,
) -> FlowyResult<ImportResult> {
let params = tokio::task::spawn_blocking(move || {
CSVImporter.import_csv_from_string(view_id, content, format)
})
.await
.map_err(internal_error)??;
let result = ImportResult {
database_id: params.database_id.clone(),
view_id: params.view_id.clone(),
};
self.create_database_with_params(params).await?;
Ok(result)
}
// will implement soon
pub async fn import_csv_from_file(
&self,
_file_path: String,
_format: CSVFormat,
) -> FlowyResult<()> {
Ok(())
}
pub async fn export_csv(&self, view_id: &str, style: CSVFormat) -> FlowyResult<String> {
let database = self.get_database_with_view_id(view_id).await?;
database.export_csv(style).await
}
pub async fn update_database_layout(
&self,
view_id: &str,
layout: DatabaseLayoutPB,
) -> FlowyResult<()> {
let database = self.get_database_with_view_id(view_id).await?;
database.update_view_layout(view_id, layout.into()).await
}
pub async fn get_database_snapshots(
&self,
view_id: &str,
limit: usize,
) -> FlowyResult<Vec<DatabaseSnapshotPB>> {
let database_id = self.get_database_id_with_view_id(view_id).await?;
let snapshots = self
.cloud_service
.get_database_collab_object_snapshots(&database_id, limit)
.await?
.into_iter()
.map(|snapshot| DatabaseSnapshotPB {
snapshot_id: snapshot.snapshot_id,
snapshot_desc: "".to_string(),
created_at: snapshot.created_at,
data: snapshot.data,
})
.collect::<Vec<_>>();
Ok(snapshots)
}
async fn get_workspace_database(&self) -> FlowyResult<Arc<WorkspaceDatabase>> {
let database = self.workspace_database.read().await;
match &*database {
None => Err(FlowyError::internal().with_context("Workspace database not initialized")),
Some(user_database) => Ok(user_database.clone()),
}
}
/// Only expose this method for testing
#[cfg(debug_assertions)]
pub fn get_cloud_service(&self) -> &Arc<dyn DatabaseCloudService> {
&self.cloud_service
}
}
/// Send notification to all clients that are listening to the given object.
fn subscribe_block_event(database: &Arc<MutexDatabase>) {
let mut block_event_rx = database.lock().subscribe_block_event();
af_spawn(async move {
while let Ok(event) = block_event_rx.recv().await {
match event {
BlockEvent::DidFetchRow(row_details) => {
for row_detail in row_details {
trace!("Did fetch row: {:?}", row_detail.row.id);
let row_id = row_detail.row.id.clone();
let pb = DidFetchRowPB::from(row_detail);
send_notification(&row_id, DatabaseNotification::DidFetchRow)
.payload(pb)
.send();
}
},
}
}
});
}
struct UserDatabaseCollabServiceImpl {
workspace_id: String,
collab_builder: Arc<AppFlowyCollabBuilder>,
cloud_service: Arc<dyn DatabaseCloudService>,
}
impl DatabaseCollabService for UserDatabaseCollabServiceImpl {
fn get_collab_doc_state(
&self,
object_id: &str,
object_ty: CollabType,
) -> CollabFuture<Result<DocStateSource, DatabaseError>> {
let workspace_id = self.workspace_id.clone();
let object_id = object_id.to_string();
let weak_cloud_service = Arc::downgrade(&self.cloud_service);
Box::pin(async move {
match weak_cloud_service.upgrade() {
None => Err(DatabaseError::Internal(anyhow!("Cloud service is dropped"))),
Some(cloud_service) => {
let doc_state = cloud_service
.get_database_object_doc_state(&object_id, object_ty, &workspace_id)
.await?;
match doc_state {
None => Ok(DocStateSource::FromDisk),
Some(doc_state) => Ok(DocStateSource::FromDocState(doc_state)),
}
},
}
})
}
fn batch_get_collab_update(
&self,
object_ids: Vec<String>,
object_ty: CollabType,
) -> CollabFuture<Result<CollabDocStateByOid, DatabaseError>> {
let workspace_id = self.workspace_id.clone();
let weak_cloud_service = Arc::downgrade(&self.cloud_service);
Box::pin(async move {
match weak_cloud_service.upgrade() {
None => {
tracing::warn!("Cloud service is dropped");
Ok(CollabDocStateByOid::default())
},
Some(cloud_service) => {
let updates = cloud_service
.batch_get_database_object_doc_state(object_ids, object_ty, &workspace_id)
.await?;
Ok(updates)
},
}
})
}
fn build_collab_with_config(
&self,
uid: i64,
object_id: &str,
object_type: CollabType,
collab_db: Weak<CollabKVDB>,
collab_raw_data: DocStateSource,
persistence_config: CollabPersistenceConfig,
) -> Result<Arc<MutexCollab>, DatabaseError> {
let collab = self.collab_builder.build_with_config(
uid,
object_id,
object_type.clone(),
collab_db.clone(),
collab_raw_data,
persistence_config,
CollabBuilderConfig::default().sync_enable(true),
)?;
Ok(collab)
}
}