refactor: kv (#2548)

* refactor: kv

* Update frontend/rust-lib/flowy-sqlite/src/kv/kv.rs

Co-authored-by: Lucas.Xu <lucas.xu@appflowy.io>

---------

Co-authored-by: Lucas.Xu <lucas.xu@appflowy.io>
This commit is contained in:
Nathan.fooo
2023-05-17 12:46:48 +08:00
committed by GitHub
parent 623f182bba
commit 19ee0ea44d
5 changed files with 140 additions and 146 deletions

View File

@ -1747,6 +1747,7 @@ dependencies = [
name = "flowy-sqlite" name = "flowy-sqlite"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"diesel", "diesel",
"diesel_derives", "diesel_derives",
"diesel_migrations", "diesel_migrations",
@ -1755,8 +1756,12 @@ dependencies = [
"libsqlite3-sys", "libsqlite3-sys",
"openssl", "openssl",
"openssl-sys", "openssl-sys",
"parking_lot 0.12.1",
"r2d2", "r2d2",
"scheduled-thread-pool", "scheduled-thread-pool",
"serde",
"serde_json",
"tempfile",
"tracing", "tracing",
] ]

View File

@ -11,6 +11,10 @@ diesel_derives = { version = "1.4.1", features = ["sqlite"] }
diesel_migrations = { version = "1.4.0", features = ["sqlite"] } diesel_migrations = { version = "1.4.0", features = ["sqlite"] }
tracing = { version = "0.1", features = ["log"] } tracing = { version = "0.1", features = ["log"] }
lazy_static = "1.4.0" lazy_static = "1.4.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
anyhow = "1.0"
parking_lot = "0.12.1"
r2d2 = "0.8.10" r2d2 = "0.8.10"
libsqlite3-sys = { version = ">=0.8.0, <0.24.0", features = ["bundled"] } libsqlite3-sys = { version = ">=0.8.0, <0.24.0", features = ["bundled"] }
@ -19,5 +23,9 @@ error-chain = "=0.12.0"
openssl = { version = "0.10.45", optional = true, features = ["vendored"] } openssl = { version = "0.10.45", optional = true, features = ["vendored"] }
openssl-sys = { version = "0.9.80", optional = true, features = ["vendored"] } openssl-sys = { version = "0.9.80", optional = true, features = ["vendored"] }
[dev-dependencies]
tempfile = "3.5.0"
[features] [features]
openssl_vendored = ["openssl", "openssl-sys"] openssl_vendored = ["openssl", "openssl-sys"]

View File

@ -1,45 +1,23 @@
use crate::kv::schema::{kv_table, kv_table::dsl, KV_SQL}; use std::path::Path;
use crate::sqlite::{DBConnection, Database, PoolConfig};
use ::diesel::{query_dsl::*, ExpressionMethods}; use ::diesel::{query_dsl::*, ExpressionMethods};
use anyhow::anyhow;
use diesel::{Connection, SqliteConnection}; use diesel::{Connection, SqliteConnection};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::{path::Path, sync::RwLock}; use parking_lot::RwLock;
use serde::de::DeserializeOwned;
use serde::Serialize;
macro_rules! impl_get_func { use crate::kv::schema::{kv_table, kv_table::dsl, KV_SQL};
( use crate::sqlite::{DBConnection, Database, PoolConfig};
$func_name:ident,
$get_method:ident=>$target:ident
) => {
#[allow(dead_code)]
pub fn $func_name(k: &str) -> Option<$target> {
match KV::get(k) {
Ok(item) => item.$get_method,
Err(_) => None,
}
}
};
}
macro_rules! impl_set_func {
($func_name:ident,$set_method:ident,$key_type:ident) => {
#[allow(dead_code)]
pub fn $func_name(key: &str, value: $key_type) {
let mut item = KeyValue::new(key);
item.$set_method = Some(value);
match KV::set(item) {
Ok(_) => {},
Err(e) => {
tracing::error!("{:?}", e)
},
};
}
};
}
const DB_NAME: &str = "kv.db"; const DB_NAME: &str = "kv.db";
lazy_static! { lazy_static! {
static ref KV_HOLDER: RwLock<KV> = RwLock::new(KV::new()); static ref KV_HOLDER: RwLock<KV> = RwLock::new(KV::new());
} }
/// [KV] uses a sqlite database to store key value pairs.
/// Most of the time, it used to storage AppFlowy configuration.
pub struct KV { pub struct KV {
database: Option<Database>, database: Option<Database>,
} }
@ -49,41 +27,10 @@ impl KV {
KV { database: None } KV { database: None }
} }
fn set(value: KeyValue) -> Result<(), String> {
// tracing::trace!("[KV]: set value: {:?}", value);
let _ = diesel::replace_into(kv_table::table)
.values(&value)
.execute(&*(get_connection()?))
.map_err(|e| format!("KV set error: {:?}", e))?;
Ok(())
}
fn get(key: &str) -> Result<KeyValue, String> {
let conn = get_connection()?;
let value = dsl::kv_table
.filter(kv_table::key.eq(key))
.first::<KeyValue>(&*conn)
.map_err(|e| format!("KV get error: {:?}", e))?;
Ok(value)
}
#[allow(dead_code)]
pub fn remove(key: &str) -> Result<(), String> {
// tracing::debug!("remove key: {}", key);
let conn = get_connection()?;
let sql = dsl::kv_table.filter(kv_table::key.eq(key));
let _ = diesel::delete(sql)
.execute(&*conn)
.map_err(|e| format!("KV remove error: {:?}", e))?;
Ok(())
}
#[tracing::instrument(level = "trace", err)] #[tracing::instrument(level = "trace", err)]
pub fn init(root: &str) -> Result<(), String> { pub fn init(root: &str) -> Result<(), anyhow::Error> {
if !Path::new(root).exists() { if !Path::new(root).exists() {
return Err(format!("Init KVStore failed. {} not exists", root)); return Err(anyhow!("Init KV failed. {} not exists", root));
} }
let pool_config = PoolConfig::default(); let pool_config = PoolConfig::default();
@ -91,54 +38,96 @@ impl KV {
let conn = database.get_connection().unwrap(); let conn = database.get_connection().unwrap();
SqliteConnection::execute(&*conn, KV_SQL).unwrap(); SqliteConnection::execute(&*conn, KV_SQL).unwrap();
let mut store = KV_HOLDER
.write()
.map_err(|e| format!("KVStore write failed: {:?}", e))?;
tracing::trace!("Init kv with path: {}", root); tracing::trace!("Init kv with path: {}", root);
store.database = Some(database); KV_HOLDER.write().database = Some(database);
Ok(()) Ok(())
} }
/// Set a string value of a key
pub fn set_str<T: ToString>(key: &str, value: T) {
let _ = Self::set_key_value(key, Some(value.to_string()));
}
/// Set a bool value of a key
pub fn set_bool(key: &str, value: bool) -> Result<(), anyhow::Error> {
Self::set_key_value(key, Some(value.to_string()))
}
/// Set a object that implements [Serialize] trait of a key
pub fn set_object<T: Serialize>(key: &str, value: T) -> Result<(), anyhow::Error> {
let value = serde_json::to_string(&value)?;
Self::set_key_value(key, Some(value))?;
Ok(())
}
/// Set a i64 value of a key
pub fn set_i64(key: &str, value: i64) -> Result<(), anyhow::Error> {
Self::set_key_value(key, Some(value.to_string()))
}
/// Get a string value of a key
pub fn get_str(key: &str) -> Option<String> {
Self::get_key_value(key).and_then(|kv| kv.value)
}
/// Get a bool value of a key
pub fn get_bool(key: &str) -> bool { pub fn get_bool(key: &str) -> bool {
match KV::get(key) { Self::get_key_value(key)
Ok(item) => item.bool_value.unwrap_or(false), .and_then(|kv| kv.value)
Err(_) => false, .and_then(|v| v.parse::<bool>().ok())
.unwrap_or(false)
}
/// Get a i64 value of a key
pub fn get_i64(key: &str) -> Option<i64> {
Self::get_key_value(key)
.and_then(|kv| kv.value)
.and_then(|v| v.parse::<i64>().ok())
}
/// Get a object that implements [DeserializeOwned] trait of a key
pub fn get_object<T: DeserializeOwned>(key: &str) -> Option<T> {
Self::get_str(key).and_then(|v| serde_json::from_str(&v).ok())
}
#[allow(dead_code)]
pub fn remove(key: &str) {
if let Ok(conn) = get_connection() {
let sql = dsl::kv_table.filter(kv_table::key.eq(key));
let _ = diesel::delete(sql).execute(&*conn);
} }
} }
impl_set_func!(set_str, str_value, String); fn set_key_value(key: &str, value: Option<String>) -> Result<(), anyhow::Error> {
let conn = get_connection()?;
diesel::replace_into(kv_table::table)
.values(KeyValue {
key: key.to_string(),
value,
})
.execute(&*conn)?;
Ok(())
}
impl_set_func!(set_bool, bool_value, bool); fn get_key_value(key: &str) -> Option<KeyValue> {
let conn = get_connection().ok()?;
impl_set_func!(set_int, int_value, i64); dsl::kv_table
.filter(kv_table::key.eq(key))
impl_set_func!(set_float, float_value, f64); .first::<KeyValue>(&*conn)
.ok()
impl_get_func!(get_str,str_value=>String); }
impl_get_func!(get_int,int_value=>i64);
impl_get_func!(get_float,float_value=>f64);
} }
fn get_connection() -> Result<DBConnection, String> { fn get_connection() -> Result<DBConnection, anyhow::Error> {
match KV_HOLDER.read() { let conn = KV_HOLDER
Ok(store) => { .read()
let conn = store .database
.database .as_ref()
.as_ref() .expect("KVStore is not init")
.expect("KVStore is not init") .get_connection()
.get_connection() .map_err(|_e| anyhow!("Get KV connection error"))?;
.map_err(|e| format!("KVStore error: {:?}", e))?; Ok(conn)
Ok(conn)
},
Err(e) => {
let msg = format!("KVStore get connection failed: {:?}", e);
tracing::error!("{:?}", msg);
Err(msg)
},
}
} }
#[derive(Clone, Debug, Default, Queryable, Identifiable, Insertable, AsChangeset)] #[derive(Clone, Debug, Default, Queryable, Identifiable, Insertable, AsChangeset)]
@ -146,42 +135,45 @@ fn get_connection() -> Result<DBConnection, String> {
#[primary_key(key)] #[primary_key(key)]
pub struct KeyValue { pub struct KeyValue {
pub key: String, pub key: String,
pub str_value: Option<String>, pub value: Option<String>,
pub int_value: Option<i64>,
pub float_value: Option<f64>,
pub bool_value: Option<bool>,
}
impl KeyValue {
pub fn new(key: &str) -> Self {
KeyValue {
key: key.to_string(),
..Default::default()
}
}
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use serde::{Deserialize, Serialize};
use tempfile::TempDir;
use crate::kv::KV; use crate::kv::KV;
#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug)]
struct Person {
name: String,
age: i32,
}
#[test] #[test]
fn kv_store_test() { fn kv_store_test() {
let dir = "./temp/"; let tempdir = TempDir::new().unwrap();
if !std::path::Path::new(dir).exists() { let path = tempdir.into_path();
std::fs::create_dir_all(dir).unwrap(); KV::init(path.to_str().unwrap()).unwrap();
}
KV::init(dir).unwrap();
KV::set_str("1", "hello".to_string()); KV::set_str("1", "hello".to_string());
assert_eq!(KV::get_str("1").unwrap(), "hello"); assert_eq!(KV::get_str("1").unwrap(), "hello");
assert_eq!(KV::get_str("2"), None); assert_eq!(KV::get_str("2"), None);
KV::set_bool("1", true); KV::set_bool("1", true).unwrap();
assert!(KV::get_bool("1")); assert!(KV::get_bool("1"));
assert!(!KV::get_bool("2")); assert!(!KV::get_bool("2"));
KV::set_i64("1", 1).unwrap();
assert_eq!(KV::get_i64("1").unwrap(), 1);
assert_eq!(KV::get_i64("2"), None);
let person = Person {
name: "nathan".to_string(),
age: 30,
};
KV::set_object("1", person.clone()).unwrap();
assert_eq!(KV::get_object::<Person>("1").unwrap(), person);
} }
} }

View File

@ -1,20 +1,14 @@
#[allow(dead_code)] #[allow(dead_code)]
pub const KV_SQL: &str = r#" pub const KV_SQL: &str = r#"
CREATE TABLE IF NOT EXISTS kv_table ( CREATE TABLE IF NOT EXISTS kv_table (
key TEXT NOT NULL PRIMARY KEY, key TEXT NOT NULL PRIMARY KEY,
str_value TEXT, value TEXT
int_value BIGINT,
float_value DOUBLE,
bool_value BOOLEAN
); );
"#; "#;
table! { table! {
kv_table (key) { kv_table (key) {
key -> Text, key -> Text,
str_value -> Nullable<Text>, value -> Nullable<Text>,
int_value -> Nullable<BigInt>,
float_value -> Nullable<Double>,
bool_value -> Nullable<Bool>,
} }
} }

View File

@ -4,6 +4,7 @@ use appflowy_integrate::RocksCollabDB;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use flowy_error::internal_error;
use flowy_sqlite::ConnectionPool; use flowy_sqlite::ConnectionPool;
use flowy_sqlite::{ use flowy_sqlite::{
kv::KV, kv::KV,
@ -18,7 +19,7 @@ use crate::entities::{
use crate::entities::{UserProfilePB, UserSettingPB}; use crate::entities::{UserProfilePB, UserSettingPB};
use crate::event_map::UserStatusCallback; use crate::event_map::UserStatusCallback;
use crate::{ use crate::{
errors::{ErrorCode, FlowyError}, errors::FlowyError,
event_map::UserCloudService, event_map::UserCloudService,
notification::*, notification::*,
services::database::{UserDB, UserTable, UserTableChangeset}, services::database::{UserDB, UserTable, UserTableChangeset},
@ -288,28 +289,22 @@ impl UserSession {
fn set_session(&self, session: Option<Session>) -> Result<(), FlowyError> { fn set_session(&self, session: Option<Session>) -> Result<(), FlowyError> {
tracing::debug!("Set user session: {:?}", session); tracing::debug!("Set user session: {:?}", session);
match &session { match &session {
None => KV::remove(&self.session_config.session_cache_key) None => KV::remove(&self.session_config.session_cache_key),
.map_err(|e| FlowyError::new(ErrorCode::Internal, &e))?, Some(session) => {
Some(session) => KV::set_str( KV::set_object(&self.session_config.session_cache_key, session.clone())
&self.session_config.session_cache_key, .map_err(internal_error)?;
session.clone().into(), },
),
} }
Ok(()) Ok(())
} }
fn get_session(&self) -> Result<Session, FlowyError> { fn get_session(&self) -> Result<Session, FlowyError> {
match KV::get_str(&self.session_config.session_cache_key) { match KV::get_object::<Session>(&self.session_config.session_cache_key) {
None => Err(FlowyError::unauthorized()), None => Err(FlowyError::unauthorized()),
Some(s) => Ok(Session::from(s)), Some(session) => Ok(session),
} }
} }
// fn get_old_session(&self) -> Option<OldSession> {
// let s = KV::get_str(&self.config.session_cache_key)?;
// serde_json::from_str::<OldSession>(&s).ok()
// }
fn is_user_login(&self, email: &str) -> bool { fn is_user_login(&self, email: &str) -> bool {
match self.get_session() { match self.get_session() {
Ok(session) => session.email == email, Ok(session) => session.email == email,