feat: Get started doc migration (#3102)

* feat: migrate empty document

* chore: update collab rev

* chore: fmt
This commit is contained in:
Nathan.fooo
2023-08-03 09:14:52 +08:00
committed by GitHub
parent a40c639a96
commit 03b8f2ccb2
31 changed files with 546 additions and 170 deletions

View File

@ -16,6 +16,7 @@ lib-dispatch = { path = "../lib-dispatch" }
appflowy-integrate = { version = "0.1.0" }
collab = { version = "0.1.0" }
collab-folder = { version = "0.1.0" }
collab-document = { version = "0.1.0" }
flowy-user-deps = { path = "../flowy-user-deps" }
tracing = { version = "0.1", features = ["log"] }

View File

@ -4,6 +4,7 @@ extern crate flowy_sqlite;
pub mod entities;
mod event_handler;
pub mod event_map;
mod migrations;
mod notification;
pub mod protobuf;
pub mod services;

View File

@ -0,0 +1,7 @@
use crate::services::session_serde::Session;
use flowy_user_deps::entities::UserProfile;
pub struct UserMigrationContext {
pub user_profile: UserProfile,
pub session: Session,
}

View File

@ -0,0 +1,51 @@
use crate::migrations::migration::UserDataMigration;
use crate::services::session_serde::Session;
use appflowy_integrate::{RocksCollabDB, YrsDocAction};
use collab::core::collab::MutexCollab;
use collab::core::origin::{CollabClient, CollabOrigin};
use collab_document::document::Document;
use collab_document::document_data::default_document_data;
use collab_folder::core::Folder;
use flowy_error::{internal_error, FlowyResult};
use std::sync::Arc;
/// Migrate the first level documents of the workspace by inserting documents
pub struct HistoricalEmptyDocumentMigration;
impl UserDataMigration for HistoricalEmptyDocumentMigration {
fn name(&self) -> &str {
"historical_empty_document"
}
fn run(&self, session: &Session, collab_db: &Arc<RocksCollabDB>) -> FlowyResult<()> {
let write_txn = collab_db.write_txn();
if let Ok(updates) = write_txn.get_all_updates(session.user_id, &session.user_workspace.id) {
let origin = CollabOrigin::Client(CollabClient::new(session.user_id, ""));
// Deserialize the folder from the raw data
let folder =
Folder::from_collab_raw_data(origin.clone(), updates, &session.user_workspace.id, vec![])?;
// Migration the first level documents of the workspace
let migration_views = folder.get_workspace_views(&session.user_workspace.id);
for view in migration_views {
// Read all updates of the view
if let Ok(view_updates) = write_txn.get_all_updates(session.user_id, &view.id) {
if let Err(_) = Document::from_updates(origin.clone(), view_updates, &view.id, vec![]) {
// Create a document with default data
let document_data = default_document_data();
let collab = Arc::new(MutexCollab::new(origin.clone(), &view.id, vec![]));
if let Ok(document) = Document::create_with_data(collab.clone(), document_data) {
// Remove all old updates and then insert the new update
let (doc_state, sv) = document.get_collab().encode_as_update_v1();
write_txn
.flush_doc_with(session.user_id, &view.id, &doc_state, &sv)
.map_err(internal_error)?;
}
}
}
}
}
write_txn.commit_transaction().map_err(internal_error)?;
Ok(())
}
}

View File

@ -6,68 +6,59 @@ use collab::core::origin::{CollabClient, CollabOrigin};
use collab::preclude::Collab;
use collab_folder::core::{Folder, FolderData};
use crate::migrations::UserMigrationContext;
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_user_deps::entities::UserProfile;
use crate::services::session_serde::Session;
pub struct UserDataMigration();
pub struct UserMigrationContext {
pub user_profile: UserProfile,
pub session: Session,
}
impl UserDataMigration {
pub fn migration(
old_user: &UserMigrationContext,
old_collab_db: &Arc<RocksCollabDB>,
new_user: &UserMigrationContext,
new_collab_db: &Arc<RocksCollabDB>,
) -> FlowyResult<Option<FolderData>> {
let mut folder_data = None;
new_collab_db
.with_write_txn(|w_txn| {
let read_txn = old_collab_db.read_txn();
if let Ok(object_ids) = read_txn.get_all_docs() {
// Migration of all objects
for object_id in object_ids {
tracing::debug!("migrate object: {:?}", object_id);
if let Ok(updates) = read_txn.get_all_updates(old_user.session.user_id, &object_id) {
// If the object is a folder, migrate the folder data
if object_id == old_user.session.user_workspace.id {
folder_data = migrate_folder(
old_user.session.user_id,
&object_id,
&new_user.session.user_workspace.id,
updates,
);
} else if object_id == old_user.session.user_workspace.database_storage_id {
migrate_database_storage(
old_user.session.user_id,
&object_id,
new_user.session.user_id,
&new_user.session.user_workspace.database_storage_id,
updates,
w_txn,
);
} else {
migrate_object(
old_user.session.user_id,
new_user.session.user_id,
&object_id,
updates,
w_txn,
);
}
/// Migration the collab objects of the old user to new user. Currently, it only happens when
/// the user is a local user and try to use AppFlowy cloud service.
pub fn migration_user_to_cloud(
old_user: &UserMigrationContext,
old_collab_db: &Arc<RocksCollabDB>,
new_user: &UserMigrationContext,
new_collab_db: &Arc<RocksCollabDB>,
) -> FlowyResult<Option<FolderData>> {
let mut folder_data = None;
new_collab_db
.with_write_txn(|w_txn| {
let read_txn = old_collab_db.read_txn();
if let Ok(object_ids) = read_txn.get_all_docs() {
// Migration of all objects
for object_id in object_ids {
tracing::debug!("migrate object: {:?}", object_id);
if let Ok(updates) = read_txn.get_all_updates(old_user.session.user_id, &object_id) {
// If the object is a folder, migrate the folder data
if object_id == old_user.session.user_workspace.id {
folder_data = migrate_folder(
old_user.session.user_id,
&object_id,
&new_user.session.user_workspace.id,
updates,
);
} else if object_id == old_user.session.user_workspace.database_storage_id {
migrate_database_storage(
old_user.session.user_id,
&object_id,
new_user.session.user_id,
&new_user.session.user_workspace.database_storage_id,
updates,
w_txn,
);
} else {
migrate_object(
old_user.session.user_id,
new_user.session.user_id,
&object_id,
updates,
w_txn,
);
}
}
}
Ok(())
})
.map_err(|err| FlowyError::new(ErrorCode::Internal, err))?;
Ok(folder_data)
}
}
Ok(())
})
.map_err(|err| FlowyError::new(ErrorCode::Internal, err))?;
Ok(folder_data)
}
fn migrate_database_storage<'a, W>(

View File

@ -0,0 +1,104 @@
use crate::services::session_serde::Session;
use appflowy_integrate::RocksCollabDB;
use chrono::NaiveDateTime;
use diesel::{RunQueryDsl, SqliteConnection};
use flowy_error::FlowyResult;
use flowy_sqlite::schema::user_data_migration_records;
use flowy_sqlite::ConnectionPool;
use std::sync::Arc;
pub struct UserLocalDataMigration {
session: Session,
collab_db: Arc<RocksCollabDB>,
sqlite_pool: Arc<ConnectionPool>,
}
impl UserLocalDataMigration {
pub fn new(
session: Session,
collab_db: Arc<RocksCollabDB>,
sqlite_pool: Arc<ConnectionPool>,
) -> Self {
Self {
session,
collab_db,
sqlite_pool,
}
}
/// Executes a series of migrations.
///
/// This function applies each migration in the `migrations` vector that hasn't already been executed.
/// It retrieves the current migration records from the database, and for each migration in the `migrations` vector,
/// checks whether it has already been run. If it hasn't, the function runs the migration and adds it to the list of applied migrations.
///
/// The function does not apply a migration if its name is already in the list of applied migrations.
/// If a migration name is duplicated, the function logs an error message and continues with the next migration.
///
/// # Arguments
///
/// * `migrations` - A vector of boxed dynamic `UserDataMigration` objects representing the migrations to be applied.
///
pub fn run(self, migrations: Vec<Box<dyn UserDataMigration>>) -> FlowyResult<Vec<String>> {
let mut applied_migrations = vec![];
let conn = self.sqlite_pool.get()?;
let record = get_all_records(&*conn)?;
let mut duplicated_names = vec![];
for migration in migrations {
if record
.iter()
.find(|record| record.migration_name == migration.name())
.is_none()
{
let migration_name = migration.name().to_string();
if !duplicated_names.contains(&migration_name) {
migration.run(&self.session, &self.collab_db)?;
applied_migrations.push(migration.name().to_string());
save_record(&*conn, &migration_name);
duplicated_names.push(migration_name);
} else {
tracing::error!("Duplicated migration name: {}", migration_name);
}
}
}
Ok(applied_migrations)
}
}
pub trait UserDataMigration {
/// Migration with the same name will be skipped
fn name(&self) -> &str;
fn run(&self, user: &Session, collab_db: &Arc<RocksCollabDB>) -> FlowyResult<()>;
}
fn save_record(conn: &SqliteConnection, migration_name: &str) {
let new_record = NewUserDataMigrationRecord {
migration_name: migration_name.to_string(),
};
diesel::insert_into(user_data_migration_records::table)
.values(&new_record)
.execute(conn)
.expect("Error inserting new migration record");
}
fn get_all_records(conn: &SqliteConnection) -> FlowyResult<Vec<UserDataMigrationRecord>> {
Ok(
user_data_migration_records::table
.load::<UserDataMigrationRecord>(conn)
.unwrap_or_default(),
)
}
#[derive(Clone, Default, Queryable, Identifiable)]
#[table_name = "user_data_migration_records"]
pub struct UserDataMigrationRecord {
pub id: i32,
pub migration_name: String,
pub executed_at: NaiveDateTime,
}
#[derive(Insertable)]
#[table_name = "user_data_migration_records"]
pub struct NewUserDataMigrationRecord {
pub migration_name: String,
}

View File

@ -0,0 +1,6 @@
mod define;
pub mod historical_document;
pub mod local_user_to_cloud;
pub mod migration;
pub use define::*;

View File

@ -1,8 +1,7 @@
pub use user_session::*;
pub mod database;
mod session_serde;
mod user_data_migration;
pub mod session_serde;
mod user_session;
mod user_sql;
mod user_workspace_sql;

View File

@ -20,9 +20,12 @@ use crate::entities::{UserProfilePB, UserSettingPB};
use crate::event_map::{
DefaultUserStatusCallback, SignUpContext, UserCloudServiceProvider, UserStatusCallback,
};
use crate::migrations::historical_document::HistoricalEmptyDocumentMigration;
use crate::migrations::local_user_to_cloud::migration_user_to_cloud;
use crate::migrations::migration::UserLocalDataMigration;
use crate::migrations::UserMigrationContext;
use crate::services::database::UserDB;
use crate::services::session_serde::Session;
use crate::services::user_data_migration::{UserDataMigration, UserMigrationContext};
use crate::services::user_sql::{UserTable, UserTableChangeset};
use crate::services::user_workspace_sql::UserWorkspaceTable;
use crate::{errors::FlowyError, notification::*};
@ -74,6 +77,25 @@ impl UserSession {
pub async fn init<C: UserStatusCallback + 'static>(&self, user_status_callback: C) {
if let Ok(session) = self.get_session() {
match (
self.database.get_collab_db(session.user_id),
self.database.get_pool(session.user_id),
) {
(Ok(collab_db), Ok(sqlite_pool)) => {
match UserLocalDataMigration::new(session.clone(), collab_db, sqlite_pool)
.run(vec![Box::new(HistoricalEmptyDocumentMigration)])
{
Ok(applied_migrations) => {
if applied_migrations.len() > 0 {
tracing::info!("Did apply migrations: {:?}", applied_migrations);
}
},
Err(e) => tracing::error!("User data migration failed: {:?}", e),
}
},
_ => tracing::error!("Failed to get collab db or sqlite pool"),
}
if let Err(e) = user_status_callback
.did_init(session.user_id, &session.user_workspace)
.await
@ -105,15 +127,14 @@ impl UserSession {
.map(|collab_db| Arc::downgrade(&collab_db))
}
pub async fn migrate_old_user_data(
async fn migrate_local_user_to_cloud(
&self,
old_user: &UserMigrationContext,
new_user: &UserMigrationContext,
) -> Result<Option<FolderData>, FlowyError> {
let old_collab_db = self.database.get_collab_db(old_user.session.user_id)?;
let new_collab_db = self.database.get_collab_db(new_user.session.user_id)?;
let folder_data =
UserDataMigration::migration(old_user, &old_collab_db, new_user, &new_collab_db)?;
let folder_data = migration_user_to_cloud(old_user, &old_collab_db, new_user, &new_collab_db)?;
Ok(folder_data)
}
@ -232,7 +253,7 @@ impl UserSession {
old_user.user_profile.id,
new_user.user_profile.id
);
match self.migrate_old_user_data(&old_user, &new_user).await {
match self.migrate_local_user_to_cloud(&old_user, &new_user).await {
Ok(folder_data) => sign_up_context.local_folder = folder_data,
Err(e) => tracing::error!("{:?}", e),
}