use crate::entities::{ SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserProfileParams, UserProfilePB, UserSettingPB, }; use crate::{ dart_notification::*, errors::{ErrorCode, FlowyError}, event_map::UserCloudService, services::{ database::{UserDB, UserTable, UserTableChangeset}, notifier::UserNotifier, }, }; use flowy_database::ConnectionPool; use flowy_database::{ kv::KV, query_dsl::*, schema::{user_table, user_table::dsl}, DBConnection, ExpressionMethods, UserDatabaseConnection, }; use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::sync::mpsc; pub struct UserSessionConfig { root_dir: String, /// Used as the key of `Session` when saving session information to KV. session_cache_key: String, } impl UserSessionConfig { /// The `root_dir` represents as the root of the user folders. It must be unique for each /// users. pub fn new(name: &str, root_dir: &str) -> Self { let session_cache_key = format!("{}_session_cache", name); Self { root_dir: root_dir.to_owned(), session_cache_key, } } } pub struct UserSession { database: UserDB, config: UserSessionConfig, cloud_service: Arc, pub notifier: UserNotifier, } impl UserSession { pub fn new(config: UserSessionConfig, cloud_service: Arc) -> Self { let db = UserDB::new(&config.root_dir); let notifier = UserNotifier::new(); Self { database: db, config, cloud_service, notifier, } } pub fn init(&self) { if let Ok(session) = self.get_session() { self.notifier.notify_login(&session.token, &session.user_id); } } pub fn db_connection(&self) -> Result { let user_id = self.get_session()?.user_id; self.database.get_connection(&user_id) } // The caller will be not 'Sync' before of the return value, // PooledConnection is not sync. You can use // db_connection_pool function to require the ConnectionPool that is 'Sync'. // // let pool = self.db_connection_pool()?; // let conn: PooledConnection = pool.get()?; pub fn db_pool(&self) -> Result, FlowyError> { let user_id = self.get_session()?.user_id; self.database.get_pool(&user_id) } #[tracing::instrument(level = "debug", skip(self))] pub async fn sign_in(&self, params: SignInParams) -> Result { if self.is_user_login(¶ms.email) { self.get_user_profile().await } else { let resp = self.cloud_service.sign_in(params).await?; let session: Session = resp.clone().into(); self.set_session(Some(session))?; let user_table = self.save_user(resp.into()).await?; let user_profile: UserProfilePB = user_table.into(); self.notifier.notify_login(&user_profile.token, &user_profile.id); Ok(user_profile) } } #[tracing::instrument(level = "debug", skip(self))] pub async fn sign_up(&self, params: SignUpParams) -> Result { if self.is_user_login(¶ms.email) { self.get_user_profile().await } else { let resp = self.cloud_service.sign_up(params).await?; let session: Session = resp.clone().into(); self.set_session(Some(session))?; let user_table = self.save_user(resp.into()).await?; let user_profile: UserProfilePB = user_table.into(); let (ret, mut tx) = mpsc::channel(1); self.notifier.notify_sign_up(ret, &user_profile); let _ = tx.recv().await; Ok(user_profile) } } #[tracing::instrument(level = "debug", skip(self))] pub async fn sign_out(&self) -> Result<(), FlowyError> { let session = self.get_session()?; let _ = diesel::delete(dsl::user_table.filter(dsl::id.eq(&session.user_id))).execute(&*(self.db_connection()?))?; self.database.close_user_db(&session.user_id)?; self.set_session(None)?; self.notifier.notify_logout(&session.token, &session.user_id); self.sign_out_on_server(&session.token).await?; Ok(()) } #[tracing::instrument(level = "debug", skip(self))] pub async fn update_user_profile(&self, params: UpdateUserProfileParams) -> Result<(), FlowyError> { let session = self.get_session()?; let changeset = UserTableChangeset::new(params.clone()); diesel_update_table!(user_table, changeset, &*self.db_connection()?); let user_profile = self.get_user_profile().await?; dart_notify(&session.token, UserNotification::UserProfileUpdated) .payload(user_profile) .send(); self.update_user_on_server(&session.token, params).await?; Ok(()) } pub async fn init_user(&self) -> Result<(), FlowyError> { Ok(()) } pub async fn check_user(&self) -> Result { let (user_id, token) = self.get_session()?.into_part(); let user = dsl::user_table .filter(user_table::id.eq(&user_id)) .first::(&*(self.db_connection()?))?; self.read_user_profile_on_server(&token)?; Ok(user.into()) } pub async fn get_user_profile(&self) -> Result { let (user_id, token) = self.get_session()?.into_part(); let user = dsl::user_table .filter(user_table::id.eq(&user_id)) .first::(&*(self.db_connection()?))?; self.read_user_profile_on_server(&token)?; Ok(user.into()) } pub fn user_dir(&self) -> Result { let session = self.get_session()?; Ok(format!("{}/{}", self.config.root_dir, session.user_id)) } pub fn user_setting(&self) -> Result { let user_setting = UserSettingPB { user_folder: self.user_dir()?, }; Ok(user_setting) } pub fn user_id(&self) -> Result { Ok(self.get_session()?.user_id) } pub fn user_name(&self) -> Result { Ok(self.get_session()?.name) } pub fn token(&self) -> Result { Ok(self.get_session()?.token) } } impl UserSession { fn read_user_profile_on_server(&self, _token: &str) -> Result<(), FlowyError> { // let server = self.cloud_service.clone(); // let token = token.to_owned(); // tokio::spawn(async move { // match server.get_user(&token).await { // Ok(profile) => { // dart_notify(&token, UserNotification::UserProfileUpdated) // .payload(profile) // .send(); // } // Err(e) => { // dart_notify(&token, UserNotification::UserProfileUpdated) // .error(e) // .send(); // } // } // }); Ok(()) } async fn update_user_on_server(&self, token: &str, params: UpdateUserProfileParams) -> Result<(), FlowyError> { let server = self.cloud_service.clone(); let token = token.to_owned(); let _ = tokio::spawn(async move { match server.update_user(&token, params).await { Ok(_) => {} Err(e) => { // TODO: retry? log::error!("update user profile failed: {:?}", e); } } }) .await; Ok(()) } async fn sign_out_on_server(&self, token: &str) -> Result<(), FlowyError> { let server = self.cloud_service.clone(); let token = token.to_owned(); let _ = tokio::spawn(async move { match server.sign_out(&token).await { Ok(_) => {} Err(e) => log::error!("Sign out failed: {:?}", e), } }) .await; Ok(()) } async fn save_user(&self, user: UserTable) -> Result { let conn = self.db_connection()?; let _ = diesel::insert_into(user_table::table) .values(user.clone()) .execute(&*conn)?; Ok(user) } fn set_session(&self, session: Option) -> Result<(), FlowyError> { tracing::debug!("Set user session: {:?}", session); match &session { None => KV::remove(&self.config.session_cache_key).map_err(|e| FlowyError::new(ErrorCode::Internal, &e))?, Some(session) => KV::set_str(&self.config.session_cache_key, session.clone().into()), } Ok(()) } fn get_session(&self) -> Result { match KV::get_str(&self.config.session_cache_key) { None => Err(FlowyError::unauthorized()), Some(s) => Ok(Session::from(s)), } } fn is_user_login(&self, email: &str) -> bool { match self.get_session() { Ok(session) => session.email == email, Err(_) => false, } } } pub async fn update_user( _cloud_service: Arc, pool: Arc, params: UpdateUserProfileParams, ) -> Result<(), FlowyError> { let changeset = UserTableChangeset::new(params); let conn = pool.get()?; diesel_update_table!(user_table, changeset, &*conn); Ok(()) } impl UserDatabaseConnection for UserSession { fn get_connection(&self) -> Result { self.db_connection().map_err(|e| format!("{:?}", e)) } } #[derive(Debug, Clone, Default, Serialize, Deserialize)] struct Session { user_id: String, token: String, email: String, #[serde(default)] name: String, } impl std::convert::From for Session { fn from(resp: SignInResponse) -> Self { Session { user_id: resp.user_id, token: resp.token, email: resp.email, name: resp.name, } } } impl std::convert::From for Session { fn from(resp: SignUpResponse) -> Self { Session { user_id: resp.user_id, token: resp.token, email: resp.email, name: resp.name, } } } impl Session { pub fn into_part(self) -> (String, String) { (self.user_id, self.token) } } impl std::convert::From for Session { fn from(s: String) -> Self { match serde_json::from_str(&s) { Ok(s) => s, Err(e) => { log::error!("Deserialize string to Session failed: {:?}", e); Session::default() } } } } impl std::convert::From for String { fn from(session: Session) -> Self { match serde_json::to_string(&session) { Ok(s) => s, Err(e) => { log::error!("Serialize session to string failed: {:?}", e); "".to_string() } } } }