mirror of
https://github.com/AppFlowy-IO/AppFlowy.git
synced 2024-08-30 18:12:39 +00:00
feat: migration database (#2009)
* feat: migration database * ci: fix tauri ci * feat: migrate database view * ci: fix ci
This commit is contained in:
parent
1dbfd838ef
commit
b21ee5d2de
@ -27,6 +27,7 @@
|
||||
"is-hotkey": "^0.2.0",
|
||||
"jest": "^29.5.0",
|
||||
"nanoid": "^4.0.0",
|
||||
"protoc-gen-ts": "^0.8.5",
|
||||
"react": "^18.2.0",
|
||||
"react-dom": "^18.2.0",
|
||||
"react-error-boundary": "^3.1.4",
|
||||
|
@ -20,11 +20,15 @@ pub struct FlowyConfig {
|
||||
}
|
||||
|
||||
fn default_proto_output() -> String {
|
||||
"resources/proto".to_owned()
|
||||
let mut path = PathBuf::from("resources");
|
||||
path.push("proto");
|
||||
path.to_str().unwrap().to_owned()
|
||||
}
|
||||
|
||||
fn default_protobuf_crate() -> String {
|
||||
"src/protobuf".to_owned()
|
||||
let mut path = PathBuf::from("src");
|
||||
path.push("protobuf");
|
||||
path.to_str().unwrap().to_owned()
|
||||
}
|
||||
|
||||
impl FlowyConfig {
|
||||
|
@ -127,6 +127,15 @@ fn generate_ts_protobuf_files(
|
||||
}
|
||||
let protoc_bin_path = protoc_bin_path.to_str().unwrap().to_owned();
|
||||
paths.iter().for_each(|path| {
|
||||
// if let Err(err) = Command::new(protoc_bin_path.clone())
|
||||
// .arg(format!("--ts_out={}", output.to_str().unwrap()))
|
||||
// .arg(format!("--proto_path={}", proto_file_output_path))
|
||||
// .arg(path)
|
||||
// .spawn()
|
||||
// {
|
||||
// panic!("Generate ts pb file failed: {}, {:?}", path, err);
|
||||
// }
|
||||
|
||||
let result = cmd_lib::run_cmd! {
|
||||
${protoc_bin_path} --ts_out=${output} --proto_path=${proto_file_output_path} ${path}
|
||||
};
|
||||
|
@ -60,12 +60,12 @@ impl FolderDepsResolver {
|
||||
.await,
|
||||
);
|
||||
|
||||
if let (Ok(user_id), Ok(token)) = (user.user_id(), user.token()) {
|
||||
match folder_manager.initialize(&user_id, &token).await {
|
||||
Ok(_) => {},
|
||||
Err(e) => tracing::error!("Initialize folder manager failed: {}", e),
|
||||
}
|
||||
}
|
||||
// if let (Ok(user_id), Ok(token)) = (user.user_id(), user.token()) {
|
||||
// match folder_manager.initialize(&user_id, &token).await {
|
||||
// Ok(_) => {},
|
||||
// Err(e) => tracing::error!("Initialize folder manager failed: {}", e),
|
||||
// }
|
||||
// }
|
||||
|
||||
let receiver = Arc::new(FolderWSMessageReceiverImpl(folder_manager.clone()));
|
||||
ws_conn.add_ws_message_receiver(receiver).unwrap();
|
||||
@ -339,7 +339,7 @@ impl ViewDataProcessor for DatabaseViewDataProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
fn layout_type_from_view_layout(layout: ViewLayoutTypePB) -> LayoutTypePB {
|
||||
pub fn layout_type_from_view_layout(layout: ViewLayoutTypePB) -> LayoutTypePB {
|
||||
match layout {
|
||||
ViewLayoutTypePB::Grid => LayoutTypePB::Grid,
|
||||
ViewLayoutTypePB::Board => LayoutTypePB::Board,
|
||||
|
@ -25,21 +25,12 @@ impl DatabaseDepsResolver {
|
||||
) -> Arc<DatabaseManager> {
|
||||
let user = Arc::new(GridUserImpl(user_session.clone()));
|
||||
let rev_web_socket = Arc::new(GridRevisionWebSocket(ws_conn));
|
||||
let database_manager = Arc::new(DatabaseManager::new(
|
||||
user.clone(),
|
||||
Arc::new(DatabaseManager::new(
|
||||
user,
|
||||
rev_web_socket,
|
||||
task_scheduler,
|
||||
Arc::new(DatabaseDBConnectionImpl(user_session)),
|
||||
));
|
||||
|
||||
if let (Ok(user_id), Ok(token)) = (user.user_id(), user.token()) {
|
||||
match database_manager.initialize(&user_id, &token).await {
|
||||
Ok(_) => {},
|
||||
Err(e) => tracing::error!("Initialize grid manager failed: {}", e),
|
||||
}
|
||||
}
|
||||
|
||||
database_manager
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6,7 +6,7 @@ use flowy_database::manager::DatabaseManager;
|
||||
use flowy_document::entities::DocumentVersionPB;
|
||||
use flowy_document::{DocumentConfig, DocumentManager};
|
||||
use flowy_error::FlowyResult;
|
||||
use flowy_folder::entities::ViewDataFormatPB;
|
||||
use flowy_folder::entities::{ViewDataFormatPB, ViewLayoutTypePB};
|
||||
use flowy_folder::{errors::FlowyError, manager::FolderManager};
|
||||
pub use flowy_net::get_client_server_configuration;
|
||||
use flowy_net::local_server::LocalServer;
|
||||
@ -17,6 +17,7 @@ use flowy_user::services::{UserSession, UserSessionConfig};
|
||||
use lib_dispatch::prelude::*;
|
||||
use lib_dispatch::runtime::tokio_default_runtime;
|
||||
|
||||
use flowy_database::entities::LayoutTypePB;
|
||||
use lib_infra::future::{to_fut, Fut};
|
||||
use module::make_plugins;
|
||||
pub use module::*;
|
||||
@ -310,7 +311,36 @@ impl UserStatusListener {
|
||||
async fn did_sign_in(&self, token: &str, user_id: &str) -> FlowyResult<()> {
|
||||
self.folder_manager.initialize(user_id, token).await?;
|
||||
self.document_manager.initialize(user_id).await?;
|
||||
self.database_manager.initialize(user_id, token).await?;
|
||||
|
||||
let cloned_folder_manager = self.folder_manager.clone();
|
||||
let get_views_fn = to_fut(async move {
|
||||
cloned_folder_manager
|
||||
.get_current_workspace()
|
||||
.await
|
||||
.map(|workspace| {
|
||||
workspace
|
||||
.apps
|
||||
.items
|
||||
.into_iter()
|
||||
.flat_map(|app| app.belongings.items)
|
||||
.flat_map(|view| match view.layout {
|
||||
ViewLayoutTypePB::Grid | ViewLayoutTypePB::Board | ViewLayoutTypePB::Calendar => {
|
||||
Some((
|
||||
view.id,
|
||||
view.name,
|
||||
layout_type_from_view_layout(view.layout),
|
||||
))
|
||||
},
|
||||
_ => None,
|
||||
})
|
||||
.collect::<Vec<(String, String, LayoutTypePB)>>()
|
||||
})
|
||||
.unwrap_or_default()
|
||||
});
|
||||
self
|
||||
.database_manager
|
||||
.initialize(user_id, token, get_views_fn)
|
||||
.await?;
|
||||
self
|
||||
.ws_conn
|
||||
.start(token.to_owned(), user_id.to_owned())
|
||||
|
@ -7,9 +7,7 @@ use crate::services::database_view::{
|
||||
make_database_view_rev_manager, make_database_view_revision_pad, DatabaseViewEditor,
|
||||
};
|
||||
use crate::services::persistence::block_index::BlockRowIndexer;
|
||||
use crate::services::persistence::database_ref::{
|
||||
DatabaseInfo, DatabaseRefIndexer, DatabaseViewRef,
|
||||
};
|
||||
use crate::services::persistence::database_ref::{DatabaseInfo, DatabaseRefs, DatabaseViewRef};
|
||||
use crate::services::persistence::kv::DatabaseKVPersistence;
|
||||
use crate::services::persistence::migration::DatabaseMigration;
|
||||
use crate::services::persistence::rev_sqlite::{
|
||||
@ -31,6 +29,7 @@ use flowy_revision::{
|
||||
use flowy_sqlite::ConnectionPool;
|
||||
use flowy_task::TaskDispatcher;
|
||||
|
||||
use lib_infra::future::Fut;
|
||||
use revision_model::Revision;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
@ -45,7 +44,7 @@ pub struct DatabaseManager {
|
||||
editors_by_database_id: RwLock<HashMap<String, Arc<DatabaseEditor>>>,
|
||||
database_user: Arc<dyn DatabaseUser>,
|
||||
block_indexer: Arc<BlockRowIndexer>,
|
||||
database_ref_indexer: Arc<DatabaseRefIndexer>,
|
||||
database_refs: Arc<DatabaseRefs>,
|
||||
#[allow(dead_code)]
|
||||
kv_persistence: Arc<DatabaseKVPersistence>,
|
||||
task_scheduler: Arc<RwLock<TaskDispatcher>>,
|
||||
@ -63,30 +62,30 @@ impl DatabaseManager {
|
||||
let editors_by_database_id = RwLock::new(HashMap::new());
|
||||
let kv_persistence = Arc::new(DatabaseKVPersistence::new(database_db.clone()));
|
||||
let block_indexer = Arc::new(BlockRowIndexer::new(database_db.clone()));
|
||||
let database_ref_indexer = Arc::new(DatabaseRefIndexer::new(database_db.clone()));
|
||||
let migration = DatabaseMigration::new(
|
||||
database_user.clone(),
|
||||
database_db,
|
||||
database_ref_indexer.clone(),
|
||||
);
|
||||
let database_refs = Arc::new(DatabaseRefs::new(database_db));
|
||||
let migration = DatabaseMigration::new(database_user.clone(), database_refs.clone());
|
||||
Self {
|
||||
editors_by_database_id,
|
||||
database_user,
|
||||
kv_persistence,
|
||||
block_indexer,
|
||||
database_ref_indexer,
|
||||
database_refs,
|
||||
task_scheduler,
|
||||
migration,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn initialize_with_new_user(&self, user_id: &str, _token: &str) -> FlowyResult<()> {
|
||||
self.migration.run(user_id).await?;
|
||||
pub async fn initialize_with_new_user(&self, _user_id: &str, _token: &str) -> FlowyResult<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn initialize(&self, user_id: &str, _token: &str) -> FlowyResult<()> {
|
||||
self.migration.run(user_id).await?;
|
||||
pub async fn initialize(
|
||||
&self,
|
||||
user_id: &str,
|
||||
_token: &str,
|
||||
get_views_fn: Fut<Vec<(String, String, LayoutTypePB)>>,
|
||||
) -> FlowyResult<()> {
|
||||
self.migration.run(user_id, get_views_fn).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -100,7 +99,7 @@ impl DatabaseManager {
|
||||
) -> FlowyResult<()> {
|
||||
let db_pool = self.database_user.db_pool()?;
|
||||
let _ = self
|
||||
.database_ref_indexer
|
||||
.database_refs
|
||||
.bind(database_id, view_id.as_ref(), true, name);
|
||||
let rev_manager = self.make_database_rev_manager(database_id, db_pool)?;
|
||||
rev_manager.reset_object(revisions).await?;
|
||||
@ -115,7 +114,9 @@ impl DatabaseManager {
|
||||
revisions: Vec<Revision>,
|
||||
) -> FlowyResult<()> {
|
||||
let view_id = view_id.as_ref();
|
||||
let rev_manager = make_database_view_rev_manager(&self.database_user, view_id).await?;
|
||||
let user_id = self.database_user.user_id()?;
|
||||
let pool = self.database_user.db_pool()?;
|
||||
let rev_manager = make_database_view_rev_manager(&user_id, pool, view_id).await?;
|
||||
rev_manager.reset_object(revisions).await?;
|
||||
Ok(())
|
||||
}
|
||||
@ -131,12 +132,13 @@ impl DatabaseManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all, err)]
|
||||
pub async fn open_database_view<T: AsRef<str>>(
|
||||
&self,
|
||||
view_id: T,
|
||||
) -> FlowyResult<Arc<DatabaseEditor>> {
|
||||
let view_id = view_id.as_ref();
|
||||
let database_info = self.database_ref_indexer.get_database_with_view(view_id)?;
|
||||
let database_info = self.database_refs.get_database_with_view(view_id)?;
|
||||
self
|
||||
.get_or_create_database_editor(&database_info.database_id, view_id)
|
||||
.await
|
||||
@ -145,7 +147,7 @@ impl DatabaseManager {
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
pub async fn close_database_view<T: AsRef<str>>(&self, view_id: T) -> FlowyResult<()> {
|
||||
let view_id = view_id.as_ref();
|
||||
let database_info = self.database_ref_indexer.get_database_with_view(view_id)?;
|
||||
let database_info = self.database_refs.get_database_with_view(view_id)?;
|
||||
tracing::Span::current().record("database_id", &database_info.database_id);
|
||||
|
||||
// Create a temporary reference database_editor in case of holding the write lock
|
||||
@ -174,7 +176,7 @@ impl DatabaseManager {
|
||||
|
||||
// #[tracing::instrument(level = "debug", skip(self), err)]
|
||||
pub async fn get_database_editor(&self, view_id: &str) -> FlowyResult<Arc<DatabaseEditor>> {
|
||||
let database_info = self.database_ref_indexer.get_database_with_view(view_id)?;
|
||||
let database_info = self.database_refs.get_database_with_view(view_id)?;
|
||||
let database_editor = self
|
||||
.editors_by_database_id
|
||||
.read()
|
||||
@ -191,16 +193,14 @@ impl DatabaseManager {
|
||||
}
|
||||
|
||||
pub async fn get_databases(&self) -> FlowyResult<Vec<DatabaseInfo>> {
|
||||
self.database_ref_indexer.get_all_databases()
|
||||
self.database_refs.get_all_databases()
|
||||
}
|
||||
|
||||
pub async fn get_database_ref_views(
|
||||
&self,
|
||||
database_id: &str,
|
||||
) -> FlowyResult<Vec<DatabaseViewRef>> {
|
||||
self
|
||||
.database_ref_indexer
|
||||
.get_ref_views_with_database(database_id)
|
||||
self.database_refs.get_ref_views_with_database(database_id)
|
||||
}
|
||||
|
||||
async fn get_or_create_database_editor(
|
||||
@ -282,7 +282,7 @@ impl DatabaseManager {
|
||||
database_pad,
|
||||
rev_manager,
|
||||
self.block_indexer.clone(),
|
||||
self.database_ref_indexer.clone(),
|
||||
self.database_refs.clone(),
|
||||
self.task_scheduler.clone(),
|
||||
)
|
||||
.await?;
|
||||
@ -359,7 +359,7 @@ pub async fn link_existing_database(
|
||||
.await?;
|
||||
|
||||
let _ = database_manager
|
||||
.database_ref_indexer
|
||||
.database_refs
|
||||
.bind(database_id, view_id, false, &name);
|
||||
Ok(())
|
||||
}
|
||||
@ -429,13 +429,13 @@ pub async fn create_new_database(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl DatabaseRefIndexerQuery for DatabaseRefIndexer {
|
||||
impl DatabaseRefIndexerQuery for DatabaseRefs {
|
||||
fn get_ref_views(&self, database_id: &str) -> FlowyResult<Vec<DatabaseViewRef>> {
|
||||
self.get_ref_views_with_database(database_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl DatabaseRefIndexerQuery for Arc<DatabaseRefIndexer> {
|
||||
impl DatabaseRefIndexerQuery for Arc<DatabaseRefs> {
|
||||
fn get_ref_views(&self, database_id: &str) -> FlowyResult<Vec<DatabaseViewRef>> {
|
||||
(**self).get_ref_views(database_id)
|
||||
}
|
||||
|
@ -695,6 +695,7 @@ impl DatabaseEditor {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(self), err)]
|
||||
pub async fn get_database(&self, view_id: &str) -> FlowyResult<DatabasePB> {
|
||||
let pad = self.database_pad.read().await;
|
||||
let fields = pad
|
||||
|
@ -313,7 +313,9 @@ impl DatabaseViews {
|
||||
}
|
||||
|
||||
async fn make_view_editor(&self, view_id: &str) -> FlowyResult<DatabaseViewEditor> {
|
||||
let rev_manager = make_database_view_rev_manager(&self.user, view_id).await?;
|
||||
let user_id = self.user.user_id()?;
|
||||
let pool = self.user.db_pool()?;
|
||||
let rev_manager = make_database_view_rev_manager(&user_id, pool, view_id).await?;
|
||||
let user_id = self.user.user_id()?;
|
||||
let token = self.user.token()?;
|
||||
let view_id = view_id.to_owned();
|
||||
@ -338,7 +340,9 @@ pub async fn make_database_view_revision_pad(
|
||||
DatabaseViewRevisionPad,
|
||||
RevisionManager<Arc<ConnectionPool>>,
|
||||
)> {
|
||||
let mut rev_manager = make_database_view_rev_manager(&user, view_id).await?;
|
||||
let user_id = user.user_id()?;
|
||||
let pool = user.db_pool()?;
|
||||
let mut rev_manager = make_database_view_rev_manager(&user_id, pool, view_id).await?;
|
||||
let view_rev_pad = rev_manager
|
||||
.initialize::<DatabaseViewRevisionSerde>(None)
|
||||
.await?;
|
||||
@ -346,16 +350,14 @@ pub async fn make_database_view_revision_pad(
|
||||
}
|
||||
|
||||
pub async fn make_database_view_rev_manager(
|
||||
user: &Arc<dyn DatabaseUser>,
|
||||
user_id: &str,
|
||||
pool: Arc<ConnectionPool>,
|
||||
view_id: &str,
|
||||
) -> FlowyResult<RevisionManager<Arc<ConnectionPool>>> {
|
||||
let user_id = user.user_id()?;
|
||||
|
||||
// Create revision persistence
|
||||
let pool = user.db_pool()?;
|
||||
let disk_cache = SQLiteDatabaseViewRevisionPersistence::new(&user_id, pool.clone());
|
||||
let disk_cache = SQLiteDatabaseViewRevisionPersistence::new(user_id, pool.clone());
|
||||
let configuration = RevisionPersistenceConfiguration::new(2, false);
|
||||
let rev_persistence = RevisionPersistence::new(&user_id, view_id, disk_cache, configuration);
|
||||
let rev_persistence = RevisionPersistence::new(user_id, view_id, disk_cache, configuration);
|
||||
|
||||
// Create snapshot persistence
|
||||
const DATABASE_VIEW_SP_PREFIX: &str = "grid_view";
|
||||
@ -365,7 +367,7 @@ pub async fn make_database_view_rev_manager(
|
||||
|
||||
let rev_compress = DatabaseViewRevisionMergeable();
|
||||
Ok(RevisionManager::new(
|
||||
&user_id,
|
||||
user_id,
|
||||
view_id,
|
||||
rev_persistence,
|
||||
rev_compress,
|
||||
|
@ -75,7 +75,7 @@ impl DateTypeOptionPB {
|
||||
|
||||
let time = if include_time {
|
||||
let fmt = self.time_format.format_str();
|
||||
format!("{}", naive.format_with_items(StrftimeItems::new(&fmt)))
|
||||
format!("{}", naive.format_with_items(StrftimeItems::new(fmt)))
|
||||
} else {
|
||||
"".to_string()
|
||||
};
|
||||
@ -95,8 +95,7 @@ impl DateTypeOptionPB {
|
||||
) -> FlowyResult<i64> {
|
||||
if let Some(time_str) = time_str.as_ref() {
|
||||
if !time_str.is_empty() {
|
||||
let naive_time =
|
||||
chrono::NaiveTime::parse_from_str(&time_str, self.time_format.format_str());
|
||||
let naive_time = chrono::NaiveTime::parse_from_str(time_str, self.time_format.format_str());
|
||||
|
||||
match naive_time {
|
||||
Ok(naive_time) => {
|
||||
|
@ -7,11 +7,11 @@ use flowy_sqlite::{
|
||||
};
|
||||
use std::sync::Arc;
|
||||
|
||||
pub struct DatabaseRefIndexer {
|
||||
pub struct DatabaseRefs {
|
||||
database: Arc<dyn DatabaseDBConnection>,
|
||||
}
|
||||
|
||||
impl DatabaseRefIndexer {
|
||||
impl DatabaseRefs {
|
||||
pub fn new(database: Arc<dyn DatabaseDBConnection>) -> Self {
|
||||
Self { database }
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
#![allow(clippy::all)]
|
||||
#![allow(dead_code)]
|
||||
#![allow(unused_variables)]
|
||||
use crate::services::persistence::migration::MigratedDatabase;
|
||||
use crate::services::persistence::rev_sqlite::SQLiteDatabaseRevisionPersistence;
|
||||
use bytes::Bytes;
|
||||
use database_model::DatabaseRevision;
|
||||
@ -15,31 +16,36 @@ use lib_infra::util::md5;
|
||||
use revision_model::Revision;
|
||||
use std::sync::Arc;
|
||||
|
||||
const V1_MIGRATION: &str = "GRID_V1_MIGRATION";
|
||||
const V1_MIGRATION: &str = "DATABASE_V1_MIGRATION";
|
||||
pub fn is_database_rev_migrated(user_id: &str) -> bool {
|
||||
let key = migration_flag_key(&user_id, V1_MIGRATION);
|
||||
KV::get_bool(&key)
|
||||
}
|
||||
|
||||
pub async fn migration_database_rev_struct(
|
||||
pub(crate) async fn migration_database_rev_struct(
|
||||
user_id: &str,
|
||||
database_id: &str,
|
||||
databases: &Vec<MigratedDatabase>,
|
||||
pool: Arc<ConnectionPool>,
|
||||
) -> FlowyResult<()> {
|
||||
let key = migration_flag_key(&user_id, V1_MIGRATION, database_id);
|
||||
if KV::get_bool(&key) {
|
||||
if is_database_rev_migrated(user_id) || databases.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let object = DatabaseRevisionResettable {
|
||||
database_id: database_id.to_owned(),
|
||||
};
|
||||
let disk_cache = SQLiteDatabaseRevisionPersistence::new(&user_id, pool);
|
||||
let reset = RevisionStructReset::new(&user_id, object, Arc::new(disk_cache));
|
||||
reset.run().await?;
|
||||
|
||||
tracing::trace!("Run database:{} v1 migration", database_id);
|
||||
tracing::debug!("Migrate databases");
|
||||
for database in databases {
|
||||
let object = DatabaseRevisionResettable {
|
||||
database_id: database.view_id.clone(),
|
||||
};
|
||||
let disk_cache = SQLiteDatabaseRevisionPersistence::new(&user_id, pool.clone());
|
||||
let reset = RevisionStructReset::new(&user_id, object, Arc::new(disk_cache));
|
||||
reset.run().await?;
|
||||
}
|
||||
let key = migration_flag_key(&user_id, V1_MIGRATION);
|
||||
KV::set_bool(&key, true);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn migration_flag_key(user_id: &str, version: &str, grid_id: &str) -> String {
|
||||
md5(format!("{}{}{}", user_id, version, grid_id,))
|
||||
fn migration_flag_key(user_id: &str, version: &str) -> String {
|
||||
md5(format!("{}{}", user_id, version,))
|
||||
}
|
||||
|
||||
struct DatabaseRevisionResettable {
|
@ -1,47 +0,0 @@
|
||||
use crate::manager::DatabaseUser;
|
||||
use crate::services::database_view::make_database_view_revision_pad;
|
||||
use crate::services::persistence::database_ref::DatabaseRefIndexer;
|
||||
|
||||
use flowy_error::{FlowyError, FlowyResult};
|
||||
use flowy_sqlite::kv::KV;
|
||||
|
||||
use flowy_sqlite::{
|
||||
prelude::*,
|
||||
schema::{grid_view_rev_table, grid_view_rev_table::dsl},
|
||||
};
|
||||
use lib_infra::util::md5;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
const DATABASE_REF_INDEXING: &str = "database_ref_indexing";
|
||||
|
||||
pub async fn indexing_database_view_refs(
|
||||
user_id: &str,
|
||||
user: Arc<dyn DatabaseUser>,
|
||||
database_ref_indexer: Arc<DatabaseRefIndexer>,
|
||||
) -> FlowyResult<()> {
|
||||
let key = md5(format!("{}{}", user_id, DATABASE_REF_INDEXING));
|
||||
if KV::get_bool(&key) {
|
||||
return Ok(());
|
||||
}
|
||||
tracing::trace!("Indexing database view refs");
|
||||
let pool = user.db_pool()?;
|
||||
let view_ids = dsl::grid_view_rev_table
|
||||
.select(grid_view_rev_table::object_id)
|
||||
.distinct()
|
||||
.load::<String>(&*pool.get().map_err(|e| FlowyError::internal().context(e))?)?;
|
||||
|
||||
for view_id in view_ids {
|
||||
if let Ok((pad, _)) = make_database_view_revision_pad(&view_id, user.clone()).await {
|
||||
tracing::trace!(
|
||||
"Indexing database:{} with view:{}",
|
||||
pad.database_id,
|
||||
pad.view_id
|
||||
);
|
||||
let _ = database_ref_indexer.bind(&pad.database_id, &pad.view_id, true, &pad.name);
|
||||
}
|
||||
}
|
||||
|
||||
KV::set_bool(&key, true);
|
||||
Ok(())
|
||||
}
|
@ -0,0 +1,147 @@
|
||||
use crate::services::database_view::make_database_view_rev_manager;
|
||||
use crate::services::persistence::database_ref::DatabaseRefs;
|
||||
use flowy_error::FlowyResult;
|
||||
use flowy_sqlite::kv::KV;
|
||||
|
||||
use crate::services::persistence::migration::MigratedDatabase;
|
||||
use crate::services::persistence::rev_sqlite::SQLiteDatabaseViewRevisionPersistence;
|
||||
use bytes::Bytes;
|
||||
use database_model::DatabaseViewRevision;
|
||||
use flowy_client_sync::client_database::{
|
||||
make_database_view_operations, make_database_view_rev_json_str, DatabaseViewOperationsBuilder,
|
||||
DatabaseViewRevisionPad,
|
||||
};
|
||||
use flowy_revision::reset::{RevisionResettable, RevisionStructReset};
|
||||
use flowy_sqlite::{
|
||||
prelude::*,
|
||||
schema::{grid_view_rev_table, grid_view_rev_table::dsl},
|
||||
};
|
||||
use lib_infra::util::md5;
|
||||
use revision_model::Revision;
|
||||
use std::sync::Arc;
|
||||
|
||||
const DATABASE_VIEW_MIGRATE: &str = "database_view_migrate";
|
||||
|
||||
pub fn is_database_view_migrated(user_id: &str) -> bool {
|
||||
let key = md5(format!("{}{}", user_id, DATABASE_VIEW_MIGRATE));
|
||||
KV::get_bool(&key)
|
||||
}
|
||||
|
||||
pub(crate) async fn migrate_database_view(
|
||||
user_id: &str,
|
||||
database_refs: Arc<DatabaseRefs>,
|
||||
migrated_databases: &Vec<MigratedDatabase>,
|
||||
pool: Arc<ConnectionPool>,
|
||||
) -> FlowyResult<()> {
|
||||
if is_database_view_migrated(user_id) || migrated_databases.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut database_with_view = vec![];
|
||||
|
||||
let database_without_view = {
|
||||
let conn = pool.get()?;
|
||||
let databases = migrated_databases
|
||||
.iter()
|
||||
.filter(|database| {
|
||||
let predicate = grid_view_rev_table::object_id.eq(&database.view_id);
|
||||
let exist = diesel::dsl::exists(dsl::grid_view_rev_table.filter(predicate));
|
||||
match select(exist).get_result::<bool>(&*conn) {
|
||||
Ok(is_exist) => {
|
||||
if is_exist {
|
||||
database_with_view.push((**database).clone())
|
||||
}
|
||||
!is_exist
|
||||
},
|
||||
Err(_) => true,
|
||||
}
|
||||
})
|
||||
.collect::<Vec<&MigratedDatabase>>();
|
||||
drop(conn);
|
||||
databases
|
||||
};
|
||||
|
||||
// Create database view if it's not exist.
|
||||
for database in database_without_view {
|
||||
tracing::debug!("[Migration]: create database view: {}", database.view_id);
|
||||
let database_id = database.view_id.clone();
|
||||
let database_view_id = database.view_id.clone();
|
||||
//
|
||||
let database_view_rev = DatabaseViewRevision::new(
|
||||
database_id,
|
||||
database_view_id.clone(),
|
||||
true,
|
||||
database.name.clone(),
|
||||
database.layout.clone(),
|
||||
);
|
||||
let database_view_ops = make_database_view_operations(&database_view_rev);
|
||||
let database_view_bytes = database_view_ops.json_bytes();
|
||||
let revision = Revision::initial_revision(&database_view_id, database_view_bytes);
|
||||
let rev_manager =
|
||||
make_database_view_rev_manager(user_id, pool.clone(), &database_view_id).await?;
|
||||
rev_manager.reset_object(vec![revision]).await?;
|
||||
}
|
||||
|
||||
// Reset existing database view
|
||||
for database in database_with_view {
|
||||
let object = DatabaseViewRevisionResettable {
|
||||
database_view_id: database.view_id.clone(),
|
||||
};
|
||||
let disk_cache = SQLiteDatabaseViewRevisionPersistence::new(user_id, pool.clone());
|
||||
let reset = RevisionStructReset::new(user_id, object, Arc::new(disk_cache));
|
||||
reset.run().await?;
|
||||
}
|
||||
|
||||
tracing::debug!("[Migration]: Add database view refs");
|
||||
for database in migrated_databases {
|
||||
// Bind the database with database view id. For historical reasons,
|
||||
// the default database_id is empty, so the view_id will be used
|
||||
// as the database_id.
|
||||
let database_id = database.view_id.clone();
|
||||
let database_view_id = database.view_id.clone();
|
||||
tracing::debug!(
|
||||
"Bind database:{} with view:{}",
|
||||
database_id,
|
||||
database_view_id
|
||||
);
|
||||
let _ = database_refs.bind(&database_id, &database_view_id, true, &database.name);
|
||||
}
|
||||
|
||||
let key = md5(format!("{}{}", user_id, DATABASE_VIEW_MIGRATE));
|
||||
KV::set_bool(&key, true);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct DatabaseViewRevisionResettable {
|
||||
database_view_id: String,
|
||||
}
|
||||
|
||||
impl RevisionResettable for DatabaseViewRevisionResettable {
|
||||
fn target_id(&self) -> &str {
|
||||
&self.database_view_id
|
||||
}
|
||||
|
||||
fn reset_data(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
|
||||
let pad = DatabaseViewRevisionPad::from_revisions(revisions)?;
|
||||
let json = pad.json_str()?;
|
||||
let bytes = DatabaseViewOperationsBuilder::new()
|
||||
.insert(&json)
|
||||
.build()
|
||||
.json_bytes();
|
||||
Ok(bytes)
|
||||
}
|
||||
|
||||
fn default_target_rev_str(&self) -> FlowyResult<String> {
|
||||
let database_view_rev = DatabaseViewRevision::default();
|
||||
let json = make_database_view_rev_json_str(&database_view_rev)?;
|
||||
Ok(json)
|
||||
}
|
||||
|
||||
fn read_record(&self) -> Option<String> {
|
||||
KV::get_str(self.target_id())
|
||||
}
|
||||
|
||||
fn set_record(&self, record: String) {
|
||||
KV::set_str(self.target_id(), record);
|
||||
}
|
||||
}
|
@ -1,49 +1,69 @@
|
||||
mod database_ref_indexing;
|
||||
mod database_rev_struct_migration;
|
||||
|
||||
mod database_migration;
|
||||
mod database_view_migration;
|
||||
use crate::entities::LayoutTypePB;
|
||||
use crate::manager::DatabaseUser;
|
||||
use crate::services::persistence::database_ref::DatabaseRefIndexer;
|
||||
use crate::services::persistence::migration::database_ref_indexing::indexing_database_view_refs;
|
||||
use crate::services::persistence::migration::database_rev_struct_migration::migration_database_rev_struct;
|
||||
use crate::services::persistence::DatabaseDBConnection;
|
||||
use crate::services::persistence::database_ref::DatabaseRefs;
|
||||
use crate::services::persistence::migration::database_migration::{
|
||||
is_database_rev_migrated, migration_database_rev_struct,
|
||||
};
|
||||
use crate::services::persistence::migration::database_view_migration::{
|
||||
is_database_view_migrated, migrate_database_view,
|
||||
};
|
||||
use database_model::LayoutRevision;
|
||||
use flowy_error::FlowyResult;
|
||||
use lib_infra::future::Fut;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub(crate) struct DatabaseMigration {
|
||||
#[allow(dead_code)]
|
||||
user: Arc<dyn DatabaseUser>,
|
||||
database: Arc<dyn DatabaseDBConnection>,
|
||||
database_ref_indexer: Arc<DatabaseRefIndexer>,
|
||||
database_refs: Arc<DatabaseRefs>,
|
||||
}
|
||||
|
||||
impl DatabaseMigration {
|
||||
pub fn new(
|
||||
user: Arc<dyn DatabaseUser>,
|
||||
database: Arc<dyn DatabaseDBConnection>,
|
||||
database_ref_indexer: Arc<DatabaseRefIndexer>,
|
||||
) -> Self {
|
||||
pub fn new(user: Arc<dyn DatabaseUser>, database_refs: Arc<DatabaseRefs>) -> Self {
|
||||
Self {
|
||||
user,
|
||||
database,
|
||||
database_ref_indexer,
|
||||
database_refs,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run(&self, user_id: &str) -> FlowyResult<()> {
|
||||
let _ = indexing_database_view_refs(
|
||||
user_id,
|
||||
self.user.clone(),
|
||||
self.database_ref_indexer.clone(),
|
||||
)
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
pub async fn run(
|
||||
&self,
|
||||
user_id: &str,
|
||||
get_views_fn: Fut<Vec<(String, String, LayoutTypePB)>>,
|
||||
) -> FlowyResult<()> {
|
||||
let pool = self.user.db_pool()?;
|
||||
|
||||
if !is_database_view_migrated(user_id) || !is_database_rev_migrated(user_id) {
|
||||
let migrated_databases = get_views_fn
|
||||
.await
|
||||
.into_iter()
|
||||
.map(|(view_id, name, layout)| MigratedDatabase {
|
||||
view_id,
|
||||
name,
|
||||
layout: layout.into(),
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
migration_database_rev_struct(user_id, &migrated_databases, pool.clone()).await?;
|
||||
|
||||
let _ = migrate_database_view(
|
||||
user_id,
|
||||
self.database_refs.clone(),
|
||||
&migrated_databases,
|
||||
pool.clone(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub async fn database_rev_struct_migration(&self, grid_id: &str) -> FlowyResult<()> {
|
||||
let user_id = self.user.user_id()?;
|
||||
let pool = self.database.get_db_pool()?;
|
||||
migration_database_rev_struct(&user_id, grid_id, pool).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct MigratedDatabase {
|
||||
pub(crate) view_id: String,
|
||||
pub(crate) name: String,
|
||||
pub(crate) layout: LayoutRevision,
|
||||
}
|
||||
|
@ -32,10 +32,7 @@ impl DocumentMigration {
|
||||
let conn = &*pool.get()?;
|
||||
let disk_cache = SQLiteDocumentRevisionPersistence::new(&self.user_id, pool);
|
||||
let documents = DeltaRevisionSql::read_all_documents(&self.user_id, conn)?;
|
||||
tracing::debug!(
|
||||
"[Document Migration]: try migrate {} documents",
|
||||
documents.len()
|
||||
);
|
||||
tracing::debug!("[Migration]: try migrate {} documents", documents.len());
|
||||
for revisions in documents {
|
||||
if revisions.is_empty() {
|
||||
continue;
|
||||
|
@ -1,5 +1,5 @@
|
||||
use crate::entities::view::ViewDataFormatPB;
|
||||
use crate::entities::{ViewLayoutTypePB, ViewPB};
|
||||
use crate::entities::{ViewLayoutTypePB, ViewPB, WorkspacePB};
|
||||
use crate::services::folder_editor::FolderRevisionMergeable;
|
||||
use crate::{
|
||||
entities::workspace::RepeatedWorkspacePB,
|
||||
@ -21,10 +21,10 @@ use folder_model::user_default;
|
||||
use lazy_static::lazy_static;
|
||||
use lib_infra::future::FutureResult;
|
||||
|
||||
use crate::services::clear_current_workspace;
|
||||
use crate::services::persistence::rev_sqlite::{
|
||||
SQLiteFolderRevisionPersistence, SQLiteFolderRevisionSnapshotPersistence,
|
||||
};
|
||||
use crate::services::{clear_current_workspace, get_current_workspace};
|
||||
use flowy_client_sync::client_folder::FolderPad;
|
||||
use std::convert::TryFrom;
|
||||
use std::{collections::HashMap, fmt::Formatter, sync::Arc};
|
||||
@ -209,6 +209,20 @@ impl FolderManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_current_workspace(&self) -> FlowyResult<WorkspacePB> {
|
||||
let user_id = self.user.user_id()?;
|
||||
let workspace_id = get_current_workspace(&user_id)?;
|
||||
let workspace = self
|
||||
.persistence
|
||||
.begin_transaction(|transaction| {
|
||||
self
|
||||
.workspace_controller
|
||||
.read_workspace(workspace_id, &user_id, &transaction)
|
||||
})
|
||||
.await?;
|
||||
Ok(workspace)
|
||||
}
|
||||
|
||||
pub async fn initialize_with_new_user(
|
||||
&self,
|
||||
user_id: &str,
|
||||
|
Loading…
Reference in New Issue
Block a user