use anyhow::anyhow; use std::collections::HashMap; use std::sync::{Arc, Weak}; use collab::core::collab::{DocStateSource, MutexCollab}; use collab_database::blocks::BlockEvent; use collab_database::database::{DatabaseData, MutexDatabase}; use collab_database::error::DatabaseError; use collab_database::views::{CreateDatabaseParams, CreateViewParams, DatabaseLayout}; use collab_database::workspace_database::{ CollabDocStateByOid, CollabFuture, DatabaseCollabService, DatabaseMeta, WorkspaceDatabase, }; use collab_entity::CollabType; use collab_plugins::local_storage::kv::KVTransactionDB; use tokio::sync::{Mutex, RwLock}; use tracing::{event, instrument, trace}; use collab_integrate::collab_builder::{AppFlowyCollabBuilder, CollabBuilderConfig}; use collab_integrate::{CollabKVAction, CollabKVDB, CollabPersistenceConfig}; use flowy_database_pub::cloud::DatabaseCloudService; use flowy_error::{internal_error, FlowyError, FlowyResult}; use lib_dispatch::prelude::af_spawn; use lib_infra::priority_task::TaskDispatcher; use crate::entities::{DatabaseLayoutPB, DatabaseSnapshotPB, DidFetchRowPB}; use crate::notification::{send_notification, DatabaseNotification}; use crate::services::database::DatabaseEditor; use crate::services::database_view::DatabaseLayoutDepsResolver; use crate::services::field_settings::default_field_settings_by_layout_map; use crate::services::share::csv::{CSVFormat, CSVImporter, ImportResult}; pub trait DatabaseUser: Send + Sync { fn user_id(&self) -> Result; fn collab_db(&self, uid: i64) -> Result, FlowyError>; } pub struct DatabaseManager { user: Arc, workspace_database: Arc>>>, task_scheduler: Arc>, editors: Mutex>>, collab_builder: Arc, cloud_service: Arc, } impl DatabaseManager { pub fn new( database_user: Arc, task_scheduler: Arc>, collab_builder: Arc, cloud_service: Arc, ) -> Self { Self { user: database_user, workspace_database: Default::default(), task_scheduler, editors: Default::default(), collab_builder, cloud_service, } } fn is_collab_exist(&self, uid: i64, collab_db: &Weak, object_id: &str) -> bool { match collab_db.upgrade() { None => false, Some(collab_db) => { let read_txn = collab_db.read_txn(); read_txn.is_exist(uid, object_id) }, } } /// When initialize with new workspace, all the resources will be cleared. pub async fn initialize( &self, uid: i64, workspace_id: String, workspace_database_object_id: String, ) -> FlowyResult<()> { // 1. Clear all existing tasks self.task_scheduler.write().await.clear_task(); // 2. Release all existing editors for (_, editor) in self.editors.lock().await.iter() { editor.close_all_views().await; } self.editors.lock().await.clear(); // 3. Clear the workspace database *self.workspace_database.write().await = None; let collab_db = self.user.collab_db(uid)?; let collab_builder = UserDatabaseCollabServiceImpl { workspace_id: workspace_id.clone(), collab_builder: self.collab_builder.clone(), cloud_service: self.cloud_service.clone(), }; let config = CollabPersistenceConfig::new().snapshot_per_update(100); let mut workspace_database_doc_state = DocStateSource::FromDisk; // If the workspace database not exist in disk, try to fetch from remote. if !self.is_collab_exist(uid, &collab_db, &workspace_database_object_id) { trace!("workspace database not exist, try to fetch from remote"); match self .cloud_service .get_database_object_doc_state( &workspace_database_object_id, CollabType::WorkspaceDatabase, &workspace_id, ) .await { Ok(doc_state) => match doc_state { Some(doc_state) => { workspace_database_doc_state = DocStateSource::FromDocState(doc_state); }, None => { workspace_database_doc_state = DocStateSource::FromDisk; }, }, Err(err) => { return Err(FlowyError::record_not_found().with_context(format!( "get workspace database :{} failed: {}", workspace_database_object_id, err, ))); }, } } // Construct the workspace database. event!( tracing::Level::INFO, "open aggregate database views object: {}", &workspace_database_object_id ); let collab = collab_builder.build_collab_with_config( uid, &workspace_database_object_id, CollabType::WorkspaceDatabase, collab_db.clone(), workspace_database_doc_state, config.clone(), )?; let workspace_database = WorkspaceDatabase::open(uid, collab, collab_db, config, collab_builder); *self.workspace_database.write().await = Some(Arc::new(workspace_database)); Ok(()) } #[instrument( name = "database_initialize_with_new_user", level = "debug", skip_all, err )] pub async fn initialize_with_new_user( &self, user_id: i64, workspace_id: String, workspace_database_object_id: String, ) -> FlowyResult<()> { self .initialize(user_id, workspace_id, workspace_database_object_id) .await?; Ok(()) } pub async fn get_database_inline_view_id(&self, database_id: &str) -> FlowyResult { let wdb = self.get_workspace_database().await?; let database_collab = wdb.get_database(database_id).await.ok_or_else(|| { FlowyError::record_not_found().with_context(format!("The database:{} not found", database_id)) })?; let lock_guard = database_collab.lock(); Ok(lock_guard.get_inline_view_id()) } pub async fn get_all_databases_meta(&self) -> Vec { let mut items = vec![]; if let Ok(wdb) = self.get_workspace_database().await { items = wdb.get_all_database_meta() } items } pub async fn track_database( &self, view_ids_by_database_id: HashMap>, ) -> FlowyResult<()> { let wdb = self.get_workspace_database().await?; view_ids_by_database_id .into_iter() .for_each(|(database_id, view_ids)| { wdb.track_database(&database_id, view_ids); }); Ok(()) } pub async fn get_database_with_view_id(&self, view_id: &str) -> FlowyResult> { let database_id = self.get_database_id_with_view_id(view_id).await?; self.get_database(&database_id).await } pub async fn get_database_id_with_view_id(&self, view_id: &str) -> FlowyResult { let wdb = self.get_workspace_database().await?; wdb.get_database_id_with_view_id(view_id).ok_or_else(|| { FlowyError::record_not_found() .with_context(format!("The database for view id: {} not found", view_id)) }) } pub async fn get_database(&self, database_id: &str) -> FlowyResult> { if let Some(editor) = self.editors.lock().await.get(database_id).cloned() { return Ok(editor); } self.open_database(database_id).await } pub async fn open_database(&self, database_id: &str) -> FlowyResult> { trace!("open database editor:{}", database_id); let database = self .get_workspace_database() .await? .get_database(database_id) .await .ok_or_else(FlowyError::collab_not_sync)?; // Subscribe the [BlockEvent] subscribe_block_event(&database); let editor = Arc::new(DatabaseEditor::new(database, self.task_scheduler.clone()).await?); self .editors .lock() .await .insert(database_id.to_string(), editor.clone()); Ok(editor) } pub async fn open_database_view>(&self, view_id: T) -> FlowyResult<()> { let view_id = view_id.as_ref(); let wdb = self.get_workspace_database().await?; if let Some(database_id) = wdb.get_database_id_with_view_id(view_id) { wdb.open_database(&database_id); } Ok(()) } pub async fn close_database_view>(&self, view_id: T) -> FlowyResult<()> { let view_id = view_id.as_ref(); let wdb = self.get_workspace_database().await?; let database_id = wdb.get_database_id_with_view_id(view_id); if let Some(database_id) = database_id { let mut editors = self.editors.lock().await; let mut should_remove = false; if let Some(editor) = editors.get(&database_id) { editor.close_view(view_id).await; should_remove = editor.num_views().await == 0; } if should_remove { trace!("remove database editor:{}", database_id); editors.remove(&database_id); wdb.close_database(&database_id); } } Ok(()) } pub async fn delete_database_view(&self, view_id: &str) -> FlowyResult<()> { let database = self.get_database_with_view_id(view_id).await?; let _ = database.delete_database_view(view_id).await?; Ok(()) } pub async fn duplicate_database(&self, view_id: &str) -> FlowyResult> { let wdb = self.get_workspace_database().await?; let data = wdb.get_database_duplicated_data(view_id).await?; let json_bytes = data.to_json_bytes()?; Ok(json_bytes) } /// Create a new database with the given data that can be deserialized to [DatabaseData]. #[tracing::instrument(level = "trace", skip_all, err)] pub async fn create_database_with_database_data( &self, view_id: &str, data: Vec, ) -> FlowyResult<()> { let mut database_data = DatabaseData::from_json_bytes(data)?; database_data.view.id = view_id.to_string(); let wdb = self.get_workspace_database().await?; let _ = wdb.create_database_with_data(database_data)?; Ok(()) } pub async fn create_database_with_params(&self, params: CreateDatabaseParams) -> FlowyResult<()> { let wdb = self.get_workspace_database().await?; let _ = wdb.create_database(params)?; Ok(()) } /// A linked view is a view that is linked to existing database. #[tracing::instrument(level = "trace", skip(self), err)] pub async fn create_linked_view( &self, name: String, layout: DatabaseLayout, database_id: String, database_view_id: String, ) -> FlowyResult<()> { let wdb = self.get_workspace_database().await?; let mut params = CreateViewParams::new(database_id.clone(), database_view_id, name, layout); if let Some(database) = wdb.get_database(&database_id).await { let (field, layout_setting) = DatabaseLayoutDepsResolver::new(database, layout) .resolve_deps_when_create_database_linked_view(); if let Some(field) = field { params = params.with_deps_fields(vec![field], vec![default_field_settings_by_layout_map()]); } if let Some(layout_setting) = layout_setting { params = params.with_layout_setting(layout_setting); } }; wdb.create_database_linked_view(params).await?; Ok(()) } pub async fn import_csv( &self, view_id: String, content: String, format: CSVFormat, ) -> FlowyResult { let params = tokio::task::spawn_blocking(move || { CSVImporter.import_csv_from_string(view_id, content, format) }) .await .map_err(internal_error)??; let result = ImportResult { database_id: params.database_id.clone(), view_id: params.view_id.clone(), }; self.create_database_with_params(params).await?; Ok(result) } // will implement soon pub async fn import_csv_from_file( &self, _file_path: String, _format: CSVFormat, ) -> FlowyResult<()> { Ok(()) } pub async fn export_csv(&self, view_id: &str, style: CSVFormat) -> FlowyResult { let database = self.get_database_with_view_id(view_id).await?; database.export_csv(style).await } pub async fn update_database_layout( &self, view_id: &str, layout: DatabaseLayoutPB, ) -> FlowyResult<()> { let database = self.get_database_with_view_id(view_id).await?; database.update_view_layout(view_id, layout.into()).await } pub async fn get_database_snapshots( &self, view_id: &str, limit: usize, ) -> FlowyResult> { let database_id = self.get_database_id_with_view_id(view_id).await?; let snapshots = self .cloud_service .get_database_collab_object_snapshots(&database_id, limit) .await? .into_iter() .map(|snapshot| DatabaseSnapshotPB { snapshot_id: snapshot.snapshot_id, snapshot_desc: "".to_string(), created_at: snapshot.created_at, data: snapshot.data, }) .collect::>(); Ok(snapshots) } async fn get_workspace_database(&self) -> FlowyResult> { let database = self.workspace_database.read().await; match &*database { None => Err(FlowyError::internal().with_context("Workspace database not initialized")), Some(user_database) => Ok(user_database.clone()), } } /// Only expose this method for testing #[cfg(debug_assertions)] pub fn get_cloud_service(&self) -> &Arc { &self.cloud_service } } /// Send notification to all clients that are listening to the given object. fn subscribe_block_event(database: &Arc) { let mut block_event_rx = database.lock().subscribe_block_event(); af_spawn(async move { while let Ok(event) = block_event_rx.recv().await { match event { BlockEvent::DidFetchRow(row_details) => { for row_detail in row_details { trace!("Did fetch row: {:?}", row_detail.row.id); let row_id = row_detail.row.id.clone(); let pb = DidFetchRowPB::from(row_detail); send_notification(&row_id, DatabaseNotification::DidFetchRow) .payload(pb) .send(); } }, } } }); } struct UserDatabaseCollabServiceImpl { workspace_id: String, collab_builder: Arc, cloud_service: Arc, } impl DatabaseCollabService for UserDatabaseCollabServiceImpl { fn get_collab_doc_state( &self, object_id: &str, object_ty: CollabType, ) -> CollabFuture> { let workspace_id = self.workspace_id.clone(); let object_id = object_id.to_string(); let weak_cloud_service = Arc::downgrade(&self.cloud_service); Box::pin(async move { match weak_cloud_service.upgrade() { None => Err(DatabaseError::Internal(anyhow!("Cloud service is dropped"))), Some(cloud_service) => { let doc_state = cloud_service .get_database_object_doc_state(&object_id, object_ty, &workspace_id) .await?; match doc_state { None => Ok(DocStateSource::FromDisk), Some(doc_state) => Ok(DocStateSource::FromDocState(doc_state)), } }, } }) } fn batch_get_collab_update( &self, object_ids: Vec, object_ty: CollabType, ) -> CollabFuture> { let workspace_id = self.workspace_id.clone(); let weak_cloud_service = Arc::downgrade(&self.cloud_service); Box::pin(async move { match weak_cloud_service.upgrade() { None => { tracing::warn!("Cloud service is dropped"); Ok(CollabDocStateByOid::default()) }, Some(cloud_service) => { let updates = cloud_service .batch_get_database_object_doc_state(object_ids, object_ty, &workspace_id) .await?; Ok(updates) }, } }) } fn build_collab_with_config( &self, uid: i64, object_id: &str, object_type: CollabType, collab_db: Weak, collab_raw_data: DocStateSource, persistence_config: CollabPersistenceConfig, ) -> Result, DatabaseError> { let collab = self.collab_builder.build_with_config( uid, object_id, object_type.clone(), collab_db.clone(), collab_raw_data, persistence_config, CollabBuilderConfig::default().sync_enable(true), )?; Ok(collab) } }