feat: integrate postgres storage (#2604)

* chore: env config

* chore: get user workspace

* feat: enable postgres storage

* chore: add new env

* chore: add set env ffi

* chore: pass env before backend init

* chore: update

* fix: ci tests

* chore: commit the generate env file

* chore: remove unused import
This commit is contained in:
Nathan.fooo
2023-05-23 23:55:21 +08:00
committed by GitHub
parent 51a7954af7
commit 056e2d49d0
87 changed files with 1421 additions and 1131 deletions

View File

@ -24,11 +24,12 @@ postgrest = "1.0"
tokio-retry = "0.3"
anyhow = "1.0"
uuid = { version = "1.3.3", features = ["v4"] }
chrono = "0.4.24"
lib-infra = { path = "../../../shared-lib/lib-infra" }
flowy-user = { path = "../flowy-user" }
flowy-folder2 = { path = "../flowy-folder2" }
flowy-error = { path = "../flowy-error" }
flowy-config = { path = "../flowy-config" }
[dev-dependencies]
uuid = { version = "1.3.3", features = ["v4"] }

View File

@ -1,3 +1,4 @@
use flowy_folder2::deps::FolderCloudService;
use std::sync::Arc;
use flowy_user::event_map::UserAuthService;
@ -8,6 +9,21 @@ mod response;
pub mod self_host;
pub mod supabase;
/// In order to run this the supabase test, you need to create a .env file in the root directory of this project
/// and add the following environment variables:
/// - SUPABASE_URL
/// - SUPABASE_ANON_KEY
/// - SUPABASE_KEY
/// - SUPABASE_JWT_SECRET
///
/// the .env file should look like this:
/// SUPABASE_URL=https://<your-supabase-url>.supabase.co
/// SUPABASE_ANON_KEY=<your-supabase-anon-key>
/// SUPABASE_KEY=<your-supabase-key>
/// SUPABASE_JWT_SECRET=<your-supabase-jwt-secret>
///
pub trait AppFlowyServer: Send + Sync + 'static {
fn user_service(&self) -> Arc<dyn UserAuthService>;
fn folder_service(&self) -> Arc<dyn FolderCloudService>;
}

View File

@ -0,0 +1,21 @@
use flowy_error::FlowyError;
use flowy_folder2::deps::{FolderCloudService, Workspace};
use flowy_folder2::gen_workspace_id;
use lib_infra::future::FutureResult;
use lib_infra::util::timestamp;
pub(crate) struct LocalServerFolderCloudServiceImpl();
impl FolderCloudService for LocalServerFolderCloudServiceImpl {
fn create_workspace(&self, _uid: i64, name: &str) -> FutureResult<Workspace, FlowyError> {
let name = name.to_string();
FutureResult::new(async move {
Ok(Workspace {
id: gen_workspace_id(),
name: name.to_string(),
belongings: Default::default(),
created_at: timestamp(),
})
})
}
}

View File

@ -0,0 +1,5 @@
mod folder;
mod user;
pub(crate) use folder::*;
pub(crate) use user::*;

View File

@ -1,5 +1,5 @@
pub use server::*;
pub mod impls;
mod server;
pub(crate) mod uid;
mod user;

View File

@ -1,11 +1,14 @@
use std::sync::Arc;
use flowy_folder2::deps::FolderCloudService;
use parking_lot::RwLock;
use tokio::sync::mpsc;
use flowy_user::event_map::UserAuthService;
use crate::local_server::user::LocalServerUserAuthServiceImpl;
use crate::local_server::impls::{
LocalServerFolderCloudServiceImpl, LocalServerUserAuthServiceImpl,
};
use crate::AppFlowyServer;
#[derive(Default)]
@ -31,4 +34,8 @@ impl AppFlowyServer for LocalServer {
fn user_service(&self) -> Arc<dyn UserAuthService> {
Arc::new(LocalServerUserAuthServiceImpl())
}
fn folder_service(&self) -> Arc<dyn FolderCloudService> {
Arc::new(LocalServerFolderCloudServiceImpl())
}
}

View File

@ -0,0 +1,21 @@
use flowy_error::FlowyError;
use flowy_folder2::deps::{FolderCloudService, Workspace};
use flowy_folder2::gen_workspace_id;
use lib_infra::future::FutureResult;
use lib_infra::util::timestamp;
pub(crate) struct SelfHostedServerFolderCloudServiceImpl();
impl FolderCloudService for SelfHostedServerFolderCloudServiceImpl {
fn create_workspace(&self, _uid: i64, name: &str) -> FutureResult<Workspace, FlowyError> {
let name = name.to_string();
FutureResult::new(async move {
Ok(Workspace {
id: gen_workspace_id(),
name: name.to_string(),
belongings: Default::default(),
created_at: timestamp(),
})
})
}
}

View File

@ -0,0 +1,5 @@
mod folder;
mod user;
pub(crate) use folder::*;
pub(crate) use user::*;

View File

@ -1,6 +1,5 @@
pub use server::*;
pub use user::*;
pub mod configuration;
pub mod impls;
mod server;
mod user;

View File

@ -1,9 +1,12 @@
use flowy_folder2::deps::FolderCloudService;
use std::sync::Arc;
use flowy_user::event_map::UserAuthService;
use crate::self_host::configuration::SelfHostedConfiguration;
use crate::self_host::SelfHostedUserAuthServiceImpl;
use crate::self_host::impls::{
SelfHostedServerFolderCloudServiceImpl, SelfHostedUserAuthServiceImpl,
};
use crate::AppFlowyServer;
pub struct SelfHostServer {
@ -20,4 +23,8 @@ impl AppFlowyServer for SelfHostServer {
fn user_service(&self) -> Arc<dyn UserAuthService> {
Arc::new(SelfHostedUserAuthServiceImpl::new(self.config.clone()))
}
fn folder_service(&self) -> Arc<dyn FolderCloudService> {
Arc::new(SelfHostedServerFolderCloudServiceImpl())
}
}

View File

@ -0,0 +1,54 @@
use crate::supabase::request::create_workspace_with_uid;
use flowy_error::FlowyError;
use flowy_folder2::deps::{FolderCloudService, Workspace};
use lib_infra::future::FutureResult;
use postgrest::Postgrest;
use std::sync::Arc;
pub(crate) const WORKSPACE_TABLE: &str = "af_workspace";
pub(crate) const WORKSPACE_NAME_COLUMN: &str = "workspace_name";
pub(crate) struct SupabaseFolderCloudServiceImpl {
postgrest: Arc<Postgrest>,
}
impl FolderCloudService for SupabaseFolderCloudServiceImpl {
fn create_workspace(&self, uid: i64, name: &str) -> FutureResult<Workspace, FlowyError> {
let name = name.to_string();
let postgrest = self.postgrest.clone();
FutureResult::new(async move { create_workspace_with_uid(postgrest, uid, &name).await })
}
}
#[cfg(test)]
mod tests {
use crate::supabase::request::{
create_user_with_uuid, create_workspace_with_uid, get_user_workspace_with_uid,
};
use crate::supabase::{SupabaseConfiguration, SupabaseServer};
use dotenv::dotenv;
use std::sync::Arc;
#[tokio::test]
async fn create_user_workspace() {
dotenv().ok();
if let Ok(config) = SupabaseConfiguration::from_env() {
let server = Arc::new(SupabaseServer::new(config));
let uuid = uuid::Uuid::new_v4();
let uid = create_user_with_uuid(server.postgres.clone(), uuid.to_string())
.await
.unwrap()
.uid;
create_workspace_with_uid(server.postgres.clone(), uid, "test")
.await
.unwrap();
let workspaces = get_user_workspace_with_uid(server.postgres.clone(), uid)
.await
.unwrap();
assert_eq!(workspaces.len(), 2);
assert_eq!(workspaces[0].name, "My workspace");
assert_eq!(workspaces[1].name, "test");
}
}
}

View File

@ -0,0 +1,5 @@
mod folder;
mod user;
pub(crate) use folder::*;
pub(crate) use user::*;

View File

@ -12,7 +12,8 @@ use crate::supabase::request::*;
pub(crate) const USER_TABLE: &str = "af_user";
pub(crate) const USER_PROFILE_TABLE: &str = "af_user_profile";
pub(crate) const USER_WORKSPACE_TABLE: &str = "af_user_workspace_view";
#[allow(dead_code)]
pub(crate) const USER_WORKSPACE_TABLE: &str = "af_workspace";
pub(crate) struct PostgrestUserAuthServiceImpl {
postgrest: Arc<Postgrest>,
}
@ -41,14 +42,12 @@ impl UserAuthService for PostgrestUserAuthServiceImpl {
let postgrest = self.postgrest.clone();
FutureResult::new(async move {
let uuid = uuid_from_box_any(params)?;
match get_user_workspace_with_uuid(postgrest, uuid).await? {
None => Err(FlowyError::user_not_exist()),
Some(user) => Ok(SignInResponse {
user_id: user.uid,
workspace_id: user.workspace_id,
..Default::default()
}),
}
let user_profile = get_user_profile(postgrest, GetUserProfileParams::Uuid(uuid)).await?;
Ok(SignInResponse {
user_id: user_profile.uid,
workspace_id: user_profile.workspace_id,
..Default::default()
})
})
}
@ -76,18 +75,19 @@ impl UserAuthService for PostgrestUserAuthServiceImpl {
) -> FutureResult<Option<UserProfile>, FlowyError> {
let postgrest = self.postgrest.clone();
FutureResult::new(async move {
let profile = get_user_workspace_with_uid(postgrest, uid)
.await?
.map(|user_workspace| UserProfile {
id: user_workspace.uid,
email: "".to_string(),
name: user_workspace.name,
token: "".to_string(),
icon_url: "".to_string(),
openai_key: "".to_string(),
workspace_id: user_workspace.workspace_id,
});
Ok(profile)
let user_profile_resp = get_user_profile(postgrest, GetUserProfileParams::Uid(uid)).await?;
let profile = UserProfile {
id: user_profile_resp.uid,
email: user_profile_resp.email,
name: user_profile_resp.name,
token: "".to_string(),
icon_url: "".to_string(),
openai_key: "".to_string(),
workspace_id: user_profile_resp.workspace_id,
};
Ok(Some(profile))
})
}
}
@ -100,8 +100,10 @@ mod tests {
use flowy_user::entities::UpdateUserProfileParams;
use crate::supabase::request::{get_user_profile, get_user_workspace_with_uid};
use crate::supabase::user::{create_user_with_uuid, get_user_id_with_uuid, update_user_profile};
use crate::supabase::request::{
create_user_with_uuid, get_user_id_with_uuid, get_user_profile, get_user_workspace_with_uid,
update_user_profile, GetUserProfileParams,
};
use crate::supabase::{SupabaseConfiguration, SupabaseServer};
#[tokio::test]
@ -151,17 +153,15 @@ mod tests {
.unwrap();
println!("result: {:?}", result);
let result = get_user_profile(server.postgres.clone(), uid)
let result = get_user_profile(server.postgres.clone(), GetUserProfileParams::Uid(uid))
.await
.unwrap()
.unwrap();
assert_eq!(result.name, "nathan".to_string());
let result = get_user_workspace_with_uid(server.postgres.clone(), uid)
.await
.unwrap()
.unwrap();
assert!(!result.workspace_id.is_empty());
assert!(!result.is_empty());
}
}
}

View File

@ -1,7 +1,7 @@
pub use server::*;
pub mod impls;
mod request;
mod response;
mod retry;
mod server;
pub mod user;

View File

@ -5,13 +5,16 @@ use postgrest::Postgrest;
use serde_json::json;
use flowy_error::{ErrorCode, FlowyError};
use flowy_folder2::deps::Workspace;
use flowy_user::entities::UpdateUserProfileParams;
use lib_infra::box_any::BoxAny;
use crate::supabase::response::{
InsertResponse, PostgrestError, UserProfile, UserProfileList, UserWorkspace, UserWorkspaceList,
use crate::supabase::impls::{
USER_PROFILE_TABLE, USER_TABLE, USER_WORKSPACE_TABLE, WORKSPACE_NAME_COLUMN, WORKSPACE_TABLE,
};
use crate::supabase::response::{
InsertResponse, PostgrestError, UserProfileResponse, UserProfileResponseList, UserWorkspaceList,
};
use crate::supabase::user::{USER_PROFILE_TABLE, USER_TABLE, USER_WORKSPACE_TABLE};
const USER_ID: &str = "uid";
const USER_UUID: &str = "uuid";
@ -19,13 +22,15 @@ const USER_UUID: &str = "uuid";
pub(crate) async fn create_user_with_uuid(
postgrest: Arc<Postgrest>,
uuid: String,
) -> Result<UserWorkspace, FlowyError> {
let insert = format!("{{\"{}\": \"{}\"}}", USER_UUID, &uuid);
) -> Result<UserProfileResponse, FlowyError> {
let mut insert = serde_json::Map::new();
insert.insert(USER_UUID.to_string(), json!(&uuid));
let insert_query = serde_json::to_string(&insert).unwrap();
// Create a new user with uuid.
let resp = postgrest
.from(USER_TABLE)
.insert(insert)
.insert(insert_query)
.execute()
.await
.map_err(|e| FlowyError::new(ErrorCode::HttpError, e))?;
@ -44,13 +49,7 @@ pub(crate) async fn create_user_with_uuid(
.map_err(|e| FlowyError::serde().context(e))?
.first_or_error()?;
match get_user_workspace_with_uid(postgrest, record.uid).await {
Ok(Some(user)) => Ok(user),
_ => Err(FlowyError::new(
ErrorCode::Internal,
"Failed to get user workspace",
)),
}
get_user_profile(postgrest, GetUserProfileParams::Uid(record.uid)).await
} else {
let err = serde_json::from_str::<PostgrestError>(&content)
.map_err(|e| FlowyError::serde().context(e))?;
@ -58,8 +57,8 @@ pub(crate) async fn create_user_with_uuid(
// If there is a unique violation, try to get the user id with uuid. At this point, the user
// should exist.
if err.is_unique_violation() {
match get_user_workspace_with_uuid(postgrest, uuid).await {
Ok(Some(user)) => Ok(user),
match get_user_profile(postgrest, GetUserProfileParams::Uuid(uuid)).await {
Ok(user) => Ok(user),
_ => Err(FlowyError::new(
ErrorCode::Internal,
"Failed to get user workspace",
@ -112,14 +111,21 @@ pub(crate) fn uuid_from_box_any(any: BoxAny) -> Result<String, FlowyError> {
Ok(uuid.to_string())
}
#[allow(dead_code)]
pub enum GetUserProfileParams {
Uid(i64),
Uuid(String),
}
pub(crate) async fn get_user_profile(
postgrest: Arc<Postgrest>,
uid: i64,
) -> Result<Option<UserProfile>, FlowyError> {
let resp = postgrest
.from(USER_PROFILE_TABLE)
.eq(USER_ID, uid.to_string())
params: GetUserProfileParams,
) -> Result<UserProfileResponse, FlowyError> {
let mut builder = postgrest.from(USER_PROFILE_TABLE);
match params {
GetUserProfileParams::Uid(uid) => builder = builder.eq(USER_ID, uid.to_string()),
GetUserProfileParams::Uuid(uuid) => builder = builder.eq(USER_UUID, uuid),
}
let resp = builder
.select("*")
.execute()
.await
@ -129,19 +135,35 @@ pub(crate) async fn get_user_profile(
.text()
.await
.map_err(|e| FlowyError::new(ErrorCode::UnexpectedEmpty, e))?;
let resp = serde_json::from_str::<UserProfileList>(&content)
.map_err(|_e| FlowyError::new(ErrorCode::Serde, "Deserialize UserProfileList failed"))?;
Ok(resp.0.first().cloned())
let mut user_profiles =
serde_json::from_str::<UserProfileResponseList>(&content).map_err(|_e| {
FlowyError::new(
ErrorCode::Serde,
"Deserialize UserProfileResponseList failed",
)
})?;
if user_profiles.0.is_empty() {
return Err(FlowyError::new(
ErrorCode::Internal,
"Failed to get user profile",
));
}
Ok(user_profiles.0.remove(0))
}
pub(crate) async fn get_user_workspace_with_uuid(
pub(crate) async fn create_workspace_with_uid(
postgrest: Arc<Postgrest>,
uuid: String,
) -> Result<Option<UserWorkspace>, FlowyError> {
uid: i64,
name: &str,
) -> Result<Workspace, FlowyError> {
let mut insert = serde_json::Map::new();
insert.insert(USER_ID.to_string(), json!(uid));
insert.insert(WORKSPACE_NAME_COLUMN.to_string(), json!(name));
let insert_query = serde_json::to_string(&insert).unwrap();
let resp = postgrest
.from(USER_WORKSPACE_TABLE)
.eq(USER_UUID, uuid)
.select("*")
.from(WORKSPACE_TABLE)
.insert(insert_query)
.execute()
.await
.map_err(|e| FlowyError::new(ErrorCode::HttpError, e))?;
@ -150,15 +172,31 @@ pub(crate) async fn get_user_workspace_with_uuid(
.text()
.await
.map_err(|e| FlowyError::new(ErrorCode::UnexpectedEmpty, e))?;
let resp = serde_json::from_str::<UserWorkspaceList>(&content)
.map_err(|_e| FlowyError::new(ErrorCode::Serde, "Deserialize UserWorkspaceList failed"))?;
Ok(resp.0.first().cloned())
let mut workspace_list = serde_json::from_str::<UserWorkspaceList>(&content)
.map_err(|_e| FlowyError::new(ErrorCode::Serde, "Deserialize UserWorkspaceList failed"))?
.into_inner();
debug_assert!(workspace_list.len() == 1);
if workspace_list.is_empty() {
return Err(FlowyError::new(
ErrorCode::Internal,
"Failed to create workspace",
));
}
let user_workspace = workspace_list.remove(0);
Ok(Workspace {
id: user_workspace.workspace_id,
name: user_workspace.workspace_name,
belongings: Default::default(),
created_at: user_workspace.created_at.timestamp(),
})
}
#[allow(dead_code)]
pub(crate) async fn get_user_workspace_with_uid(
postgrest: Arc<Postgrest>,
uid: i64,
) -> Result<Option<UserWorkspace>, FlowyError> {
) -> Result<Vec<Workspace>, FlowyError> {
let resp = postgrest
.from(USER_WORKSPACE_TABLE)
.eq(USER_ID, uid.to_string())
@ -171,16 +209,27 @@ pub(crate) async fn get_user_workspace_with_uid(
.text()
.await
.map_err(|e| FlowyError::new(ErrorCode::UnexpectedEmpty, e))?;
let resp = serde_json::from_str::<UserWorkspaceList>(&content)
.map_err(|_e| FlowyError::new(ErrorCode::Serde, "Deserialize UserWorkspaceList failed"))?;
Ok(resp.0.first().cloned())
let user_workspaces = serde_json::from_str::<UserWorkspaceList>(&content)
.map_err(|_e| FlowyError::new(ErrorCode::Serde, "Deserialize UserWorkspaceList failed"))?
.0;
Ok(
user_workspaces
.into_iter()
.map(|user_workspace| Workspace {
id: user_workspace.workspace_id,
name: user_workspace.workspace_name,
belongings: Default::default(),
created_at: user_workspace.created_at.timestamp(),
})
.collect(),
)
}
#[allow(dead_code)]
pub(crate) async fn update_user_profile(
postgrest: Arc<Postgrest>,
params: UpdateUserProfileParams,
) -> Result<Option<UserProfile>, FlowyError> {
) -> Result<Option<UserProfileResponse>, FlowyError> {
if params.is_empty() {
return Err(FlowyError::new(
ErrorCode::UnexpectedEmpty,
@ -206,7 +255,7 @@ pub(crate) async fn update_user_profile(
.await
.map_err(|e| FlowyError::new(ErrorCode::UnexpectedEmpty, e))?;
let resp = serde_json::from_str::<UserProfileList>(&content)
let resp = serde_json::from_str::<UserProfileResponseList>(&content)
.map_err(|_e| FlowyError::new(ErrorCode::Serde, "Deserialize UserProfileList failed"))?;
Ok(resp.0.first().cloned())
}

View File

@ -1,3 +1,4 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::Value;
use thiserror::Error;
@ -56,27 +57,39 @@ pub(crate) struct InsertRecord {
#[allow(dead_code)]
#[derive(Debug, Deserialize, Clone)]
pub(crate) struct UserProfile {
pub(crate) struct UserProfileResponse {
pub uid: i64,
#[serde(deserialize_with = "deserialize_null_or_default")]
pub name: String,
#[serde(deserialize_with = "deserialize_null_or_default")]
pub email: String,
}
#[derive(Debug, Deserialize)]
pub(crate) struct UserProfileList(pub Vec<UserProfile>);
#[derive(Debug, Deserialize, Clone)]
pub(crate) struct UserWorkspace {
pub uid: i64,
#[serde(deserialize_with = "deserialize_null_or_default")]
pub name: String,
pub workspace_id: String,
}
#[derive(Debug, Deserialize)]
pub(crate) struct UserWorkspaceList(pub Vec<UserWorkspace>);
pub(crate) struct UserProfileResponseList(pub Vec<UserProfileResponse>);
#[derive(Debug, Deserialize, Clone)]
pub(crate) struct UserWorkspace {
#[allow(dead_code)]
pub uid: i64,
#[serde(deserialize_with = "deserialize_null_or_default")]
pub workspace_name: String,
pub created_at: DateTime<Utc>,
pub workspace_id: String,
}
#[derive(Debug, Deserialize)]
pub(crate) struct UserWorkspaceList(pub(crate) Vec<UserWorkspace>);
impl UserWorkspaceList {
pub(crate) fn into_inner(self) -> Vec<UserWorkspace> {
self.0
}
}
/// Handles the case where the value is null. If the value is null, return the default value of the
/// type. Otherwise, deserialize the value.

View File

@ -1,15 +1,21 @@
use std::sync::Arc;
use postgrest::Postgrest;
use serde::Deserialize;
use flowy_config::entities::{SUPABASE_JWT_SECRET, SUPABASE_KEY, SUPABASE_URL};
use flowy_error::{ErrorCode, FlowyError};
use flowy_folder2::deps::FolderCloudService;
use flowy_user::event_map::UserAuthService;
use crate::supabase::user::PostgrestUserAuthServiceImpl;
use crate::supabase::impls::PostgrestUserAuthServiceImpl;
use crate::AppFlowyServer;
#[derive(Debug)]
pub const SUPABASE_URL: &str = "SUPABASE_URL";
pub const SUPABASE_ANON_KEY: &str = "SUPABASE_ANON_KEY";
pub const SUPABASE_KEY: &str = "SUPABASE_KEY";
pub const SUPABASE_JWT_SECRET: &str = "SUPABASE_JWT_SECRET";
#[derive(Debug, Deserialize)]
pub struct SupabaseConfiguration {
/// The url of the supabase server.
pub url: String,
@ -20,6 +26,11 @@ pub struct SupabaseConfiguration {
}
impl SupabaseConfiguration {
/// Load the configuration from the environment variables.
/// SUPABASE_URL=https://<your-supabase-url>.supabase.co
/// SUPABASE_KEY=<your-supabase-key>
/// SUPABASE_JWT_SECRET=<your-supabase-jwt-secret>
///
pub fn from_env() -> Result<Self, FlowyError> {
Ok(Self {
url: std::env::var(SUPABASE_URL)
@ -31,6 +42,12 @@ impl SupabaseConfiguration {
})?,
})
}
pub fn write_env(&self) {
std::env::set_var(SUPABASE_URL, &self.url);
std::env::set_var(SUPABASE_KEY, &self.key);
std::env::set_var(SUPABASE_JWT_SECRET, &self.jwt_secret);
}
}
pub struct SupabaseServer {
@ -53,4 +70,8 @@ impl AppFlowyServer for SupabaseServer {
fn user_service(&self) -> Arc<dyn UserAuthService> {
Arc::new(PostgrestUserAuthServiceImpl::new(self.postgres.clone()))
}
fn folder_service(&self) -> Arc<dyn FolderCloudService> {
todo!()
}
}