chore: bump client api (#5217)

* chore: bump client api

* chore: fix compile

* chore: fix compile
This commit is contained in:
Nathan.fooo
2024-04-27 21:55:12 +08:00
committed by GitHub
parent d4278a7549
commit 044dad1d3e
31 changed files with 597 additions and 428 deletions

View File

@ -35,6 +35,7 @@ pub fn migration_anon_user_on_sign_up(
.with_write_txn(|new_collab_w_txn| {
let old_collab_r_txn = old_collab_db.read_txn();
let old_to_new_id_map = Arc::new(Mutex::new(OldToNewIdMap::new()));
migrate_user_awareness(
old_to_new_id_map.lock().deref_mut(),
old_user,
@ -57,7 +58,7 @@ pub fn migration_anon_user_on_sign_up(
// Migration of all objects except the folder and database_with_views
object_ids.retain(|id| {
id != &old_user.session.user_workspace.id
&& id != &old_user.session.user_workspace.workspace_database_object_id
&& id != &old_user.session.user_workspace.database_indexer_id
});
info!("migrate collab objects: {:?}", object_ids.len());
@ -84,7 +85,7 @@ pub fn migration_anon_user_on_sign_up(
// Migrate other collab objects
for object_id in &object_ids {
if let Some(collab) = collab_by_oid.get(object_id) {
let new_object_id = old_to_new_id_map.lock().get_new_id(object_id);
let new_object_id = old_to_new_id_map.lock().exchange_new_id(object_id);
tracing::debug!("migrate from: {}, to: {}", object_id, new_object_id,);
migrate_collab_object(
collab,
@ -109,7 +110,7 @@ impl OldToNewIdMap {
fn new() -> Self {
Self::default()
}
fn get_new_id(&mut self, old_id: &str) -> String {
fn exchange_new_id(&mut self, old_id: &str) -> String {
let view_id = self
.0
.entry(old_id.to_string())
@ -148,7 +149,7 @@ where
{
let database_with_views_collab = Collab::new(
old_user.session.user_id,
&old_user.session.user_workspace.workspace_database_object_id,
&old_user.session.user_workspace.database_indexer_id,
"phantom",
vec![],
false,
@ -156,13 +157,13 @@ where
database_with_views_collab.with_origin_transact_mut(|txn| {
old_collab_r_txn.load_doc_with_txn(
old_user.session.user_id,
&old_user.session.user_workspace.workspace_database_object_id,
&old_user.session.user_workspace.database_indexer_id,
txn,
)
})?;
let new_uid = new_user_session.user_id;
let new_object_id = &new_user_session.user_workspace.workspace_database_object_id;
let new_object_id = &new_user_session.user_workspace.database_indexer_id;
let array = DatabaseMetaList::from_collab(&database_with_views_collab);
for database_meta in array.get_all_database_meta() {
@ -170,9 +171,9 @@ where
let new_linked_views = update
.linked_views
.iter()
.map(|view_id| old_to_new_id_map.get_new_id(view_id))
.map(|view_id| old_to_new_id_map.exchange_new_id(view_id))
.collect();
update.database_id = old_to_new_id_map.get_new_id(&update.database_id);
update.database_id = old_to_new_id_map.exchange_new_id(&update.database_id);
update.linked_views = new_linked_views;
})
}
@ -237,7 +238,7 @@ where
let fav_map = old_fav_map
.into_iter()
.map(|mut item| {
let new_view_id = old_to_new_id_map.get_new_id(&item.id);
let new_view_id = old_to_new_id_map.exchange_new_id(&item.id);
item.id = new_view_id;
item
})
@ -248,7 +249,7 @@ where
let trash_map = old_trash_map
.into_iter()
.map(|mut item| {
let new_view_id = old_to_new_id_map.get_new_id(&item.id);
let new_view_id = old_to_new_id_map.exchange_new_id(&item.id);
item.id = new_view_id;
item
})
@ -260,7 +261,7 @@ where
let recent_map = old_recent_map
.into_iter()
.map(|mut item| {
let new_view_id = old_to_new_id_map.get_new_id(&item.id);
let new_view_id = old_to_new_id_map.exchange_new_id(&item.id);
item.id = new_view_id;
item
})
@ -279,19 +280,19 @@ where
.child_views
.iter_mut()
.for_each(|view_identifier| {
view_identifier.id = old_to_new_id_map.get_new_id(&view_identifier.id);
view_identifier.id = old_to_new_id_map.exchange_new_id(&view_identifier.id);
});
folder_data.views.iter_mut().for_each(|view| {
// 2. replace the old parent view id of the view
view.parent_view_id = old_to_new_id_map.get_new_id(&view.parent_view_id);
view.parent_view_id = old_to_new_id_map.exchange_new_id(&view.parent_view_id);
// 3. replace the old id of the view
view.id = old_to_new_id_map.get_new_id(&view.id);
view.id = old_to_new_id_map.exchange_new_id(&view.id);
// 4. replace the old id of the children views
view.children.iter_mut().for_each(|view_identifier| {
view_identifier.id = old_to_new_id_map.get_new_id(&view_identifier.id);
view_identifier.id = old_to_new_id_map.exchange_new_id(&view_identifier.id);
});
});
@ -349,7 +350,8 @@ where
{
// Migrate databases
let mut database_object_ids = vec![];
let database_row_object_ids = RwLock::new(HashSet::new());
let imported_database_row_object_ids: RwLock<HashMap<String, HashSet<String>>> =
RwLock::new(HashMap::new());
for object_id in &mut *object_ids {
if let Some(collab) = collab_by_oid.get(object_id) {
@ -359,14 +361,17 @@ where
database_object_ids.push(object_id.clone());
reset_inline_view_id(collab, |old_inline_view_id| {
old_to_new_id_map.lock().get_new_id(&old_inline_view_id)
old_to_new_id_map
.lock()
.exchange_new_id(&old_inline_view_id)
});
mut_database_views_with_collab(collab, |database_view| {
let new_view_id = old_to_new_id_map.lock().get_new_id(&database_view.id);
let old_database_id = database_view.database_id.clone();
let new_view_id = old_to_new_id_map.lock().exchange_new_id(&database_view.id);
let new_database_id = old_to_new_id_map
.lock()
.get_new_id(&database_view.database_id);
.exchange_new_id(&database_view.database_id);
tracing::trace!(
"migrate database view id from: {}, to: {}",
@ -384,7 +389,7 @@ where
database_view.row_orders.iter_mut().for_each(|row_order| {
let old_row_id = String::from(row_order.id.clone());
let old_row_document_id = database_row_document_id_from_row_id(&old_row_id);
let new_row_id = old_to_new_id_map.lock().get_new_id(&old_row_id);
let new_row_id = old_to_new_id_map.lock().exchange_new_id(&old_row_id);
let new_row_document_id = database_row_document_id_from_row_id(&new_row_id);
tracing::debug!("migrate row id: {} to {}", row_order.id, new_row_id);
tracing::debug!(
@ -397,11 +402,15 @@ where
.insert(old_row_document_id, new_row_document_id);
row_order.id = RowId::from(new_row_id);
database_row_object_ids.write().insert(old_row_id);
imported_database_row_object_ids
.write()
.entry(old_database_id.clone())
.or_default()
.insert(old_row_id);
});
});
let new_object_id = old_to_new_id_map.lock().get_new_id(object_id);
let new_object_id = old_to_new_id_map.lock().exchange_new_id(object_id);
tracing::debug!(
"migrate database from: {}, to: {}",
object_id,
@ -415,29 +424,48 @@ where
);
}
}
let imported_database_row_object_ids = imported_database_row_object_ids.read();
// remove the database object ids from the object ids
object_ids.retain(|id| !database_object_ids.contains(id));
let database_row_object_ids = database_row_object_ids.read();
for object_id in &*database_row_object_ids {
if let Some(collab) = collab_by_oid.get(object_id) {
let new_object_id = old_to_new_id_map.lock().get_new_id(object_id);
tracing::info!(
"migrate database row from: {}, to: {}",
object_id,
new_object_id,
);
mut_row_with_collab(collab, |row_update| {
row_update.set_row_id(RowId::from(new_object_id.clone()));
});
migrate_collab_object(
collab,
new_user_session.user_id,
&new_object_id,
new_collab_w_txn,
);
// remove database row object ids from the imported object ids
object_ids.retain(|id| {
!imported_database_row_object_ids
.values()
.flatten()
.any(|row_id| row_id == id)
});
for (database_id, imported_row_ids) in &*imported_database_row_object_ids {
for imported_row_id in imported_row_ids {
if let Some(imported_collab) = collab_by_oid.get(imported_row_id) {
let new_database_id = old_to_new_id_map.lock().exchange_new_id(database_id);
let new_row_id = old_to_new_id_map.lock().exchange_new_id(imported_row_id);
info!(
"import database row from: {}, to: {}",
imported_row_id, new_row_id,
);
mut_row_with_collab(imported_collab, |row_update| {
row_update.set_row_id(RowId::from(new_row_id.clone()), new_database_id.clone());
});
migrate_collab_object(
imported_collab,
new_user_session.user_id,
&new_row_id,
new_collab_w_txn,
);
}
// imported_collab_by_oid contains all the collab object ids, including the row document collab object ids.
// So, if the id exist in the imported_collab_by_oid, it means the row document collab object is exist.
let imported_row_document_id = database_row_document_id_from_row_id(imported_row_id);
if collab_by_oid.get(&imported_row_document_id).is_some() {
let _ = old_to_new_id_map
.lock()
.exchange_new_id(&imported_row_document_id);
}
}
}
object_ids.retain(|id| !database_row_object_ids.contains(id));
Ok(())
}
@ -456,7 +484,7 @@ where
let collab = Collab::new(
old_user.session.user_id,
object_id,
"phantom",
"migrate_device",
vec![],
false,
);

View File

@ -43,7 +43,7 @@ pub async fn sync_supabase_user_data_to_cloud(
uid,
&workspace_id,
device_id,
&new_user_session.user_workspace.workspace_database_object_id,
&new_user_session.user_workspace.database_indexer_id,
collab_db,
user_service.clone(),
)

View File

@ -14,7 +14,7 @@ use crate::notification::{send_notification, UserNotification};
use crate::services::cloud_config::{
get_cloud_config, get_or_create_cloud_config, save_cloud_config,
};
use crate::services::data_import::get_appflowy_data_folder_import_context;
use crate::services::data_import::prepare_import;
use crate::user_manager::UserManager;
fn upgrade_manager(manager: AFPluginState<Weak<UserManager>>) -> FlowyResult<Arc<UserManager>> {
@ -266,10 +266,11 @@ pub async fn import_appflowy_data_folder_handler(
af_spawn(async move {
let result = async {
let manager = upgrade_manager(manager)?;
let context = get_appflowy_data_folder_import_context(&data.path)
let imported_folder = prepare_import(&data.path)
.map_err(|err| FlowyError::new(ErrorCode::AppFlowyDataFolderImportError, err.to_string()))?
.with_container_name(data.import_container_name);
manager.import_appflowy_data_folder(context).await?;
manager.perform_import(imported_folder).await?;
Ok::<(), FlowyError>(())
}
.await;

View File

@ -69,7 +69,7 @@ impl AuthenticateUser {
pub fn workspace_database_object_id(&self) -> FlowyResult<String> {
let session = self.get_session()?;
Ok(session.user_workspace.workspace_database_object_id.clone())
Ok(session.user_workspace.database_indexer_id.clone())
}
pub fn get_collab_db(&self, uid: i64) -> FlowyResult<Weak<CollabKVDB>> {

View File

@ -1,4 +1,5 @@
use crate::migrations::session_migration::migrate_session_with_user_uuid;
use crate::services::data_import::importer::load_collab_by_oid;
use crate::services::db::UserDBPath;
use crate::services::entities::UserPaths;
@ -20,6 +21,7 @@ use collab_entity::CollabType;
use collab_folder::{Folder, UserId, View, ViewIdentifier, ViewLayout};
use collab_integrate::{CollabKVAction, CollabKVDB, PersistenceError};
use collab_plugins::local_storage::kv::KVTransactionDB;
use flowy_error::FlowyError;
use flowy_folder_pub::cloud::gen_view_id;
use flowy_folder_pub::entities::{AppFlowyData, ImportData};
@ -35,20 +37,27 @@ use std::path::Path;
use std::sync::{Arc, Weak};
use tracing::{debug, error, event, info, instrument, warn};
pub(crate) struct ImportContext {
pub(crate) struct ImportedFolder {
pub imported_session: Session,
pub imported_collab_db: Arc<CollabKVDB>,
pub container_name: Option<String>,
pub source: ImportedSource,
}
impl ImportContext {
#[derive(Clone)]
pub(crate) enum ImportedSource {
ExternalFolder,
AnonUser,
}
impl ImportedFolder {
pub fn with_container_name(mut self, container_name: Option<String>) -> Self {
self.container_name = container_name;
self
}
}
pub(crate) fn get_appflowy_data_folder_import_context(path: &str) -> anyhow::Result<ImportContext> {
pub(crate) fn prepare_import(path: &str) -> anyhow::Result<ImportedFolder> {
if !Path::new(path).exists() {
return Err(anyhow!("The path: {} is not exist", path));
}
@ -83,70 +92,106 @@ pub(crate) fn get_appflowy_data_folder_import_context(path: &str) -> anyhow::Res
None,
);
Ok(ImportContext {
Ok(ImportedFolder {
imported_session,
imported_collab_db,
container_name: None,
source: ImportedSource::ExternalFolder,
})
}
#[allow(dead_code)]
fn migrate_user_awareness(
old_to_new_id_map: &mut OldToNewIdMap,
old_user_session: &Session,
new_user_session: &Session,
) -> Result<(), PersistenceError> {
let old_uid = old_user_session.user_id;
let new_uid = new_user_session.user_id;
old_to_new_id_map.insert(old_uid.to_string(), new_uid.to_string());
Ok(())
}
/// This path refers to the directory where AppFlowy stores its data. The directory structure is as follows:
/// root folder:
/// - cache.db
/// - log (log files with unique identifiers)
/// - 2761499 (other relevant files or directories, identified by unique numbers)
pub(crate) fn import_appflowy_data_folder(
session: &Session,
pub(crate) fn generate_import_data(
current_session: &Session,
workspace_id: &str,
collab_db: &Arc<CollabKVDB>,
import_context: ImportContext,
imported_folder: ImportedFolder,
) -> anyhow::Result<ImportData> {
let imported_session = import_context.imported_session;
let imported_collab_db = import_context.imported_collab_db;
let container_name = import_context.container_name;
let imported_session = imported_folder.imported_session.clone();
let imported_collab_db = imported_folder.imported_collab_db.clone();
let imported_container_view_name = imported_folder.container_name.clone();
let mut database_view_ids_by_database_id: HashMap<String, Vec<String>> = HashMap::new();
let row_object_ids = Mutex::new(HashSet::new());
let row_document_object_ids = Mutex::new(HashSet::new());
let document_object_ids = Mutex::new(HashSet::new());
let database_object_ids = Mutex::new(HashSet::new());
let import_container_view_id = match &container_name {
None => workspace_id.to_string(),
Some(_) => gen_view_id().to_string(),
// All the imported views will be attached to the container view. If the container view name is not provided,
// the container view will be the workspace, which mean the root of the workspace.
let import_container_view_id = match imported_folder.source {
ImportedSource::ExternalFolder => match &imported_container_view_name {
None => workspace_id.to_string(),
Some(_) => gen_view_id().to_string(),
},
ImportedSource::AnonUser => workspace_id.to_string(),
};
let views = collab_db.with_write_txn(|collab_write_txn| {
let imported_collab_read_txn = imported_collab_db.read_txn();
// use the old_to_new_id_map to keep track of the other collab object id and the new collab object id
let old_to_new_id_map = Arc::new(Mutex::new(OldToNewIdMap::new()));
// 1. Get all the imported collab object ids
let mut all_imported_object_ids = imported_collab_read_txn
.get_all_docs()
.map(|iter| iter.collect::<Vec<String>>())
.unwrap_or_default();
// when doing import, we don't want to import the user workspace, database view tracker and the user awareness
all_imported_object_ids.retain(|id| id != &imported_session.user_workspace.id);
all_imported_object_ids
.retain(|id| id != &imported_session.user_workspace.workspace_database_object_id);
// when doing import, we don't want to import these objects:
// 1. user workspace
// 2. database view tracker
// 3. the user awareness
// So we remove these object ids from the list
let user_workspace_id = &imported_session.user_workspace.id;
let database_indexer_id = &imported_session.user_workspace.database_indexer_id;
let user_awareness_id =
user_awareness_object_id(&imported_session.user_uuid, user_workspace_id).to_string();
all_imported_object_ids.retain(|id| {
id != &user_awareness_object_id(
&imported_session.user_uuid,
&imported_session.user_workspace.id,
)
.to_string()
id != user_workspace_id && id != database_indexer_id && id != &user_awareness_id
});
// import database view tracker
migrate_database_view_tracker(
&mut old_to_new_id_map.lock(),
&imported_session,
&imported_collab_read_txn,
&mut database_view_ids_by_database_id,
&database_object_ids,
)?;
match imported_folder.source {
ImportedSource::ExternalFolder => {
// 2. mapping the database indexer ids
mapping_database_indexer_ids(
&mut old_to_new_id_map.lock(),
&imported_session,
&imported_collab_read_txn,
&mut database_view_ids_by_database_id,
&database_object_ids,
)?;
},
ImportedSource::AnonUser => {
// 2. migrate the database with views object
migrate_database_with_views_object(
&mut old_to_new_id_map.lock(),
&imported_session,
&imported_collab_read_txn,
current_session,
collab_write_txn,
)?;
},
}
// remove the database view ids from the object ids. Because there are no collab object for the database view
// remove the database view ids from the object ids. Because there are no physical collab object
// for the database view
let database_view_ids: Vec<String> = database_view_ids_by_database_id
.values()
.flatten()
@ -154,99 +199,74 @@ pub(crate) fn import_appflowy_data_folder(
.collect();
all_imported_object_ids.retain(|id| !database_view_ids.contains(id));
// load other collab objects
// 3. load imported collab objects data.
let imported_collab_by_oid = load_collab_by_oid(
imported_session.user_id,
&imported_collab_read_txn,
&all_imported_object_ids,
);
// import the database
migrate_databases(
&old_to_new_id_map,
session,
current_session,
collab_write_txn,
&mut all_imported_object_ids,
&imported_collab_by_oid,
&row_object_ids,
&row_document_object_ids,
)?;
debug!(
"import row document ids: {:?}",
row_document_object_ids
.lock()
.iter()
.collect::<Vec<&String>>()
);
// the object ids now only contains the document collab object ids
for object_id in &all_imported_object_ids {
if let Some(imported_collab) = imported_collab_by_oid.get(object_id) {
let new_object_id = old_to_new_id_map.lock().renew_id(object_id);
let new_object_id = old_to_new_id_map.lock().exchange_new_id(object_id);
document_object_ids.lock().insert(new_object_id.clone());
debug!("import from: {}, to: {}", object_id, new_object_id,);
write_collab_object(
imported_collab,
session.user_id,
current_session.user_id,
&new_object_id,
collab_write_txn,
);
}
}
// create a root view that contains all the views
let (mut child_views, orphan_views) = import_workspace_views(
// Update the parent view IDs of all top-level views to match the new container view ID, making
// them child views of the container. This ensures that the hierarchy within the imported
// structure is correctly maintained.
let (mut child_views, orphan_views) = mapping_folder_views(
&import_container_view_id,
&mut old_to_new_id_map.lock(),
&imported_session,
&imported_collab_read_txn,
)?;
match container_name {
None => {
match imported_folder.source {
ImportedSource::ExternalFolder => match imported_container_view_name {
None => {
child_views.extend(orphan_views);
Ok(child_views)
},
Some(container_name) => {
// create a new view with given name and then attach views to it
attach_to_new_view(
current_session,
&document_object_ids,
&import_container_view_id,
collab_write_txn,
child_views,
orphan_views,
container_name,
)
},
},
ImportedSource::AnonUser => {
child_views.extend(orphan_views);
Ok(child_views)
},
Some(container_name) => {
let name = if container_name.is_empty() {
format!(
"import_{}",
chrono::Local::now().format("%Y-%m-%d %H:%M:%S")
)
} else {
container_name
};
// create the content for the container view
let import_container_doc_state = default_document_collab_data(&import_container_view_id)
.map_err(|err| PersistenceError::InvalidData(err.to_string()))?
.doc_state
.to_vec();
import_collab_object_with_doc_state(
import_container_doc_state,
session.user_id,
&import_container_view_id,
collab_write_txn,
)?;
document_object_ids
.lock()
.insert(import_container_view_id.clone());
let mut import_container_views =
vec![
ViewBuilder::new(session.user_id, session.user_workspace.id.clone())
.with_view_id(import_container_view_id)
.with_layout(ViewLayout::Document)
.with_name(name)
.with_child_views(child_views)
.build(),
];
import_container_views.extend(orphan_views);
Ok(import_container_views)
},
}
})?;
Ok(ImportData::AppFlowyDataFolder {
items: vec![
AppFlowyData::Folder {
@ -261,11 +281,61 @@ pub(crate) fn import_appflowy_data_folder(
],
})
}
fn attach_to_new_view<'a, W>(
current_session: &Session,
document_object_ids: &Mutex<HashSet<String>>,
import_container_view_id: &str,
collab_write_txn: &'a W,
child_views: Vec<ParentChildViews>,
orphan_views: Vec<ParentChildViews>,
container_name: String,
) -> Result<Vec<ParentChildViews>, PersistenceError>
where
W: CollabKVAction<'a>,
PersistenceError: From<W::Error>,
{
let name = if container_name.is_empty() {
format!(
"import_{}",
chrono::Local::now().format("%Y-%m-%d %H:%M:%S")
)
} else {
container_name
};
fn migrate_database_view_tracker<'a, W>(
// create the content for the container view
let import_container_doc_state = default_document_collab_data(import_container_view_id)
.map_err(|err| PersistenceError::InvalidData(err.to_string()))?
.doc_state
.to_vec();
import_collab_object_with_doc_state(
import_container_doc_state,
current_session.user_id,
import_container_view_id,
collab_write_txn,
)?;
document_object_ids
.lock()
.insert(import_container_view_id.to_string());
let mut import_container_views = vec![ViewBuilder::new(
current_session.user_id,
current_session.user_workspace.id.clone(),
)
.with_view_id(import_container_view_id)
.with_layout(ViewLayout::Document)
.with_name(name)
.with_child_views(child_views)
.build()];
import_container_views.extend(orphan_views);
Ok(import_container_views)
}
fn mapping_database_indexer_ids<'a, W>(
old_to_new_id_map: &mut OldToNewIdMap,
other_session: &Session,
other_collab_read_txn: &W,
imported_session: &Session,
imported_collab_read_txn: &W,
database_view_ids_by_database_id: &mut HashMap<String, Vec<String>>,
database_object_ids: &Mutex<HashSet<String>>,
) -> Result<(), PersistenceError>
@ -273,29 +343,29 @@ where
W: CollabKVAction<'a>,
PersistenceError: From<W::Error>,
{
let database_view_tracker_collab = Collab::new(
other_session.user_id,
&other_session.user_workspace.workspace_database_object_id,
"phantom",
let imported_database_indexer = Collab::new(
imported_session.user_id,
&imported_session.user_workspace.database_indexer_id,
"import_device",
vec![],
false,
);
database_view_tracker_collab.with_origin_transact_mut(|txn| {
other_collab_read_txn.load_doc_with_txn(
other_session.user_id,
&other_session.user_workspace.workspace_database_object_id,
imported_database_indexer.with_origin_transact_mut(|txn| {
imported_collab_read_txn.load_doc_with_txn(
imported_session.user_id,
&imported_session.user_workspace.database_indexer_id,
txn,
)
})?;
let array = DatabaseMetaList::from_collab(&database_view_tracker_collab);
for database_meta in array.get_all_database_meta() {
let array = DatabaseMetaList::from_collab(&imported_database_indexer);
for database_meta_list in array.get_all_database_meta() {
database_view_ids_by_database_id.insert(
old_to_new_id_map.renew_id(&database_meta.database_id),
database_meta
old_to_new_id_map.exchange_new_id(&database_meta_list.database_id),
database_meta_list
.linked_views
.into_iter()
.map(|view_id| old_to_new_id_map.renew_id(&view_id))
.map(|view_id| old_to_new_id_map.exchange_new_id(&view_id))
.collect(),
);
}
@ -308,6 +378,59 @@ where
Ok(())
}
fn migrate_database_with_views_object<'a, 'b, W, R>(
old_to_new_id_map: &mut OldToNewIdMap,
old_user_session: &Session,
old_collab_r_txn: &R,
new_user_session: &Session,
new_collab_w_txn: &W,
) -> Result<(), PersistenceError>
where
'a: 'b,
W: CollabKVAction<'a>,
R: CollabKVAction<'b>,
PersistenceError: From<W::Error>,
PersistenceError: From<R::Error>,
{
let database_with_views_collab = Collab::new(
old_user_session.user_id,
&old_user_session.user_workspace.database_indexer_id,
"migrate_device",
vec![],
false,
);
database_with_views_collab.with_origin_transact_mut(|txn| {
old_collab_r_txn.load_doc_with_txn(
old_user_session.user_id,
&old_user_session.user_workspace.database_indexer_id,
txn,
)
})?;
let new_uid = new_user_session.user_id;
let new_object_id = &new_user_session.user_workspace.database_indexer_id;
let array = DatabaseMetaList::from_collab(&database_with_views_collab);
for database_meta in array.get_all_database_meta() {
array.update_database(&database_meta.database_id, |update| {
let new_linked_views = update
.linked_views
.iter()
.map(|view_id| old_to_new_id_map.exchange_new_id(view_id))
.collect();
update.database_id = old_to_new_id_map.exchange_new_id(&update.database_id);
update.linked_views = new_linked_views;
})
}
let txn = database_with_views_collab.transact();
if let Err(err) = new_collab_w_txn.create_new_doc(new_uid, new_object_id, &txn) {
error!("🔴migrate database storage failed: {:?}", err);
}
drop(txn);
Ok(())
}
fn migrate_databases<'a, W>(
old_to_new_id_map: &Arc<Mutex<OldToNewIdMap>>,
session: &Session,
@ -315,15 +438,16 @@ fn migrate_databases<'a, W>(
imported_object_ids: &mut Vec<String>,
imported_collab_by_oid: &HashMap<String, Collab>,
row_object_ids: &Mutex<HashSet<String>>,
row_document_object_ids: &Mutex<HashSet<String>>,
) -> Result<(), PersistenceError>
where
W: CollabKVAction<'a>,
PersistenceError: From<W::Error>,
{
// Migrate databases
let row_document_object_ids = Mutex::new(HashSet::new());
let mut database_object_ids = vec![];
let imported_database_row_object_ids = RwLock::new(HashSet::new());
let imported_database_row_object_ids: RwLock<HashMap<String, HashSet<String>>> =
RwLock::new(HashMap::new());
for object_id in &mut *imported_object_ids {
if let Some(database_collab) = imported_collab_by_oid.get(object_id) {
@ -333,21 +457,24 @@ where
database_object_ids.push(object_id.clone());
reset_inline_view_id(database_collab, |old_inline_view_id| {
old_to_new_id_map.lock().renew_id(&old_inline_view_id)
old_to_new_id_map
.lock()
.exchange_new_id(&old_inline_view_id)
});
mut_database_views_with_collab(database_collab, |database_view| {
let new_view_id = old_to_new_id_map.lock().renew_id(&database_view.id);
let new_view_id = old_to_new_id_map.lock().exchange_new_id(&database_view.id);
let old_database_id = database_view.database_id.clone();
let new_database_id = old_to_new_id_map
.lock()
.renew_id(&database_view.database_id);
.exchange_new_id(&database_view.database_id);
database_view.id = new_view_id;
database_view.database_id = new_database_id;
database_view.row_orders.iter_mut().for_each(|row_order| {
let old_row_id = String::from(row_order.id.clone());
let old_row_document_id = database_row_document_id_from_row_id(&old_row_id);
let new_row_id = old_to_new_id_map.lock().renew_id(&old_row_id);
let new_row_id = old_to_new_id_map.lock().exchange_new_id(&old_row_id);
// The row document might not exist in the database row. But by querying the old_row_document_id,
// we can know the document of the row is exist or not.
let new_row_document_id = database_row_document_id_from_row_id(&new_row_id);
@ -357,7 +484,12 @@ where
.insert(old_row_document_id.clone(), new_row_document_id);
row_order.id = RowId::from(new_row_id);
imported_database_row_object_ids.write().insert(old_row_id);
imported_database_row_object_ids
.write()
.entry(old_database_id.clone())
.or_default()
.insert(old_row_id);
});
// collect the ids
@ -369,7 +501,7 @@ where
row_object_ids.lock().extend(new_row_ids);
});
let new_object_id = old_to_new_id_map.lock().renew_id(object_id);
let new_object_id = old_to_new_id_map.lock().exchange_new_id(object_id);
debug!(
"migrate database from: {}, to: {}",
object_id, new_object_id,
@ -386,38 +518,59 @@ where
// remove the database object ids from the object ids
imported_object_ids.retain(|id| !database_object_ids.contains(id));
imported_object_ids.retain(|id| !imported_database_row_object_ids.contains(id));
for imported_row_id in &*imported_database_row_object_ids {
if let Some(imported_collab) = imported_collab_by_oid.get(imported_row_id) {
let new_row_id = old_to_new_id_map.lock().renew_id(imported_row_id);
info!(
"import database row from: {}, to: {}",
imported_row_id, new_row_id,
);
mut_row_with_collab(imported_collab, |row_update| {
row_update.set_row_id(RowId::from(new_row_id.clone()));
});
write_collab_object(
imported_collab,
session.user_id,
&new_row_id,
collab_write_txn,
);
}
// remove database row object ids from the imported object ids
imported_object_ids.retain(|id| {
!imported_database_row_object_ids
.values()
.flatten()
.any(|row_id| row_id == id)
});
// imported_collab_by_oid contains all the collab object ids, including the row document collab object ids.
// So, if the id exist in the imported_collab_by_oid, it means the row document collab object is exist.
let imported_row_document_id = database_row_document_id_from_row_id(imported_row_id);
if imported_collab_by_oid
.get(&imported_row_document_id)
.is_some()
{
let new_row_document_id = old_to_new_id_map.lock().renew_id(&imported_row_document_id);
row_document_object_ids.lock().insert(new_row_document_id);
for (database_id, imported_row_ids) in &*imported_database_row_object_ids {
for imported_row_id in imported_row_ids {
if let Some(imported_collab) = imported_collab_by_oid.get(imported_row_id) {
let new_database_id = old_to_new_id_map.lock().exchange_new_id(database_id);
let new_row_id = old_to_new_id_map.lock().exchange_new_id(imported_row_id);
info!(
"import database row from: {}, to: {}",
imported_row_id, new_row_id,
);
mut_row_with_collab(imported_collab, |row_update| {
row_update.set_row_id(RowId::from(new_row_id.clone()), new_database_id.clone());
});
write_collab_object(
imported_collab,
session.user_id,
&new_row_id,
collab_write_txn,
);
}
// imported_collab_by_oid contains all the collab object ids, including the row document collab object ids.
// So, if the id exist in the imported_collab_by_oid, it means the row document collab object is exist.
let imported_row_document_id = database_row_document_id_from_row_id(imported_row_id);
if imported_collab_by_oid
.get(&imported_row_document_id)
.is_some()
{
let new_row_document_id = old_to_new_id_map
.lock()
.exchange_new_id(&imported_row_document_id);
row_document_object_ids.lock().insert(new_row_document_id);
}
}
}
debug!(
"import row document ids: {:?}",
row_document_object_ids
.lock()
.iter()
.collect::<Vec<&String>>()
);
Ok(())
}
@ -476,58 +629,59 @@ where
Ok(())
}
fn import_workspace_views<'a, W>(
parent_view_id: &str,
fn mapping_folder_views<'a, W>(
root_view_id: &str,
old_to_new_id_map: &mut OldToNewIdMap,
other_session: &Session,
other_collab_read_txn: &W,
imported_session: &Session,
imported_collab_read_txn: &W,
) -> Result<(Vec<ParentChildViews>, Vec<ParentChildViews>), PersistenceError>
where
W: CollabKVAction<'a>,
PersistenceError: From<W::Error>,
{
let other_folder_collab = Collab::new(
other_session.user_id,
&other_session.user_workspace.id,
"phantom",
let imported_folder_collab = Collab::new(
imported_session.user_id,
&imported_session.user_workspace.id,
"migrate_device",
vec![],
false,
);
other_folder_collab.with_origin_transact_mut(|txn| {
other_collab_read_txn.load_doc_with_txn(
other_session.user_id,
&other_session.user_workspace.id,
imported_folder_collab.with_origin_transact_mut(|txn| {
imported_collab_read_txn.load_doc_with_txn(
imported_session.user_id,
&imported_session.user_workspace.id,
txn,
)
})?;
let other_user_id = UserId::from(other_session.user_id);
let other_folder = Folder::open(
let other_user_id = UserId::from(imported_session.user_id);
let imported_folder = Folder::open(
other_user_id,
Arc::new(MutexCollab::new(other_folder_collab)),
Arc::new(MutexCollab::new(imported_folder_collab)),
None,
)
.map_err(|err| PersistenceError::InvalidData(err.to_string()))?;
let other_folder_data = other_folder
.get_folder_data(&other_session.user_workspace.id)
let imported_folder_data = imported_folder
.get_folder_data(&imported_session.user_workspace.id)
.ok_or(PersistenceError::Internal(anyhow!(
"Can't read the folder data"
)))?;
// replace the old parent view id of the workspace
old_to_new_id_map.0.insert(
other_session.user_workspace.id.clone(),
parent_view_id.to_string(),
imported_session.user_workspace.id.clone(),
root_view_id.to_string(),
);
let trash_ids = other_folder_data
let trash_ids = imported_folder_data
.trash
.into_values()
.flatten()
.map(|item| old_to_new_id_map.renew_id(&item.id))
.map(|item| old_to_new_id_map.exchange_new_id(&item.id))
.collect::<Vec<String>>();
// 1. Replace the workspace views id to new id
let mut first_level_views = other_folder_data
// 1. Replace the views id with new view id
let mut first_level_views = imported_folder_data
.workspace
.child_views
.items
@ -536,20 +690,20 @@ where
.collect::<Vec<ViewIdentifier>>();
first_level_views.iter_mut().for_each(|view_identifier| {
view_identifier.id = old_to_new_id_map.renew_id(&view_identifier.id);
view_identifier.id = old_to_new_id_map.exchange_new_id(&view_identifier.id);
});
let mut all_views = other_folder_data.views;
let mut all_views = imported_folder_data.views;
all_views.iter_mut().for_each(|view| {
// 2. replace the old parent view id of the view
view.parent_view_id = old_to_new_id_map.renew_id(&view.parent_view_id);
view.parent_view_id = old_to_new_id_map.exchange_new_id(&view.parent_view_id);
// 3. replace the old id of the view
view.id = old_to_new_id_map.renew_id(&view.id);
view.id = old_to_new_id_map.exchange_new_id(&view.id);
// 4. replace the old id of the children views
view.children.iter_mut().for_each(|view_identifier| {
view_identifier.id = old_to_new_id_map.renew_id(&view_identifier.id);
view_identifier.id = old_to_new_id_map.exchange_new_id(&view_identifier.id);
});
});
@ -559,6 +713,7 @@ where
.map(|view| (view.id.clone(), view))
.collect::<HashMap<String, View>>();
// 5. create the parent views. Each parent view contains the children views.
let parent_views = first_level_views
.into_iter()
.flat_map(
@ -569,7 +724,7 @@ where
)
.collect::<Vec<ParentChildViews>>();
// the views in the all_views_map now, should be the orphan views
// 6. after the parent views are created, the all_views_map only contains the orphan views
debug!("create orphan views: {:?}", all_views_map.keys());
let mut orphan_views = vec![];
for orphan_view in all_views_map.into_values() {
@ -610,7 +765,7 @@ impl OldToNewIdMap {
fn new() -> Self {
Self::default()
}
fn renew_id(&mut self, old_id: &str) -> String {
fn exchange_new_id(&mut self, old_id: &str) -> String {
let view_id = self
.0
.entry(old_id.to_string())

View File

@ -1,25 +1,8 @@
use crate::services::data_import::appflowy_data_import::import_appflowy_data_folder;
use collab_integrate::{CollabKVAction, CollabKVDB, PersistenceError};
use flowy_user_pub::session::Session;
use std::collections::HashMap;
use crate::services::data_import::ImportContext;
use collab::preclude::Collab;
use flowy_folder_pub::entities::ImportData;
use std::sync::Arc;
use collab_integrate::{CollabKVAction, PersistenceError};
use std::collections::HashMap;
use tracing::instrument;
/// Import appflowy data from the given path.
/// If the container name is not empty, then the data will be imported to the given container.
/// Otherwise, the data will be imported to the current workspace.
pub(crate) fn import_data(
session: &Session,
context: ImportContext,
collab_db: Arc<CollabKVDB>,
) -> anyhow::Result<ImportData> {
import_appflowy_data_folder(session, &session.user_workspace.id, &collab_db, context)
}
#[instrument(level = "debug", skip_all)]
pub fn load_collab_by_oid<'a, R>(
uid: i64,

View File

@ -81,7 +81,7 @@ impl TryFrom<(i64, &UserWorkspace)> for UserWorkspaceTable {
if value.1.id.is_empty() {
return Err(FlowyError::invalid_data().with_context("The id is empty"));
}
if value.1.workspace_database_object_id.is_empty() {
if value.1.database_indexer_id.is_empty() {
return Err(FlowyError::invalid_data().with_context("The database storage id is empty"));
}
@ -90,7 +90,7 @@ impl TryFrom<(i64, &UserWorkspace)> for UserWorkspaceTable {
name: value.1.name.clone(),
uid: value.0,
created_at: value.1.created_at.timestamp(),
database_storage_id: value.1.workspace_database_object_id.clone(),
database_storage_id: value.1.database_indexer_id.clone(),
icon: value.1.icon.clone(),
})
}
@ -105,7 +105,7 @@ impl From<UserWorkspaceTable> for UserWorkspace {
.timestamp_opt(value.created_at, 0)
.single()
.unwrap_or_default(),
workspace_database_object_id: value.database_storage_id,
database_indexer_id: value.database_storage_id,
icon: value.icon,
}
}

View File

@ -2,7 +2,7 @@ use collab_integrate::collab_builder::AppFlowyCollabBuilder;
use collab_integrate::CollabKVDB;
use collab_user::core::MutexUserAwareness;
use flowy_error::{internal_error, ErrorCode, FlowyResult};
use flowy_folder_pub::entities::ImportData;
use flowy_server_pub::AuthenticatorType;
use flowy_sqlite::kv::StorePreferences;
use flowy_sqlite::schema::user_table;
@ -36,8 +36,6 @@ use crate::migrations::AnonUser;
use crate::services::authenticate_user::AuthenticateUser;
use crate::services::cloud_config::get_cloud_config;
use crate::services::collab_interact::{CollabInteract, DefaultCollabInteract};
use crate::services::data_import::importer::import_data;
use crate::services::data_import::ImportContext;
use crate::services::sqlite_sql::user_sql::{select_user_profile, UserTable, UserTableChangeset};
use crate::user_manager::manager_user_encryption::validate_encryption_sign;
@ -791,24 +789,6 @@ impl UserManager {
)?;
Ok(())
}
pub(crate) async fn import_appflowy_data(
&self,
context: ImportContext,
) -> Result<ImportData, FlowyError> {
let session = self.get_session()?;
let user_collab_db = self
.authenticate_user
.database
.get_collab_db(session.user_id)?;
let import_data = tokio::task::spawn_blocking(move || {
import_data(&session, context, user_collab_db)
.map_err(|err| FlowyError::new(ErrorCode::AppFlowyDataFolderImportError, err.to_string()))
})
.await
.map_err(internal_error)??;
Ok(import_data)
}
}
fn current_authenticator() -> Authenticator {

View File

@ -5,7 +5,7 @@ use collab_entity::{CollabObject, CollabType};
use collab_integrate::CollabKVDB;
use tracing::{error, info, instrument, warn};
use flowy_error::{FlowyError, FlowyResult};
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_folder_pub::entities::{AppFlowyData, ImportData};
use flowy_sqlite::schema::user_workspace_table;
use flowy_sqlite::{query_dsl::*, DBConnection, ExpressionMethods};
@ -17,7 +17,9 @@ use lib_dispatch::prelude::af_spawn;
use crate::entities::{RepeatedUserWorkspacePB, ResetWorkspacePB};
use crate::migrations::AnonUser;
use crate::notification::{send_notification, UserNotification};
use crate::services::data_import::{upload_collab_objects_data, ImportContext};
use crate::services::data_import::{
generate_import_data, upload_collab_objects_data, ImportedFolder, ImportedSource,
};
use crate::services::sqlite_sql::workspace_sql::{
get_all_user_workspace_op, get_user_workspace_op, insert_new_workspaces_op, UserWorkspaceTable,
};
@ -29,16 +31,31 @@ impl UserManager {
/// If the container name is not empty, then the data will be imported to the given container.
/// Otherwise, the data will be imported to the current workspace.
#[instrument(skip_all, err)]
pub(crate) async fn import_appflowy_data_folder(
&self,
context: ImportContext,
) -> FlowyResult<()> {
let session = self.get_session()?;
let import_data = self.import_appflowy_data(context).await?;
pub(crate) async fn perform_import(&self, imported_folder: ImportedFolder) -> FlowyResult<()> {
let current_session = self.get_session()?;
let user_collab_db = self
.authenticate_user
.database
.get_collab_db(current_session.user_id)?;
let cloned_current_session = current_session.clone();
let import_data = tokio::task::spawn_blocking(move || {
generate_import_data(
&cloned_current_session,
&cloned_current_session.user_workspace.id,
&user_collab_db,
imported_folder,
)
.map_err(|err| FlowyError::new(ErrorCode::AppFlowyDataFolderImportError, err.to_string()))
})
.await??;
match import_data {
ImportData::AppFlowyDataFolder { items } => {
for item in items {
self.upload_appflowy_data_item(&session, item).await?;
self
.upload_appflowy_data_item(&current_session, item)
.await?;
}
},
}
@ -47,7 +64,7 @@ impl UserManager {
async fn upload_appflowy_data_item(
&self,
session: &Session,
current_session: &Session,
item: AppFlowyData,
) -> Result<(), FlowyError> {
match item {
@ -79,13 +96,15 @@ impl UserManager {
document_object_ids,
database_object_ids,
} => {
let user = self.get_user_profile_from_disk(session.user_id).await?;
let user = self
.get_user_profile_from_disk(current_session.user_id)
.await?;
let user_collab_db = self
.get_collab_db(session.user_id)?
.get_collab_db(current_session.user_id)?
.upgrade()
.ok_or_else(|| FlowyError::internal().with_context("Collab db not found"))?;
let user_id = session.user_id;
let user_id = current_session.user_id;
let weak_user_collab_db = Arc::downgrade(&user_collab_db);
let weak_user_cloud_service = self.cloud_services.get_user_service()?;
match upload_collab_objects_data(
@ -124,12 +143,13 @@ impl UserManager {
old_user: &AnonUser,
old_collab_db: &Arc<CollabKVDB>,
) -> FlowyResult<()> {
let import_context = ImportContext {
let import_context = ImportedFolder {
imported_session: old_user.session.clone(),
imported_collab_db: old_collab_db.clone(),
container_name: None,
source: ImportedSource::AnonUser,
};
self.import_appflowy_data_folder(import_context).await?;
self.perform_import(import_context).await?;
Ok(())
}