feat: Data sync on signup (#3283)

* chore: sync all data

* chore: sync database row and document in the row

* chore: sync inline view id

* chore: sync row and document in row

* fix: tests

* fix: migrate document in row

* chore: retry when create collab fail

* fix: invalid secret cause rerun application

* chore: fix clippy warnnings
This commit is contained in:
Nathan.fooo
2023-08-28 13:28:24 +08:00
committed by GitHub
parent 41ec2d992e
commit 4e67282f2b
66 changed files with 1492 additions and 513 deletions

View File

@ -10,6 +10,7 @@ flowy-derive = { path = "../../../shared-lib/flowy-derive" }
flowy-sqlite = { path = "../flowy-sqlite", optional = true }
flowy-encrypt = { path = "../flowy-encrypt" }
flowy-error = { path = "../flowy-error", features = ["impl_from_sqlite", "impl_from_dispatch_error"] }
flowy-folder-deps = { path = "../flowy-folder-deps" }
lib-infra = { path = "../../../shared-lib/lib-infra" }
flowy-notification = { path = "../flowy-notification" }
flowy-server-config = { path = "../flowy-server-config" }
@ -18,8 +19,10 @@ appflowy-integrate = { version = "0.1.0" }
collab = { version = "0.1.0" }
collab-folder = { version = "0.1.0" }
collab-document = { version = "0.1.0" }
collab-database = { version = "0.1.0" }
collab-user = { version = "0.1.0" }
flowy-user-deps = { path = "../flowy-user-deps" }
anyhow = "1.0.75"
tracing = { version = "0.1", features = ["log"] }
bytes = "1.4"

View File

@ -88,7 +88,7 @@ pub trait UserStatusCallback: Send + Sync + 'static {
/// Will be called after the user signed up.
fn did_sign_up(
&self,
context: SignUpContext,
is_new_user: bool,
user_profile: &UserProfile,
user_workspace: &UserWorkspace,
device_id: &str,
@ -163,7 +163,7 @@ impl UserStatusCallback for DefaultUserStatusCallback {
fn did_sign_up(
&self,
_context: SignUpContext,
_is_new_user: bool,
_user_profile: &UserProfile,
_user_workspace: &UserWorkspace,
_device_id: &str,

View File

@ -3,7 +3,6 @@ use std::sync::{Arc, Weak};
use appflowy_integrate::collab_builder::AppFlowyCollabBuilder;
use appflowy_integrate::RocksCollabDB;
use collab_folder::core::FolderData;
use collab_user::core::MutexUserAwareness;
use serde_json::Value;
use tokio::sync::{Mutex, RwLock};
@ -19,12 +18,11 @@ use flowy_user_deps::entities::*;
use lib_infra::box_any::BoxAny;
use crate::entities::{AuthStateChangedPB, AuthStatePB, UserProfilePB, UserSettingPB};
use crate::event_map::{
DefaultUserStatusCallback, SignUpContext, UserCloudServiceProvider, UserStatusCallback,
};
use crate::event_map::{DefaultUserStatusCallback, UserCloudServiceProvider, UserStatusCallback};
use crate::migrations::historical_document::HistoricalEmptyDocumentMigration;
use crate::migrations::local_user_to_cloud::migration_user_to_cloud;
use crate::migrations::migrate_to_new_user::migration_local_user_on_sign_up;
use crate::migrations::migration::UserLocalDataMigration;
use crate::migrations::sync_new_user::sync_user_data_to_cloud;
use crate::migrations::MigrationUser;
use crate::services::cloud_config::get_cloud_config;
use crate::services::database::UserDB;
@ -305,10 +303,7 @@ impl UserManager {
} else {
UserAwarenessDataSource::Remote
};
let mut sign_up_context = SignUpContext {
is_new: response.is_new_user,
local_folder: None,
};
if response.is_new_user {
if let Some(old_user) = migration_user {
let new_user = MigrationUser {
@ -320,10 +315,9 @@ impl UserManager {
old_user.user_profile.uid,
new_user.user_profile.uid
);
match self.migrate_local_user_to_cloud(&old_user, &new_user).await {
Ok(folder_data) => sign_up_context.local_folder = folder_data,
Err(e) => tracing::error!("{:?}", e),
}
self
.migrate_local_user_to_cloud(&old_user, &new_user)
.await?;
let _ = self.database.close(old_user.session.user_id);
}
}
@ -331,20 +325,20 @@ impl UserManager {
.initialize_user_awareness(&new_session, user_awareness_source)
.await;
self
.save_auth_data(&response, auth_type, &new_session)
.await?;
self
.user_status_callback
.read()
.await
.did_sign_up(
sign_up_context,
response.is_new_user,
user_profile,
&new_session.user_workspace,
&new_session.device_id,
)
.await?;
self
.save_auth_data(&response, auth_type, &new_session)
.await?;
send_auth_state_notification(AuthStateChangedPB {
state: AuthStatePB::AuthStateSignIn,
@ -596,17 +590,28 @@ impl UserManager {
&self,
old_user: &MigrationUser,
new_user: &MigrationUser,
) -> Result<Option<FolderData>, FlowyError> {
) -> Result<(), FlowyError> {
let old_collab_db = self.database.get_collab_db(old_user.session.user_id)?;
let new_collab_db = self.database.get_collab_db(new_user.session.user_id)?;
let folder_data = migration_user_to_cloud(old_user, &old_collab_db, new_user, &new_collab_db)?;
migration_local_user_on_sign_up(old_user, &old_collab_db, new_user, &new_collab_db)?;
if let Err(err) = sync_user_data_to_cloud(
self.cloud_services.get_user_service()?,
new_user,
&new_collab_db,
)
.await
{
tracing::error!("Sync user data to cloud failed: {:?}", err);
}
// Save the old user workspace setting.
save_user_workspaces(
old_user.session.user_id,
self.database.get_pool(old_user.session.user_id)?,
&[old_user.session.user_workspace.clone()],
)?;
Ok(folder_data)
Ok(())
}
}

View File

@ -1,143 +0,0 @@
use std::sync::Arc;
use appflowy_integrate::{PersistenceError, RocksCollabDB, YrsDocAction};
use collab::core::collab::{CollabRawData, MutexCollab};
use collab::core::origin::{CollabClient, CollabOrigin};
use collab::preclude::Collab;
use collab_folder::core::{Folder, FolderData};
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use crate::migrations::MigrationUser;
/// Migration the collab objects of the old user to new user. Currently, it only happens when
/// the user is a local user and try to use AppFlowy cloud service.
pub fn migration_user_to_cloud(
old_user: &MigrationUser,
old_collab_db: &Arc<RocksCollabDB>,
new_user: &MigrationUser,
new_collab_db: &Arc<RocksCollabDB>,
) -> FlowyResult<Option<FolderData>> {
let mut folder_data = None;
new_collab_db
.with_write_txn(|w_txn| {
let read_txn = old_collab_db.read_txn();
if let Ok(object_ids) = read_txn.get_all_docs() {
// Migration of all objects
for object_id in object_ids {
tracing::debug!("migrate object: {:?}", object_id);
if let Ok(updates) = read_txn.get_all_updates(old_user.session.user_id, &object_id) {
// If the object is a folder, migrate the folder data
if object_id == old_user.session.user_workspace.id {
folder_data = migrate_folder(
old_user.session.user_id,
&object_id,
&new_user.session.user_workspace.id,
updates,
);
} else if object_id == old_user.session.user_workspace.database_storage_id {
migrate_database_storage(
old_user.session.user_id,
&object_id,
new_user.session.user_id,
&new_user.session.user_workspace.database_storage_id,
updates,
w_txn,
);
} else {
migrate_object(
old_user.session.user_id,
new_user.session.user_id,
&object_id,
updates,
w_txn,
);
}
}
}
}
Ok(())
})
.map_err(|err| FlowyError::new(ErrorCode::Internal, err))?;
Ok(folder_data)
}
fn migrate_database_storage<'a, W>(
old_uid: i64,
old_object_id: &str,
new_uid: i64,
new_object_id: &str,
updates: CollabRawData,
w_txn: &'a W,
) where
W: YrsDocAction<'a>,
PersistenceError: From<W::Error>,
{
let origin = CollabOrigin::Client(CollabClient::new(old_uid, "phantom"));
match Collab::new_with_raw_data(origin, old_object_id, updates, vec![]) {
Ok(collab) => {
let txn = collab.transact();
if let Err(err) = w_txn.create_new_doc(new_uid, new_object_id, &txn) {
tracing::error!("🔴migrate database storage failed: {:?}", err);
}
},
Err(err) => tracing::error!("🔴construct migration database storage failed: {:?} ", err),
}
}
fn migrate_object<'a, W>(
old_uid: i64,
new_uid: i64,
object_id: &str,
updates: CollabRawData,
w_txn: &'a W,
) where
W: YrsDocAction<'a>,
PersistenceError: From<W::Error>,
{
let origin = CollabOrigin::Client(CollabClient::new(old_uid, "phantom"));
match Collab::new_with_raw_data(origin, object_id, updates, vec![]) {
Ok(collab) => {
let txn = collab.transact();
if let Err(err) = w_txn.create_new_doc(new_uid, object_id, &txn) {
tracing::error!("🔴migrate collab failed: {:?}", err);
}
},
Err(err) => tracing::error!("🔴construct migration collab failed: {:?} ", err),
}
}
fn migrate_folder(
old_uid: i64,
old_object_id: &str,
new_workspace_id: &str,
updates: CollabRawData,
) -> Option<FolderData> {
let origin = CollabOrigin::Client(CollabClient::new(old_uid, "phantom"));
let old_folder_collab = Collab::new_with_raw_data(origin, old_object_id, updates, vec![]).ok()?;
let mutex_collab = Arc::new(MutexCollab::from_collab(old_folder_collab));
let old_folder = Folder::open(mutex_collab, None);
let mut folder_data = old_folder.get_folder_data()?;
let old_workspace_id = folder_data.current_workspace_id;
folder_data.current_workspace_id = new_workspace_id.to_string();
let mut workspace = folder_data.workspaces.pop()?;
if folder_data.workspaces.len() > 1 {
tracing::error!("🔴migrate folder: more than one workspace");
}
workspace.id = new_workspace_id.to_string();
// Only take one workspace
folder_data.workspaces.clear();
folder_data.workspaces.push(workspace);
// Update the view's parent view id to new workspace id
folder_data.views.iter_mut().for_each(|view| {
if view.parent_view_id == old_workspace_id {
view.parent_view_id = new_workspace_id.to_string();
}
});
Some(folder_data)
}

View File

@ -0,0 +1,430 @@
use std::collections::{HashMap, HashSet};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use anyhow::anyhow;
use appflowy_integrate::{PersistenceError, RocksCollabDB, YrsDocAction};
use collab::core::collab::MutexCollab;
use collab::core::origin::{CollabClient, CollabOrigin};
use collab::preclude::Collab;
use collab_database::database::{
is_database_collab, mut_database_views_with_collab, reset_inline_view_id,
};
use collab_database::rows::{database_row_document_id_from_row_id, mut_row_with_collab, RowId};
use collab_database::user::DatabaseWithViewsArray;
use collab_folder::core::Folder;
use parking_lot::{Mutex, RwLock};
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_folder_deps::cloud::gen_view_id;
use crate::migrations::MigrationUser;
/// Migration the collab objects of the old user to new user. Currently, it only happens when
/// the user is a local user and try to use AppFlowy cloud service.
pub fn migration_local_user_on_sign_up(
old_user: &MigrationUser,
old_collab_db: &Arc<RocksCollabDB>,
new_user: &MigrationUser,
new_collab_db: &Arc<RocksCollabDB>,
) -> FlowyResult<()> {
new_collab_db
.with_write_txn(|new_collab_w_txn| {
let old_collab_r_txn = old_collab_db.read_txn();
let old_to_new_id_map = Arc::new(Mutex::new(OldToNewIdMap::new()));
migrate_user_awareness(old_to_new_id_map.lock().deref_mut(), old_user, new_user)?;
migrate_database_with_views_object(
&mut old_to_new_id_map.lock(),
old_user,
&old_collab_r_txn,
new_user,
new_collab_w_txn,
)?;
let mut object_ids = old_collab_r_txn
.get_all_docs()
.map(|iter| iter.collect::<Vec<String>>())
.unwrap_or_default();
// Migration of all objects except the folder and database_with_views
object_ids.retain(|id| {
id != &old_user.session.user_workspace.id
&& id != &old_user.session.user_workspace.database_views_aggregate_id
});
tracing::info!("migrate collab objects: {:?}", object_ids.len());
let collab_by_oid = make_collab_by_oid(old_user, &old_collab_r_txn, &object_ids);
migrate_databases(
&old_to_new_id_map,
new_user,
new_collab_w_txn,
&mut object_ids,
&collab_by_oid,
)?;
// Migrates the folder, replacing all existing view IDs with new ones.
// This function handles the process of migrating folder data between two users. As a part of this migration,
// all existing view IDs associated with the old user will be replaced by new IDs relevant to the new user.
migrate_workspace_folder(
&mut old_to_new_id_map.lock(),
old_user,
&old_collab_r_txn,
new_user,
new_collab_w_txn,
)?;
// Migrate other collab objects
for object_id in &object_ids {
if let Some(collab) = collab_by_oid.get(object_id) {
let new_object_id = old_to_new_id_map.lock().get_new_id(object_id);
tracing::debug!("migrate from: {}, to: {}", object_id, new_object_id,);
migrate_collab_object(
collab,
new_user.session.user_id,
&new_object_id,
new_collab_w_txn,
);
}
}
Ok(())
})
.map_err(|err| FlowyError::new(ErrorCode::Internal, err))?;
Ok(())
}
#[derive(Default)]
pub struct OldToNewIdMap(HashMap<String, String>);
impl OldToNewIdMap {
fn new() -> Self {
Self::default()
}
fn get_new_id(&mut self, old_id: &str) -> String {
let view_id = self
.0
.entry(old_id.to_string())
.or_insert(gen_view_id().to_string());
(*view_id).clone()
}
}
impl Deref for OldToNewIdMap {
type Target = HashMap<String, String>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for OldToNewIdMap {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
fn migrate_database_with_views_object<'a, W>(
old_to_new_id_map: &mut OldToNewIdMap,
old_user: &MigrationUser,
old_collab_r_txn: &'a W,
new_user: &MigrationUser,
new_collab_w_txn: &'a W,
) -> Result<(), PersistenceError>
where
W: YrsDocAction<'a>,
PersistenceError: From<W::Error>,
{
let database_with_views_collab = Collab::new(
old_user.session.user_id,
&old_user.session.user_workspace.database_views_aggregate_id,
"phantom",
vec![],
);
database_with_views_collab.with_origin_transact_mut(|txn| {
old_collab_r_txn.load_doc(
old_user.session.user_id,
&old_user.session.user_workspace.database_views_aggregate_id,
txn,
)
})?;
let new_uid = new_user.session.user_id;
let new_object_id = &new_user.session.user_workspace.database_views_aggregate_id;
let array = DatabaseWithViewsArray::from_collab(&database_with_views_collab);
for database_view in array.get_all_databases() {
array.update_database(&database_view.database_id, |update| {
let new_linked_views = update
.linked_views
.iter()
.map(|view_id| old_to_new_id_map.get_new_id(view_id))
.collect();
update.database_id = old_to_new_id_map.get_new_id(&update.database_id);
update.linked_views = new_linked_views;
})
}
let txn = database_with_views_collab.transact();
if let Err(err) = new_collab_w_txn.create_new_doc(new_uid, new_object_id, &txn) {
tracing::error!("🔴migrate database storage failed: {:?}", err);
}
drop(txn);
Ok(())
}
fn migrate_collab_object<'a, W>(collab: &Collab, new_uid: i64, new_object_id: &str, w_txn: &'a W)
where
W: YrsDocAction<'a>,
PersistenceError: From<W::Error>,
{
let txn = collab.transact();
if let Err(err) = w_txn.create_new_doc(new_uid, &new_object_id, &txn) {
tracing::error!("🔴migrate collab failed: {:?}", err);
}
}
fn migrate_workspace_folder<'a, W>(
old_to_new_id_map: &mut OldToNewIdMap,
old_user: &MigrationUser,
old_collab_r_txn: &'a W,
new_user: &MigrationUser,
new_collab_w_txn: &'a W,
) -> Result<(), PersistenceError>
where
W: YrsDocAction<'a>,
PersistenceError: From<W::Error>,
{
let old_uid = old_user.session.user_id;
let old_workspace_id = &old_user.session.user_workspace.id;
let new_uid = new_user.session.user_id;
let new_workspace_id = &new_user.session.user_workspace.id;
let old_folder_collab = Collab::new(old_uid, old_workspace_id, "phantom", vec![]);
old_folder_collab
.with_origin_transact_mut(|txn| old_collab_r_txn.load_doc(old_uid, old_workspace_id, txn))?;
let old_folder = Folder::open(Arc::new(MutexCollab::from_collab(old_folder_collab)), None);
let mut folder_data = old_folder
.get_folder_data()
.ok_or(PersistenceError::Internal(
anyhow!("Can't migrate the folder data").into(),
))?;
old_to_new_id_map
.0
.insert(old_workspace_id.to_string(), new_workspace_id.to_string());
// 1. Replace the workspace views id to new id
debug_assert!(folder_data.workspaces.len() == 1);
folder_data
.workspaces
.iter_mut()
.enumerate()
.for_each(|(index, workspace)| {
if index == 0 {
workspace.id = new_workspace_id.to_string();
} else {
tracing::warn!("🔴migrate folder: more than one workspace");
workspace.id = old_to_new_id_map.get_new_id(&workspace.id);
}
workspace
.child_views
.iter_mut()
.for_each(|view_identifier| {
view_identifier.id = old_to_new_id_map.get_new_id(&view_identifier.id);
});
});
folder_data.views.iter_mut().for_each(|view| {
// 2. replace the old parent view id of the view
view.parent_view_id = old_to_new_id_map.get_new_id(&view.parent_view_id);
// 3. replace the old id of the view
view.id = old_to_new_id_map.get_new_id(&view.id);
// 4. replace the old id of the children views
view.children.iter_mut().for_each(|view_identifier| {
view_identifier.id = old_to_new_id_map.get_new_id(&view_identifier.id);
});
});
match old_to_new_id_map.get(&folder_data.current_workspace_id) {
Some(new_workspace_id) => {
folder_data.current_workspace_id = new_workspace_id.clone();
},
None => {
tracing::error!("🔴migrate folder: current workspace id not found");
},
}
match old_to_new_id_map.get(&folder_data.current_view) {
Some(new_view_id) => {
folder_data.current_view = new_view_id.clone();
},
None => {
tracing::error!("🔴migrate folder: current view id not found");
folder_data.current_view = "".to_string();
},
}
let origin = CollabOrigin::Client(CollabClient::new(new_uid, "phantom"));
let new_folder_collab = Collab::new_with_raw_data(origin, new_workspace_id, vec![], vec![])
.map_err(|err| PersistenceError::Internal(Box::new(err)))?;
let mutex_collab = Arc::new(MutexCollab::from_collab(new_folder_collab));
let _ = Folder::create(mutex_collab.clone(), None, Some(folder_data));
{
let mutex_collab = mutex_collab.lock();
let txn = mutex_collab.transact();
if let Err(err) = new_collab_w_txn.create_new_doc(new_uid, new_workspace_id, &txn) {
tracing::error!("🔴migrate folder failed: {:?}", err);
}
}
Ok(())
}
fn migrate_user_awareness(
old_to_new_id_map: &mut OldToNewIdMap,
old_user: &MigrationUser,
new_user: &MigrationUser,
) -> Result<(), PersistenceError> {
let old_uid = old_user.session.user_id;
let new_uid = new_user.session.user_id;
tracing::debug!("migrate user awareness from: {}, to: {}", old_uid, new_uid);
old_to_new_id_map.insert(old_uid.to_string(), new_uid.to_string());
Ok(())
}
fn migrate_databases<'a, W>(
old_to_new_id_map: &Arc<Mutex<OldToNewIdMap>>,
new_user: &MigrationUser,
new_collab_w_txn: &'a W,
object_ids: &mut Vec<String>,
collab_by_oid: &HashMap<String, Collab>,
) -> Result<(), PersistenceError>
where
W: YrsDocAction<'a>,
PersistenceError: From<W::Error>,
{
// Migrate databases
let mut database_object_ids = vec![];
let database_row_object_ids = RwLock::new(HashSet::new());
for object_id in &mut *object_ids {
if let Some(collab) = collab_by_oid.get(object_id) {
if !is_database_collab(collab) {
continue;
}
database_object_ids.push(object_id.clone());
reset_inline_view_id(collab, |old_inline_view_id| {
old_to_new_id_map.lock().get_new_id(&old_inline_view_id)
});
mut_database_views_with_collab(collab, |database_view| {
let new_view_id = old_to_new_id_map.lock().get_new_id(&database_view.id);
let new_database_id = old_to_new_id_map
.lock()
.get_new_id(&database_view.database_id);
tracing::trace!(
"migrate database view id from: {}, to: {}",
database_view.id,
new_view_id,
);
tracing::trace!(
"migrate database view database id from: {}, to: {}",
database_view.database_id,
new_database_id,
);
database_view.id = new_view_id;
database_view.database_id = new_database_id;
database_view.row_orders.iter_mut().for_each(|row_order| {
let old_row_id = String::from(row_order.id.clone());
let old_row_document_id = database_row_document_id_from_row_id(&old_row_id);
let new_row_id = old_to_new_id_map.lock().get_new_id(&old_row_id);
let new_row_document_id = database_row_document_id_from_row_id(&new_row_id);
tracing::debug!("migrate row id: {} to {}", row_order.id, new_row_id);
tracing::debug!(
"migrate row document id: {} to {}",
old_row_document_id,
new_row_document_id
);
old_to_new_id_map
.lock()
.insert(old_row_document_id, new_row_document_id);
row_order.id = RowId::from(new_row_id);
database_row_object_ids.write().insert(old_row_id);
});
});
let new_object_id = old_to_new_id_map.lock().get_new_id(object_id);
tracing::debug!(
"migrate database from: {}, to: {}",
object_id,
new_object_id,
);
migrate_collab_object(
collab,
new_user.session.user_id,
&new_object_id,
new_collab_w_txn,
);
}
}
object_ids.retain(|id| !database_object_ids.contains(id));
let database_row_object_ids = database_row_object_ids.read();
for object_id in &*database_row_object_ids {
if let Some(collab) = collab_by_oid.get(object_id) {
let new_object_id = old_to_new_id_map.lock().get_new_id(object_id);
tracing::info!(
"migrate database row from: {}, to: {}",
object_id,
new_object_id,
);
mut_row_with_collab(collab, |row_update| {
row_update.set_row_id(RowId::from(new_object_id.clone()));
});
migrate_collab_object(
collab,
new_user.session.user_id,
&new_object_id,
new_collab_w_txn,
);
}
}
object_ids.retain(|id| !database_row_object_ids.contains(id));
Ok(())
}
fn make_collab_by_oid<'a, R>(
old_user: &MigrationUser,
old_collab_r_txn: &R,
object_ids: &[String],
) -> HashMap<String, Collab>
where
R: YrsDocAction<'a>,
PersistenceError: From<R::Error>,
{
let mut collab_by_oid = HashMap::new();
for object_id in object_ids {
let collab = Collab::new(old_user.session.user_id, object_id, "phantom", vec![]);
match collab.with_origin_transact_mut(|txn| {
old_collab_r_txn.load_doc(old_user.session.user_id, &object_id, txn)
}) {
Ok(_) => {
collab_by_oid.insert(object_id.clone(), collab);
},
Err(err) => tracing::error!("🔴Initialize migration collab failed: {:?} ", err),
}
}
collab_by_oid
}

View File

@ -1,6 +1,7 @@
pub use define::*;
mod define;
pub mod historical_document;
pub mod local_user_to_cloud;
pub mod migrate_to_new_user;
pub mod migration;
pub use define::*;
pub mod sync_new_user;

View File

@ -0,0 +1,327 @@
use std::future::Future;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::Arc;
use anyhow::{anyhow, Error};
use appflowy_integrate::{CollabObject, CollabType, PersistenceError, RocksCollabDB, YrsDocAction};
use collab::core::collab::MutexCollab;
use collab::preclude::Collab;
use collab_database::database::get_database_row_ids;
use collab_database::rows::database_row_document_id_from_row_id;
use collab_database::user::{get_database_with_views, DatabaseWithViews};
use collab_folder::core::{Folder, View, ViewLayout};
use parking_lot::Mutex;
use flowy_error::FlowyResult;
use flowy_user_deps::cloud::UserCloudService;
use crate::migrations::MigrationUser;
#[tracing::instrument(level = "info", skip_all, err)]
pub async fn sync_user_data_to_cloud(
user_service: Arc<dyn UserCloudService>,
new_user: &MigrationUser,
collab_db: &Arc<RocksCollabDB>,
) -> FlowyResult<()> {
let workspace_id = new_user.session.user_workspace.id.clone();
let uid = new_user.session.user_id;
let folder = Arc::new(sync_folder(uid, &workspace_id, collab_db, user_service.clone()).await?);
let database_records = sync_database_views(
uid,
&workspace_id,
&new_user.session.user_workspace.database_views_aggregate_id,
collab_db,
user_service.clone(),
)
.await;
let views = folder.lock().get_current_workspace_views();
for view in views {
let view_id = view.id.clone();
if let Err(err) = sync_views(
uid,
folder.clone(),
database_records.clone(),
workspace_id.to_string(),
view,
collab_db.clone(),
user_service.clone(),
)
.await
{
tracing::error!("🔴sync {} failed: {:?}", view_id, err);
}
}
Ok(())
}
fn sync_views(
uid: i64,
folder: Arc<MutexFolder>,
database_records: Vec<Arc<DatabaseWithViews>>,
workspace_id: String,
view: Arc<View>,
collab_db: Arc<RocksCollabDB>,
user_service: Arc<dyn UserCloudService>,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + Sync>> {
Box::pin(async move {
let collab_type = collab_type_from_view_layout(&view.layout);
let object_id = object_id_from_view(&view, &database_records)?;
tracing::debug!(
"sync view: {:?}:{} with object_id: {}",
view.layout,
view.id,
object_id
);
let collab_object =
CollabObject::new(uid, object_id, collab_type).with_workspace_id(workspace_id.to_string());
match view.layout {
ViewLayout::Document => {
let update = get_collab_init_update(uid, &collab_object, &collab_db)?;
tracing::info!(
"sync object: {} with update: {}",
collab_object,
update.len()
);
user_service
.create_collab_object(&collab_object, update)
.await?;
},
ViewLayout::Grid | ViewLayout::Board | ViewLayout::Calendar => {
let (database_update, row_ids) = get_database_init_update(uid, &collab_object, &collab_db)?;
tracing::info!(
"sync object: {} with update: {}",
collab_object,
database_update.len()
);
user_service
.create_collab_object(&collab_object, database_update)
.await?;
// sync database's row
for row_id in row_ids {
tracing::debug!("sync row: {}", row_id);
let document_id = database_row_document_id_from_row_id(&row_id);
let database_row_collab_object = CollabObject::new(uid, row_id, CollabType::DatabaseRow)
.with_workspace_id(workspace_id.to_string());
let database_row_update =
get_collab_init_update(uid, &database_row_collab_object, &collab_db)?;
tracing::info!(
"sync object: {} with update: {}",
database_row_collab_object,
database_row_update.len()
);
let _ = user_service
.create_collab_object(&database_row_collab_object, database_row_update)
.await;
let database_row_document = CollabObject::new(uid, document_id, CollabType::Document)
.with_workspace_id(workspace_id.to_string());
// sync document in the row if exist
if let Ok(document_update) =
get_collab_init_update(uid, &database_row_document, &collab_db)
{
tracing::info!(
"sync database row document: {} with update: {}",
database_row_document,
document_update.len()
);
let _ = user_service
.create_collab_object(&database_row_document, document_update)
.await;
}
}
},
}
let child_views = folder.lock().views.get_views_belong_to(&view.id);
for child_view in child_views {
let cloned_child_view = child_view.clone();
if let Err(err) = Box::pin(sync_views(
uid,
folder.clone(),
database_records.clone(),
workspace_id.clone(),
child_view,
collab_db.clone(),
user_service.clone(),
))
.await
{
tracing::error!(
"🔴sync {:?}:{} failed: {:?}",
cloned_child_view.layout,
cloned_child_view.id,
err
)
}
}
Ok(())
})
}
fn get_collab_init_update(
uid: i64,
collab_object: &CollabObject,
collab_db: &Arc<RocksCollabDB>,
) -> Result<Vec<u8>, PersistenceError> {
let collab = Collab::new(uid, &collab_object.object_id, "phantom", vec![]);
let _ = collab.with_origin_transact_mut(|txn| {
collab_db
.read_txn()
.load_doc(uid, &collab_object.object_id, txn)
})?;
let update = collab.encode_as_update_v1().0;
if update.is_empty() {
return Err(PersistenceError::UnexpectedEmptyUpdates);
}
Ok(update)
}
fn get_database_init_update(
uid: i64,
collab_object: &CollabObject,
collab_db: &Arc<RocksCollabDB>,
) -> Result<(Vec<u8>, Vec<String>), PersistenceError> {
let collab = Collab::new(uid, &collab_object.object_id, "phantom", vec![]);
let _ = collab.with_origin_transact_mut(|txn| {
collab_db
.read_txn()
.load_doc(uid, &collab_object.object_id, txn)
})?;
let row_ids = get_database_row_ids(&collab).unwrap_or_default();
let update = collab.encode_as_update_v1().0;
if update.is_empty() {
return Err(PersistenceError::UnexpectedEmptyUpdates);
}
Ok((update, row_ids))
}
async fn sync_folder(
uid: i64,
workspace_id: &str,
collab_db: &Arc<RocksCollabDB>,
user_service: Arc<dyn UserCloudService>,
) -> Result<MutexFolder, Error> {
let (folder, update) = {
let collab = Collab::new(uid, workspace_id, "phantom", vec![]);
// Use the temporary result to short the lifetime of the TransactionMut
collab.with_origin_transact_mut(|txn| collab_db.read_txn().load_doc(uid, workspace_id, txn))?;
let update = collab.encode_as_update_v1().0;
(
MutexFolder::new(Folder::open(
Arc::new(MutexCollab::from_collab(collab)),
None,
)),
update,
)
};
let collab_object = CollabObject::new(uid, workspace_id.to_string(), CollabType::Folder)
.with_workspace_id(workspace_id.to_string());
tracing::info!(
"sync object: {} with update: {}",
collab_object,
update.len()
);
if let Err(err) = user_service
.create_collab_object(&collab_object, update)
.await
{
tracing::error!("🔴sync folder failed: {:?}", err);
}
Ok(folder)
}
async fn sync_database_views(
uid: i64,
workspace_id: &str,
database_views_aggregate_id: &str,
collab_db: &Arc<RocksCollabDB>,
user_service: Arc<dyn UserCloudService>,
) -> Vec<Arc<DatabaseWithViews>> {
let collab_object = CollabObject::new(
uid,
database_views_aggregate_id.to_string(),
CollabType::WorkspaceDatabase,
)
.with_workspace_id(workspace_id.to_string());
// Use the temporary result to short the lifetime of the TransactionMut
let result = {
let collab = Collab::new(uid, database_views_aggregate_id, "phantom", vec![]);
collab
.with_origin_transact_mut(|txn| {
collab_db
.read_txn()
.load_doc(uid, database_views_aggregate_id, txn)
})
.map(|_| {
(
get_database_with_views(&collab),
collab.encode_as_update_v1().0,
)
})
};
if let Ok((records, update)) = result {
let _ = user_service
.create_collab_object(&collab_object, update)
.await;
records.into_iter().map(Arc::new).collect()
} else {
vec![]
}
}
struct MutexFolder(Mutex<Folder>);
impl MutexFolder {
pub fn new(folder: Folder) -> Self {
Self(Mutex::new(folder))
}
}
impl Deref for MutexFolder {
type Target = Mutex<Folder>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
unsafe impl Sync for MutexFolder {}
unsafe impl Send for MutexFolder {}
fn collab_type_from_view_layout(view_layout: &ViewLayout) -> CollabType {
match view_layout {
ViewLayout::Document => CollabType::Document,
ViewLayout::Grid | ViewLayout::Board | ViewLayout::Calendar => CollabType::Database,
}
}
fn object_id_from_view(
view: &Arc<View>,
database_records: &[Arc<DatabaseWithViews>],
) -> Result<String, Error> {
if view.layout.is_database() {
match database_records
.iter()
.find(|record| record.linked_views.contains(&view.id))
{
None => Err(anyhow!(
"🔴sync view: {} failed: no database for this view",
view.id
)),
Some(record) => Ok(record.database_id.clone()),
}
} else {
Ok(view.id.clone())
}
}

View File

@ -66,7 +66,7 @@ impl<'de> Visitor<'de> for SessionVisitor {
name: "My Workspace".to_string(),
created_at: Utc::now(),
// For historical reasons, the database_storage_id is constructed by the user_id.
database_storage_id: STANDARD.encode(format!("{}:user:database", user_id)),
database_views_aggregate_id: STANDARD.encode(format!("{}:user:database", user_id)),
})
}
}

View File

@ -23,7 +23,7 @@ impl TryFrom<(i64, &UserWorkspace)> for UserWorkspaceTable {
if value.1.id.is_empty() {
return Err(FlowyError::invalid_data().with_context("The id is empty"));
}
if value.1.database_storage_id.is_empty() {
if value.1.database_views_aggregate_id.is_empty() {
return Err(FlowyError::invalid_data().with_context("The database storage id is empty"));
}
@ -32,7 +32,7 @@ impl TryFrom<(i64, &UserWorkspace)> for UserWorkspaceTable {
name: value.1.name.clone(),
uid: value.0,
created_at: value.1.created_at.timestamp(),
database_storage_id: value.1.database_storage_id.clone(),
database_storage_id: value.1.database_views_aggregate_id.clone(),
})
}
}
@ -46,7 +46,7 @@ impl From<UserWorkspaceTable> for UserWorkspace {
.timestamp_opt(value.created_at, 0)
.single()
.unwrap_or_default(),
database_storage_id: value.database_storage_id,
database_views_aggregate_id: value.database_storage_id,
}
}
}