feat: migrate user data to cloud (#3078)

* refactor: weak passed-in params in handler

* refactor: rename struct

* chore: update tables

* chore: update schema

* chore: add permission

* chore: update tables

* chore: support transaction mode

* chore: workspace database id

* chore: add user workspace

* feat: return list of workspaces

* chore: add user to workspace

* feat: separate database row table

* refactor: update schema

* chore: partition table

* chore: use transaction

* refactor: dir

* refactor: collab db ref

* fix: collab db lock

* chore: rename files

* chore: add tables descriptions

* chore: update readme

* docs: update documentation

* chore: rename crate

* chore: update ref

* chore: update tests

* chore: update tests

* refactor: crate deps

* chore: update crate ref

* chore: remove unused deps

* chore: remove unused deps

* chore: update collab crate refs

* chore: replace client with transaction in pooler

* refactor: return error type

* refactor: use anyhow error in deps

* feat: supabase postgrest user signin (wip)

* fix: Cargo.toml source git deps, changed Error to anyhow::Error

* fix: uuid serialization

* chore: fix conflict

* chore: extend the response

* feat: add implementation place holders

* feat: impl get_user_workspaces

* feat: impl get_user_profile

* test: create workspace

* fix: postgrest: field names and alias

* chore: implement folder restful api

* chore: implement collab storate with restful api

* feat: added placeholders for impl: update_user_profile, check_user

* feat: impl: update_user_profile

* feat: impl: check_user

* fix: use UidResponse, add more debug info for serde serialization error

* fix: get_user_profile: use Optional<UserProfileResponse>

* chore: imple init sync

* chore: support soft delete

* feat: postgresql: add migration test

* feat: postgresql migration test: added UID display and colored output

* feat: postgresql migration test: workspace role

* feat: postgresql migration test: create shared common utils

* feat: postgresql migration test: fixed shebang

* chore: add flush_collab_update pg function

* chore: implement datbaase and document restful api

* chore: migrate to use restful api

* chore: update table schema

* chore: fix tests

* chore: remove unused code

* chore: format code

* chore: remove unused env

* fix: tauri build

* fix: tauri build

---------

Co-authored-by: Fu Zi Xiang <speed2exe@live.com.sg>
This commit is contained in:
Nathan.fooo
2023-07-29 09:46:24 +08:00
committed by GitHub
parent a885170869
commit 2cd88594e8
179 changed files with 4999 additions and 5314 deletions

View File

@ -1,38 +0,0 @@
use std::sync::Arc;
use appflowy_integrate::RocksCollabDB;
pub use collab_database::user::CollabObjectUpdate;
pub use collab_database::user::CollabObjectUpdateByOid;
use flowy_error::FlowyError;
use lib_infra::future::FutureResult;
pub trait DatabaseUser2: Send + Sync {
fn user_id(&self) -> Result<i64, FlowyError>;
fn token(&self) -> Result<Option<String>, FlowyError>;
fn collab_db(&self, uid: i64) -> Result<Arc<RocksCollabDB>, FlowyError>;
}
/// A trait for database cloud service.
/// Each kind of server should implement this trait. Check out the [AppFlowyServerProvider] of
/// [flowy-server] crate for more information.
pub trait DatabaseCloudService: Send + Sync {
fn get_collab_update(&self, object_id: &str) -> FutureResult<CollabObjectUpdate, FlowyError>;
fn batch_get_collab_updates(
&self,
object_ids: Vec<String>,
) -> FutureResult<CollabObjectUpdateByOid, FlowyError>;
fn get_collab_latest_snapshot(
&self,
object_id: &str,
) -> FutureResult<Option<DatabaseSnapshot>, FlowyError>;
}
pub struct DatabaseSnapshot {
pub snapshot_id: i64,
pub database_id: String,
pub data: Vec<u8>,
pub created_at: i64,
}

View File

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, Weak};
use collab_database::database::gen_row_id;
use collab_database::rows::RowId;
@ -8,7 +8,7 @@ use lib_dispatch::prelude::{data_result_ok, AFPluginData, AFPluginState, DataRes
use lib_infra::util::timestamp;
use crate::entities::*;
use crate::manager::DatabaseManager2;
use crate::manager::DatabaseManager;
use crate::services::cell::CellBuilder;
use crate::services::field::checklist_type_option::ChecklistCellChangeset;
use crate::services::field::{
@ -17,11 +17,21 @@ use crate::services::field::{
use crate::services::group::{GroupChangeset, GroupSettingChangeset};
use crate::services::share::csv::CSVFormat;
fn upgrade_manager(
database_manager: AFPluginState<Weak<DatabaseManager>>,
) -> FlowyResult<Arc<DatabaseManager>> {
let manager = database_manager
.upgrade()
.ok_or(FlowyError::internal().context("The database manager is already dropped"))?;
Ok(manager)
}
#[tracing::instrument(level = "trace", skip_all, err)]
pub(crate) async fn get_database_data_handler(
data: AFPluginData<DatabaseViewIdPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<DatabasePB, FlowyError> {
let manager = upgrade_manager(manager)?;
let view_id: DatabaseViewIdPB = data.into_inner();
let database_editor = manager.get_database_with_view_id(view_id.as_ref()).await?;
let data = database_editor.get_database_data(view_id.as_ref()).await?;
@ -31,8 +41,9 @@ pub(crate) async fn get_database_data_handler(
#[tracing::instrument(level = "trace", skip_all, err)]
pub(crate) async fn open_database_handler(
data: AFPluginData<DatabaseViewIdPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> Result<(), FlowyError> {
let manager = upgrade_manager(manager)?;
let view_id: DatabaseViewIdPB = data.into_inner();
let database_id = manager
.get_database_id_with_view_id(view_id.as_ref())
@ -44,8 +55,9 @@ pub(crate) async fn open_database_handler(
#[tracing::instrument(level = "trace", skip_all, err)]
pub(crate) async fn get_database_id_handler(
data: AFPluginData<DatabaseViewIdPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<DatabaseIdPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let view_id: DatabaseViewIdPB = data.into_inner();
let database_id = manager
.get_database_id_with_view_id(view_id.as_ref())
@ -56,8 +68,9 @@ pub(crate) async fn get_database_id_handler(
#[tracing::instrument(level = "trace", skip(data, manager), err)]
pub(crate) async fn get_database_setting_handler(
data: AFPluginData<DatabaseViewIdPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<DatabaseViewSettingPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let view_id: DatabaseViewIdPB = data.into_inner();
let database_editor = manager.get_database_with_view_id(view_id.as_ref()).await?;
let data = database_editor
@ -69,8 +82,9 @@ pub(crate) async fn get_database_setting_handler(
#[tracing::instrument(level = "trace", skip(data, manager), err)]
pub(crate) async fn update_database_setting_handler(
data: AFPluginData<DatabaseSettingChangesetPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> Result<(), FlowyError> {
let manager = upgrade_manager(manager)?;
let params: DatabaseSettingChangesetParams = data.into_inner().try_into()?;
let editor = manager.get_database_with_view_id(&params.view_id).await?;
@ -100,8 +114,9 @@ pub(crate) async fn update_database_setting_handler(
#[tracing::instrument(level = "trace", skip(data, manager), err)]
pub(crate) async fn get_all_filters_handler(
data: AFPluginData<DatabaseViewIdPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<RepeatedFilterPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let view_id: DatabaseViewIdPB = data.into_inner();
let database_editor = manager.get_database_with_view_id(view_id.as_ref()).await?;
let filters = database_editor.get_all_filters(view_id.as_ref()).await;
@ -111,8 +126,9 @@ pub(crate) async fn get_all_filters_handler(
#[tracing::instrument(level = "trace", skip(data, manager), err)]
pub(crate) async fn get_all_sorts_handler(
data: AFPluginData<DatabaseViewIdPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<RepeatedSortPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let view_id: DatabaseViewIdPB = data.into_inner();
let database_editor = manager.get_database_with_view_id(view_id.as_ref()).await?;
let sorts = database_editor.get_all_sorts(view_id.as_ref()).await;
@ -122,8 +138,9 @@ pub(crate) async fn get_all_sorts_handler(
#[tracing::instrument(level = "trace", skip(data, manager), err)]
pub(crate) async fn delete_all_sorts_handler(
data: AFPluginData<DatabaseViewIdPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> Result<(), FlowyError> {
let manager = upgrade_manager(manager)?;
let view_id: DatabaseViewIdPB = data.into_inner();
let database_editor = manager.get_database_with_view_id(view_id.as_ref()).await?;
database_editor.delete_all_sorts(view_id.as_ref()).await;
@ -133,8 +150,9 @@ pub(crate) async fn delete_all_sorts_handler(
#[tracing::instrument(level = "trace", skip(data, manager), err)]
pub(crate) async fn get_fields_handler(
data: AFPluginData<GetFieldPayloadPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<RepeatedFieldPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let params: GetFieldParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
let fields = database_editor
@ -149,8 +167,9 @@ pub(crate) async fn get_fields_handler(
#[tracing::instrument(level = "trace", skip(data, manager), err)]
pub(crate) async fn get_primary_field_handler(
data: AFPluginData<DatabaseViewIdPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<FieldPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let view_id = data.into_inner().value;
let database_editor = manager.get_database_with_view_id(&view_id).await?;
let mut fields = database_editor
@ -177,8 +196,9 @@ pub(crate) async fn get_primary_field_handler(
#[tracing::instrument(level = "trace", skip(data, manager), err)]
pub(crate) async fn update_field_handler(
data: AFPluginData<FieldChangesetPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> Result<(), FlowyError> {
let manager = upgrade_manager(manager)?;
let params: FieldChangesetParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
database_editor.update_field(params).await?;
@ -188,8 +208,9 @@ pub(crate) async fn update_field_handler(
#[tracing::instrument(level = "trace", skip(data, manager), err)]
pub(crate) async fn update_field_type_option_handler(
data: AFPluginData<TypeOptionChangesetPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> Result<(), FlowyError> {
let manager = upgrade_manager(manager)?;
let params: TypeOptionChangesetParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
if let Some(old_field) = database_editor.get_field(&params.field_id) {
@ -211,8 +232,9 @@ pub(crate) async fn update_field_type_option_handler(
#[tracing::instrument(level = "trace", skip(data, manager), err)]
pub(crate) async fn delete_field_handler(
data: AFPluginData<DeleteFieldPayloadPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> Result<(), FlowyError> {
let manager = upgrade_manager(manager)?;
let params: FieldIdParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
database_editor.delete_field(&params.field_id).await?;
@ -222,8 +244,9 @@ pub(crate) async fn delete_field_handler(
#[tracing::instrument(level = "debug", skip(data, manager), err)]
pub(crate) async fn switch_to_field_handler(
data: AFPluginData<UpdateFieldTypePayloadPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> Result<(), FlowyError> {
let manager = upgrade_manager(manager)?;
let params: EditFieldParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
let old_field = database_editor.get_field(&params.field_id);
@ -257,8 +280,9 @@ pub(crate) async fn switch_to_field_handler(
#[tracing::instrument(level = "trace", skip(data, manager), err)]
pub(crate) async fn duplicate_field_handler(
data: AFPluginData<DuplicateFieldPayloadPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> Result<(), FlowyError> {
let manager = upgrade_manager(manager)?;
let params: FieldIdParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
database_editor
@ -271,8 +295,9 @@ pub(crate) async fn duplicate_field_handler(
#[tracing::instrument(level = "trace", skip(data, manager), err)]
pub(crate) async fn get_field_type_option_data_handler(
data: AFPluginData<TypeOptionPathPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<TypeOptionPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let params: TypeOptionPathParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
if let Some((field, data)) = database_editor
@ -294,8 +319,9 @@ pub(crate) async fn get_field_type_option_data_handler(
#[tracing::instrument(level = "trace", skip(data, manager), err)]
pub(crate) async fn create_field_type_option_data_handler(
data: AFPluginData<CreateFieldPayloadPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<TypeOptionPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let params: CreateFieldParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
let (field, data) = database_editor
@ -313,8 +339,9 @@ pub(crate) async fn create_field_type_option_data_handler(
#[tracing::instrument(level = "trace", skip(data, manager), err)]
pub(crate) async fn move_field_handler(
data: AFPluginData<MoveFieldPayloadPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> Result<(), FlowyError> {
let manager = upgrade_manager(manager)?;
let params: MoveFieldParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
database_editor
@ -331,8 +358,9 @@ pub(crate) async fn move_field_handler(
// #[tracing::instrument(level = "debug", skip(data, manager), err)]
pub(crate) async fn get_row_handler(
data: AFPluginData<RowIdPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<OptionalRowPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let params: RowIdParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
let row = database_editor
@ -343,8 +371,9 @@ pub(crate) async fn get_row_handler(
pub(crate) async fn get_row_meta_handler(
data: AFPluginData<RowIdPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<RowMetaPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let params: RowIdParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
match database_editor.get_row_meta(&params.view_id, &params.row_id) {
@ -355,8 +384,9 @@ pub(crate) async fn get_row_meta_handler(
pub(crate) async fn update_row_meta_handler(
data: AFPluginData<UpdateRowMetaChangesetPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> FlowyResult<()> {
let manager = upgrade_manager(manager)?;
let params: UpdateRowMetaParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
let row_id = RowId::from(params.id.clone());
@ -367,8 +397,9 @@ pub(crate) async fn update_row_meta_handler(
#[tracing::instrument(level = "debug", skip(data, manager), err)]
pub(crate) async fn delete_row_handler(
data: AFPluginData<RowIdPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> Result<(), FlowyError> {
let manager = upgrade_manager(manager)?;
let params: RowIdParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
database_editor.delete_row(&params.row_id).await;
@ -378,8 +409,9 @@ pub(crate) async fn delete_row_handler(
#[tracing::instrument(level = "debug", skip(data, manager), err)]
pub(crate) async fn duplicate_row_handler(
data: AFPluginData<RowIdPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> Result<(), FlowyError> {
let manager = upgrade_manager(manager)?;
let params: RowIdParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
database_editor
@ -391,8 +423,9 @@ pub(crate) async fn duplicate_row_handler(
#[tracing::instrument(level = "debug", skip(data, manager), err)]
pub(crate) async fn move_row_handler(
data: AFPluginData<MoveRowPayloadPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> Result<(), FlowyError> {
let manager = upgrade_manager(manager)?;
let params: MoveRowParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
database_editor
@ -404,8 +437,9 @@ pub(crate) async fn move_row_handler(
#[tracing::instrument(level = "debug", skip(data, manager), err)]
pub(crate) async fn create_row_handler(
data: AFPluginData<CreateRowPayloadPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<RowMetaPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let params: CreateRowParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
let fields = database_editor.get_fields(&params.view_id, None);
@ -433,8 +467,9 @@ pub(crate) async fn create_row_handler(
// #[tracing::instrument(level = "trace", skip_all, err)]
pub(crate) async fn get_cell_handler(
data: AFPluginData<CellIdPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<CellPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let params: CellIdParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
let cell = database_editor
@ -447,8 +482,9 @@ pub(crate) async fn get_cell_handler(
#[tracing::instrument(level = "debug", skip_all, err)]
pub(crate) async fn update_cell_handler(
data: AFPluginData<CellChangesetPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> Result<(), FlowyError> {
let manager = upgrade_manager(manager)?;
let params: CellChangesetPB = data.into_inner();
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
database_editor
@ -465,8 +501,9 @@ pub(crate) async fn update_cell_handler(
#[tracing::instrument(level = "trace", skip_all, err)]
pub(crate) async fn new_select_option_handler(
data: AFPluginData<CreateSelectOptionPayloadPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<SelectOptionPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let params: CreateSelectOptionParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
let result = database_editor
@ -483,8 +520,9 @@ pub(crate) async fn new_select_option_handler(
#[tracing::instrument(level = "trace", skip_all, err)]
pub(crate) async fn insert_or_update_select_option_handler(
data: AFPluginData<RepeatedSelectOptionPayload>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> Result<(), FlowyError> {
let manager = upgrade_manager(manager)?;
let params = data.into_inner();
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
database_editor
@ -501,8 +539,9 @@ pub(crate) async fn insert_or_update_select_option_handler(
#[tracing::instrument(level = "trace", skip_all, err)]
pub(crate) async fn delete_select_option_handler(
data: AFPluginData<RepeatedSelectOptionPayload>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> Result<(), FlowyError> {
let manager = upgrade_manager(manager)?;
let params = data.into_inner();
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
database_editor
@ -519,8 +558,9 @@ pub(crate) async fn delete_select_option_handler(
#[tracing::instrument(level = "trace", skip(data, manager), err)]
pub(crate) async fn get_select_option_handler(
data: AFPluginData<CellIdPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<SelectOptionCellDataPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let params: CellIdParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
let options = database_editor
@ -532,8 +572,9 @@ pub(crate) async fn get_select_option_handler(
#[tracing::instrument(level = "trace", skip_all, err)]
pub(crate) async fn update_select_option_cell_handler(
data: AFPluginData<SelectOptionCellChangesetPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> Result<(), FlowyError> {
let manager = upgrade_manager(manager)?;
let params: SelectOptionCellChangesetParams = data.into_inner().try_into()?;
let database_editor = manager
.get_database_with_view_id(&params.cell_identifier.view_id)
@ -556,8 +597,9 @@ pub(crate) async fn update_select_option_cell_handler(
#[tracing::instrument(level = "trace", skip_all, err)]
pub(crate) async fn get_checklist_cell_data_handler(
data: AFPluginData<CellIdPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<ChecklistCellDataPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let params: CellIdParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
let data = database_editor
@ -569,8 +611,9 @@ pub(crate) async fn get_checklist_cell_data_handler(
#[tracing::instrument(level = "trace", skip_all, err)]
pub(crate) async fn update_checklist_cell_handler(
data: AFPluginData<ChecklistCellDataChangesetPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> Result<(), FlowyError> {
let manager = upgrade_manager(manager)?;
let params: ChecklistCellDataChangesetParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
let changeset = ChecklistCellChangeset {
@ -588,8 +631,9 @@ pub(crate) async fn update_checklist_cell_handler(
#[tracing::instrument(level = "trace", skip_all, err)]
pub(crate) async fn update_date_cell_handler(
data: AFPluginData<DateChangesetPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> Result<(), FlowyError> {
let manager = upgrade_manager(manager)?;
let data = data.into_inner();
let cell_id: CellIdParams = data.cell_id.try_into()?;
let cell_changeset = DateCellChangeset {
@ -612,8 +656,9 @@ pub(crate) async fn update_date_cell_handler(
#[tracing::instrument(level = "trace", skip_all, err)]
pub(crate) async fn get_groups_handler(
data: AFPluginData<DatabaseViewIdPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<RepeatedGroupPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let params: DatabaseViewIdPB = data.into_inner();
let database_editor = manager.get_database_with_view_id(params.as_ref()).await?;
let groups = database_editor.load_groups(params.as_ref()).await?;
@ -623,8 +668,9 @@ pub(crate) async fn get_groups_handler(
#[tracing::instrument(level = "trace", skip_all, err)]
pub(crate) async fn get_group_handler(
data: AFPluginData<DatabaseGroupIdPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<GroupPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let params: DatabaseGroupIdParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
let group = database_editor
@ -636,8 +682,9 @@ pub(crate) async fn get_group_handler(
#[tracing::instrument(level = "trace", skip_all, err)]
pub(crate) async fn set_group_by_field_handler(
data: AFPluginData<GroupByFieldPayloadPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> FlowyResult<()> {
let manager = upgrade_manager(manager)?;
let params: GroupByFieldParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
database_editor
@ -649,8 +696,9 @@ pub(crate) async fn set_group_by_field_handler(
#[tracing::instrument(level = "trace", skip_all, err)]
pub(crate) async fn update_group_handler(
data: AFPluginData<UpdateGroupPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> FlowyResult<()> {
let manager = upgrade_manager(manager)?;
let params: UpdateGroupParams = data.into_inner().try_into()?;
let view_id = params.view_id.clone();
let database_editor = manager.get_database_with_view_id(&view_id).await?;
@ -666,8 +714,9 @@ pub(crate) async fn update_group_handler(
#[tracing::instrument(level = "debug", skip(data, manager), err)]
pub(crate) async fn move_group_handler(
data: AFPluginData<MoveGroupPayloadPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> FlowyResult<()> {
let manager = upgrade_manager(manager)?;
let params: MoveGroupParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
database_editor
@ -679,8 +728,9 @@ pub(crate) async fn move_group_handler(
#[tracing::instrument(level = "debug", skip(data, manager), err)]
pub(crate) async fn move_group_row_handler(
data: AFPluginData<MoveGroupRowPayloadPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> FlowyResult<()> {
let manager = upgrade_manager(manager)?;
let params: MoveGroupRowParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
database_editor
@ -696,8 +746,9 @@ pub(crate) async fn move_group_row_handler(
#[tracing::instrument(level = "debug", skip(manager), err)]
pub(crate) async fn get_databases_handler(
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<RepeatedDatabaseDescriptionPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let data = manager.get_all_databases_description().await;
data_result_ok(data)
}
@ -705,8 +756,9 @@ pub(crate) async fn get_databases_handler(
#[tracing::instrument(level = "debug", skip(data, manager), err)]
pub(crate) async fn set_layout_setting_handler(
data: AFPluginData<LayoutSettingChangesetPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> FlowyResult<()> {
let manager = upgrade_manager(manager)?;
let params: LayoutSettingChangeset = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
let layout_params = LayoutSettingParams {
@ -721,8 +773,9 @@ pub(crate) async fn set_layout_setting_handler(
pub(crate) async fn get_layout_setting_handler(
data: AFPluginData<DatabaseLayoutMetaPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<DatabaseLayoutSettingPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let params: DatabaseLayoutMeta = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
let layout_setting_pb = database_editor
@ -736,8 +789,9 @@ pub(crate) async fn get_layout_setting_handler(
#[tracing::instrument(level = "debug", skip(data, manager), err)]
pub(crate) async fn get_calendar_events_handler(
data: AFPluginData<CalendarEventRequestPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<RepeatedCalendarEventPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let params: CalendarEventRequestParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
let events = database_editor
@ -749,8 +803,9 @@ pub(crate) async fn get_calendar_events_handler(
#[tracing::instrument(level = "debug", skip(data, manager), err)]
pub(crate) async fn get_no_date_calendar_events_handler(
data: AFPluginData<CalendarEventRequestPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<RepeatedNoDateCalendarEventPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let params: CalendarEventRequestParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
let _events = database_editor
@ -762,8 +817,9 @@ pub(crate) async fn get_no_date_calendar_events_handler(
#[tracing::instrument(level = "debug", skip(data, manager), err)]
pub(crate) async fn get_calendar_event_handler(
data: AFPluginData<RowIdPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<CalendarEventPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let params: RowIdParams = data.into_inner().try_into()?;
let database_editor = manager.get_database_with_view_id(&params.view_id).await?;
let event = database_editor
@ -778,8 +834,9 @@ pub(crate) async fn get_calendar_event_handler(
#[tracing::instrument(level = "debug", skip(data, manager), err)]
pub(crate) async fn move_calendar_event_handler(
data: AFPluginData<MoveCalendarEventPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> FlowyResult<()> {
let manager = upgrade_manager(manager)?;
let data = data.into_inner();
let cell_id: CellIdParams = data.cell_path.try_into()?;
let cell_changeset = DateCellChangeset {
@ -801,7 +858,7 @@ pub(crate) async fn move_calendar_event_handler(
#[tracing::instrument(level = "debug", skip_all, err)]
pub(crate) async fn create_database_view(
_data: AFPluginData<CreateDatabaseViewPayloadPB>,
_manager: AFPluginState<Arc<DatabaseManager2>>,
_manager: AFPluginState<Weak<DatabaseManager>>,
) -> FlowyResult<()> {
// let data: CreateDatabaseViewParams = data.into_inner().try_into()?;
Ok(())
@ -810,8 +867,9 @@ pub(crate) async fn create_database_view(
#[tracing::instrument(level = "debug", skip_all, err)]
pub(crate) async fn export_csv_handler(
data: AFPluginData<DatabaseViewIdPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<DatabaseExportDataPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let view_id = data.into_inner().value;
let database = manager.get_database_with_view_id(&view_id).await?;
let data = database.export_csv(CSVFormat::Original).await?;
@ -824,8 +882,9 @@ pub(crate) async fn export_csv_handler(
#[tracing::instrument(level = "debug", skip_all, err)]
pub(crate) async fn get_snapshots_handler(
data: AFPluginData<DatabaseViewIdPB>,
manager: AFPluginState<Arc<DatabaseManager2>>,
manager: AFPluginState<Weak<DatabaseManager>>,
) -> DataResult<RepeatedDatabaseSnapshotPB, FlowyError> {
let manager = upgrade_manager(manager)?;
let view_id = data.into_inner().value;
let snapshots = manager.get_database_snapshots(&view_id).await?;
data_result_ok(RepeatedDatabaseSnapshotPB { items: snapshots })

View File

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::Weak;
use strum_macros::Display;
@ -6,9 +6,9 @@ use flowy_derive::{Flowy_Event, ProtoBuf_Enum};
use lib_dispatch::prelude::*;
use crate::event_handler::*;
use crate::manager::DatabaseManager2;
use crate::manager::DatabaseManager;
pub fn init(database_manager: Arc<DatabaseManager2>) -> AFPlugin {
pub fn init(database_manager: Weak<DatabaseManager>) -> AFPlugin {
let plugin = AFPlugin::new()
.name(env!("CARGO_PKG_NAME"))
.state(database_manager);

View File

@ -1,11 +1,10 @@
pub use manager::*;
pub mod deps;
pub mod entities;
mod event_handler;
pub mod event_map;
mod manager;
mod notification;
pub mod notification;
mod protobuf;
pub mod services;
pub mod template;

View File

@ -1,23 +1,23 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use appflowy_integrate::collab_builder::AppFlowyCollabBuilder;
use appflowy_integrate::{CollabPersistenceConfig, RocksCollabDB};
use appflowy_integrate::{CollabPersistenceConfig, CollabType, RocksCollabDB};
use collab::core::collab::{CollabRawData, MutexCollab};
use collab_database::blocks::BlockEvent;
use collab_database::database::{DatabaseData, YrsDocAction};
use collab_database::error::DatabaseError;
use collab_database::user::{
make_workspace_database_id, CollabFuture, CollabObjectUpdate, CollabObjectUpdateByOid,
DatabaseCollabService, WorkspaceDatabase,
CollabFuture, CollabObjectUpdate, CollabObjectUpdateByOid, DatabaseCollabService,
WorkspaceDatabase,
};
use collab_database::views::{CreateDatabaseParams, CreateViewParams, DatabaseLayout};
use tokio::sync::RwLock;
use flowy_database_deps::cloud::DatabaseCloudService;
use flowy_error::{internal_error, FlowyError, FlowyResult};
use flowy_task::TaskDispatcher;
use crate::deps::{DatabaseCloudService, DatabaseUser2};
use crate::entities::{
DatabaseDescriptionPB, DatabaseLayoutPB, DatabaseSnapshotPB, DidFetchRowPB,
RepeatedDatabaseDescriptionPB,
@ -27,8 +27,14 @@ use crate::services::database::DatabaseEditor;
use crate::services::database_view::DatabaseLayoutDepsResolver;
use crate::services::share::csv::{CSVFormat, CSVImporter, ImportResult};
pub struct DatabaseManager2 {
user: Arc<dyn DatabaseUser2>,
pub trait DatabaseUser: Send + Sync {
fn user_id(&self) -> Result<i64, FlowyError>;
fn token(&self) -> Result<Option<String>, FlowyError>;
fn collab_db(&self, uid: i64) -> Result<Weak<RocksCollabDB>, FlowyError>;
}
pub struct DatabaseManager {
user: Arc<dyn DatabaseUser>,
workspace_database: Arc<RwLock<Option<Arc<WorkspaceDatabase>>>>,
task_scheduler: Arc<RwLock<TaskDispatcher>>,
editors: RwLock<HashMap<String, Arc<DatabaseEditor>>>,
@ -36,9 +42,9 @@ pub struct DatabaseManager2 {
cloud_service: Arc<dyn DatabaseCloudService>,
}
impl DatabaseManager2 {
impl DatabaseManager {
pub fn new(
database_user: Arc<dyn DatabaseUser2>,
database_user: Arc<dyn DatabaseUser>,
task_scheduler: Arc<RwLock<TaskDispatcher>>,
collab_builder: Arc<AppFlowyCollabBuilder>,
cloud_service: Arc<dyn DatabaseCloudService>,
@ -53,14 +59,23 @@ impl DatabaseManager2 {
}
}
fn is_collab_exist(&self, uid: i64, collab_db: &Arc<RocksCollabDB>, object_id: &str) -> bool {
let read_txn = collab_db.read_txn();
read_txn.is_exist(uid, object_id)
fn is_collab_exist(&self, uid: i64, collab_db: &Weak<RocksCollabDB>, object_id: &str) -> bool {
match collab_db.upgrade() {
None => false,
Some(collab_db) => {
let read_txn = collab_db.read_txn();
read_txn.is_exist(uid, object_id)
},
}
}
pub async fn initialize(&self, uid: i64) -> FlowyResult<()> {
pub async fn initialize(
&self,
uid: i64,
_workspace_id: String,
workspace_database_id: String,
) -> FlowyResult<()> {
let collab_db = self.user.collab_db(uid)?;
let workspace_database_id = make_workspace_database_id(uid);
let collab_builder = UserDatabaseCollabServiceImpl {
collab_builder: self.collab_builder.clone(),
cloud_service: self.cloud_service.clone(),
@ -73,7 +88,7 @@ impl DatabaseManager2 {
tracing::trace!("workspace database not exist, try to fetch from remote");
match self
.cloud_service
.get_collab_update(&workspace_database_id)
.get_collab_update(&workspace_database_id, CollabType::WorkspaceDatabase)
.await
{
Ok(updates) => collab_raw_data = updates,
@ -91,7 +106,7 @@ impl DatabaseManager2 {
let collab = collab_builder.build_collab_with_config(
uid,
&workspace_database_id,
"databases",
CollabType::WorkspaceDatabase,
collab_db.clone(),
collab_raw_data,
&config,
@ -100,11 +115,21 @@ impl DatabaseManager2 {
WorkspaceDatabase::open(uid, collab, collab_db, config, collab_builder);
subscribe_block_event(&workspace_database);
*self.workspace_database.write().await = Some(Arc::new(workspace_database));
// Remove all existing editors
self.editors.write().await.clear();
Ok(())
}
pub async fn initialize_with_new_user(&self, user_id: i64, _token: &str) -> FlowyResult<()> {
self.initialize(user_id).await?;
pub async fn initialize_with_new_user(
&self,
user_id: i64,
workspace_id: String,
database_storage_id: String,
) -> FlowyResult<()> {
self
.initialize(user_id, workspace_id, database_storage_id)
.await?;
Ok(())
}
@ -346,6 +371,7 @@ impl DatabaseCollabService for UserDatabaseCollabServiceImpl {
fn get_collab_update(
&self,
object_id: &str,
object_ty: CollabType,
) -> CollabFuture<Result<CollabObjectUpdate, DatabaseError>> {
let object_id = object_id.to_string();
let weak_cloud_service = Arc::downgrade(&self.cloud_service);
@ -357,9 +383,8 @@ impl DatabaseCollabService for UserDatabaseCollabServiceImpl {
},
Some(cloud_service) => {
let updates = cloud_service
.get_collab_update(&object_id)
.await
.map_err(|e| DatabaseError::Internal(Box::new(e)))?;
.get_collab_update(&object_id, object_ty)
.await?;
Ok(updates)
},
}
@ -369,6 +394,7 @@ impl DatabaseCollabService for UserDatabaseCollabServiceImpl {
fn batch_get_collab_update(
&self,
object_ids: Vec<String>,
object_ty: CollabType,
) -> CollabFuture<Result<CollabObjectUpdateByOid, DatabaseError>> {
let weak_cloud_service = Arc::downgrade(&self.cloud_service);
Box::pin(async move {
@ -379,9 +405,8 @@ impl DatabaseCollabService for UserDatabaseCollabServiceImpl {
},
Some(cloud_service) => {
let updates = cloud_service
.batch_get_collab_updates(object_ids)
.await
.map_err(|e| DatabaseError::Internal(Box::new(e)))?;
.batch_get_collab_updates(object_ids, object_ty)
.await?;
Ok(updates)
},
}
@ -392,8 +417,8 @@ impl DatabaseCollabService for UserDatabaseCollabServiceImpl {
&self,
uid: i64,
object_id: &str,
object_name: &str,
collab_db: Arc<RocksCollabDB>,
object_type: CollabType,
collab_db: Weak<RocksCollabDB>,
collab_raw_data: CollabRawData,
config: &CollabPersistenceConfig,
) -> Arc<MutexCollab> {
@ -402,7 +427,7 @@ impl DatabaseCollabService for UserDatabaseCollabServiceImpl {
.build_with_config(
uid,
object_id,
object_name,
object_type,
collab_db,
collab_raw_data,
config,

View File

@ -74,7 +74,11 @@ impl DatabaseEditor {
tokio::spawn(async move {
while let Some(snapshot_state) = snapshot_state.next().await {
if let Some(new_snapshot_id) = snapshot_state.snapshot_id() {
tracing::debug!("Did create database remote snapshot: {}", new_snapshot_id);
tracing::debug!(
"Did create {} database remote snapshot: {}",
database_id,
new_snapshot_id
);
send_notification(
&database_id,
DatabaseNotification::DidUpdateDatabaseSnapshotState,