From 2088595f3ba3c4e5a3ddb0dba1352d5688119015 Mon Sep 17 00:00:00 2001 From: appflowy Date: Wed, 1 Sep 2021 11:21:42 +0800 Subject: [PATCH] replace std::sync::RwLock with parking_lot::RwLock --- .../src/deps_resolve/workspace_deps_impl.rs | 2 +- rust-lib/flowy-user/Cargo.toml | 1 + .../src/services/user/user_session.rs | 125 ++++++++++++------ 3 files changed, 88 insertions(+), 40 deletions(-) diff --git a/rust-lib/flowy-sdk/src/deps_resolve/workspace_deps_impl.rs b/rust-lib/flowy-sdk/src/deps_resolve/workspace_deps_impl.rs index ba41e38c7b..b0ddb2dc0c 100644 --- a/rust-lib/flowy-sdk/src/deps_resolve/workspace_deps_impl.rs +++ b/rust-lib/flowy-sdk/src/deps_resolve/workspace_deps_impl.rs @@ -13,7 +13,7 @@ pub struct WorkspaceUserImpl { impl WorkspaceUser for WorkspaceUserImpl { fn user_id(&self) -> Result { - self.user_session.user_id().map_err(|e| { + self.user_session.current_session().map_err(|e| { ErrorBuilder::new(ErrorCode::UserInternalError) .error(e) .build() diff --git a/rust-lib/flowy-user/Cargo.toml b/rust-lib/flowy-user/Cargo.toml index c81ef8d0d0..a8beb05429 100644 --- a/rust-lib/flowy-user/Cargo.toml +++ b/rust-lib/flowy-user/Cargo.toml @@ -18,6 +18,7 @@ flowy-net = { path = "../flowy-net" } tracing = { version = "0.1", features = ["log"] } bytes = "1.0" serde = { version = "1.0", features = ["derive"] } +serde_json = {version = "1.0"} validator = "0.12.0" rand = { version = "0.8", features=["std_rng"] } unicode-segmentation = "1.7.1" diff --git a/rust-lib/flowy-user/src/services/user/user_session.rs b/rust-lib/flowy-user/src/services/user/user_session.rs index 92670d3b7c..e76544e762 100644 --- a/rust-lib/flowy-user/src/services/user/user_session.rs +++ b/rust-lib/flowy-user/src/services/user/user_session.rs @@ -16,7 +16,10 @@ use flowy_database::{ use crate::entities::UserToken; use flowy_infra::kv::KVStore; use flowy_sqlite::ConnectionPool; -use std::sync::{Arc, RwLock}; +use parking_lot::RwLock; +use serde::{Deserialize, Serialize}; +use serde_json::Error; +use std::sync::Arc; pub struct UserSessionConfig { root_dir: String, @@ -37,7 +40,7 @@ pub struct UserSession { config: UserSessionConfig, #[allow(dead_code)] pub(crate) server: Server, - user_id: RwLock>, + session: RwLock>, } impl UserSession { @@ -48,23 +51,30 @@ impl UserSession { database: db, config, server, - user_id: RwLock::new(None), + session: RwLock::new(None), } } pub fn get_db_connection(&self) -> Result { - let user_id = self.user_id()?; + let user_id = self.get_session()?.user_id; self.database.get_connection(&user_id) } + // The caller will be not 'Sync' before of the return value, + // PooledConnection is not sync. You can use + // db_connection_pool function to require the ConnectionPool that is 'Sync'. + // + // let pool = self.db_connection_pool()?; + // let conn: PooledConnection = pool.get()?; pub fn db_connection_pool(&self) -> Result, UserError> { - let user_id = self.user_id()?; + let user_id = self.get_session()?.user_id; self.database.get_pool(&user_id) } pub async fn sign_in(&self, params: SignInParams) -> Result { let resp = self.server.sign_in(params).await?; - let _ = self.set_user_id(Some(resp.uid.clone()))?; + let session = Session::new(&resp.uid, &resp.token); + let _ = self.set_session(Some(session))?; let user_table = self.save_user(resp.into()).await?; Ok(user_table) @@ -72,7 +82,8 @@ impl UserSession { pub async fn sign_up(&self, params: SignUpParams) -> Result { let resp = self.server.sign_up(params).await?; - let _ = self.set_user_id(Some(resp.uid.clone()))?; + let session = Session::new(&resp.uid, &resp.token); + let _ = self.set_session(Some(session))?; let user_table = self.save_user(resp.into()).await?; Ok(user_table) @@ -91,7 +102,7 @@ impl UserSession { diesel::delete(dsl::user_table.filter(dsl::id.eq(&user_detail.id))).execute(&*conn)?; let _ = self.server.sign_out(&user_detail.id); let _ = self.database.close_user_db(&user_detail.id)?; - let _ = self.set_user_id(None)?; + let _ = self.set_session(None)?; Ok(()) } @@ -113,7 +124,7 @@ impl UserSession { } pub async fn user_detail(&self) -> Result { - let user_id = self.user_id()?; + let user_id = self.get_session()?.user_id; let user = dsl::user_table .filter(user_table::id.eq(&user_id)) .first::(&*(self.get_db_connection()?))?; @@ -137,44 +148,41 @@ impl UserSession { Ok(UserDetail::from(user)) } - pub fn set_user_id(&self, user_id: Option) -> Result<(), UserError> { - log::trace!("Set user id: {:?}", user_id); - KVStore::set_str(USER_ID_CACHE_KEY, user_id.clone().unwrap_or("".to_owned())); - match self.user_id.write() { - Ok(mut write_guard) => { - *write_guard = user_id; - Ok(()) - }, - Err(e) => Err(ErrorBuilder::new(ErrorCode::WriteCurrentIdFailed) - .error(e) - .build()), - } - } - pub fn user_dir(&self) -> Result { - let user_id = self.user_id()?; - Ok(format!("{}/{}", self.config.root_dir, user_id)) + let session = self.get_session()?; + Ok(format!("{}/{}", self.config.root_dir, session.user_id)) } - pub fn user_id(&self) -> Result { - let mut user_id = { - let read_guard = self.user_id.read().map_err(|e| { - ErrorBuilder::new(ErrorCode::ReadCurrentIdFailed) - .error(e) - .build() - })?; + fn set_session(&self, session: Option) -> Result<(), UserError> { + log::trace!("Update user session: {:?}", session); + match &session { + None => KVStore::set_str(SESSION_CACHE_KEY, "".to_string()), + Some(session) => KVStore::set_str(SESSION_CACHE_KEY, session.clone().into()), + } + let mut write_guard = self.session.write(); + *write_guard = session; + Ok(()) + } + fn get_session(&self) -> Result { + let mut session = { + let read_guard = self.session.read(); (*read_guard).clone() }; - if user_id.is_none() { - user_id = KVStore::get_str(USER_ID_CACHE_KEY); - let _ = self.set_user_id(user_id.clone())?; + if session.is_none() { + match KVStore::get_str(SESSION_CACHE_KEY) { + None => {}, + Some(s) => { + session = Some(Session::from(s)); + let _ = self.set_session(session.clone())?; + }, + } } - match user_id { + match session { None => Err(ErrorBuilder::new(ErrorCode::UserNotLoginYet).build()), - Some(user_id) => Ok(user_id), + Some(session) => Ok(session), } } @@ -196,7 +204,7 @@ pub async fn update_user( } pub fn current_user_id() -> Result { - match KVStore::get_str(USER_ID_CACHE_KEY) { + match KVStore::get_str(SESSION_CACHE_KEY) { None => Err(ErrorBuilder::new(ErrorCode::UserNotLoginYet).build()), Some(user_id) => Ok(user_id), } @@ -208,4 +216,43 @@ impl UserDatabaseConnection for UserSession { } } -const USER_ID_CACHE_KEY: &str = "user_id"; +const SESSION_CACHE_KEY: &str = "session_cache_key"; + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +struct Session { + user_id: String, + token: String, +} + +impl Session { + pub fn new(user_id: &str, token: &str) -> Self { + Self { + user_id: user_id.to_owned(), + token: token.to_owned(), + } + } +} + +impl std::convert::From for Session { + fn from(s: String) -> Self { + match serde_json::from_str(&s) { + Ok(s) => s, + Err(e) => { + log::error!("{Deserialize string to Session failed: {:?}", e"}"); + Session::default() + }, + } + } +} + +impl std::convert::Into for Session { + fn into(self) -> String { + match serde_json::to_string(&self) { + Ok(s) => s, + Err(e) => { + log::error!("{Serialize session to string failed: {:?}", e"}"); + "".to_string() + }, + } + } +}