replace std::sync::RwLock with parking_lot::RwLock

This commit is contained in:
appflowy 2021-09-01 11:21:42 +08:00
parent 33138382bd
commit 2088595f3b
3 changed files with 88 additions and 40 deletions

View File

@ -13,7 +13,7 @@ pub struct WorkspaceUserImpl {
impl WorkspaceUser for WorkspaceUserImpl {
fn user_id(&self) -> Result<String, WorkspaceError> {
self.user_session.user_id().map_err(|e| {
self.user_session.current_session().map_err(|e| {
ErrorBuilder::new(ErrorCode::UserInternalError)
.error(e)
.build()

View File

@ -18,6 +18,7 @@ flowy-net = { path = "../flowy-net" }
tracing = { version = "0.1", features = ["log"] }
bytes = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = {version = "1.0"}
validator = "0.12.0"
rand = { version = "0.8", features=["std_rng"] }
unicode-segmentation = "1.7.1"

View File

@ -16,7 +16,10 @@ use flowy_database::{
use crate::entities::UserToken;
use flowy_infra::kv::KVStore;
use flowy_sqlite::ConnectionPool;
use std::sync::{Arc, RwLock};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use serde_json::Error;
use std::sync::Arc;
pub struct UserSessionConfig {
root_dir: String,
@ -37,7 +40,7 @@ pub struct UserSession {
config: UserSessionConfig,
#[allow(dead_code)]
pub(crate) server: Server,
user_id: RwLock<Option<String>>,
session: RwLock<Option<Session>>,
}
impl UserSession {
@ -48,23 +51,30 @@ impl UserSession {
database: db,
config,
server,
user_id: RwLock::new(None),
session: RwLock::new(None),
}
}
pub fn get_db_connection(&self) -> Result<DBConnection, UserError> {
let user_id = self.user_id()?;
let user_id = self.get_session()?.user_id;
self.database.get_connection(&user_id)
}
// The caller will be not 'Sync' before of the return value,
// PooledConnection<ConnectionManager> is not sync. You can use
// db_connection_pool function to require the ConnectionPool that is 'Sync'.
//
// let pool = self.db_connection_pool()?;
// let conn: PooledConnection<ConnectionManager> = pool.get()?;
pub fn db_connection_pool(&self) -> Result<Arc<ConnectionPool>, UserError> {
let user_id = self.user_id()?;
let user_id = self.get_session()?.user_id;
self.database.get_pool(&user_id)
}
pub async fn sign_in(&self, params: SignInParams) -> Result<UserTable, UserError> {
let resp = self.server.sign_in(params).await?;
let _ = self.set_user_id(Some(resp.uid.clone()))?;
let session = Session::new(&resp.uid, &resp.token);
let _ = self.set_session(Some(session))?;
let user_table = self.save_user(resp.into()).await?;
Ok(user_table)
@ -72,7 +82,8 @@ impl UserSession {
pub async fn sign_up(&self, params: SignUpParams) -> Result<UserTable, UserError> {
let resp = self.server.sign_up(params).await?;
let _ = self.set_user_id(Some(resp.uid.clone()))?;
let session = Session::new(&resp.uid, &resp.token);
let _ = self.set_session(Some(session))?;
let user_table = self.save_user(resp.into()).await?;
Ok(user_table)
@ -91,7 +102,7 @@ impl UserSession {
diesel::delete(dsl::user_table.filter(dsl::id.eq(&user_detail.id))).execute(&*conn)?;
let _ = self.server.sign_out(&user_detail.id);
let _ = self.database.close_user_db(&user_detail.id)?;
let _ = self.set_user_id(None)?;
let _ = self.set_session(None)?;
Ok(())
}
@ -113,7 +124,7 @@ impl UserSession {
}
pub async fn user_detail(&self) -> Result<UserDetail, UserError> {
let user_id = self.user_id()?;
let user_id = self.get_session()?.user_id;
let user = dsl::user_table
.filter(user_table::id.eq(&user_id))
.first::<UserTable>(&*(self.get_db_connection()?))?;
@ -137,44 +148,41 @@ impl UserSession {
Ok(UserDetail::from(user))
}
pub fn set_user_id(&self, user_id: Option<String>) -> Result<(), UserError> {
log::trace!("Set user id: {:?}", user_id);
KVStore::set_str(USER_ID_CACHE_KEY, user_id.clone().unwrap_or("".to_owned()));
match self.user_id.write() {
Ok(mut write_guard) => {
*write_guard = user_id;
Ok(())
},
Err(e) => Err(ErrorBuilder::new(ErrorCode::WriteCurrentIdFailed)
.error(e)
.build()),
}
}
pub fn user_dir(&self) -> Result<String, UserError> {
let user_id = self.user_id()?;
Ok(format!("{}/{}", self.config.root_dir, user_id))
let session = self.get_session()?;
Ok(format!("{}/{}", self.config.root_dir, session.user_id))
}
pub fn user_id(&self) -> Result<String, UserError> {
let mut user_id = {
let read_guard = self.user_id.read().map_err(|e| {
ErrorBuilder::new(ErrorCode::ReadCurrentIdFailed)
.error(e)
.build()
})?;
fn set_session(&self, session: Option<Session>) -> Result<(), UserError> {
log::trace!("Update user session: {:?}", session);
match &session {
None => KVStore::set_str(SESSION_CACHE_KEY, "".to_string()),
Some(session) => KVStore::set_str(SESSION_CACHE_KEY, session.clone().into()),
}
let mut write_guard = self.session.write();
*write_guard = session;
Ok(())
}
fn get_session(&self) -> Result<Session, UserError> {
let mut session = {
let read_guard = self.session.read();
(*read_guard).clone()
};
if user_id.is_none() {
user_id = KVStore::get_str(USER_ID_CACHE_KEY);
let _ = self.set_user_id(user_id.clone())?;
if session.is_none() {
match KVStore::get_str(SESSION_CACHE_KEY) {
None => {},
Some(s) => {
session = Some(Session::from(s));
let _ = self.set_session(session.clone())?;
},
}
}
match user_id {
match session {
None => Err(ErrorBuilder::new(ErrorCode::UserNotLoginYet).build()),
Some(user_id) => Ok(user_id),
Some(session) => Ok(session),
}
}
@ -196,7 +204,7 @@ pub async fn update_user(
}
pub fn current_user_id() -> Result<String, UserError> {
match KVStore::get_str(USER_ID_CACHE_KEY) {
match KVStore::get_str(SESSION_CACHE_KEY) {
None => Err(ErrorBuilder::new(ErrorCode::UserNotLoginYet).build()),
Some(user_id) => Ok(user_id),
}
@ -208,4 +216,43 @@ impl UserDatabaseConnection for UserSession {
}
}
const USER_ID_CACHE_KEY: &str = "user_id";
const SESSION_CACHE_KEY: &str = "session_cache_key";
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
struct Session {
user_id: String,
token: String,
}
impl Session {
pub fn new(user_id: &str, token: &str) -> Self {
Self {
user_id: user_id.to_owned(),
token: token.to_owned(),
}
}
}
impl std::convert::From<String> for Session {
fn from(s: String) -> Self {
match serde_json::from_str(&s) {
Ok(s) => s,
Err(e) => {
log::error!("{Deserialize string to Session failed: {:?}", e"}");
Session::default()
},
}
}
}
impl std::convert::Into<String> for Session {
fn into(self) -> String {
match serde_json::to_string(&self) {
Ok(s) => s,
Err(e) => {
log::error!("{Serialize session to string failed: {:?}", e"}");
"".to_string()
},
}
}
}