chore: versionize migration (#5060)

This commit is contained in:
Nathan.fooo 2024-04-04 13:03:40 +08:00 committed by GitHub
parent 8ade3b5f73
commit 65e7e2347a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 98 additions and 16 deletions

View File

@ -1826,6 +1826,7 @@ dependencies = [
"lib-infra", "lib-infra",
"lib-log", "lib-log",
"parking_lot 0.12.1", "parking_lot 0.12.1",
"semver",
"serde", "serde",
"serde_json", "serde_json",
"serde_repr", "serde_repr",
@ -2199,6 +2200,7 @@ dependencies = [
"once_cell", "once_cell",
"parking_lot 0.12.1", "parking_lot 0.12.1",
"protobuf", "protobuf",
"semver",
"serde", "serde",
"serde_json", "serde_json",
"serde_repr", "serde_repr",

View File

@ -1784,6 +1784,7 @@ dependencies = [
"lib-infra", "lib-infra",
"lib-log", "lib-log",
"parking_lot 0.12.1", "parking_lot 0.12.1",
"semver",
"serde", "serde",
"serde_json", "serde_json",
"serde_repr", "serde_repr",
@ -2173,6 +2174,7 @@ dependencies = [
"quickcheck_macros", "quickcheck_macros",
"rand 0.8.5", "rand 0.8.5",
"rand_core 0.6.4", "rand_core 0.6.4",
"semver",
"serde", "serde",
"serde_json", "serde_json",
"serde_repr", "serde_repr",

View File

@ -48,6 +48,7 @@ serde_repr.workspace = true
futures.workspace = true futures.workspace = true
walkdir = "2.4.0" walkdir = "2.4.0"
sysinfo = "0.30.5" sysinfo = "0.30.5"
semver = "1.0.22"
[features] [features]
default = ["rev-sqlite"] default = ["rev-sqlite"]

View File

@ -1,6 +1,7 @@
#![allow(unused_doc_comments)] #![allow(unused_doc_comments)]
use flowy_storage::ObjectStorageService; use flowy_storage::ObjectStorageService;
use semver::Version;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use sysinfo::System; use sysinfo::System;
@ -93,6 +94,7 @@ impl AppFlowyCore {
server_type, server_type,
Arc::downgrade(&store_preference), Arc::downgrade(&store_preference),
)); ));
let app_version = Version::parse(&config.app_version).unwrap_or_else(|_| Version::new(0, 5, 4));
event!(tracing::Level::DEBUG, "Init managers",); event!(tracing::Level::DEBUG, "Init managers",);
let ( let (
@ -115,6 +117,7 @@ impl AppFlowyCore {
&config.storage_path, &config.storage_path,
&config.application_path, &config.application_path,
&config.device_id, &config.device_id,
app_version,
); );
let authenticate_user = Arc::new(AuthenticateUser::new( let authenticate_user = Arc::new(AuthenticateUser::new(

View File

@ -46,6 +46,7 @@ uuid.workspace = true
chrono = { workspace = true, default-features = false, features = ["clock"] } chrono = { workspace = true, default-features = false, features = ["clock"] }
base64 = "^0.21" base64 = "^0.21"
tokio-stream = "0.1.14" tokio-stream = "0.1.14"
semver = "1.0.22"
[dev-dependencies] [dev-dependencies]
nanoid = "0.4.0" nanoid = "0.4.0"

View File

@ -6,6 +6,7 @@ use collab_document::document::Document;
use collab_document::document_data::default_document_data; use collab_document::document_data::default_document_data;
use collab_folder::{Folder, View}; use collab_folder::{Folder, View};
use collab_plugins::local_storage::kv::KVTransactionDB; use collab_plugins::local_storage::kv::KVTransactionDB;
use semver::Version;
use tracing::{event, instrument}; use tracing::{event, instrument};
use collab_integrate::{CollabKVAction, CollabKVDB, PersistenceError}; use collab_integrate::{CollabKVAction, CollabKVDB, PersistenceError};
@ -24,6 +25,10 @@ impl UserDataMigration for HistoricalEmptyDocumentMigration {
"historical_empty_document" "historical_empty_document"
} }
fn applies_to_version(&self, _version: &Version) -> bool {
true
}
#[instrument(name = "HistoricalEmptyDocumentMigration", skip_all, err)] #[instrument(name = "HistoricalEmptyDocumentMigration", skip_all, err)]
fn run( fn run(
&self, &self,

View File

@ -2,6 +2,7 @@ use std::sync::Arc;
use chrono::NaiveDateTime; use chrono::NaiveDateTime;
use diesel::{RunQueryDsl, SqliteConnection}; use diesel::{RunQueryDsl, SqliteConnection};
use semver::Version;
use collab_integrate::CollabKVDB; use collab_integrate::CollabKVDB;
use flowy_error::FlowyResult; use flowy_error::FlowyResult;
@ -47,6 +48,7 @@ impl UserLocalDataMigration {
self, self,
migrations: Vec<Box<dyn UserDataMigration>>, migrations: Vec<Box<dyn UserDataMigration>>,
authenticator: &Authenticator, authenticator: &Authenticator,
app_version: Option<Version>,
) -> FlowyResult<Vec<String>> { ) -> FlowyResult<Vec<String>> {
let mut applied_migrations = vec![]; let mut applied_migrations = vec![];
let mut conn = self.sqlite_pool.get()?; let mut conn = self.sqlite_pool.get()?;
@ -57,11 +59,17 @@ impl UserLocalDataMigration {
.iter() .iter()
.any(|record| record.migration_name == migration.name()) .any(|record| record.migration_name == migration.name())
{ {
if let Some(app_version) = app_version.as_ref() {
if !migration.applies_to_version(app_version) {
continue;
}
}
let migration_name = migration.name().to_string(); let migration_name = migration.name().to_string();
if !duplicated_names.contains(&migration_name) { if !duplicated_names.contains(&migration_name) {
migration.run(&self.session, &self.collab_db, authenticator)?; migration.run(&self.session, &self.collab_db, authenticator)?;
applied_migrations.push(migration.name().to_string()); applied_migrations.push(migration.name().to_string());
save_record(&mut conn, &migration_name); save_migration_record(&mut conn, &migration_name);
duplicated_names.push(migration_name); duplicated_names.push(migration_name);
} else { } else {
tracing::error!("Duplicated migration name: {}", migration_name); tracing::error!("Duplicated migration name: {}", migration_name);
@ -75,6 +83,9 @@ impl UserLocalDataMigration {
pub trait UserDataMigration { pub trait UserDataMigration {
/// Migration with the same name will be skipped /// Migration with the same name will be skipped
fn name(&self) -> &str; fn name(&self) -> &str;
/// Returns bool value whether the migration should be applied to the current app version
/// true if the migration should be applied, false otherwise
fn applies_to_version(&self, app_version: &Version) -> bool;
fn run( fn run(
&self, &self,
user: &Session, user: &Session,
@ -83,7 +94,7 @@ pub trait UserDataMigration {
) -> FlowyResult<()>; ) -> FlowyResult<()>;
} }
fn save_record(conn: &mut SqliteConnection, migration_name: &str) { pub(crate) fn save_migration_record(conn: &mut SqliteConnection, migration_name: &str) {
let new_record = NewUserDataMigrationRecord { let new_record = NewUserDataMigrationRecord {
migration_name: migration_name.to_string(), migration_name: migration_name.to_string(),
}; };

View File

@ -2,6 +2,7 @@ use std::sync::Arc;
use collab_folder::Folder; use collab_folder::Folder;
use collab_plugins::local_storage::kv::{KVTransactionDB, PersistenceError}; use collab_plugins::local_storage::kv::{KVTransactionDB, PersistenceError};
use semver::Version;
use tracing::instrument; use tracing::instrument;
use collab_integrate::{CollabKVAction, CollabKVDB}; use collab_integrate::{CollabKVAction, CollabKVDB};
@ -22,6 +23,10 @@ impl UserDataMigration for FavoriteV1AndWorkspaceArrayMigration {
"workspace_favorite_v1_and_workspace_array_migration" "workspace_favorite_v1_and_workspace_array_migration"
} }
fn applies_to_version(&self, _app_version: &Version) -> bool {
true
}
#[instrument(name = "FavoriteV1AndWorkspaceArrayMigration", skip_all, err)] #[instrument(name = "FavoriteV1AndWorkspaceArrayMigration", skip_all, err)]
fn run( fn run(
&self, &self,

View File

@ -2,6 +2,7 @@ use std::sync::Arc;
use collab_folder::Folder; use collab_folder::Folder;
use collab_plugins::local_storage::kv::{KVTransactionDB, PersistenceError}; use collab_plugins::local_storage::kv::{KVTransactionDB, PersistenceError};
use semver::Version;
use tracing::instrument; use tracing::instrument;
use collab_integrate::{CollabKVAction, CollabKVDB}; use collab_integrate::{CollabKVAction, CollabKVDB};
@ -20,6 +21,10 @@ impl UserDataMigration for WorkspaceTrashMapToSectionMigration {
"workspace_trash_map_to_section_migration" "workspace_trash_map_to_section_migration"
} }
fn applies_to_version(&self, _app_version: &Version) -> bool {
true
}
#[instrument(name = "WorkspaceTrashMapToSectionMigration", skip_all, err)] #[instrument(name = "WorkspaceTrashMapToSectionMigration", skip_all, err)]
fn run( fn run(
&self, &self,

View File

@ -80,6 +80,7 @@ pub(crate) fn get_appflowy_data_folder_import_context(path: &str) -> anyhow::Res
&imported_user, &imported_user,
imported_collab_db.clone(), imported_collab_db.clone(),
imported_sqlite_db.get_pool(), imported_sqlite_db.get_pool(),
None,
); );
Ok(ImportContext { Ok(ImportContext {

View File

@ -5,6 +5,7 @@ use std::path::PathBuf;
use crate::services::db::UserDBPath; use crate::services::db::UserDBPath;
use base64::engine::general_purpose::PAD; use base64::engine::general_purpose::PAD;
use base64::engine::GeneralPurpose; use base64::engine::GeneralPurpose;
use semver::Version;
pub const URL_SAFE_ENGINE: GeneralPurpose = GeneralPurpose::new(&URL_SAFE, PAD); pub const URL_SAFE_ENGINE: GeneralPurpose = GeneralPurpose::new(&URL_SAFE, PAD);
#[derive(Clone)] #[derive(Clone)]
@ -19,18 +20,26 @@ pub struct UserConfig {
pub device_id: String, pub device_id: String,
/// Used as the key of `Session` when saving session information to KV. /// Used as the key of `Session` when saving session information to KV.
pub(crate) session_cache_key: String, pub(crate) session_cache_key: String,
pub app_version: Version,
} }
impl UserConfig { impl UserConfig {
/// The `root_dir` represents as the root of the user folders. It must be unique for each /// The `root_dir` represents as the root of the user folders. It must be unique for each
/// users. /// users.
pub fn new(name: &str, storage_path: &str, application_path: &str, device_id: &str) -> Self { pub fn new(
name: &str,
storage_path: &str,
application_path: &str,
device_id: &str,
app_version: Version,
) -> Self {
let session_cache_key = format!("{}_session_cache", name); let session_cache_key = format!("{}_session_cache", name);
Self { Self {
storage_path: storage_path.to_owned(), storage_path: storage_path.to_owned(),
application_path: application_path.to_owned(), application_path: application_path.to_owned(),
session_cache_key, session_cache_key,
device_id: device_id.to_owned(), device_id: device_id.to_owned(),
app_version,
} }
} }

View File

@ -11,13 +11,14 @@ use flowy_sqlite::{query_dsl::*, DBConnection, ExpressionMethods};
use flowy_user_pub::cloud::{UserCloudServiceProvider, UserUpdate}; use flowy_user_pub::cloud::{UserCloudServiceProvider, UserUpdate};
use flowy_user_pub::entities::*; use flowy_user_pub::entities::*;
use flowy_user_pub::workspace_service::UserWorkspaceService; use flowy_user_pub::workspace_service::UserWorkspaceService;
use semver::Version;
use serde_json::Value; use serde_json::Value;
use std::string::ToString; use std::string::ToString;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use tokio::sync::{Mutex, RwLock}; use tokio::sync::{Mutex, RwLock};
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tracing::{debug, error, event, info, instrument, warn}; use tracing::{debug, error, event, info, instrument, trace, warn};
use lib_dispatch::prelude::af_spawn; use lib_dispatch::prelude::af_spawn;
use lib_infra::box_any::BoxAny; use lib_infra::box_any::BoxAny;
@ -26,7 +27,9 @@ use crate::anon_user::{migration_anon_user_on_sign_up, sync_supabase_user_data_t
use crate::entities::{AuthStateChangedPB, AuthStatePB, UserProfilePB, UserSettingPB}; use crate::entities::{AuthStateChangedPB, AuthStatePB, UserProfilePB, UserSettingPB};
use crate::event_map::{DefaultUserStatusCallback, UserStatusCallback}; use crate::event_map::{DefaultUserStatusCallback, UserStatusCallback};
use crate::migrations::document_empty_content::HistoricalEmptyDocumentMigration; use crate::migrations::document_empty_content::HistoricalEmptyDocumentMigration;
use crate::migrations::migration::{UserDataMigration, UserLocalDataMigration}; use crate::migrations::migration::{
save_migration_record, UserDataMigration, UserLocalDataMigration,
};
use crate::migrations::workspace_and_favorite_v1::FavoriteV1AndWorkspaceArrayMigration; use crate::migrations::workspace_and_favorite_v1::FavoriteV1AndWorkspaceArrayMigration;
use crate::migrations::workspace_trash_v1::WorkspaceTrashMapToSectionMigration; use crate::migrations::workspace_trash_v1::WorkspaceTrashMapToSectionMigration;
use crate::migrations::AnonUser; use crate::migrations::AnonUser;
@ -246,7 +249,13 @@ impl UserManager {
self.authenticate_user.database.get_pool(session.user_id), self.authenticate_user.database.get_pool(session.user_id),
) { ) {
(Ok(collab_db), Ok(sqlite_pool)) => { (Ok(collab_db), Ok(sqlite_pool)) => {
run_collab_data_migration(&session, &user, collab_db, sqlite_pool); run_collab_data_migration(
&session,
&user,
collab_db,
sqlite_pool,
Some(self.authenticate_user.user_config.app_version.clone()),
);
}, },
_ => error!("Failed to get collab db or sqlite pool"), _ => error!("Failed to get collab db or sqlite pool"),
} }
@ -425,6 +434,17 @@ impl UserManager {
.await?; .await?;
if response.is_new_user { if response.is_new_user {
// For new user, we don't need to run the migrations
if let Ok(pool) = self
.authenticate_user
.database
.get_pool(new_session.user_id)
{
mark_all_migrations_as_applied(&pool);
} else {
error!("Failed to get pool for user {}", new_session.user_id);
}
if let Some(old_user) = migration_user { if let Some(old_user) = migration_user {
event!( event!(
tracing::Level::INFO, tracing::Level::INFO,
@ -827,22 +847,39 @@ fn remove_user_token(uid: i64, mut conn: DBConnection) -> FlowyResult<()> {
Ok(()) Ok(())
} }
fn collab_migration_list() -> Vec<Box<dyn UserDataMigration>> {
// ⚠The order of migrations is crucial. If you're adding a new migration, please ensure
// it's appended to the end of the list.
vec![
Box::new(HistoricalEmptyDocumentMigration),
Box::new(FavoriteV1AndWorkspaceArrayMigration),
Box::new(WorkspaceTrashMapToSectionMigration),
]
}
fn mark_all_migrations_as_applied(sqlite_pool: &Arc<ConnectionPool>) {
if let Ok(mut conn) = sqlite_pool.get() {
for migration in collab_migration_list() {
save_migration_record(&mut conn, migration.name());
}
info!("Mark all migrations as applied");
}
}
pub(crate) fn run_collab_data_migration( pub(crate) fn run_collab_data_migration(
session: &Session, session: &Session,
user: &UserProfile, user: &UserProfile,
collab_db: Arc<CollabKVDB>, collab_db: Arc<CollabKVDB>,
sqlite_pool: Arc<ConnectionPool>, sqlite_pool: Arc<ConnectionPool>,
version: Option<Version>,
) { ) {
// ⚠The order of migrations is crucial. If you're adding a new migration, please ensure trace!("Run collab data migration: {:?}", version);
// it's appended to the end of the list. let migrations = collab_migration_list();
let migrations: Vec<Box<dyn UserDataMigration>> = vec![ match UserLocalDataMigration::new(session.clone(), collab_db, sqlite_pool).run(
Box::new(HistoricalEmptyDocumentMigration), migrations,
Box::new(FavoriteV1AndWorkspaceArrayMigration), &user.authenticator,
Box::new(WorkspaceTrashMapToSectionMigration), version,
]; ) {
match UserLocalDataMigration::new(session.clone(), collab_db, sqlite_pool)
.run(migrations, &user.authenticator)
{
Ok(applied_migrations) => { Ok(applied_migrations) => {
if !applied_migrations.is_empty() { if !applied_migrations.is_empty() {
info!("Did apply migrations: {:?}", applied_migrations); info!("Did apply migrations: {:?}", applied_migrations);