refactor: remove shared instance KV (#3123)

* refactor: remove shared instance KV

* test: enable document test
This commit is contained in:
Nathan.fooo 2023-08-06 11:51:03 +08:00 committed by GitHub
parent 9a72f31d60
commit 6f159e741b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 246 additions and 153 deletions

View File

@ -1,33 +1,56 @@
use std::sync::Weak;
use flowy_error::{FlowyError, FlowyResult}; use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::kv::KV; use flowy_sqlite::kv::StorePreferences;
use lib_dispatch::prelude::{data_result_ok, AFPluginData, DataResult}; use lib_dispatch::prelude::{data_result_ok, AFPluginData, AFPluginState, DataResult};
use crate::entities::{KeyPB, KeyValuePB}; use crate::entities::{KeyPB, KeyValuePB};
pub(crate) async fn set_key_value_handler(data: AFPluginData<KeyValuePB>) -> FlowyResult<()> { pub(crate) async fn set_key_value_handler(
store_preferences: AFPluginState<Weak<StorePreferences>>,
data: AFPluginData<KeyValuePB>,
) -> FlowyResult<()> {
let data = data.into_inner(); let data = data.into_inner();
if let Some(store_preferences) = store_preferences.upgrade() {
match data.value { match data.value {
None => KV::remove(&data.key), None => store_preferences.remove(&data.key),
Some(value) => { Some(value) => {
KV::set_str(&data.key, value); store_preferences.set_str(&data.key, value);
}, },
} }
}
Ok(()) Ok(())
} }
pub(crate) async fn get_key_value_handler( pub(crate) async fn get_key_value_handler(
store_preferences: AFPluginState<Weak<StorePreferences>>,
data: AFPluginData<KeyPB>, data: AFPluginData<KeyPB>,
) -> DataResult<KeyValuePB, FlowyError> { ) -> DataResult<KeyValuePB, FlowyError> {
match store_preferences.upgrade() {
None => Err(FlowyError::internal().context("The store preferences is already drop"))?,
Some(store_preferences) => {
let data = data.into_inner(); let data = data.into_inner();
let value = KV::get_str(&data.key); let value = store_preferences.get_str(&data.key);
data_result_ok(KeyValuePB { data_result_ok(KeyValuePB {
key: data.key, key: data.key,
value, value,
}) })
},
}
} }
pub(crate) async fn remove_key_value_handler(data: AFPluginData<KeyPB>) -> FlowyResult<()> { pub(crate) async fn remove_key_value_handler(
store_preferences: AFPluginState<Weak<StorePreferences>>,
data: AFPluginData<KeyPB>,
) -> FlowyResult<()> {
match store_preferences.upgrade() {
None => Err(FlowyError::internal().context("The store preferences is already drop"))?,
Some(store_preferences) => {
let data = data.into_inner(); let data = data.into_inner();
KV::remove(&data.key); store_preferences.remove(&data.key);
Ok(()) Ok(())
},
}
} }

View File

@ -1,13 +1,17 @@
use std::sync::Weak;
use strum_macros::Display; use strum_macros::Display;
use flowy_derive::{Flowy_Event, ProtoBuf_Enum}; use flowy_derive::{Flowy_Event, ProtoBuf_Enum};
use flowy_sqlite::kv::StorePreferences;
use lib_dispatch::prelude::AFPlugin; use lib_dispatch::prelude::AFPlugin;
use crate::event_handler::*; use crate::event_handler::*;
pub fn init() -> AFPlugin { pub fn init(store_preferences: Weak<StorePreferences>) -> AFPlugin {
AFPlugin::new() AFPlugin::new()
.name(env!("CARGO_PKG_NAME")) .name(env!("CARGO_PKG_NAME"))
.state(store_preferences)
.event(ConfigEvent::SetKeyValue, set_key_value_handler) .event(ConfigEvent::SetKeyValue, set_key_value_handler)
.event(ConfigEvent::GetKeyValue, get_key_value_handler) .event(ConfigEvent::GetKeyValue, get_key_value_handler)
.event(ConfigEvent::RemoveKeyValue, remove_key_value_handler) .event(ConfigEvent::RemoveKeyValue, remove_key_value_handler)

View File

@ -1,5 +1,5 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::{Arc, Weak};
use appflowy_integrate::collab_builder::{CollabStorageProvider, CollabStorageType}; use appflowy_integrate::collab_builder::{CollabStorageProvider, CollabStorageType};
use appflowy_integrate::{CollabType, RemoteCollabStorage, YrsDocAction}; use appflowy_integrate::{CollabType, RemoteCollabStorage, YrsDocAction};
@ -17,7 +17,7 @@ use flowy_server::self_host::SelfHostServer;
use flowy_server::supabase::SupabaseServer; use flowy_server::supabase::SupabaseServer;
use flowy_server::AppFlowyServer; use flowy_server::AppFlowyServer;
use flowy_server_config::supabase_config::SupabaseConfiguration; use flowy_server_config::supabase_config::SupabaseConfiguration;
use flowy_sqlite::kv::KV; use flowy_sqlite::kv::StorePreferences;
use flowy_user::event_map::UserCloudServiceProvider; use flowy_user::event_map::UserCloudServiceProvider;
use flowy_user::services::database::{ use flowy_user::services::database::{
get_user_profile, get_user_workspace, open_collab_db, open_user_db, get_user_profile, get_user_workspace, open_collab_db, open_user_db,
@ -54,15 +54,22 @@ pub struct AppFlowyServerProvider {
provider_type: RwLock<ServerProviderType>, provider_type: RwLock<ServerProviderType>,
providers: RwLock<HashMap<ServerProviderType, Arc<dyn AppFlowyServer>>>, providers: RwLock<HashMap<ServerProviderType, Arc<dyn AppFlowyServer>>>,
supabase_config: RwLock<Option<SupabaseConfiguration>>, supabase_config: RwLock<Option<SupabaseConfiguration>>,
store_preferences: Weak<StorePreferences>,
} }
impl AppFlowyServerProvider { impl AppFlowyServerProvider {
pub fn new(config: AppFlowyCoreConfig, supabase_config: Option<SupabaseConfiguration>) -> Self { pub fn new(
config: AppFlowyCoreConfig,
provider_type: ServerProviderType,
supabase_config: Option<SupabaseConfiguration>,
store_preferences: Weak<StorePreferences>,
) -> Self {
Self { Self {
config, config,
provider_type: RwLock::new(current_server_provider()), provider_type: RwLock::new(provider_type),
providers: RwLock::new(HashMap::new()), providers: RwLock::new(HashMap::new()),
supabase_config: RwLock::new(supabase_config), supabase_config: RwLock::new(supabase_config),
store_preferences,
} }
} }
@ -141,12 +148,17 @@ impl UserCloudServiceProvider for AppFlowyServerProvider {
let provider_type: ServerProviderType = auth_type.into(); let provider_type: ServerProviderType = auth_type.into();
*self.provider_type.write() = provider_type.clone(); *self.provider_type.write() = provider_type.clone();
match KV::set_object(SERVER_PROVIDER_TYPE_KEY, provider_type.clone()) { match self.store_preferences.upgrade() {
None => tracing::error!("🔴Failed to update server provider type: store preferences is drop"),
Some(store_preferences) => {
match store_preferences.set_object(SERVER_PROVIDER_TYPE_KEY, provider_type.clone()) {
Ok(_) => tracing::trace!("Update server provider type to: {:?}", provider_type), Ok(_) => tracing::trace!("Update server provider type to: {:?}", provider_type),
Err(e) => { Err(e) => {
tracing::error!("🔴Failed to update server provider type: {:?}", e); tracing::error!("🔴Failed to update server provider type: {:?}", e);
}, },
} }
},
}
} }
/// Returns the [UserService] base on the current [ServerProviderType]. /// Returns the [UserService] base on the current [ServerProviderType].
@ -336,8 +348,8 @@ impl From<&AuthType> for ServerProviderType {
} }
} }
fn current_server_provider() -> ServerProviderType { pub fn current_server_provider(store_preferences: &Arc<StorePreferences>) -> ServerProviderType {
match KV::get_object::<ServerProviderType>(SERVER_PROVIDER_TYPE_KEY) { match store_preferences.get_object::<ServerProviderType>(SERVER_PROVIDER_TYPE_KEY) {
None => ServerProviderType::Local, None => ServerProviderType::Local,
Some(provider_type) => provider_type, Some(provider_type) => provider_type,
} }

View File

@ -16,7 +16,7 @@ use flowy_database2::DatabaseManager;
use flowy_document2::manager::DocumentManager; use flowy_document2::manager::DocumentManager;
use flowy_error::FlowyResult; use flowy_error::FlowyResult;
use flowy_folder2::manager::{FolderInitializeData, FolderManager}; use flowy_folder2::manager::{FolderInitializeData, FolderManager};
use flowy_sqlite::kv::KV; use flowy_sqlite::kv::StorePreferences;
use flowy_task::{TaskDispatcher, TaskRunner}; use flowy_task::{TaskDispatcher, TaskRunner};
use flowy_user::event_map::{SignUpContext, UserCloudServiceProvider, UserStatusCallback}; use flowy_user::event_map::{SignUpContext, UserCloudServiceProvider, UserStatusCallback};
use flowy_user::services::{get_supabase_config, UserSession, UserSessionConfig}; use flowy_user::services::{get_supabase_config, UserSession, UserSessionConfig};
@ -28,7 +28,9 @@ use module::make_plugins;
pub use module::*; pub use module::*;
use crate::deps_resolve::*; use crate::deps_resolve::*;
use crate::integrate::server::{AppFlowyServerProvider, ServerProviderType}; use crate::integrate::server::{
current_server_provider, AppFlowyServerProvider, ServerProviderType,
};
mod deps_resolve; mod deps_resolve;
mod integrate; mod integrate;
@ -118,6 +120,7 @@ pub struct AppFlowyCore {
pub event_dispatcher: Arc<AFPluginDispatcher>, pub event_dispatcher: Arc<AFPluginDispatcher>,
pub server_provider: Arc<AppFlowyServerProvider>, pub server_provider: Arc<AppFlowyServerProvider>,
pub task_dispatcher: Arc<RwLock<TaskDispatcher>>, pub task_dispatcher: Arc<RwLock<TaskDispatcher>>,
pub storage_preference: Arc<StorePreferences>,
} }
impl AppFlowyCore { impl AppFlowyCore {
@ -132,7 +135,7 @@ impl AppFlowyCore {
init_log(&config); init_log(&config);
// Init the key value database // Init the key value database
init_kv(&config.storage_path); let store_preference = Arc::new(StorePreferences::new(&config.storage_path).unwrap());
tracing::info!("🔥 {:?}", &config); tracing::info!("🔥 {:?}", &config);
let runtime = tokio_default_runtime().unwrap(); let runtime = tokio_default_runtime().unwrap();
@ -140,9 +143,12 @@ impl AppFlowyCore {
let task_dispatcher = Arc::new(RwLock::new(task_scheduler)); let task_dispatcher = Arc::new(RwLock::new(task_scheduler));
runtime.spawn(TaskRunner::run(task_dispatcher.clone())); runtime.spawn(TaskRunner::run(task_dispatcher.clone()));
let provider_type = current_server_provider(&store_preference);
let server_provider = Arc::new(AppFlowyServerProvider::new( let server_provider = Arc::new(AppFlowyServerProvider::new(
config.clone(), config.clone(),
get_supabase_config(), provider_type,
get_supabase_config(&store_preference),
Arc::downgrade(&store_preference),
)); ));
let ( let (
@ -153,7 +159,7 @@ impl AppFlowyCore {
document_manager, document_manager,
collab_builder, collab_builder,
) = runtime.block_on(async { ) = runtime.block_on(async {
let user_session = mk_user_session(&config, server_provider.clone()); let user_session = mk_user_session(&config, &store_preference, server_provider.clone());
/// The shared collab builder is used to build the [Collab] instance. The plugins will be loaded /// The shared collab builder is used to build the [Collab] instance. The plugins will be loaded
/// on demand based on the [CollabPluginConfig]. /// on demand based on the [CollabPluginConfig].
let collab_builder = Arc::new(AppFlowyCollabBuilder::new( let collab_builder = Arc::new(AppFlowyCollabBuilder::new(
@ -228,6 +234,7 @@ impl AppFlowyCore {
event_dispatcher, event_dispatcher,
server_provider, server_provider,
task_dispatcher, task_dispatcher,
storage_preference: store_preference,
} }
} }
@ -237,13 +244,6 @@ impl AppFlowyCore {
} }
} }
fn init_kv(root: &str) {
match KV::init(root) {
Ok(_) => {},
Err(e) => tracing::error!("Init kv store failed: {}", e),
}
}
fn init_log(config: &AppFlowyCoreConfig) { fn init_log(config: &AppFlowyCoreConfig) {
if !INIT_LOG.load(Ordering::SeqCst) { if !INIT_LOG.load(Ordering::SeqCst) {
INIT_LOG.store(true, Ordering::SeqCst); INIT_LOG.store(true, Ordering::SeqCst);
@ -256,10 +256,15 @@ fn init_log(config: &AppFlowyCoreConfig) {
fn mk_user_session( fn mk_user_session(
config: &AppFlowyCoreConfig, config: &AppFlowyCoreConfig,
storage_preference: &Arc<StorePreferences>,
user_cloud_service_provider: Arc<dyn UserCloudServiceProvider>, user_cloud_service_provider: Arc<dyn UserCloudServiceProvider>,
) -> Arc<UserSession> { ) -> Arc<UserSession> {
let user_config = UserSessionConfig::new(&config.name, &config.storage_path); let user_config = UserSessionConfig::new(&config.name, &config.storage_path);
Arc::new(UserSession::new(user_config, user_cloud_service_provider)) Arc::new(UserSession::new(
user_config,
user_cloud_service_provider,
storage_preference.clone(),
))
} }
struct UserStatusCallbackImpl { struct UserStatusCallbackImpl {

View File

@ -12,12 +12,16 @@ pub fn make_plugins(
user_session: Weak<UserSession>, user_session: Weak<UserSession>,
document_manager2: Weak<DocumentManager2>, document_manager2: Weak<DocumentManager2>,
) -> Vec<AFPlugin> { ) -> Vec<AFPlugin> {
let store_preferences = user_session
.upgrade()
.and_then(|session| Some(session.get_store_preferences()))
.unwrap();
let user_plugin = flowy_user::event_map::init(user_session); let user_plugin = flowy_user::event_map::init(user_session);
let folder_plugin = flowy_folder2::event_map::init(folder_manager); let folder_plugin = flowy_folder2::event_map::init(folder_manager);
let network_plugin = flowy_net::event_map::init(); let network_plugin = flowy_net::event_map::init();
let database_plugin = flowy_database2::event_map::init(database_manager); let database_plugin = flowy_database2::event_map::init(database_manager);
let document_plugin2 = flowy_document2::event_map::init(document_manager2); let document_plugin2 = flowy_document2::event_map::init(document_manager2);
let config_plugin = flowy_config::event_map::init(); let config_plugin = flowy_config::event_map::init(store_preferences);
vec![ vec![
user_plugin, user_plugin,
folder_plugin, folder_plugin,

View File

@ -3,34 +3,25 @@ use std::path::Path;
use ::diesel::{query_dsl::*, ExpressionMethods}; use ::diesel::{query_dsl::*, ExpressionMethods};
use anyhow::anyhow; use anyhow::anyhow;
use diesel::{Connection, SqliteConnection}; use diesel::{Connection, SqliteConnection};
use lazy_static::lazy_static;
use parking_lot::RwLock;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use crate::kv::schema::{kv_table, kv_table::dsl, KV_SQL}; use crate::kv::schema::{kv_table, kv_table::dsl, KV_SQL};
use crate::sqlite::{DBConnection, Database, PoolConfig}; use crate::sqlite::{Database, PoolConfig};
const DB_NAME: &str = "cache.db"; const DB_NAME: &str = "cache.db";
lazy_static! {
static ref KV_HOLDER: RwLock<KV> = RwLock::new(KV::new());
}
/// [KV] uses a sqlite database to store key value pairs. /// [StorePreferences] uses a sqlite database to store key value pairs.
/// Most of the time, it used to storage AppFlowy configuration. /// Most of the time, it used to storage AppFlowy configuration.
pub struct KV { pub struct StorePreferences {
database: Option<Database>, database: Option<Database>,
} }
impl KV { impl StorePreferences {
fn new() -> Self {
KV { database: None }
}
#[tracing::instrument(level = "trace", err)] #[tracing::instrument(level = "trace", err)]
pub fn init(root: &str) -> Result<(), anyhow::Error> { pub fn new(root: &str) -> Result<Self, anyhow::Error> {
if !Path::new(root).exists() { if !Path::new(root).exists() {
return Err(anyhow!("Init KV failed. {} not exists", root)); return Err(anyhow!("Init StorePreferences failed. {} not exists", root));
} }
let pool_config = PoolConfig::default(); let pool_config = PoolConfig::default();
@ -38,69 +29,83 @@ impl KV {
let conn = database.get_connection().unwrap(); let conn = database.get_connection().unwrap();
SqliteConnection::execute(&*conn, KV_SQL).unwrap(); SqliteConnection::execute(&*conn, KV_SQL).unwrap();
tracing::trace!("Init kv with path: {}", root); tracing::trace!("Init StorePreferences with path: {}", root);
KV_HOLDER.write().database = Some(database); Ok(Self {
database: Some(database),
Ok(()) })
} }
/// Set a string value of a key /// Set a string value of a key
pub fn set_str<T: ToString>(key: &str, value: T) { pub fn set_str<T: ToString>(&self, key: &str, value: T) {
let _ = Self::set_key_value(key, Some(value.to_string())); let _ = self.set_key_value(key, Some(value.to_string()));
} }
/// Set a bool value of a key /// Set a bool value of a key
pub fn set_bool(key: &str, value: bool) -> Result<(), anyhow::Error> { pub fn set_bool(&self, key: &str, value: bool) -> Result<(), anyhow::Error> {
Self::set_key_value(key, Some(value.to_string())) self.set_key_value(key, Some(value.to_string()))
} }
/// Set a object that implements [Serialize] trait of a key /// Set a object that implements [Serialize] trait of a key
pub fn set_object<T: Serialize>(key: &str, value: T) -> Result<(), anyhow::Error> { pub fn set_object<T: Serialize>(&self, key: &str, value: T) -> Result<(), anyhow::Error> {
let value = serde_json::to_string(&value)?; let value = serde_json::to_string(&value)?;
Self::set_key_value(key, Some(value))?; self.set_key_value(key, Some(value))?;
Ok(()) Ok(())
} }
/// Set a i64 value of a key /// Set a i64 value of a key
pub fn set_i64(key: &str, value: i64) -> Result<(), anyhow::Error> { pub fn set_i64(&self, key: &str, value: i64) -> Result<(), anyhow::Error> {
Self::set_key_value(key, Some(value.to_string())) self.set_key_value(key, Some(value.to_string()))
} }
/// Get a string value of a key /// Get a string value of a key
pub fn get_str(key: &str) -> Option<String> { pub fn get_str(&self, key: &str) -> Option<String> {
Self::get_key_value(key).and_then(|kv| kv.value) self.get_key_value(key).and_then(|kv| kv.value)
} }
/// Get a bool value of a key /// Get a bool value of a key
pub fn get_bool(key: &str) -> bool { pub fn get_bool(&self, key: &str) -> bool {
Self::get_key_value(key) self
.get_key_value(key)
.and_then(|kv| kv.value) .and_then(|kv| kv.value)
.and_then(|v| v.parse::<bool>().ok()) .and_then(|v| v.parse::<bool>().ok())
.unwrap_or(false) .unwrap_or(false)
} }
/// Get a i64 value of a key /// Get a i64 value of a key
pub fn get_i64(key: &str) -> Option<i64> { pub fn get_i64(&self, key: &str) -> Option<i64> {
Self::get_key_value(key) self
.get_key_value(key)
.and_then(|kv| kv.value) .and_then(|kv| kv.value)
.and_then(|v| v.parse::<i64>().ok()) .and_then(|v| v.parse::<i64>().ok())
} }
/// Get a object that implements [DeserializeOwned] trait of a key /// Get a object that implements [DeserializeOwned] trait of a key
pub fn get_object<T: DeserializeOwned>(key: &str) -> Option<T> { pub fn get_object<T: DeserializeOwned>(&self, key: &str) -> Option<T> {
Self::get_str(key).and_then(|v| serde_json::from_str(&v).ok()) self
.get_str(key)
.and_then(|v| serde_json::from_str(&v).ok())
} }
#[allow(dead_code)] #[allow(dead_code)]
pub fn remove(key: &str) { pub fn remove(&self, key: &str) {
if let Ok(conn) = get_connection() { if let Some(conn) = self
.database
.as_ref()
.and_then(|database| database.get_connection().ok())
{
let sql = dsl::kv_table.filter(kv_table::key.eq(key)); let sql = dsl::kv_table.filter(kv_table::key.eq(key));
let _ = diesel::delete(sql).execute(&*conn); let _ = diesel::delete(sql).execute(&*conn);
} }
} }
fn set_key_value(key: &str, value: Option<String>) -> Result<(), anyhow::Error> { fn set_key_value(&self, key: &str, value: Option<String>) -> Result<(), anyhow::Error> {
let conn = get_connection()?; match self
.database
.as_ref()
.and_then(|database| database.get_connection().ok())
{
None => Err(anyhow!("StorePreferences is not initialized")),
Some(conn) => {
diesel::replace_into(kv_table::table) diesel::replace_into(kv_table::table)
.values(KeyValue { .values(KeyValue {
key: key.to_string(), key: key.to_string(),
@ -108,10 +113,12 @@ impl KV {
}) })
.execute(&*conn)?; .execute(&*conn)?;
Ok(()) Ok(())
},
}
} }
fn get_key_value(key: &str) -> Option<KeyValue> { fn get_key_value(&self, key: &str) -> Option<KeyValue> {
let conn = get_connection().ok()?; let conn = self.database.as_ref().unwrap().get_connection().ok()?;
dsl::kv_table dsl::kv_table
.filter(kv_table::key.eq(key)) .filter(kv_table::key.eq(key))
.first::<KeyValue>(&*conn) .first::<KeyValue>(&*conn)
@ -119,17 +126,6 @@ impl KV {
} }
} }
fn get_connection() -> Result<DBConnection, anyhow::Error> {
let conn = KV_HOLDER
.read()
.database
.as_ref()
.expect("KVStore is not init")
.get_connection()
.map_err(|_e| anyhow!("Get KV connection error"))?;
Ok(conn)
}
#[derive(Clone, Debug, Default, Queryable, Identifiable, Insertable, AsChangeset)] #[derive(Clone, Debug, Default, Queryable, Identifiable, Insertable, AsChangeset)]
#[table_name = "kv_table"] #[table_name = "kv_table"]
#[primary_key(key)] #[primary_key(key)]
@ -143,7 +139,7 @@ mod tests {
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tempfile::TempDir; use tempfile::TempDir;
use crate::kv::KV; use crate::kv::StorePreferences;
#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug)] #[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug)]
struct Person { struct Person {
@ -155,25 +151,25 @@ mod tests {
fn kv_store_test() { fn kv_store_test() {
let tempdir = TempDir::new().unwrap(); let tempdir = TempDir::new().unwrap();
let path = tempdir.into_path(); let path = tempdir.into_path();
KV::init(path.to_str().unwrap()).unwrap(); let store = StorePreferences::new(path.to_str().unwrap()).unwrap();
KV::set_str("1", "hello".to_string()); store.set_str("1", "hello".to_string());
assert_eq!(KV::get_str("1").unwrap(), "hello"); assert_eq!(store.get_str("1").unwrap(), "hello");
assert_eq!(KV::get_str("2"), None); assert_eq!(store.get_str("2"), None);
KV::set_bool("1", true).unwrap(); store.set_bool("1", true).unwrap();
assert!(KV::get_bool("1")); assert!(store.get_bool("1"));
assert!(!KV::get_bool("2")); assert!(!store.get_bool("2"));
KV::set_i64("1", 1).unwrap(); store.set_i64("1", 1).unwrap();
assert_eq!(KV::get_i64("1").unwrap(), 1); assert_eq!(store.get_i64("1").unwrap(), 1);
assert_eq!(KV::get_i64("2"), None); assert_eq!(store.get_i64("2"), None);
let person = Person { let person = Person {
name: "nathan".to_string(), name: "nathan".to_string(),
age: 30, age: 30,
}; };
KV::set_object("1", person.clone()).unwrap(); store.set_object("1", person.clone()).unwrap();
assert_eq!(KV::get_object::<Person>("1").unwrap(), person); assert_eq!(store.get_object::<Person>("1").unwrap(), person);
} }
} }

View File

@ -10,7 +10,6 @@ use parking_lot::RwLock;
use protobuf::ProtobufError; use protobuf::ProtobufError;
use tokio::sync::broadcast::{channel, Sender}; use tokio::sync::broadcast::{channel, Sender};
use crate::document::document_event::OpenDocumentData;
use flowy_core::{AppFlowyCore, AppFlowyCoreConfig}; use flowy_core::{AppFlowyCore, AppFlowyCoreConfig};
use flowy_database2::entities::*; use flowy_database2::entities::*;
use flowy_database2::event_map::DatabaseEvent; use flowy_database2::event_map::DatabaseEvent;
@ -26,6 +25,7 @@ use flowy_user::entities::{AuthTypePB, ThirdPartyAuthPB, UserProfilePB};
use flowy_user::errors::{FlowyError, FlowyResult}; use flowy_user::errors::{FlowyError, FlowyResult};
use flowy_user::event_map::UserEvent::*; use flowy_user::event_map::UserEvent::*;
use crate::document::document_event::OpenDocumentData;
use crate::event_builder::EventBuilder; use crate::event_builder::EventBuilder;
use crate::user_event::{async_sign_up, SignUpContext}; use crate::user_event::{async_sign_up, SignUpContext};
@ -38,14 +38,16 @@ pub mod user_event;
pub struct FlowyCoreTest { pub struct FlowyCoreTest {
auth_type: Arc<RwLock<AuthTypePB>>, auth_type: Arc<RwLock<AuthTypePB>>,
inner: AppFlowyCore, inner: AppFlowyCore,
cleaner: Arc<RwLock<Option<Cleaner>>>, #[allow(dead_code)]
cleaner: Arc<Cleaner>,
pub notification_sender: TestNotificationSender, pub notification_sender: TestNotificationSender,
} }
impl Default for FlowyCoreTest { impl Default for FlowyCoreTest {
fn default() -> Self { fn default() -> Self {
let temp_dir = temp_dir(); let temp_dir = PathBuf::from(temp_dir()).join(nanoid!(6));
Self::new_with_user_data_path(temp_dir.to_str().unwrap(), nanoid!(6)) std::fs::create_dir_all(&temp_dir).unwrap();
Self::new_with_user_data_path(temp_dir, nanoid!(6))
} }
} }
@ -54,8 +56,8 @@ impl FlowyCoreTest {
Self::default() Self::default()
} }
pub fn new_with_user_data_path(path: &str, name: String) -> Self { pub fn new_with_user_data_path(path: PathBuf, name: String) -> Self {
let config = AppFlowyCoreConfig::new(path, name).log_filter( let config = AppFlowyCoreConfig::new(path.clone().to_str().unwrap(), name).log_filter(
"info", "info",
vec!["flowy_test".to_string(), "lib_dispatch".to_string()], vec!["flowy_test".to_string(), "lib_dispatch".to_string()],
); );
@ -71,7 +73,7 @@ impl FlowyCoreTest {
inner, inner,
auth_type, auth_type,
notification_sender, notification_sender,
cleaner: Arc::new(RwLock::new(None)), cleaner: Arc::new(Cleaner(path)),
} }
} }
@ -139,8 +141,6 @@ impl FlowyCoreTest {
.await .await
.try_parse::<UserProfilePB>()?; .try_parse::<UserProfilePB>()?;
let user_path = PathBuf::from(&self.config.storage_path).join(user_profile.id.to_string());
*self.cleaner.write() = Some(Cleaner::new(user_path));
Ok(user_profile) Ok(user_profile)
} }
@ -807,8 +807,8 @@ impl Cleaner {
Cleaner(dir) Cleaner(dir)
} }
fn cleanup(_dir: &PathBuf) { fn cleanup(dir: &PathBuf) {
// let _ = std::fs::remove_dir_all(dir); let _ = std::fs::remove_dir_all(dir);
} }
} }

View File

@ -1,15 +1,13 @@
use crate::user::migration_test::util::unzip_history_user_db;
use flowy_core::DEFAULT_NAME; use flowy_core::DEFAULT_NAME;
use flowy_folder2::entities::ViewLayoutPB; use flowy_folder2::entities::ViewLayoutPB;
use flowy_test::FlowyCoreTest; use flowy_test::FlowyCoreTest;
use crate::user::migration_test::util::unzip_history_user_db;
#[tokio::test] #[tokio::test]
async fn migrate_historical_empty_document_test() { async fn migrate_historical_empty_document_test() {
let (cleaner, user_db_path) = unzip_history_user_db("historical_empty_document").unwrap(); let (cleaner, user_db_path) = unzip_history_user_db("historical_empty_document").unwrap();
let test = FlowyCoreTest::new_with_user_data_path( let test = FlowyCoreTest::new_with_user_data_path(user_db_path, DEFAULT_NAME.to_string());
user_db_path.to_str().unwrap(),
DEFAULT_NAME.to_string(),
);
let views = test.get_all_workspace_views().await; let views = test.get_all_workspace_views().await;
assert_eq!(views.len(), 3); assert_eq!(views.len(), 3);

View File

@ -1,2 +1,2 @@
// mod document_test; mod document_test;
mod util; mod util;

View File

@ -4,7 +4,7 @@ use std::{convert::TryInto, sync::Arc};
use flowy_error::{FlowyError, FlowyResult}; use flowy_error::{FlowyError, FlowyResult};
use flowy_server_config::supabase_config::SupabaseConfiguration; use flowy_server_config::supabase_config::SupabaseConfiguration;
use flowy_sqlite::kv::KV; use flowy_sqlite::kv::StorePreferences;
use flowy_user_deps::entities::*; use flowy_user_deps::entities::*;
use lib_dispatch::prelude::*; use lib_dispatch::prelude::*;
use lib_infra::box_any::BoxAny; use lib_infra::box_any::BoxAny;
@ -19,6 +19,15 @@ fn upgrade_session(session: AFPluginState<Weak<UserSession>>) -> FlowyResult<Arc
Ok(session) Ok(session)
} }
fn upgrade_store_preferences(
store: AFPluginState<Weak<StorePreferences>>,
) -> FlowyResult<Arc<StorePreferences>> {
let store = store
.upgrade()
.ok_or(FlowyError::internal().context("The store preferences is already drop"))?;
Ok(store)
}
#[tracing::instrument(level = "debug", name = "sign_in", skip(data, session), fields(email = %data.email), err)] #[tracing::instrument(level = "debug", name = "sign_in", skip(data, session), fields(email = %data.email), err)]
pub async fn sign_in( pub async fn sign_in(
data: AFPluginData<SignInPayloadPB>, data: AFPluginData<SignInPayloadPB>,
@ -107,22 +116,27 @@ pub async fn update_user_profile_handler(
const APPEARANCE_SETTING_CACHE_KEY: &str = "appearance_settings"; const APPEARANCE_SETTING_CACHE_KEY: &str = "appearance_settings";
#[tracing::instrument(level = "debug", skip(data), err)] #[tracing::instrument(level = "debug", skip_all, err)]
pub async fn set_appearance_setting( pub async fn set_appearance_setting(
store_preferences: AFPluginState<Weak<StorePreferences>>,
data: AFPluginData<AppearanceSettingsPB>, data: AFPluginData<AppearanceSettingsPB>,
) -> Result<(), FlowyError> { ) -> Result<(), FlowyError> {
let store_preferences = upgrade_store_preferences(store_preferences)?;
let mut setting = data.into_inner(); let mut setting = data.into_inner();
if setting.theme.is_empty() { if setting.theme.is_empty() {
setting.theme = APPEARANCE_DEFAULT_THEME.to_string(); setting.theme = APPEARANCE_DEFAULT_THEME.to_string();
} }
KV::set_object(APPEARANCE_SETTING_CACHE_KEY, setting)?; store_preferences.set_object(APPEARANCE_SETTING_CACHE_KEY, setting)?;
Ok(()) Ok(())
} }
#[tracing::instrument(level = "debug", err)] #[tracing::instrument(level = "debug", skip_all, err)]
pub async fn get_appearance_setting() -> DataResult<AppearanceSettingsPB, FlowyError> { pub async fn get_appearance_setting(
match KV::get_str(APPEARANCE_SETTING_CACHE_KEY) { store_preferences: AFPluginState<Weak<StorePreferences>>,
) -> DataResult<AppearanceSettingsPB, FlowyError> {
let store_preferences = upgrade_store_preferences(store_preferences)?;
match store_preferences.get_str(APPEARANCE_SETTING_CACHE_KEY) {
None => data_result_ok(AppearanceSettingsPB::default()), None => data_result_ok(AppearanceSettingsPB::default()),
Some(s) => { Some(s) => {
let setting = match serde_json::from_str(&s) { let setting = match serde_json::from_str(&s) {
@ -177,9 +191,11 @@ pub async fn set_supabase_config_handler(
#[tracing::instrument(level = "debug", skip_all, err)] #[tracing::instrument(level = "debug", skip_all, err)]
pub async fn get_supabase_config_handler( pub async fn get_supabase_config_handler(
store_preferences: AFPluginState<Weak<StorePreferences>>,
_session: AFPluginState<Weak<UserSession>>, _session: AFPluginState<Weak<UserSession>>,
) -> DataResult<SupabaseConfigPB, FlowyError> { ) -> DataResult<SupabaseConfigPB, FlowyError> {
let config = get_supabase_config().unwrap_or_default(); let store_preferences = upgrade_store_preferences(store_preferences)?;
let config = get_supabase_config(&store_preferences).unwrap_or_default();
data_result_ok(config.into()) data_result_ok(config.into())
} }

View File

@ -15,9 +15,14 @@ use crate::event_handler::*;
use crate::{errors::FlowyError, services::UserSession}; use crate::{errors::FlowyError, services::UserSession};
pub fn init(user_session: Weak<UserSession>) -> AFPlugin { pub fn init(user_session: Weak<UserSession>) -> AFPlugin {
let store_preferences = user_session
.upgrade()
.and_then(|session| Some(session.get_store_preferences()))
.unwrap();
AFPlugin::new() AFPlugin::new()
.name("Flowy-User") .name("Flowy-User")
.state(user_session) .state(user_session)
.state(store_preferences)
.event(UserEvent::SignIn, sign_in) .event(UserEvent::SignIn, sign_in)
.event(UserEvent::SignUp, sign_up) .event(UserEvent::SignUp, sign_up)
.event(UserEvent::InitUser, init_user_handler) .event(UserEvent::InitUser, init_user_handler)

View File

@ -1,13 +1,16 @@
use crate::migrations::migration::UserDataMigration; use std::sync::Arc;
use crate::services::session_serde::Session;
use appflowy_integrate::{RocksCollabDB, YrsDocAction}; use appflowy_integrate::{RocksCollabDB, YrsDocAction};
use collab::core::collab::MutexCollab; use collab::core::collab::MutexCollab;
use collab::core::origin::{CollabClient, CollabOrigin}; use collab::core::origin::{CollabClient, CollabOrigin};
use collab_document::document::Document; use collab_document::document::Document;
use collab_document::document_data::default_document_data; use collab_document::document_data::default_document_data;
use collab_folder::core::Folder; use collab_folder::core::Folder;
use flowy_error::{internal_error, FlowyResult}; use flowy_error::{internal_error, FlowyResult};
use std::sync::Arc;
use crate::migrations::migration::UserDataMigration;
use crate::services::session_serde::Session;
/// Migrate the first level documents of the workspace by inserting documents /// Migrate the first level documents of the workspace by inserting documents
pub struct HistoricalEmptyDocumentMigration; pub struct HistoricalEmptyDocumentMigration;
@ -30,7 +33,7 @@ impl UserDataMigration for HistoricalEmptyDocumentMigration {
for view in migration_views { for view in migration_views {
// Read all updates of the view // Read all updates of the view
if let Ok(view_updates) = write_txn.get_all_updates(session.user_id, &view.id) { if let Ok(view_updates) = write_txn.get_all_updates(session.user_id, &view.id) {
if let Err(_) = Document::from_updates(origin.clone(), view_updates, &view.id, vec![]) { if Document::from_updates(origin.clone(), view_updates, &view.id, vec![]).is_err() {
// Create a document with default data // Create a document with default data
let document_data = default_document_data(); let document_data = default_document_data();
let collab = Arc::new(MutexCollab::new(origin.clone(), &view.id, vec![])); let collab = Arc::new(MutexCollab::new(origin.clone(), &view.id, vec![]));

View File

@ -8,9 +8,10 @@ use uuid::Uuid;
use flowy_error::{internal_error, ErrorCode, FlowyResult}; use flowy_error::{internal_error, ErrorCode, FlowyResult};
use flowy_server_config::supabase_config::SupabaseConfiguration; use flowy_server_config::supabase_config::SupabaseConfiguration;
use flowy_sqlite::kv::StorePreferences;
use flowy_sqlite::schema::{user_table, user_workspace_table}; use flowy_sqlite::schema::{user_table, user_workspace_table};
use flowy_sqlite::ConnectionPool; use flowy_sqlite::ConnectionPool;
use flowy_sqlite::{kv::KV, query_dsl::*, DBConnection, ExpressionMethods}; use flowy_sqlite::{query_dsl::*, DBConnection, ExpressionMethods};
use flowy_user_deps::entities::*; use flowy_user_deps::entities::*;
use lib_infra::box_any::BoxAny; use lib_infra::box_any::BoxAny;
use lib_infra::util::timestamp; use lib_infra::util::timestamp;
@ -56,6 +57,7 @@ pub struct UserSession {
database: UserDB, database: UserDB,
session_config: UserSessionConfig, session_config: UserSessionConfig,
cloud_services: Arc<dyn UserCloudServiceProvider>, cloud_services: Arc<dyn UserCloudServiceProvider>,
store_preferences: Arc<StorePreferences>,
pub(crate) user_status_callback: RwLock<Arc<dyn UserStatusCallback>>, pub(crate) user_status_callback: RwLock<Arc<dyn UserStatusCallback>>,
} }
@ -63,6 +65,7 @@ impl UserSession {
pub fn new( pub fn new(
session_config: UserSessionConfig, session_config: UserSessionConfig,
cloud_services: Arc<dyn UserCloudServiceProvider>, cloud_services: Arc<dyn UserCloudServiceProvider>,
store_preferences: Arc<StorePreferences>,
) -> Self { ) -> Self {
let database = UserDB::new(&session_config.root_dir); let database = UserDB::new(&session_config.root_dir);
let user_status_callback: RwLock<Arc<dyn UserStatusCallback>> = let user_status_callback: RwLock<Arc<dyn UserStatusCallback>> =
@ -71,10 +74,15 @@ impl UserSession {
database, database,
session_config, session_config,
cloud_services, cloud_services,
store_preferences,
user_status_callback, user_status_callback,
} }
} }
pub fn get_store_preferences(&self) -> Weak<StorePreferences> {
Arc::downgrade(&self.store_preferences)
}
pub async fn init<C: UserStatusCallback + 'static>(&self, user_status_callback: C) { pub async fn init<C: UserStatusCallback + 'static>(&self, user_status_callback: C) {
if let Ok(session) = self.get_session() { if let Ok(session) = self.get_session() {
match ( match (
@ -443,7 +451,9 @@ impl UserSession {
pub fn save_supabase_config(&self, config: SupabaseConfiguration) { pub fn save_supabase_config(&self, config: SupabaseConfiguration) {
self.cloud_services.update_supabase_config(&config); self.cloud_services.update_supabase_config(&config);
let _ = KV::set_object(SUPABASE_CONFIG_CACHE_KEY, config); let _ = self
.store_preferences
.set_object(SUPABASE_CONFIG_CACHE_KEY, config);
} }
async fn update_user( async fn update_user(
@ -554,9 +564,13 @@ impl UserSession {
fn set_session(&self, session: Option<Session>) -> Result<(), FlowyError> { fn set_session(&self, session: Option<Session>) -> Result<(), FlowyError> {
tracing::debug!("Set user session: {:?}", session); tracing::debug!("Set user session: {:?}", session);
match &session { match &session {
None => KV::remove(&self.session_config.session_cache_key), None => self
.store_preferences
.remove(&self.session_config.session_cache_key),
Some(session) => { Some(session) => {
KV::set_object(&self.session_config.session_cache_key, session.clone()) self
.store_preferences
.set_object(&self.session_config.session_cache_key, session.clone())
.map_err(internal_error)?; .map_err(internal_error)?;
}, },
} }
@ -564,24 +578,34 @@ impl UserSession {
} }
fn log_user(&self, uid: i64, storage_path: String) { fn log_user(&self, uid: i64, storage_path: String) {
let mut logger_users = KV::get_object::<HistoricalUsers>(HISTORICAL_USER).unwrap_or_default(); let mut logger_users = self
.store_preferences
.get_object::<HistoricalUsers>(HISTORICAL_USER)
.unwrap_or_default();
logger_users.add_user(HistoricalUser { logger_users.add_user(HistoricalUser {
user_id: uid, user_id: uid,
sign_in_timestamp: timestamp(), sign_in_timestamp: timestamp(),
storage_path, storage_path,
}); });
let _ = KV::set_object(HISTORICAL_USER, logger_users); let _ = self
.store_preferences
.set_object(HISTORICAL_USER, logger_users);
} }
pub fn get_historical_users(&self) -> Vec<HistoricalUser> { pub fn get_historical_users(&self) -> Vec<HistoricalUser> {
KV::get_object::<HistoricalUsers>(HISTORICAL_USER) self
.store_preferences
.get_object::<HistoricalUsers>(HISTORICAL_USER)
.unwrap_or_default() .unwrap_or_default()
.users .users
} }
/// Returns the current user session. /// Returns the current user session.
pub fn get_session(&self) -> Result<Session, FlowyError> { pub fn get_session(&self) -> Result<Session, FlowyError> {
match KV::get_object::<Session>(&self.session_config.session_cache_key) { match self
.store_preferences
.get_object::<Session>(&self.session_config.session_cache_key)
{
None => Err(FlowyError::new( None => Err(FlowyError::new(
ErrorCode::RecordNotFound, ErrorCode::RecordNotFound,
"User is not logged in", "User is not logged in",
@ -591,8 +615,11 @@ impl UserSession {
} }
} }
pub fn get_supabase_config() -> Option<SupabaseConfiguration> { pub fn get_supabase_config(
KV::get_str(SUPABASE_CONFIG_CACHE_KEY) store_preference: &Arc<StorePreferences>,
) -> Option<SupabaseConfiguration> {
store_preference
.get_str(SUPABASE_CONFIG_CACHE_KEY)
.and_then(|s| serde_json::from_str(&s).ok()) .and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or_else(|| SupabaseConfiguration::from_env().ok()) .unwrap_or_else(|| SupabaseConfiguration::from_env().ok())
} }