feat: sync the documents and databases after batch importing documents and databases (#5644)

* feat: support batch import

* feat: support batch import database

* chore: revert launch.json

* chore: fix rust ci

* fix: rust ci
This commit is contained in:
Lucas.Xu 2024-07-01 14:44:08 +08:00 committed by GitHub
parent c78f23e1c0
commit 2b8dca209e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 291 additions and 164 deletions

View File

@ -4,34 +4,29 @@ import 'package:appflowy_backend/protobuf/flowy-folder/import.pb.dart';
import 'package:appflowy_backend/protobuf/flowy-folder/view.pbenum.dart';
import 'package:appflowy_result/appflowy_result.dart';
class ImportBackendService {
static Future<FlowyResult<void, FlowyError>> importData(
List<int> data,
String name,
String parentViewId,
ImportTypePB importType,
) async {
final payload = ImportPB.create()
..data = data
..parentViewId = parentViewId
..viewLayout = importType.toLayout()
..name = name
..importType = importType;
return FolderEventImportData(payload).send();
}
class ImportPayload {
ImportPayload({
required this.name,
required this.data,
required this.layout,
});
final String name;
final List<int> data;
final ViewLayoutPB layout;
}
extension on ImportTypePB {
ViewLayoutPB toLayout() {
switch (this) {
case ImportTypePB.HistoryDocument:
return ViewLayoutPB.Document;
case ImportTypePB.HistoryDatabase ||
ImportTypePB.CSV ||
ImportTypePB.RawDatabase:
return ViewLayoutPB.Grid;
default:
throw UnimplementedError('Unsupported import type $this');
}
class ImportBackendService {
static Future<FlowyResult<void, FlowyError>> importPages(
String parentViewId,
List<ImportValuePayloadPB> values,
) async {
final request = ImportPayloadPB(
parentViewId: parentViewId,
values: values,
syncAfterCreate: true,
);
return FolderEventImportData(request).send();
}
}

View File

@ -7,8 +7,6 @@ import 'package:appflowy/plugins/document/presentation/editor_plugins/migration/
import 'package:appflowy/startup/startup.dart';
import 'package:appflowy/workspace/application/settings/share/import_service.dart';
import 'package:appflowy/workspace/presentation/home/menu/sidebar/import/import_type.dart';
import 'package:appflowy/workspace/presentation/home/toast.dart';
import 'package:appflowy_backend/log.dart';
import 'package:appflowy_backend/protobuf/flowy-folder/protobuf.dart';
import 'package:appflowy_editor/appflowy_editor.dart' hide Log;
import 'package:easy_localization/easy_localization.dart';
@ -153,6 +151,8 @@ class _ImportPanelState extends State<ImportPanel> {
showLoading.value = true;
final importValues = <ImportValuePayloadPB>[];
for (final file in result.files) {
final path = file.path;
if (path == null) {
@ -166,59 +166,52 @@ class _ImportPanelState extends State<ImportPanel> {
case ImportType.historyDocument:
final bytes = _documentDataFrom(importType, data);
if (bytes != null) {
final result = await ImportBackendService.importData(
bytes,
name,
parentViewId,
ImportTypePB.HistoryDocument,
importValues.add(
ImportValuePayloadPB.create()
..name = name
..data = bytes
..viewLayout = ViewLayoutPB.Document
..importType = ImportTypePB.HistoryDocument,
);
result.onFailure((error) {
showSnackBarMessage(context, error.msg);
Log.error('Failed to import markdown $error');
});
}
break;
case ImportType.historyDatabase:
final result = await ImportBackendService.importData(
utf8.encode(data),
name,
parentViewId,
ImportTypePB.HistoryDatabase,
importValues.add(
ImportValuePayloadPB.create()
..name = name
..data = utf8.encode(data)
..viewLayout = ViewLayoutPB.Grid
..importType = ImportTypePB.HistoryDatabase,
);
result.onFailure((error) {
showSnackBarMessage(context, error.msg);
Log.error('Failed to import history database $error');
});
break;
case ImportType.databaseRawData:
final result = await ImportBackendService.importData(
utf8.encode(data),
name,
parentViewId,
ImportTypePB.RawDatabase,
importValues.add(
ImportValuePayloadPB.create()
..name = name
..data = utf8.encode(data)
..viewLayout = ViewLayoutPB.Grid
..importType = ImportTypePB.RawDatabase,
);
result.onFailure((error) {
showSnackBarMessage(context, error.msg);
Log.error('Failed to import database raw data $error');
});
break;
case ImportType.databaseCSV:
final result = await ImportBackendService.importData(
utf8.encode(data),
name,
parentViewId,
ImportTypePB.CSV,
importValues.add(
ImportValuePayloadPB.create()
..name = name
..data = utf8.encode(data)
..viewLayout = ViewLayoutPB.Grid
..importType = ImportTypePB.CSV,
);
result.onFailure((error) {
showSnackBarMessage(context, error.msg);
Log.error('Failed to import CSV $error');
});
break;
default:
assert(false, 'Unsupported Type $importType');
}
}
await ImportBackendService.importPages(
parentViewId,
importValues,
);
showLoading.value = false;
widget.importCallback(importType, '', null);
}

View File

@ -259,13 +259,14 @@ impl EventIntegrationTest {
.parse::<ViewPB>()
}
pub async fn import_data(&self, data: ImportPB) -> ViewPB {
pub async fn import_data(&self, data: ImportPayloadPB) -> Vec<ViewPB> {
EventBuilder::new(self.clone())
.event(FolderEvent::ImportData)
.payload(data)
.async_send()
.await
.parse::<ViewPB>()
.parse::<RepeatedViewPB>()
.items
}
pub async fn get_view_ancestors(&self, view_id: &str) -> Vec<ViewPB> {

View File

@ -1,7 +1,7 @@
use crate::util::unzip;
use event_integration_test::EventIntegrationTest;
use flowy_core::DEFAULT_NAME;
use flowy_folder::entities::{ImportPB, ImportTypePB, ViewLayoutPB};
use flowy_folder::entities::{ImportPayloadPB, ImportTypePB, ImportValuePayloadPB, ViewLayoutPB};
#[tokio::test]
async fn import_492_row_csv_file_test() {
@ -16,8 +16,9 @@ async fn import_492_row_csv_file_test() {
let workspace_id = test.get_current_workspace().await.id;
let import_data = gen_import_data(file_name, csv_string, workspace_id);
let view = test.import_data(import_data).await;
let database = test.get_database(&view.id).await;
let views = test.import_data(import_data).await;
let view_id = views[0].clone().id;
let database = test.get_database(&view_id).await;
assert_eq!(database.rows.len(), 492);
drop(cleaner);
}
@ -35,21 +36,24 @@ async fn import_10240_row_csv_file_test() {
let workspace_id = test.get_current_workspace().await.id;
let import_data = gen_import_data(file_name, csv_string, workspace_id);
let view = test.import_data(import_data).await;
let database = test.get_database(&view.id).await;
let views = test.import_data(import_data).await;
let view_id = views[0].clone().id;
let database = test.get_database(&view_id).await;
assert_eq!(database.rows.len(), 10240);
drop(cleaner);
}
fn gen_import_data(file_name: String, csv_string: String, workspace_id: String) -> ImportPB {
let import_data = ImportPB {
fn gen_import_data(file_name: String, csv_string: String, workspace_id: String) -> ImportPayloadPB {
ImportPayloadPB {
parent_view_id: workspace_id.clone(),
name: file_name,
data: Some(csv_string.as_bytes().to_vec()),
file_path: None,
view_layout: ViewLayoutPB::Grid,
import_type: ImportTypePB::CSV,
};
import_data
sync_after_create: false,
values: vec![ImportValuePayloadPB {
name: file_name,
data: Some(csv_string.as_bytes().to_vec()),
file_path: None,
view_layout: ViewLayoutPB::Grid,
import_type: ImportTypePB::CSV,
}],
}
}

View File

@ -1,4 +1,5 @@
use bytes::Bytes;
use collab_entity::EncodedCollab;
use collab_integrate::collab_builder::AppFlowyCollabBuilder;
use collab_integrate::CollabKVDB;
use flowy_chat::chat_manager::ChatManager;
@ -229,15 +230,15 @@ impl FolderOperationHandler for DocumentFolderOperation {
_name: &str,
_import_type: ImportType,
bytes: Vec<u8>,
) -> FutureResult<(), FlowyError> {
) -> FutureResult<EncodedCollab, FlowyError> {
let view_id = view_id.to_string();
let manager = self.0.clone();
FutureResult::new(async move {
let data = DocumentDataPB::try_from(Bytes::from(bytes))?;
manager
let encoded_collab = manager
.create_document(uid, &view_id, Some(data.into()))
.await?;
Ok(())
Ok(encoded_collab)
})
}
@ -392,7 +393,7 @@ impl FolderOperationHandler for DatabaseFolderOperation {
_name: &str,
import_type: ImportType,
bytes: Vec<u8>,
) -> FutureResult<(), FlowyError> {
) -> FutureResult<EncodedCollab, FlowyError> {
let database_manager = self.0.clone();
let view_id = view_id.to_string();
let format = match import_type {
@ -406,11 +407,10 @@ impl FolderOperationHandler for DatabaseFolderOperation {
String::from_utf8(bytes).map_err(|err| FlowyError::internal().with_context(err))
})
.await??;
database_manager
let result = database_manager
.import_csv(view_id, content, format)
.await?;
Ok(())
Ok(result.encoded_collab)
})
}
@ -531,7 +531,7 @@ impl FolderOperationHandler for ChatFolderOperation {
_name: &str,
_import_type: ImportType,
_bytes: Vec<u8>,
) -> FutureResult<(), FlowyError> {
) -> FutureResult<EncodedCollab, FlowyError> {
FutureResult::new(async move { Err(FlowyError::not_support()) })
}

View File

@ -3,7 +3,7 @@ use std::collections::HashMap;
use std::sync::{Arc, Weak};
use collab::core::collab::{DataSource, MutexCollab};
use collab_database::database::DatabaseData;
use collab_database::database::{DatabaseData, MutexDatabase};
use collab_database::error::DatabaseError;
use collab_database::rows::RowId;
use collab_database::views::{CreateDatabaseParams, CreateViewParams, DatabaseLayout};
@ -309,10 +309,13 @@ impl DatabaseManager {
Ok(())
}
pub async fn create_database_with_params(&self, params: CreateDatabaseParams) -> FlowyResult<()> {
pub async fn create_database_with_params(
&self,
params: CreateDatabaseParams,
) -> FlowyResult<Arc<MutexDatabase>> {
let wdb = self.get_database_indexer().await?;
let _ = wdb.create_database(params)?;
Ok(())
let database = wdb.create_database(params)?;
Ok(database)
}
/// A linked view is a view that is linked to existing database.
@ -362,11 +365,19 @@ impl DatabaseManager {
return Err(FlowyError::internal().with_context("The number of rows exceeds the limit"));
}
let view_id = params.inline_view_id.clone();
let database_id = params.database_id.clone();
let database = self.create_database_with_params(params).await?;
let encoded_collab = database
.lock()
.get_collab()
.lock()
.encode_collab_v1(|collab| CollabType::Database.validate_require_data(collab))?;
let result = ImportResult {
database_id: params.database_id.clone(),
view_id: params.inline_view_id.clone(),
database_id,
view_id,
encoded_collab,
};
self.create_database_with_params(params).await?;
Ok(result)
}

View File

@ -5,6 +5,7 @@ use collab_database::fields::Field;
use collab_database::rows::{new_cell_builder, Cell, CreateRowParams};
use collab_database::views::{CreateDatabaseParams, CreateViewParams, DatabaseLayout};
use collab_entity::EncodedCollab;
use flowy_error::{FlowyError, FlowyResult};
use crate::entities::FieldType;
@ -166,6 +167,7 @@ impl FieldsRows {
pub struct ImportResult {
pub database_id: String,
pub view_id: String,
pub encoded_collab: EncodedCollab,
}
#[cfg(test)]

View File

@ -110,25 +110,30 @@ impl DocumentManager {
uid: i64,
doc_id: &str,
data: Option<DocumentData>,
) -> FlowyResult<()> {
) -> FlowyResult<EncodedCollab> {
if self.is_doc_exist(doc_id).await.unwrap_or(false) {
Err(FlowyError::new(
ErrorCode::RecordAlreadyExists,
format!("document {} already exists", doc_id),
))
} else {
let doc_state = doc_state_from_document_data(
let encoded_collab = doc_state_from_document_data(
doc_id,
data.unwrap_or_else(|| default_document_data(doc_id)),
)
.await?
.doc_state
.to_vec();
.await?;
let doc_state = encoded_collab.doc_state.to_vec();
let collab = self
.collab_for_document(uid, doc_id, DataSource::DocStateV1(doc_state), false)
.collab_for_document(
uid,
doc_id,
DataSource::DocStateV1(doc_state.clone()),
false,
)
.await?;
collab.lock().flush();
Ok(())
Ok(encoded_collab)
}
}

View File

@ -1,6 +1,7 @@
use crate::entities::parser::empty_str::NotEmptyStr;
use crate::entities::ViewLayoutPB;
use crate::share::{ImportParams, ImportType};
use crate::share::{ImportParams, ImportType, ImportValue};
use collab_entity::CollabType;
use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
use flowy_error::FlowyError;
@ -23,6 +24,17 @@ impl From<ImportTypePB> for ImportType {
}
}
impl From<ImportType> for CollabType {
fn from(import_type: ImportType) -> Self {
match import_type {
ImportType::HistoryDocument => CollabType::Document,
ImportType::HistoryDatabase => CollabType::Database,
ImportType::RawDatabase => CollabType::Database,
ImportType::CSV => CollabType::Database,
}
}
}
impl Default for ImportTypePB {
fn default() -> Self {
Self::HistoryDocument
@ -30,27 +42,43 @@ impl Default for ImportTypePB {
}
#[derive(Clone, Debug, ProtoBuf, Default)]
pub struct ImportPB {
pub struct ImportValuePayloadPB {
// the name of the import page
#[pb(index = 1)]
pub name: String,
// the data of the import page
// if the data is empty, the file_path must be provided
#[pb(index = 2, one_of)]
pub data: Option<Vec<u8>>,
// the file path of the import page
// if the file_path is empty, the data must be provided
#[pb(index = 3, one_of)]
pub file_path: Option<String>,
// the layout of the import page
#[pb(index = 4)]
pub view_layout: ViewLayoutPB,
// the type of the import page
#[pb(index = 5)]
pub import_type: ImportTypePB,
}
#[derive(Clone, Debug, ProtoBuf, Default)]
pub struct ImportPayloadPB {
#[pb(index = 1)]
pub parent_view_id: String,
#[pb(index = 2)]
pub name: String,
pub values: Vec<ImportValuePayloadPB>,
#[pb(index = 3, one_of)]
pub data: Option<Vec<u8>>,
#[pb(index = 4, one_of)]
pub file_path: Option<String>,
#[pb(index = 5)]
pub view_layout: ViewLayoutPB,
#[pb(index = 6)]
pub import_type: ImportTypePB,
#[pb(index = 3)]
pub sync_after_create: bool,
}
impl TryInto<ImportParams> for ImportPB {
impl TryInto<ImportParams> for ImportPayloadPB {
type Error = FlowyError;
fn try_into(self) -> Result<ImportParams, Self::Error> {
@ -58,28 +86,39 @@ impl TryInto<ImportParams> for ImportPB {
.map_err(|_| FlowyError::invalid_view_id())?
.0;
let name = if self.name.is_empty() {
"Untitled".to_string()
} else {
self.name
};
let mut values = Vec::new();
let file_path = match self.file_path {
None => None,
Some(file_path) => Some(
NotEmptyStr::parse(file_path)
.map_err(|_| FlowyError::invalid_data().with_context("The import file path is empty"))?
.0,
),
};
for value in self.values {
let name = if value.name.is_empty() {
"Untitled".to_string()
} else {
value.name
};
let file_path = match value.file_path {
None => None,
Some(file_path) => Some(
NotEmptyStr::parse(file_path)
.map_err(|_| FlowyError::invalid_data().with_context("The import file path is empty"))?
.0,
),
};
let params = ImportValue {
name,
data: value.data,
file_path,
view_layout: value.view_layout.into(),
import_type: value.import_type.into(),
};
values.push(params);
}
Ok(ImportParams {
parent_view_id,
name,
data: self.data,
file_path,
view_layout: self.view_layout.into(),
import_type: self.import_type.into(),
values,
sync_after_create: self.sync_after_create,
})
}
}

View File

@ -363,14 +363,13 @@ pub(crate) async fn delete_my_trash_handler(
#[tracing::instrument(level = "debug", skip(data, folder), err)]
pub(crate) async fn import_data_handler(
data: AFPluginData<ImportPB>,
data: AFPluginData<ImportPayloadPB>,
folder: AFPluginState<Weak<FolderManager>>,
) -> DataResult<ViewPB, FlowyError> {
) -> DataResult<RepeatedViewPB, FlowyError> {
let folder = upgrade_folder(folder)?;
let params: ImportParams = data.into_inner().try_into()?;
let view = folder.import(params).await?;
let view_pb = view_pb_without_child_views(view);
data_result_ok(view_pb)
let views = folder.import(params).await?;
data_result_ok(views)
}
#[tracing::instrument(level = "debug", skip(folder), err)]

View File

@ -132,7 +132,7 @@ pub enum FolderEvent {
#[event()]
PermanentlyDeleteAllTrashItem = 27,
#[event(input = "ImportPB", output = "ViewPB")]
#[event(input = "ImportPayloadPB", output = "RepeatedViewPB")]
ImportData = 30,
#[event(input = "WorkspaceIdPB", output = "RepeatedFolderSnapshotPB")]

View File

@ -12,13 +12,13 @@ use crate::manager_observer::{
use crate::notification::{
send_notification, send_workspace_setting_notification, FolderNotification,
};
use crate::share::ImportParams;
use crate::share::{ImportParams, ImportValue};
use crate::util::{
folder_not_init_error, insert_parent_child_views, workspace_data_not_sync_error,
};
use crate::view_operation::{create_view, FolderOperationHandler, FolderOperationHandlers};
use collab::core::collab::{DataSource, MutexCollab};
use collab_entity::CollabType;
use collab_entity::{CollabType, EncodedCollab};
use collab_folder::error::FolderError;
use collab_folder::{
Folder, FolderNotify, Section, SectionItem, TrashInfo, UserId, View, ViewLayout, ViewUpdate,
@ -26,8 +26,8 @@ use collab_folder::{
};
use collab_integrate::collab_builder::{AppFlowyCollabBuilder, CollabBuilderConfig};
use collab_integrate::CollabKVDB;
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_folder_pub::cloud::{gen_view_id, FolderCloudService};
use flowy_error::{internal_error, ErrorCode, FlowyError, FlowyResult};
use flowy_folder_pub::cloud::{gen_view_id, FolderCloudService, FolderCollabParams};
use flowy_folder_pub::folder_builder::ParentChildViews;
use flowy_search_pub::entities::FolderIndexManager;
use flowy_sqlite::kv::KVStorePreferences;
@ -1035,38 +1035,50 @@ impl FolderManager {
Ok(())
}
pub(crate) async fn import(&self, import_data: ImportParams) -> FlowyResult<View> {
let workspace_id = self.user.workspace_id()?;
/// Imports a single file to the folder and returns the encoded collab for immediate cloud sync.
pub(crate) async fn import_single_file(
&self,
parent_view_id: String,
import_data: ImportValue,
) -> FlowyResult<(View, Option<EncodedCollab>)> {
// Ensure either data or file_path is provided
if import_data.data.is_none() && import_data.file_path.is_none() {
return Err(FlowyError::new(
ErrorCode::InvalidParams,
"data or file_path is required",
"Either data or file_path is required",
));
}
let handler = self.get_handler(&import_data.view_layout)?;
let view_id = gen_view_id().to_string();
let uid = self.user.user_id()?;
let mut encoded_collab: Option<EncodedCollab> = None;
// Import data from bytes if available
if let Some(data) = import_data.data {
handler
.import_from_bytes(
uid,
&view_id,
&import_data.name,
import_data.import_type,
data,
)
.await?;
encoded_collab = Some(
handler
.import_from_bytes(
uid,
&view_id,
&import_data.name,
import_data.import_type,
data,
)
.await?,
);
}
// Import data from file path if available
if let Some(file_path) = import_data.file_path {
// TODO(Lucas): return the collab
handler
.import_from_file_path(&view_id, &import_data.name, file_path)
.await?;
}
let params = CreateViewParams {
parent_view_id: import_data.parent_view_id,
parent_view_id,
name: import_data.name,
desc: "".to_string(),
layout: import_data.view_layout.clone().into(),
@ -1081,18 +1093,75 @@ impl FolderManager {
};
let view = create_view(self.user.user_id()?, params, import_data.view_layout);
// Insert the new view into the folder
self.with_folder(
|| (),
|folder| {
folder.insert_view(view.clone(), None);
},
);
Ok((view, encoded_collab))
}
/// Import function to handle the import of data.
pub(crate) async fn import(&self, import_data: ImportParams) -> FlowyResult<RepeatedViewPB> {
let workspace_id = self.user.workspace_id()?;
// Initialize an empty vector to store the objects
let sync_after_create = import_data.sync_after_create;
let mut objects = vec![];
let mut views = vec![];
// Iterate over the values in the import data
for data in import_data.values {
let collab_type = data.import_type.clone().into();
// Import a single file and get the view and encoded collab data
let (view, encoded_collab) = self
.import_single_file(import_data.parent_view_id.clone(), data)
.await?;
let object_id = view.id.clone();
views.push(view_pb_without_child_views(view));
if sync_after_create {
if let Some(encoded_collab) = encoded_collab {
// Try to encode the collaboration data to bytes
let encode_collab_v1 = encoded_collab.encode_to_bytes().map_err(internal_error);
// If the view can't be encoded, skip it and don't block the whole import process
match encode_collab_v1 {
Ok(encode_collab_v1) => objects.push(FolderCollabParams {
object_id,
encoded_collab_v1: encode_collab_v1,
collab_type,
}),
Err(e) => {
error!("import error {}", e)
},
}
}
}
}
// Sync the view to the cloud
if sync_after_create {
self
.cloud_service
.batch_create_folder_collab_objects(&workspace_id, objects)
.await?;
}
// Notify that the parent view has changed
notify_parent_view_did_change(
&workspace_id,
self.mutex_folder.clone(),
vec![view.parent_view_id.clone()],
vec![import_data.parent_view_id],
);
Ok(view)
Ok(RepeatedViewPB { items: views })
}
/// Update the view with the provided view_id using the specified function.

View File

@ -9,11 +9,17 @@ pub enum ImportType {
}
#[derive(Clone, Debug)]
pub struct ImportParams {
pub parent_view_id: String,
pub struct ImportValue {
pub name: String,
pub data: Option<Vec<u8>>,
pub file_path: Option<String>,
pub view_layout: ViewLayout,
pub import_type: ImportType,
}
#[derive(Clone, Debug)]
pub struct ImportParams {
pub parent_view_id: String,
pub values: Vec<ImportValue>,
pub sync_after_create: bool,
}

View File

@ -3,6 +3,7 @@ use std::sync::Arc;
use bytes::Bytes;
use collab_entity::EncodedCollab;
pub use collab_folder::View;
use collab_folder::ViewLayout;
use tokio::sync::RwLock;
@ -78,6 +79,8 @@ pub trait FolderOperationHandler {
) -> FutureResult<(), FlowyError>;
/// Create a view by importing data
///
/// The return value
fn import_from_bytes(
&self,
uid: i64,
@ -85,7 +88,7 @@ pub trait FolderOperationHandler {
name: &str,
import_type: ImportType,
bytes: Vec<u8>,
) -> FutureResult<(), FlowyError>;
) -> FutureResult<EncodedCollab, FlowyError>;
/// Create a view by importing data from a file
fn import_from_file_path(