feat: reload UI (#2999)

* chore: reload folder

* chore: reload folder

* chore: init sync

* chore: update tables

* chore: update database

* chore: load row

* chore: update

* chore: reload row

* test: fit test

* chore: retry

* chore: support batch fetch

* chore: enable sync

* chore: sync switch

* chore: sync switch

* chore: migration user data

* chore: migrate data

* chore: migrate folder

* chore: save user email

* chore: refresh user profile

* chore: fix test

* chore: delete translation files

* test: clippy format
This commit is contained in:
Nathan.fooo
2023-07-14 13:37:13 +08:00
committed by GitHub
parent 5085ea115f
commit f9e7b5ffa4
170 changed files with 3380 additions and 1482 deletions

View File

@ -39,6 +39,9 @@ flowy-folder2 = { path = "../flowy-folder2" }
flowy-database2 = { path = "../flowy-database2" }
flowy-document2 = { path = "../flowy-document2" }
flowy-error = { path = "../flowy-error" }
flowy-server-config = { path = "../flowy-server-config" }
collab-folder = { version = "0.1.0" }
collab-document = { version = "0.1.0" }
[dev-dependencies]
uuid = { version = "1.3.3", features = ["v4"] }

View File

@ -29,6 +29,7 @@ pub mod util;
///
pub trait AppFlowyServer: Send + Sync + 'static {
fn enable_sync(&self, _enable: bool) {}
fn user_service(&self) -> Arc<dyn UserAuthService>;
fn folder_service(&self) -> Arc<dyn FolderCloudService>;
fn database_service(&self) -> Arc<dyn DatabaseCloudService>;

View File

@ -1,17 +1,26 @@
use flowy_database2::deps::{DatabaseCloudService, DatabaseSnapshot};
use flowy_database2::deps::{
CollabObjectUpdate, CollabObjectUpdateByOid, DatabaseCloudService, DatabaseSnapshot,
};
use flowy_error::FlowyError;
use lib_infra::future::FutureResult;
pub(crate) struct LocalServerDatabaseCloudServiceImpl();
impl DatabaseCloudService for LocalServerDatabaseCloudServiceImpl {
fn get_database_updates(&self, _database_id: &str) -> FutureResult<Vec<Vec<u8>>, FlowyError> {
fn get_collab_update(&self, _object_id: &str) -> FutureResult<CollabObjectUpdate, FlowyError> {
FutureResult::new(async move { Ok(vec![]) })
}
fn get_database_latest_snapshot(
fn batch_get_collab_updates(
&self,
_database_id: &str,
_object_ids: Vec<String>,
) -> FutureResult<CollabObjectUpdateByOid, FlowyError> {
FutureResult::new(async move { Ok(CollabObjectUpdateByOid::default()) })
}
fn get_collab_latest_snapshot(
&self,
_object_id: &str,
) -> FutureResult<Option<DatabaseSnapshot>, FlowyError> {
FutureResult::new(async move { Ok(None) })
}

View File

@ -1,4 +1,4 @@
use flowy_document2::deps::{DocumentCloudService, DocumentSnapshot};
use flowy_document2::deps::{DocumentCloudService, DocumentData, DocumentSnapshot};
use flowy_error::FlowyError;
use lib_infra::future::FutureResult;
@ -15,4 +15,11 @@ impl DocumentCloudService for LocalServerDocumentCloudServiceImpl {
) -> FutureResult<Option<DocumentSnapshot>, FlowyError> {
FutureResult::new(async move { Ok(None) })
}
fn get_document_data(
&self,
_document_id: &str,
) -> FutureResult<Option<DocumentData>, FlowyError> {
FutureResult::new(async move { Ok(None) })
}
}

View File

@ -1,10 +1,16 @@
use std::sync::Arc;
use flowy_error::FlowyError;
use flowy_folder2::deps::{FolderCloudService, FolderSnapshot, Workspace};
use flowy_folder2::deps::{FolderCloudService, FolderData, FolderSnapshot, Workspace};
use flowy_folder2::gen_workspace_id;
use lib_infra::future::FutureResult;
use lib_infra::util::timestamp;
pub(crate) struct LocalServerFolderCloudServiceImpl();
use crate::local_server::LocalServerDB;
pub(crate) struct LocalServerFolderCloudServiceImpl {
pub db: Arc<dyn LocalServerDB>,
}
impl FolderCloudService for LocalServerFolderCloudServiceImpl {
fn create_workspace(&self, _uid: i64, name: &str) -> FutureResult<Workspace, FlowyError> {
@ -19,6 +25,10 @@ impl FolderCloudService for LocalServerFolderCloudServiceImpl {
})
}
fn get_folder_data(&self, _workspace_id: &str) -> FutureResult<Option<FolderData>, FlowyError> {
FutureResult::new(async move { Ok(None) })
}
fn get_folder_latest_snapshot(
&self,
_workspace_id: &str,
@ -26,7 +36,25 @@ impl FolderCloudService for LocalServerFolderCloudServiceImpl {
FutureResult::new(async move { Ok(None) })
}
fn get_folder_updates(&self, _workspace_id: &str) -> FutureResult<Vec<Vec<u8>>, FlowyError> {
FutureResult::new(async move { Ok(vec![]) })
fn get_folder_updates(
&self,
workspace_id: &str,
uid: i64,
) -> FutureResult<Vec<Vec<u8>>, FlowyError> {
let weak_db = Arc::downgrade(&self.db);
let workspace_id = workspace_id.to_string();
FutureResult::new(async move {
match weak_db.upgrade() {
None => Ok(vec![]),
Some(db) => {
let updates = db.get_collab_updates(uid, &workspace_id)?;
Ok(updates)
},
}
})
}
fn service_name(&self) -> String {
"Local".to_string()
}
}

View File

@ -1,3 +1,5 @@
use std::sync::Arc;
use lazy_static::lazy_static;
use parking_lot::Mutex;
@ -10,12 +12,15 @@ use lib_infra::box_any::BoxAny;
use lib_infra::future::FutureResult;
use crate::local_server::uid::UserIDGenerator;
use crate::local_server::LocalServerDB;
lazy_static! {
static ref ID_GEN: Mutex<UserIDGenerator> = Mutex::new(UserIDGenerator::new(1));
}
pub(crate) struct LocalServerUserAuthServiceImpl();
pub(crate) struct LocalServerUserAuthServiceImpl {
pub db: Arc<dyn LocalServerDB>,
}
impl UserAuthService for LocalServerUserAuthServiceImpl {
fn sign_up(&self, params: BoxAny) -> FutureResult<SignUpResponse, FlowyError> {
@ -35,10 +40,21 @@ impl UserAuthService for LocalServerUserAuthServiceImpl {
}
fn sign_in(&self, params: BoxAny) -> FutureResult<SignInResponse, FlowyError> {
let weak_db = Arc::downgrade(&self.db);
FutureResult::new(async move {
let uid = ID_GEN.lock().next_id();
let params = params.unbox_or_error::<SignInParams>()?;
let workspace_id = uuid::Uuid::new_v4().to_string();
let params: SignInParams = params.unbox_or_error::<SignInParams>()?;
let uid = match params.uid {
None => ID_GEN.lock().next_id(),
Some(uid) => uid,
};
// Get the workspace id from the database if it exists, otherwise generate a new one.
let workspace_id = weak_db
.upgrade()
.and_then(|db| db.get_user_profile(uid).ok())
.and_then(|user_profile| user_profile.map(|user_profile| user_profile.workspace_id))
.unwrap_or(uuid::Uuid::new_v4().to_string());
Ok(SignInResponse {
user_id: uid,
name: params.name,

View File

@ -1,13 +1,17 @@
use std::sync::Arc;
use appflowy_integrate::RemoteCollabStorage;
use collab_document::YrsDocAction;
use parking_lot::RwLock;
use tokio::sync::mpsc;
use flowy_database2::deps::DatabaseCloudService;
use flowy_document2::deps::DocumentCloudService;
use flowy_error::FlowyError;
use flowy_folder2::deps::FolderCloudService;
use flowy_user::entities::UserProfile;
use flowy_user::event_map::UserAuthService;
use flowy_user::services::database::{get_user_profile, open_collab_db, open_user_db};
use crate::local_server::impls::{
LocalServerDatabaseCloudServiceImpl, LocalServerDocumentCloudServiceImpl,
@ -15,15 +19,22 @@ use crate::local_server::impls::{
};
use crate::AppFlowyServer;
#[derive(Default)]
pub trait LocalServerDB: Send + Sync + 'static {
fn get_user_profile(&self, uid: i64) -> Result<Option<UserProfile>, FlowyError>;
fn get_collab_updates(&self, uid: i64, object_id: &str) -> Result<Vec<Vec<u8>>, FlowyError>;
}
pub struct LocalServer {
storage_path: String,
stop_tx: RwLock<Option<mpsc::Sender<()>>>,
}
impl LocalServer {
pub fn new() -> Self {
// let _config = self_host_server_configuration().unwrap();
Self::default()
pub fn new(storage_path: &str) -> Self {
Self {
storage_path: storage_path.to_string(),
stop_tx: Default::default(),
}
}
pub async fn stop(&self) {
@ -36,11 +47,17 @@ impl LocalServer {
impl AppFlowyServer for LocalServer {
fn user_service(&self) -> Arc<dyn UserAuthService> {
Arc::new(LocalServerUserAuthServiceImpl())
let db = LocalServerDBImpl {
storage_path: self.storage_path.clone(),
};
Arc::new(LocalServerUserAuthServiceImpl { db: Arc::new(db) })
}
fn folder_service(&self) -> Arc<dyn FolderCloudService> {
Arc::new(LocalServerFolderCloudServiceImpl())
let db = LocalServerDBImpl {
storage_path: self.storage_path.clone(),
};
Arc::new(LocalServerFolderCloudServiceImpl { db: Arc::new(db) })
}
fn database_service(&self) -> Arc<dyn DatabaseCloudService> {
@ -55,3 +72,25 @@ impl AppFlowyServer for LocalServer {
None
}
}
struct LocalServerDBImpl {
storage_path: String,
}
impl LocalServerDB for LocalServerDBImpl {
fn get_user_profile(&self, uid: i64) -> Result<Option<UserProfile>, FlowyError> {
let sqlite_db = open_user_db(&self.storage_path, uid)?;
let user_profile = get_user_profile(&sqlite_db, uid).ok();
Ok(user_profile)
}
fn get_collab_updates(&self, uid: i64, object_id: &str) -> Result<Vec<Vec<u8>>, FlowyError> {
let collab_db = open_collab_db(&self.storage_path, uid)?;
let read_txn = collab_db.read_txn();
let updates = read_txn
.get_all_updates(uid, object_id)
.map_err(|e| FlowyError::internal().context(format!("Failed to open collab db: {:?}", e)))?;
Ok(updates)
}
}

View File

@ -1,17 +1,26 @@
use flowy_database2::deps::{DatabaseCloudService, DatabaseSnapshot};
use flowy_database2::deps::{
CollabObjectUpdate, CollabObjectUpdateByOid, DatabaseCloudService, DatabaseSnapshot,
};
use flowy_error::FlowyError;
use lib_infra::future::FutureResult;
pub(crate) struct SelfHostedDatabaseCloudServiceImpl();
impl DatabaseCloudService for SelfHostedDatabaseCloudServiceImpl {
fn get_database_updates(&self, _database_id: &str) -> FutureResult<Vec<Vec<u8>>, FlowyError> {
fn get_collab_update(&self, _object_id: &str) -> FutureResult<CollabObjectUpdate, FlowyError> {
FutureResult::new(async move { Ok(vec![]) })
}
fn get_database_latest_snapshot(
fn batch_get_collab_updates(
&self,
_database_id: &str,
_object_ids: Vec<String>,
) -> FutureResult<CollabObjectUpdateByOid, FlowyError> {
FutureResult::new(async move { Ok(CollabObjectUpdateByOid::default()) })
}
fn get_collab_latest_snapshot(
&self,
_object_id: &str,
) -> FutureResult<Option<DatabaseSnapshot>, FlowyError> {
FutureResult::new(async move { Ok(None) })
}

View File

@ -1,4 +1,4 @@
use flowy_document2::deps::{DocumentCloudService, DocumentSnapshot};
use flowy_document2::deps::{DocumentCloudService, DocumentData, DocumentSnapshot};
use flowy_error::FlowyError;
use lib_infra::future::FutureResult;
@ -15,4 +15,11 @@ impl DocumentCloudService for SelfHostedDocumentCloudServiceImpl {
) -> FutureResult<Option<DocumentSnapshot>, FlowyError> {
FutureResult::new(async move { Ok(None) })
}
fn get_document_data(
&self,
_document_id: &str,
) -> FutureResult<Option<DocumentData>, FlowyError> {
FutureResult::new(async move { Ok(None) })
}
}

View File

@ -1,5 +1,5 @@
use flowy_error::FlowyError;
use flowy_folder2::deps::{FolderCloudService, FolderSnapshot, Workspace};
use flowy_folder2::deps::{FolderCloudService, FolderData, FolderSnapshot, Workspace};
use flowy_folder2::gen_workspace_id;
use lib_infra::future::FutureResult;
use lib_infra::util::timestamp;
@ -19,6 +19,10 @@ impl FolderCloudService for SelfHostedServerFolderCloudServiceImpl {
})
}
fn get_folder_data(&self, _workspace_id: &str) -> FutureResult<Option<FolderData>, FlowyError> {
FutureResult::new(async move { Ok(None) })
}
fn get_folder_latest_snapshot(
&self,
_workspace_id: &str,
@ -26,7 +30,15 @@ impl FolderCloudService for SelfHostedServerFolderCloudServiceImpl {
FutureResult::new(async move { Ok(None) })
}
fn get_folder_updates(&self, _workspace_id: &str) -> FutureResult<Vec<Vec<u8>>, FlowyError> {
fn get_folder_updates(
&self,
_workspace_id: &str,
_uid: i64,
) -> FutureResult<Vec<Vec<u8>>, FlowyError> {
FutureResult::new(async move { Ok(vec![]) })
}
fn service_name(&self) -> String {
"SelfHosted".to_string()
}
}

View File

@ -1,90 +0,0 @@
use serde::Deserialize;
use flowy_error::{ErrorCode, FlowyError};
pub const SUPABASE_URL: &str = "SUPABASE_URL";
pub const SUPABASE_ANON_KEY: &str = "SUPABASE_ANON_KEY";
pub const SUPABASE_KEY: &str = "SUPABASE_KEY";
pub const SUPABASE_JWT_SECRET: &str = "SUPABASE_JWT_SECRET";
pub const SUPABASE_DB: &str = "SUPABASE_DB";
pub const SUPABASE_DB_USER: &str = "SUPABASE_DB_USER";
pub const SUPABASE_DB_PASSWORD: &str = "SUPABASE_DB_PASSWORD";
pub const SUPABASE_DB_PORT: &str = "SUPABASE_DB_PORT";
#[derive(Debug, Deserialize)]
pub struct SupabaseConfiguration {
/// The url of the supabase server.
pub url: String,
/// The key of the supabase server.
pub key: String,
/// The secret used to sign the JWT tokens.
pub jwt_secret: String,
pub postgres_config: PostgresConfiguration,
}
impl SupabaseConfiguration {
/// Load the configuration from the environment variables.
/// SUPABASE_URL=https://<your-supabase-url>.supabase.co
/// SUPABASE_KEY=<your-supabase-key>
/// SUPABASE_JWT_SECRET=<your-supabase-jwt-secret>
///
pub fn from_env() -> Result<Self, FlowyError> {
let postgres_config = PostgresConfiguration::from_env()?;
Ok(Self {
url: std::env::var(SUPABASE_URL)
.map_err(|_| FlowyError::new(ErrorCode::InvalidAuthConfig, "Missing SUPABASE_URL"))?,
key: std::env::var(SUPABASE_KEY)
.map_err(|_| FlowyError::new(ErrorCode::InvalidAuthConfig, "Missing SUPABASE_KEY"))?,
jwt_secret: std::env::var(SUPABASE_JWT_SECRET).map_err(|_| {
FlowyError::new(ErrorCode::InvalidAuthConfig, "Missing SUPABASE_JWT_SECRET")
})?,
postgres_config,
})
}
pub fn write_env(&self) {
std::env::set_var(SUPABASE_URL, &self.url);
std::env::set_var(SUPABASE_KEY, &self.key);
std::env::set_var(SUPABASE_JWT_SECRET, &self.jwt_secret);
self.postgres_config.write_env();
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct PostgresConfiguration {
pub url: String,
pub user_name: String,
pub password: String,
pub port: u16,
}
impl PostgresConfiguration {
pub fn from_env() -> Result<Self, FlowyError> {
let url = std::env::var(SUPABASE_DB)
.map_err(|_| FlowyError::new(ErrorCode::InvalidAuthConfig, "Missing SUPABASE_DB"))?;
let user_name = std::env::var(SUPABASE_DB_USER)
.map_err(|_| FlowyError::new(ErrorCode::InvalidAuthConfig, "Missing SUPABASE_DB_USER"))?;
let password = std::env::var(SUPABASE_DB_PASSWORD)
.map_err(|_| FlowyError::new(ErrorCode::InvalidAuthConfig, "Missing SUPABASE_DB_PASSWORD"))?;
let port = std::env::var(SUPABASE_DB_PORT)
.map_err(|_| FlowyError::new(ErrorCode::InvalidAuthConfig, "Missing SUPABASE_DB_PORT"))?
.parse::<u16>()
.map_err(|_e| FlowyError::new(ErrorCode::InvalidAuthConfig, "Missing SUPABASE_DB_PORT"))?;
Ok(Self {
url,
user_name,
password,
port,
})
}
pub fn write_env(&self) {
std::env::set_var(SUPABASE_DB, &self.url);
std::env::set_var(SUPABASE_DB_USER, &self.user_name);
std::env::set_var(SUPABASE_DB_PASSWORD, &self.password);
std::env::set_var(SUPABASE_DB_PORT, self.port.to_string());
}
}

View File

@ -1,27 +1,36 @@
use std::sync::{Arc, Weak};
use std::future::Future;
use std::iter::Take;
use std::pin::Pin;
use std::sync::Weak;
use std::time::Duration;
use anyhow::Error;
use appflowy_integrate::{
merge_updates_v1, CollabObject, Decode, MsgId, RemoteCollabSnapshot, RemoteCollabState,
RemoteCollabStorage, YrsUpdate,
merge_updates_v1, CollabObject, MsgId, RemoteCollabSnapshot, RemoteCollabState,
RemoteCollabStorage, RemoteUpdateReceiver,
};
use chrono::{DateTime, Utc};
use deadpool_postgres::GenericClient;
use futures_util::TryStreamExt;
use futures::pin_mut;
use futures_util::{StreamExt, TryStreamExt};
use tokio::task::spawn_blocking;
use tokio_postgres::types::ToSql;
use tokio_postgres::Row;
use tokio_retry::strategy::FixedInterval;
use tokio_retry::{Action, Retry};
use flowy_error::FlowyError;
use flowy_database2::deps::{CollabObjectUpdate, CollabObjectUpdateByOid};
use lib_infra::async_trait::async_trait;
use lib_infra::util::md5;
use crate::supabase::postgres_db::PostgresObject;
use crate::supabase::sql_builder::{
DeleteSqlBuilder, InsertSqlBuilder, SelectSqlBuilder, WhereCondition,
};
use crate::supabase::PostgresServer;
use crate::supabase::{PostgresServer, SupabaseServerService};
pub struct PgCollabStorageImpl {
server: Arc<PostgresServer>,
pub struct PgCollabStorageImpl<T> {
server: T,
}
const AF_COLLAB_KEY_COLUMN: &str = "key";
@ -32,27 +41,64 @@ const AF_COLLAB_SNAPSHOT_BLOB_SIZE_COLUMN: &str = "blob_size";
const AF_COLLAB_SNAPSHOT_CREATED_AT_COLUMN: &str = "created_at";
const AF_COLLAB_SNAPSHOT_TABLE: &str = "af_collab_snapshot";
impl PgCollabStorageImpl {
pub fn new(server: Arc<PostgresServer>) -> Self {
impl<T> PgCollabStorageImpl<T>
where
T: SupabaseServerService,
{
pub fn new(server: T) -> Self {
Self { server }
}
pub async fn get_client(&self) -> Option<PostgresObject> {
self
.server
.get_pg_server()?
.upgrade()?
.get_pg_client()
.await
.recv()
.await
.ok()
}
}
#[async_trait]
impl RemoteCollabStorage for PgCollabStorageImpl {
impl<T> RemoteCollabStorage for PgCollabStorageImpl<T>
where
T: SupabaseServerService,
{
fn is_enable(&self) -> bool {
self
.server
.get_pg_server()
.and_then(|server| server.upgrade())
.is_some()
}
async fn get_all_updates(&self, object_id: &str) -> Result<Vec<Vec<u8>>, Error> {
get_updates_from_server(object_id, Arc::downgrade(&self.server)).await
let pg_server = self.server.try_get_pg_server()?;
let action = FetchObjectUpdateAction::new(object_id, pg_server);
let updates = action.run().await?;
Ok(updates)
}
async fn get_latest_snapshot(
&self,
object_id: &str,
) -> Result<Option<RemoteCollabSnapshot>, Error> {
get_latest_snapshot_from_server(object_id, Arc::downgrade(&self.server)).await
match self.server.get_pg_server() {
None => Ok(None),
Some(weak_server) => get_latest_snapshot_from_server(object_id, weak_server).await,
}
}
async fn get_collab_state(&self, object_id: &str) -> Result<Option<RemoteCollabState>, Error> {
let client = self.server.get_pg_client().await.recv().await?;
let client = self.get_client().await;
if client.is_none() {
return Ok(None);
}
let client = client.unwrap();
let (sql, params) = SelectSqlBuilder::new("af_collab_state")
.column("*")
.where_clause("oid", object_id.to_string())
@ -83,7 +129,10 @@ impl RemoteCollabStorage for PgCollabStorageImpl {
}
async fn create_snapshot(&self, object: &CollabObject, snapshot: Vec<u8>) -> Result<i64, Error> {
let client = self.server.get_pg_client().await.recv().await?;
let client = self
.get_client()
.await
.ok_or_else(|| anyhow::anyhow!("Create snapshot failed. No client available"))?;
let value_size = snapshot.len() as i32;
let (sql, params) = InsertSqlBuilder::new("af_collab_snapshot")
.value(AF_COLLAB_SNAPSHOT_OID_COLUMN, object.id.clone())
@ -112,17 +161,21 @@ impl RemoteCollabStorage for PgCollabStorageImpl {
_id: MsgId,
update: Vec<u8>,
) -> Result<(), Error> {
let client = self.server.get_pg_client().await.recv().await?;
let value_size = update.len() as i32;
let (sql, params) = InsertSqlBuilder::new("af_collab")
.value("oid", object.id.clone())
.value("name", object.name.clone())
.value("value", update)
.value("value_size", value_size)
.build();
if let Some(client) = self.get_client().await {
let value_size = update.len() as i32;
let md5 = md5(&update);
let (sql, params) = InsertSqlBuilder::new("af_collab")
.value("oid", object.id.clone())
.value("name", object.name.clone())
.value("value", update)
.value("uid", object.uid)
.value("md5", md5)
.value("value_size", value_size)
.build();
let stmt = client.prepare_cached(&sql).await?;
client.execute_raw(&stmt, params).await?;
let stmt = client.prepare_cached(&sql).await?;
client.execute_raw(&stmt, params).await?;
}
Ok(())
}
@ -132,36 +185,46 @@ impl RemoteCollabStorage for PgCollabStorageImpl {
_id: MsgId,
init_update: Vec<u8>,
) -> Result<(), Error> {
let mut client = self.server.get_pg_client().await.recv().await?;
let client = self.get_client().await;
if client.is_none() {
return Ok(());
}
let mut client = client.unwrap();
let txn = client.transaction().await?;
// 1.Get all updates
// 1.Get all updates and lock the table. It means that a subsequent UPDATE, DELETE, or SELECT
// FOR UPDATE by this transaction will not result in a lock wait. other transactions that try
// to update or lock these specific rows will be blocked until the current transaction ends
let (sql, params) = SelectSqlBuilder::new("af_collab")
.column(AF_COLLAB_KEY_COLUMN)
.column("value")
.order_by(AF_COLLAB_KEY_COLUMN, true)
.where_clause("oid", object.id.clone())
.lock()
.build();
let get_all_update_stmt = txn.prepare_cached(&sql).await?;
let row_stream = txn.query_raw(&get_all_update_stmt, params).await?;
let remote_updates = row_stream.try_collect::<Vec<_>>().await?;
let pg_rows = row_stream.try_collect::<Vec<_>>().await?;
let insert_builder = InsertSqlBuilder::new("af_collab")
.value("oid", object.id.clone())
.value("uid", object.uid)
.value("name", object.name.clone());
let (sql, params) = if !remote_updates.is_empty() {
let remoted_keys = remote_updates
.iter()
let (sql, params) = if !pg_rows.is_empty() {
let last_row_key = pg_rows
.last()
.map(|row| row.get::<_, i64>(AF_COLLAB_KEY_COLUMN))
.collect::<Vec<_>>();
let last_row_key = remoted_keys.last().cloned().unwrap();
.unwrap();
// 2.Merge all updates
let merged_update =
spawn_blocking(move || merge_update_from_rows(remote_updates, init_update)).await??;
// 2.Merge the updates into one and then delete the merged updates
let merge_result =
spawn_blocking(move || merge_update_from_rows(pg_rows, init_update)).await??;
tracing::trace!("Merged updates count: {}", merge_result.merged_keys.len());
// 3. Delete all updates
// 3. Delete merged updates
let (sql, params) = DeleteSqlBuilder::new("af_collab")
.where_condition(WhereCondition::Equals(
"oid".to_string(),
@ -169,7 +232,8 @@ impl RemoteCollabStorage for PgCollabStorageImpl {
))
.where_condition(WhereCondition::In(
AF_COLLAB_KEY_COLUMN.to_string(),
remoted_keys
merge_result
.merged_keys
.into_iter()
.map(|key| Box::new(key) as Box<dyn ToSql + Send + Sync>)
.collect::<Vec<_>>(),
@ -178,18 +242,25 @@ impl RemoteCollabStorage for PgCollabStorageImpl {
let delete_stmt = txn.prepare_cached(&sql).await?;
txn.execute_raw(&delete_stmt, params).await?;
let value_size = merged_update.len() as i32;
// Override the key with the last row key in case of concurrent init sync
// 4. Insert the merged update. The new_update contains the merged update and the
// init_update.
let new_update = merge_result.new_update;
let value_size = new_update.len() as i32;
let md5 = md5(&new_update);
insert_builder
.value("value", merged_update)
.value("value", new_update)
.value("value_size", value_size)
.value("md5", md5)
.value(AF_COLLAB_KEY_COLUMN, last_row_key)
.overriding_system_value()
.build()
} else {
let value_size = init_update.len() as i32;
let md5 = md5(&init_update);
insert_builder
.value("value", init_update)
.value("md5", md5)
.value("value_size", value_size)
.build()
};
@ -203,6 +274,11 @@ impl RemoteCollabStorage for PgCollabStorageImpl {
tracing::trace!("{} init sync done", object.id);
Ok(())
}
async fn subscribe_remote_updates(&self, _object: &CollabObject) -> Option<RemoteUpdateReceiver> {
// using pg_notify to subscribe to updates
None
}
}
pub async fn get_updates_from_server(
@ -225,7 +301,7 @@ pub async fn get_updates_from_server(
.try_collect::<Vec<_>>()
.await?
.into_iter()
.flat_map(|row| update_from_row(row).ok())
.flat_map(|row| update_from_row(&row).ok())
.collect(),
)
},
@ -275,30 +351,156 @@ pub async fn get_latest_snapshot_from_server(
}
}
fn update_from_row(row: Row) -> Result<Vec<u8>, FlowyError> {
row
.try_get::<_, Vec<u8>>("value")
.map_err(|e| FlowyError::internal().context(format!("Failed to get value from row: {}", e)))
fn update_from_row(row: &Row) -> Result<Vec<u8>, anyhow::Error> {
let update = row.try_get::<_, Vec<u8>>("value")?;
Ok(update)
}
#[allow(dead_code)]
fn decode_update_from_row(row: Row) -> Result<YrsUpdate, FlowyError> {
let update = update_from_row(row)?;
YrsUpdate::decode_v1(&update).map_err(|_| FlowyError::internal().context("Invalid yrs update"))
}
fn merge_update_from_rows(rows: Vec<Row>, new_update: Vec<u8>) -> Result<Vec<u8>, FlowyError> {
fn merge_update_from_rows(
rows: Vec<Row>,
new_update: Vec<u8>,
) -> Result<MergeResult, anyhow::Error> {
let mut updates = vec![];
let mut merged_keys = vec![];
for row in rows {
let update = update_from_row(row)?;
merged_keys.push(row.try_get::<_, i64>(AF_COLLAB_KEY_COLUMN)?);
let update = update_from_row(&row)?;
updates.push(update);
}
updates.push(new_update);
let updates = updates
.iter()
.map(|update| update.as_ref())
.collect::<Vec<&[u8]>>();
merge_updates_v1(&updates).map_err(|_| FlowyError::internal().context("Failed to merge updates"))
let new_update = merge_updates_v1(&updates)?;
Ok(MergeResult {
merged_keys,
new_update,
})
}
struct MergeResult {
merged_keys: Vec<i64>,
new_update: Vec<u8>,
}
pub struct FetchObjectUpdateAction {
object_id: String,
pg_server: Weak<PostgresServer>,
}
impl FetchObjectUpdateAction {
pub fn new(object_id: &str, pg_server: Weak<PostgresServer>) -> Self {
Self {
pg_server,
object_id: object_id.to_string(),
}
}
pub fn run(self) -> Retry<Take<FixedInterval>, FetchObjectUpdateAction> {
let retry_strategy = FixedInterval::new(Duration::from_secs(5)).take(3);
Retry::spawn(retry_strategy, self)
}
pub fn run_with_fix_interval(
self,
secs: u64,
times: usize,
) -> Retry<Take<FixedInterval>, FetchObjectUpdateAction> {
let retry_strategy = FixedInterval::new(Duration::from_secs(secs)).take(times);
Retry::spawn(retry_strategy, self)
}
}
impl Action for FetchObjectUpdateAction {
type Future = Pin<Box<dyn Future<Output = Result<Self::Item, Self::Error>> + Send>>;
type Item = CollabObjectUpdate;
type Error = anyhow::Error;
fn run(&mut self) -> Self::Future {
let weak_pb_server = self.pg_server.clone();
let object_id = self.object_id.clone();
Box::pin(async move {
match weak_pb_server.upgrade() {
None => Ok(vec![]),
Some(server) => {
let client = server.get_pg_client().await.recv().await?;
let (sql, params) = SelectSqlBuilder::new("af_collab")
.column("value")
.order_by(AF_COLLAB_KEY_COLUMN, true)
.where_clause("oid", object_id)
.build();
let stmt = client.prepare_cached(&sql).await?;
let row_stream = client.query_raw(&stmt, params).await?;
Ok(
row_stream
.try_collect::<Vec<_>>()
.await?
.into_iter()
.flat_map(|row| update_from_row(&row).ok())
.collect(),
)
},
}
})
}
}
pub struct BatchFetchObjectUpdateAction {
object_ids: Vec<String>,
pg_server: Weak<PostgresServer>,
}
impl BatchFetchObjectUpdateAction {
pub fn new(object_ids: Vec<String>, pg_server: Weak<PostgresServer>) -> Self {
Self {
pg_server,
object_ids,
}
}
pub fn run(self) -> Retry<Take<FixedInterval>, BatchFetchObjectUpdateAction> {
let retry_strategy = FixedInterval::new(Duration::from_secs(5)).take(3);
Retry::spawn(retry_strategy, self)
}
}
impl Action for BatchFetchObjectUpdateAction {
type Future = Pin<Box<dyn Future<Output = Result<Self::Item, Self::Error>> + Send>>;
type Item = CollabObjectUpdateByOid;
type Error = anyhow::Error;
fn run(&mut self) -> Self::Future {
let weak_pb_server = self.pg_server.clone();
let object_ids = self.object_ids.clone();
Box::pin(async move {
match weak_pb_server.upgrade() {
None => Ok(CollabObjectUpdateByOid::default()),
Some(server) => {
let client = server.get_pg_client().await.recv().await?;
let mut updates_by_oid = CollabObjectUpdateByOid::new();
// Group the updates by oid
let (sql, params) = SelectSqlBuilder::new("af_collab")
.column("oid")
.array_agg("value")
.group_by("oid")
.where_clause_in("oid", object_ids)
.build();
let stmt = client.prepare_cached(&sql).await?;
// Poll the rows
let rows = Box::pin(client.query_raw(&stmt, params).await?);
pin_mut!(rows);
while let Some(Ok(row)) = rows.next().await {
let oid = row.try_get::<_, String>("oid")?;
let updates = row.try_get::<_, Vec<Vec<u8>>>("value")?;
updates_by_oid.insert(oid, updates);
}
Ok(updates_by_oid)
},
}
})
}
}

View File

@ -1,48 +1,100 @@
use std::sync::Arc;
use tokio::sync::oneshot::channel;
use flowy_database2::deps::{DatabaseCloudService, DatabaseSnapshot};
use flowy_database2::deps::{
CollabObjectUpdate, CollabObjectUpdateByOid, DatabaseCloudService, DatabaseSnapshot,
};
use flowy_error::{internal_error, FlowyError};
use lib_infra::future::FutureResult;
use crate::supabase::impls::{get_latest_snapshot_from_server, get_updates_from_server};
use crate::supabase::PostgresServer;
use crate::supabase::impls::{
get_latest_snapshot_from_server, BatchFetchObjectUpdateAction, FetchObjectUpdateAction,
};
use crate::supabase::SupabaseServerService;
pub(crate) struct SupabaseDatabaseCloudServiceImpl {
server: Arc<PostgresServer>,
pub struct SupabaseDatabaseCloudServiceImpl<T> {
server: T,
}
impl SupabaseDatabaseCloudServiceImpl {
pub fn new(server: Arc<PostgresServer>) -> Self {
impl<T> SupabaseDatabaseCloudServiceImpl<T> {
pub fn new(server: T) -> Self {
Self { server }
}
}
impl DatabaseCloudService for SupabaseDatabaseCloudServiceImpl {
fn get_database_updates(&self, database_id: &str) -> FutureResult<Vec<Vec<u8>>, FlowyError> {
let server = Arc::downgrade(&self.server);
impl<T> DatabaseCloudService for SupabaseDatabaseCloudServiceImpl<T>
where
T: SupabaseServerService,
{
fn get_collab_update(&self, object_id: &str) -> FutureResult<CollabObjectUpdate, FlowyError> {
let weak_server = self.server.get_pg_server();
let (tx, rx) = channel();
let database_id = database_id.to_string();
tokio::spawn(async move { tx.send(get_updates_from_server(&database_id, server).await) });
let database_id = object_id.to_string();
tokio::spawn(async move {
tx.send(
async move {
match weak_server {
None => Ok(CollabObjectUpdate::default()),
Some(weak_server) => {
FetchObjectUpdateAction::new(&database_id, weak_server)
.run()
.await
},
}
}
.await,
)
});
FutureResult::new(async { rx.await.map_err(internal_error)?.map_err(internal_error) })
}
fn get_database_latest_snapshot(
fn batch_get_collab_updates(
&self,
database_id: &str,
) -> FutureResult<Option<DatabaseSnapshot>, FlowyError> {
let server = Arc::downgrade(&self.server);
object_ids: Vec<String>,
) -> FutureResult<CollabObjectUpdateByOid, FlowyError> {
let weak_server = self.server.get_pg_server();
let (tx, rx) = channel();
let database_id = database_id.to_string();
tokio::spawn(
async move { tx.send(get_latest_snapshot_from_server(&database_id, server).await) },
);
tokio::spawn(async move {
tx.send(
async move {
match weak_server {
None => Ok(CollabObjectUpdateByOid::default()),
Some(weak_server) => {
BatchFetchObjectUpdateAction::new(object_ids, weak_server)
.run()
.await
},
}
}
.await,
)
});
FutureResult::new(async { rx.await.map_err(internal_error)?.map_err(internal_error) })
}
fn get_collab_latest_snapshot(
&self,
object_id: &str,
) -> FutureResult<Option<DatabaseSnapshot>, FlowyError> {
let weak_server = self.server.get_pg_server();
let (tx, rx) = channel();
let database_id = object_id.to_string();
tokio::spawn(async move {
tx.send(
async move {
match weak_server {
None => Ok(None),
Some(weak_server) => get_latest_snapshot_from_server(&database_id, weak_server)
.await
.map_err(internal_error),
}
}
.await,
)
});
FutureResult::new(async {
Ok(
rx.await
.map_err(internal_error)?
.map_err(internal_error)?
.map_err(internal_error)??
.map(|snapshot| DatabaseSnapshot {
snapshot_id: snapshot.snapshot_id,
database_id: snapshot.oid,

View File

@ -1,50 +1,75 @@
use std::sync::Arc;
use collab_document::document::Document;
use collab_folder::core::CollabOrigin;
use tokio::sync::oneshot::channel;
use flowy_document2::deps::{DocumentCloudService, DocumentSnapshot};
use flowy_document2::deps::{DocumentCloudService, DocumentData, DocumentSnapshot};
use flowy_error::{internal_error, FlowyError};
use lib_infra::future::FutureResult;
use crate::supabase::impls::{get_latest_snapshot_from_server, get_updates_from_server};
use crate::supabase::PostgresServer;
use crate::supabase::impls::{get_latest_snapshot_from_server, FetchObjectUpdateAction};
use crate::supabase::SupabaseServerService;
pub(crate) struct SupabaseDocumentCloudServiceImpl {
server: Arc<PostgresServer>,
pub struct SupabaseDocumentCloudServiceImpl<T> {
server: T,
}
impl SupabaseDocumentCloudServiceImpl {
pub fn new(server: Arc<PostgresServer>) -> Self {
impl<T> SupabaseDocumentCloudServiceImpl<T> {
pub fn new(server: T) -> Self {
Self { server }
}
}
impl DocumentCloudService for SupabaseDocumentCloudServiceImpl {
impl<T> DocumentCloudService for SupabaseDocumentCloudServiceImpl<T>
where
T: SupabaseServerService,
{
fn get_document_updates(&self, document_id: &str) -> FutureResult<Vec<Vec<u8>>, FlowyError> {
let server = Arc::downgrade(&self.server);
let weak_server = self.server.get_pg_server();
let (tx, rx) = channel();
let document_id = document_id.to_string();
tokio::spawn(async move { tx.send(get_updates_from_server(&document_id, server).await) });
FutureResult::new(async { rx.await.map_err(internal_error)?.map_err(internal_error) })
tokio::spawn(async move {
tx.send(
async move {
match weak_server {
None => Ok(vec![]),
Some(weak_server) => FetchObjectUpdateAction::new(&document_id, weak_server)
.run_with_fix_interval(5, 5)
.await
.map_err(internal_error),
}
}
.await,
)
});
FutureResult::new(async { rx.await.map_err(internal_error)? })
}
fn get_document_latest_snapshot(
&self,
document_id: &str,
) -> FutureResult<Option<DocumentSnapshot>, FlowyError> {
let server = Arc::downgrade(&self.server);
let weak_server = self.server.get_pg_server();
let (tx, rx) = channel();
let document_id = document_id.to_string();
tokio::spawn(
async move { tx.send(get_latest_snapshot_from_server(&document_id, server).await) },
);
tokio::spawn(async move {
tx.send(
async move {
match weak_server {
None => Ok(None),
Some(weak_server) => get_latest_snapshot_from_server(&document_id, weak_server)
.await
.map_err(internal_error),
}
}
.await,
)
});
FutureResult::new(async {
{
Ok(
rx.await
.map_err(internal_error)?
.map_err(internal_error)?
.map_err(internal_error)??
.map(|snapshot| DocumentSnapshot {
snapshot_id: snapshot.snapshot_id,
document_id: snapshot.oid,
@ -55,4 +80,29 @@ impl DocumentCloudService for SupabaseDocumentCloudServiceImpl {
}
})
}
fn get_document_data(&self, document_id: &str) -> FutureResult<Option<DocumentData>, FlowyError> {
let weak_server = self.server.get_pg_server();
let (tx, rx) = channel();
let document_id = document_id.to_string();
tokio::spawn(async move {
tx.send(
async move {
match weak_server {
None => Ok(Ok(None)),
Some(weak_server) => {
let action = FetchObjectUpdateAction::new(&document_id, weak_server);
action.run().await.map(|updates| {
let document =
Document::from_updates(CollabOrigin::Empty, updates, &document_id, vec![])?;
Ok(document.get_document_data().ok())
})
},
}
}
.await,
)
});
FutureResult::new(async { rx.await.map_err(internal_error)?.map_err(internal_error)? })
}
}

View File

@ -1,44 +1,56 @@
use std::sync::Arc;
use chrono::{DateTime, Utc};
use collab_folder::core::{CollabOrigin, Folder};
use futures_util::{pin_mut, StreamExt};
use tokio::sync::oneshot::channel;
use uuid::Uuid;
use crate::supabase::impls::{get_latest_snapshot_from_server, get_updates_from_server};
use flowy_error::{internal_error, ErrorCode, FlowyError};
use flowy_folder2::deps::{FolderCloudService, FolderSnapshot, Workspace};
use flowy_folder2::deps::{FolderCloudService, FolderData, FolderSnapshot, Workspace};
use lib_infra::future::FutureResult;
use crate::supabase::pg_db::PostgresObject;
use crate::supabase::impls::{
get_latest_snapshot_from_server, get_updates_from_server, FetchObjectUpdateAction,
};
use crate::supabase::postgres_db::PostgresObject;
use crate::supabase::sql_builder::{InsertSqlBuilder, SelectSqlBuilder};
use crate::supabase::PostgresServer;
use crate::supabase::SupabaseServerService;
pub(crate) const WORKSPACE_TABLE: &str = "af_workspace";
pub(crate) const WORKSPACE_ID: &str = "workspace_id";
const WORKSPACE_NAME: &str = "workspace_name";
const CREATED_AT: &str = "created_at";
pub(crate) struct SupabaseFolderCloudServiceImpl {
server: Arc<PostgresServer>,
pub struct SupabaseFolderCloudServiceImpl<T> {
server: T,
}
impl SupabaseFolderCloudServiceImpl {
pub fn new(server: Arc<PostgresServer>) -> Self {
impl<T> SupabaseFolderCloudServiceImpl<T> {
pub fn new(server: T) -> Self {
Self { server }
}
}
impl FolderCloudService for SupabaseFolderCloudServiceImpl {
impl<T> FolderCloudService for SupabaseFolderCloudServiceImpl<T>
where
T: SupabaseServerService,
{
fn create_workspace(&self, uid: i64, name: &str) -> FutureResult<Workspace, FlowyError> {
let server = self.server.clone();
let weak_server = self.server.try_get_pg_server();
let (tx, rx) = channel();
let name = name.to_string();
tokio::spawn(async move {
tx.send(
async move {
let client = server.get_pg_client().await.recv().await?;
create_workspace(&client, uid, &name).await
match weak_server?.upgrade() {
None => Err(FlowyError::new(
ErrorCode::PgDatabaseError,
"Server is close",
)),
Some(server) => {
let client = server.get_pg_client().await.recv().await?;
create_workspace(&client, uid, &name).await
},
}
}
.await,
)
@ -46,21 +58,58 @@ impl FolderCloudService for SupabaseFolderCloudServiceImpl {
FutureResult::new(async { rx.await.map_err(internal_error)? })
}
fn get_folder_data(&self, workspace_id: &str) -> FutureResult<Option<FolderData>, FlowyError> {
let weak_server = self.server.get_pg_server();
let (tx, rx) = channel();
let workspace_id = workspace_id.to_string();
tokio::spawn(async move {
tx.send(
async move {
match weak_server {
None => Ok(Ok(None)),
Some(weak_server) => get_updates_from_server(&workspace_id, weak_server)
.await
.map(|updates| {
let folder = Folder::from_collab_raw_data(
CollabOrigin::Empty,
updates,
&workspace_id,
vec![],
)?;
Ok(folder.get_folder_data())
}),
}
}
.await,
)
});
FutureResult::new(async { rx.await.map_err(internal_error)?.map_err(internal_error)? })
}
fn get_folder_latest_snapshot(
&self,
workspace_id: &str,
) -> FutureResult<Option<FolderSnapshot>, FlowyError> {
let server = Arc::downgrade(&self.server);
let weak_server = self.server.get_pg_server();
let workspace_id = workspace_id.to_string();
let (tx, rx) = channel();
tokio::spawn(
async move { tx.send(get_latest_snapshot_from_server(&workspace_id, server).await) },
);
tokio::spawn(async move {
tx.send(
async {
match weak_server {
None => Ok(None),
Some(weak_server) => get_latest_snapshot_from_server(&workspace_id, weak_server)
.await
.map_err(internal_error),
}
}
.await,
)
});
FutureResult::new(async {
Ok(
rx.await
.map_err(internal_error)?
.map_err(internal_error)?
.map_err(internal_error)??
.map(|snapshot| FolderSnapshot {
snapshot_id: snapshot.snapshot_id,
database_id: snapshot.oid,
@ -71,13 +120,34 @@ impl FolderCloudService for SupabaseFolderCloudServiceImpl {
})
}
fn get_folder_updates(&self, workspace_id: &str) -> FutureResult<Vec<Vec<u8>>, FlowyError> {
let server = Arc::downgrade(&self.server);
fn get_folder_updates(
&self,
workspace_id: &str,
_uid: i64,
) -> FutureResult<Vec<Vec<u8>>, FlowyError> {
let weak_server = self.server.get_pg_server();
let (tx, rx) = channel();
let workspace_id = workspace_id.to_string();
tokio::spawn(async move { tx.send(get_updates_from_server(&workspace_id, server).await) });
tokio::spawn(async move {
tx.send(
async move {
match weak_server {
None => Ok(vec![]),
Some(weak_server) => {
let action = FetchObjectUpdateAction::new(&workspace_id, weak_server);
action.run_with_fix_interval(5, 10).await
},
}
}
.await,
)
});
FutureResult::new(async { rx.await.map_err(internal_error)?.map_err(internal_error) })
}
fn service_name(&self) -> String {
"Supabase".to_string()
}
}
async fn create_workspace(
@ -147,15 +217,17 @@ mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::RwLock;
use uuid::Uuid;
use flowy_folder2::deps::FolderCloudService;
use flowy_server_config::supabase_config::PostgresConfiguration;
use flowy_user::event_map::UserAuthService;
use lib_infra::box_any::BoxAny;
use crate::supabase::impls::folder::SupabaseFolderCloudServiceImpl;
use crate::supabase::impls::SupabaseUserAuthServiceImpl;
use crate::supabase::{PostgresConfiguration, PostgresServer};
use crate::supabase::{PostgresServer, SupabaseServerServiceImpl};
#[tokio::test]
async fn create_user_workspace() {
@ -165,7 +237,8 @@ mod tests {
let server = Arc::new(PostgresServer::new(
PostgresConfiguration::from_env().unwrap(),
));
let user_service = SupabaseUserAuthServiceImpl::new(server.clone());
let weak_server = SupabaseServerServiceImpl(Arc::new(RwLock::new(Some(server.clone()))));
let user_service = SupabaseUserAuthServiceImpl::new(weak_server.clone());
// create user
let mut params = HashMap::new();
@ -173,7 +246,7 @@ mod tests {
let user = user_service.sign_up(BoxAny::new(params)).await.unwrap();
// create workspace
let folder_service = SupabaseFolderCloudServiceImpl::new(server);
let folder_service = SupabaseFolderCloudServiceImpl::new(weak_server);
let workspace = folder_service
.create_workspace(user.user_id, "my test workspace")
.await

View File

@ -1,7 +1,7 @@
pub use collab_storage::*;
pub(crate) use database::*;
pub(crate) use document::*;
pub(crate) use folder::*;
pub use database::*;
pub use document::*;
pub use folder::*;
pub use user::*;
mod collab_storage;

View File

@ -1,5 +1,4 @@
use std::str::FromStr;
use std::sync::Arc;
use deadpool_postgres::GenericClient;
use futures::pin_mut;
@ -11,39 +10,50 @@ use uuid::Uuid;
use flowy_error::{internal_error, ErrorCode, FlowyError};
use flowy_user::entities::{SignInResponse, SignUpResponse, UpdateUserProfileParams, UserProfile};
use flowy_user::event_map::{UserAuthService, UserCredentials};
use flowy_user::services::{uuid_from_box_any, AuthType};
use lib_infra::box_any::BoxAny;
use lib_infra::future::FutureResult;
use crate::supabase::entities::{GetUserProfileParams, UserProfileResponse};
use crate::supabase::pg_db::PostgresObject;
use crate::supabase::postgres_db::PostgresObject;
use crate::supabase::sql_builder::{SelectSqlBuilder, UpdateSqlBuilder};
use crate::supabase::PostgresServer;
use crate::util::uuid_from_box_any;
use crate::supabase::SupabaseServerService;
pub(crate) const USER_TABLE: &str = "af_user";
pub(crate) const USER_PROFILE_TABLE: &str = "af_user_profile";
pub const USER_UUID: &str = "uuid";
pub struct SupabaseUserAuthServiceImpl {
server: Arc<PostgresServer>,
pub struct SupabaseUserAuthServiceImpl<T> {
server: T,
}
impl SupabaseUserAuthServiceImpl {
pub fn new(server: Arc<PostgresServer>) -> Self {
impl<T> SupabaseUserAuthServiceImpl<T> {
pub fn new(server: T) -> Self {
Self { server }
}
}
impl UserAuthService for SupabaseUserAuthServiceImpl {
impl<T> UserAuthService for SupabaseUserAuthServiceImpl<T>
where
T: SupabaseServerService,
{
fn sign_up(&self, params: BoxAny) -> FutureResult<SignUpResponse, FlowyError> {
let server = self.server.clone();
let weak_server = self.server.try_get_pg_server();
let (tx, rx) = channel();
tokio::spawn(async move {
tx.send(
async {
let client = server.get_pg_client().await.recv().await?;
let uuid = uuid_from_box_any(params)?;
create_user_with_uuid(&client, uuid).await
async move {
match weak_server?.upgrade() {
Some(server) => {
let client = server.get_pg_client().await.recv().await?;
let params = uuid_from_box_any(params)?;
create_user_with_uuid(&client, params.uuid, params.email).await
},
None => Err(FlowyError::new(
ErrorCode::PgDatabaseError,
"Server is close",
)),
}
}
.await,
)
@ -52,19 +62,28 @@ impl UserAuthService for SupabaseUserAuthServiceImpl {
}
fn sign_in(&self, params: BoxAny) -> FutureResult<SignInResponse, FlowyError> {
let server = self.server.clone();
let server = self.server.try_get_pg_server();
let (tx, rx) = channel();
tokio::spawn(async move {
tx.send(
async {
let client = server.get_pg_client().await.recv().await?;
let uuid = uuid_from_box_any(params)?;
let user_profile = get_user_profile(&client, GetUserProfileParams::Uuid(uuid)).await?;
Ok(SignInResponse {
user_id: user_profile.uid,
workspace_id: user_profile.workspace_id,
..Default::default()
})
match server?.upgrade() {
None => Err(FlowyError::new(
ErrorCode::PgDatabaseError,
"Server is close",
)),
Some(server) => {
let client = server.get_pg_client().await.recv().await?;
let uuid = uuid_from_box_any(params)?.uuid;
let user_profile =
get_user_profile(&client, GetUserProfileParams::Uuid(uuid)).await?;
Ok(SignInResponse {
user_id: user_profile.uid,
workspace_id: user_profile.workspace_id,
..Default::default()
})
},
}
}
.await,
)
@ -81,13 +100,17 @@ impl UserAuthService for SupabaseUserAuthServiceImpl {
_credential: UserCredentials,
params: UpdateUserProfileParams,
) -> FutureResult<(), FlowyError> {
let server = self.server.clone();
let weak_server = self.server.try_get_pg_server();
let (tx, rx) = channel();
tokio::spawn(async move {
tx.send(
async move {
let client = server.get_pg_client().await.recv().await?;
update_user_profile(&client, params).await
if let Some(server) = weak_server?.upgrade() {
let client = server.get_pg_client().await.recv().await?;
update_user_profile(&client, params).await
} else {
Ok(())
}
}
.await,
)
@ -99,28 +122,33 @@ impl UserAuthService for SupabaseUserAuthServiceImpl {
&self,
credential: UserCredentials,
) -> FutureResult<Option<UserProfile>, FlowyError> {
let server = self.server.clone();
let weak_server = self.server.try_get_pg_server();
let (tx, rx) = channel();
tokio::spawn(async move {
tx.send(
async move {
let client = server.get_pg_client().await.recv().await?;
let uid = credential
.uid
.ok_or(FlowyError::new(ErrorCode::InvalidParams, "uid is required"))?;
let user_profile = get_user_profile(&client, GetUserProfileParams::Uid(uid))
.await
.ok()
.map(|user_profile| UserProfile {
id: user_profile.uid,
email: user_profile.email,
name: user_profile.name,
token: "".to_string(),
icon_url: "".to_string(),
openai_key: "".to_string(),
workspace_id: user_profile.workspace_id,
});
Ok(user_profile)
if let Some(server) = weak_server?.upgrade() {
let client = server.get_pg_client().await.recv().await?;
let uid = credential
.uid
.ok_or(FlowyError::new(ErrorCode::InvalidParams, "uid is required"))?;
let user_profile = get_user_profile(&client, GetUserProfileParams::Uid(uid))
.await
.ok()
.map(|user_profile| UserProfile {
id: user_profile.uid,
email: user_profile.email,
name: user_profile.name,
token: "".to_string(),
icon_url: "".to_string(),
openai_key: "".to_string(),
workspace_id: user_profile.workspace_id,
auth_type: AuthType::Supabase,
});
Ok(user_profile)
} else {
Ok(None)
}
}
.await,
)
@ -130,13 +158,21 @@ impl UserAuthService for SupabaseUserAuthServiceImpl {
fn check_user(&self, credential: UserCredentials) -> FutureResult<(), FlowyError> {
let uuid = credential.uuid.and_then(|uuid| Uuid::from_str(&uuid).ok());
let server = self.server.clone();
let weak_server = self.server.try_get_pg_server();
let (tx, rx) = channel();
tokio::spawn(async move {
tx.send(
async move {
let client = server.get_pg_client().await.recv().await?;
check_user(&client, credential.uid, uuid).await
match weak_server?.upgrade() {
None => Err(FlowyError::new(
ErrorCode::PgDatabaseError,
"Server is close",
)),
Some(server) => {
let client = server.get_pg_client().await.recv().await?;
check_user(&client, credential.uid, uuid).await
},
}
}
.await,
)
@ -148,12 +184,13 @@ impl UserAuthService for SupabaseUserAuthServiceImpl {
async fn create_user_with_uuid(
client: &PostgresObject,
uuid: Uuid,
email: String,
) -> Result<SignUpResponse, FlowyError> {
let mut is_new = true;
if let Err(e) = client
.execute(
&format!("INSERT INTO {} (uuid) VALUES ($1);", USER_TABLE),
&[&uuid],
&format!("INSERT INTO {} (uuid, email) VALUES ($1,$2);", USER_TABLE),
&[&uuid, &email],
)
.await
{

View File

@ -65,13 +65,14 @@ DROP FUNCTION IF EXISTS check_and_delete_snapshots;
mod tests {
use tokio_postgres::NoTls;
use flowy_server_config::supabase_config::PostgresConfiguration;
use crate::supabase::migration::run_initial_drop;
use crate::supabase::*;
// ‼️‼️‼️ Warning: this test will create a table in the database
#[tokio::test]
async fn test_postgres_db() -> Result<(), anyhow::Error> {
if dotenv::from_filename(".env.test.danger").is_err() {
if dotenv::from_filename(".env.test").is_err() {
return Ok(());
}

View File

@ -21,3 +21,6 @@ DROP FUNCTION IF EXISTS af_collab_snapshot_update_edit_count;
DROP TRIGGER IF EXISTS check_and_delete_snapshots_trigger ON af_collab_snapshot CASCADE;
DROP FUNCTION IF EXISTS check_and_delete_snapshots;
DROP TRIGGER IF EXISTS new_af_collab_row_trigger ON af_collab CASCADE;
DROP FUNCTION IF EXISTS notify_on_insert_af_collab;

View File

@ -1,6 +1,7 @@
-- user table
CREATE TABLE IF NOT EXISTS af_user (
uuid UUID PRIMARY KEY,
email TEXT DEFAULT '',
uid BIGINT GENERATED ALWAYS AS IDENTITY,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
@ -14,8 +15,8 @@ CREATE TABLE IF NOT EXISTS af_user_profile (
);
-- user_profile trigger
CREATE OR REPLACE FUNCTION create_af_user_profile_trigger_func() RETURNS TRIGGER AS $$ BEGIN
INSERT INTO af_user_profile (uid, uuid)
VALUES (NEW.uid, NEW.uuid);
INSERT INTO af_user_profile (uid, uuid, email)
VALUES (NEW.uid, NEW.uuid, NEW.email);
RETURN NEW;
END $$ LANGUAGE plpgsql;
CREATE TRIGGER create_af_user_profile_trigger BEFORE
@ -42,13 +43,26 @@ CREATE TABLE IF NOT EXISTS af_collab (
key BIGINT GENERATED ALWAYS AS IDENTITY,
value BYTEA NOT NULL,
value_size INTEGER,
uid BIGINT NOT NULL,
md5 TEXT DEFAULT '',
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (oid, key)
);
-- collab pg notify trigger. It will notify the frontend when a new row is inserted in the af_collab table.
CREATE OR REPLACE FUNCTION notify_on_insert_af_collab() RETURNS trigger AS $$
BEGIN
-- use pg_notify to send a notification
PERFORM pg_notify('new_row_in_af_collab', NEW.oid::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER new_af_collab_row_trigger
AFTER INSERT ON af_collab
FOR EACH ROW EXECUTE PROCEDURE notify_on_insert_af_collab();
-- collab statistics. It will be used to store the edit_count of the collab.
CREATE TABLE IF NOT EXISTS af_collab_statistics (
oid TEXT PRIMARY KEY,
edit_count BIGINT DEFAULT 0
edit_count BIGINT NOT NULL DEFAULT 0
);
-- collab statistics trigger. It will increment the edit_count of the collab when a new row is inserted in the af_collab table.
CREATE OR REPLACE FUNCTION increment_af_collab_edit_count() RETURNS TRIGGER AS $$ BEGIN IF EXISTS(
@ -76,19 +90,19 @@ CREATE TABLE IF NOT EXISTS af_collab_snapshot (
name TEXT DEFAULT '',
blob BYTEA NOT NULL,
blob_size INTEGER NOT NULL,
edit_count BIGINT DEFAULT 0,
edit_count BIGINT NOT NULL DEFAULT 0,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
-- auto insert edit_count in the snapshot table.
CREATE OR REPLACE FUNCTION af_collab_snapshot_update_edit_count() RETURNS TRIGGER AS $$ BEGIN NEW.edit_count := (
SELECT edit_count
SELECT COALESCE(edit_count, 0)
FROM af_collab_statistics
WHERE oid = NEW.oid
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER af_collab_snapshot_update_edit_count_trigger BEFORE
CREATE TRIGGER af_collab_snapshot_update_edit_count_trigger AFTER
INSERT ON af_collab_snapshot FOR EACH ROW EXECUTE FUNCTION af_collab_snapshot_update_edit_count();
-- collab snapshot trigger. It will delete the oldest snapshot if the number of snapshots is greater than 20.
-- It can use the PG_CRON extension to run this trigger periodically.

View File

@ -1,12 +1,10 @@
pub use configuration::*;
pub use server::*;
mod entities;
pub mod impls;
mod pg_db;
mod postgres_db;
mod sql_builder;
// mod postgres_http;
mod configuration;
mod migration;
mod queue;
mod server;

View File

@ -6,10 +6,10 @@ use deadpool_postgres::{Manager, ManagerConfig, Object, Pool, RecyclingMethod};
use tokio_postgres::NoTls;
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_server_config::supabase_config::PostgresConfiguration;
use crate::supabase::migration::run_migrations;
use crate::supabase::queue::RequestPayload;
use crate::supabase::PostgresConfiguration;
pub type PostgresObject = Object;
pub struct PostgresDB {
@ -25,6 +25,9 @@ impl PostgresDB {
}
pub async fn new(configuration: PostgresConfiguration) -> Result<Self, anyhow::Error> {
// TODO(nathan): Handling connection surges using
// https://supabase.com/blog/supabase-pgbouncer
// https://supabase.com/docs/guides/database/connecting-to-postgres
let mut pg_config = tokio_postgres::Config::new();
pg_config
.host(&configuration.url)

View File

@ -164,8 +164,13 @@ impl<Payload> RequestRunner<Payload>
where
Payload: 'static + Send + Sync,
{
pub async fn run(mut notifier: watch::Receiver<bool>, server: Weak<dyn RequestHandler<Payload>>) {
server.upgrade().unwrap().notify();
pub async fn run(
mut notifier: watch::Receiver<bool>,
handler: Weak<dyn RequestHandler<Payload>>,
) {
if let Some(handler) = handler.upgrade() {
handler.notify();
}
loop {
// stops the runner if the notifier was closed.
if notifier.changed().await.is_err() {
@ -177,10 +182,10 @@ where
break;
}
if let Some(server) = server.upgrade() {
if let Some(request) = server.prepare_request().await {
if let Some(handler) = handler.upgrade() {
if let Some(request) = handler.prepare_request().await {
if request.is_done() {
server.notify();
handler.notify();
continue;
}
@ -188,8 +193,8 @@ where
continue;
}
let _ = server.handle_request(request).await;
server.notify();
let _ = handler.handle_request(request).await;
handler.notify();
}
} else {
break;

View File

@ -4,13 +4,16 @@ use std::sync::{Arc, Weak};
use std::time::Duration;
use appflowy_integrate::RemoteCollabStorage;
use parking_lot::RwLock;
use tokio::spawn;
use tokio::sync::{watch, Mutex};
use tokio::time::interval;
use flowy_database2::deps::DatabaseCloudService;
use flowy_document2::deps::DocumentCloudService;
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_folder2::deps::FolderCloudService;
use flowy_server_config::supabase_config::{PostgresConfiguration, SupabaseConfiguration};
use flowy_user::event_map::UserAuthService;
use lib_infra::async_trait::async_trait;
@ -18,11 +21,10 @@ use crate::supabase::impls::{
PgCollabStorageImpl, SupabaseDatabaseCloudServiceImpl, SupabaseDocumentCloudServiceImpl,
SupabaseFolderCloudServiceImpl, SupabaseUserAuthServiceImpl,
};
use crate::supabase::pg_db::{PgClientReceiver, PostgresDB, PostgresEvent};
use crate::supabase::postgres_db::{PgClientReceiver, PostgresDB, PostgresEvent};
use crate::supabase::queue::{
PendingRequest, RequestHandler, RequestQueue, RequestRunner, RequestState,
};
use crate::supabase::{PostgresConfiguration, SupabaseConfiguration};
use crate::AppFlowyServer;
/// Supabase server is used to provide the implementation of the [AppFlowyServer] trait.
@ -30,70 +32,148 @@ use crate::AppFlowyServer;
pub struct SupabaseServer {
#[allow(dead_code)]
config: SupabaseConfiguration,
postgres: Arc<PostgresServer>,
postgres: Arc<RwLock<Option<Arc<PostgresServer>>>>,
}
impl SupabaseServer {
pub fn new(config: SupabaseConfiguration) -> Self {
let postgres = PostgresServer::new(config.postgres_config.clone());
let postgres = if config.enable_sync {
Some(Arc::new(PostgresServer::new(
config.postgres_config.clone(),
)))
} else {
None
};
Self {
config,
postgres: Arc::new(postgres),
postgres: Arc::new(RwLock::new(postgres)),
}
}
pub fn set_enable_sync(&self, enable: bool) {
if enable {
if self.postgres.read().is_some() {
return;
}
*self.postgres.write() = Some(Arc::new(PostgresServer::new(
self.config.postgres_config.clone(),
)));
} else {
*self.postgres.write() = None;
}
}
}
impl AppFlowyServer for SupabaseServer {
fn enable_sync(&self, enable: bool) {
tracing::info!("supabase sync: {}", enable);
self.set_enable_sync(enable);
}
fn user_service(&self) -> Arc<dyn UserAuthService> {
Arc::new(SupabaseUserAuthServiceImpl::new(self.postgres.clone()))
Arc::new(SupabaseUserAuthServiceImpl::new(SupabaseServerServiceImpl(
self.postgres.clone(),
)))
}
fn folder_service(&self) -> Arc<dyn FolderCloudService> {
Arc::new(SupabaseFolderCloudServiceImpl::new(self.postgres.clone()))
Arc::new(SupabaseFolderCloudServiceImpl::new(
SupabaseServerServiceImpl(self.postgres.clone()),
))
}
fn database_service(&self) -> Arc<dyn DatabaseCloudService> {
Arc::new(SupabaseDatabaseCloudServiceImpl::new(self.postgres.clone()))
Arc::new(SupabaseDatabaseCloudServiceImpl::new(
SupabaseServerServiceImpl(self.postgres.clone()),
))
}
fn document_service(&self) -> Arc<dyn DocumentCloudService> {
Arc::new(SupabaseDocumentCloudServiceImpl::new(self.postgres.clone()))
Arc::new(SupabaseDocumentCloudServiceImpl::new(
SupabaseServerServiceImpl(self.postgres.clone()),
))
}
fn collab_storage(&self) -> Option<Arc<dyn RemoteCollabStorage>> {
Some(Arc::new(PgCollabStorageImpl::new(self.postgres.clone())))
Some(Arc::new(PgCollabStorageImpl::new(
SupabaseServerServiceImpl(self.postgres.clone()),
)))
}
}
/// [SupabaseServerService] is used to provide supabase services. The caller can using this trait
/// to get the services and it might need to handle the situation when the services is unavailable.
/// For example, when user stop syncing, the services will be unavailable or when the user is logged
/// out.
pub trait SupabaseServerService: Send + Sync + 'static {
fn get_pg_server(&self) -> Option<Weak<PostgresServer>>;
fn try_get_pg_server(&self) -> FlowyResult<Weak<PostgresServer>>;
}
#[derive(Clone)]
pub struct SupabaseServerServiceImpl(pub Arc<RwLock<Option<Arc<PostgresServer>>>>);
impl SupabaseServerService for SupabaseServerServiceImpl {
/// Get the postgres server, if the postgres server is not available, return None.
fn get_pg_server(&self) -> Option<Weak<PostgresServer>> {
self.0.read().as_ref().map(Arc::downgrade)
}
/// Try to get the postgres server, if the postgres server is not available, return an error.
fn try_get_pg_server(&self) -> FlowyResult<Weak<PostgresServer>> {
self.0.read().as_ref().map(Arc::downgrade).ok_or_else(|| {
FlowyError::new(
ErrorCode::SupabaseSyncRequired,
"Supabase sync is disabled, please enable it first",
)
})
}
}
pub struct PostgresServer {
inner: Arc<PostgresServerInner>,
request_handler: Arc<PostgresRequestHandler>,
}
impl Deref for PostgresServer {
type Target = Arc<PostgresServerInner>;
type Target = Arc<PostgresRequestHandler>;
fn deref(&self) -> &Self::Target {
&self.inner
&self.request_handler
}
}
pub struct PostgresServerInner {
impl PostgresServer {
pub fn new(config: PostgresConfiguration) -> Self {
let (runner_notifier_tx, runner_notifier) = watch::channel(false);
let request_handler = Arc::new(PostgresRequestHandler::new(runner_notifier_tx, config));
// Initialize the connection to the database
let conn = PendingRequest::new(PostgresEvent::ConnectDB);
request_handler.queue.lock().push(conn);
let handler = Arc::downgrade(&request_handler) as Weak<dyn RequestHandler<PostgresEvent>>;
spawn(RequestRunner::run(runner_notifier, handler));
Self { request_handler }
}
}
pub struct PostgresRequestHandler {
config: PostgresConfiguration,
db: Arc<Mutex<Option<Arc<PostgresDB>>>>,
queue: parking_lot::Mutex<RequestQueue<PostgresEvent>>,
notifier: Arc<watch::Sender<bool>>,
runner_notifier: Arc<watch::Sender<bool>>,
sequence: AtomicU32,
}
impl PostgresServerInner {
pub fn new(notifier: watch::Sender<bool>, config: PostgresConfiguration) -> Self {
impl PostgresRequestHandler {
pub fn new(runner_notifier: watch::Sender<bool>, config: PostgresConfiguration) -> Self {
let db = Arc::new(Default::default());
let queue = parking_lot::Mutex::new(RequestQueue::new());
let notifier = Arc::new(notifier);
let runner_notifier = Arc::new(runner_notifier);
Self {
db,
queue,
notifier,
runner_notifier,
config,
sequence: Default::default(),
}
@ -114,28 +194,13 @@ impl PostgresServerInner {
}
}
impl PostgresServer {
pub fn new(config: PostgresConfiguration) -> Self {
let (notifier, notifier_rx) = watch::channel(false);
let inner = Arc::new(PostgresServerInner::new(notifier, config));
// Initialize the connection to the database
let conn = PendingRequest::new(PostgresEvent::ConnectDB);
inner.queue.lock().push(conn);
let handler = Arc::downgrade(&inner) as Weak<dyn RequestHandler<PostgresEvent>>;
spawn(RequestRunner::run(notifier_rx, handler));
Self { inner }
}
}
#[async_trait]
impl RequestHandler<PostgresEvent> for PostgresServerInner {
impl RequestHandler<PostgresEvent> for PostgresRequestHandler {
async fn prepare_request(&self) -> Option<PendingRequest<PostgresEvent>> {
match self.queue.try_lock() {
None => {
// If acquire the lock failed, try after 300ms
let weak_notifier = Arc::downgrade(&self.notifier);
let weak_notifier = Arc::downgrade(&self.runner_notifier);
spawn(async move {
interval(Duration::from_millis(300)).tick().await;
if let Some(notifier) = weak_notifier.upgrade() {
@ -193,6 +258,6 @@ impl RequestHandler<PostgresEvent> for PostgresServerInner {
}
fn notify(&self) {
let _ = self.notifier.send(false);
let _ = self.runner_notifier.send(false);
}
}

View File

@ -52,8 +52,12 @@ pub struct SelectSqlBuilder {
table: String,
columns: Vec<String>,
where_clause: Option<(String, Box<dyn ToSql + Sync + Send>)>,
where_clause_in: Option<(String, Vec<Box<dyn ToSql + Sync + Send>>)>,
group_by_column: Option<String>,
order_by: Option<(String, bool)>,
limit: Option<i64>,
lock: bool,
array_agg_columns: Vec<String>,
}
impl SelectSqlBuilder {
@ -62,16 +66,35 @@ impl SelectSqlBuilder {
table: table.to_string(),
columns: Vec::new(),
where_clause: None,
where_clause_in: None,
group_by_column: None,
order_by: None,
limit: None,
lock: false,
array_agg_columns: vec![],
}
}
pub fn lock(mut self) -> Self {
self.lock = true;
self
}
pub fn column(mut self, column: &str) -> Self {
self.columns.push(column.to_string());
self
}
pub fn group_by(mut self, column: &str) -> Self {
self.group_by_column = Some(column.to_string());
self
}
pub fn array_agg(mut self, column: &str) -> Self {
self.array_agg_columns.push(column.to_string());
self
}
pub fn order_by(mut self, column: &str, asc: bool) -> Self {
self.order_by = Some((column.to_string(), asc));
self
@ -82,13 +105,33 @@ impl SelectSqlBuilder {
self
}
pub fn where_clause_in<T: 'static + ToSql + Sync + Send>(
mut self,
clause: &str,
values: Vec<T>,
) -> Self {
let boxed_values: Vec<_> = values
.into_iter()
.map(|value| Box::new(value) as Box<dyn ToSql + Send + Sync>)
.collect();
self.where_clause_in = Some((clause.to_string(), boxed_values));
self
}
pub fn limit(mut self, limit: i64) -> Self {
self.limit = Some(limit);
self
}
pub fn build(self) -> (String, Vec<Box<dyn ToSql + Sync + Send>>) {
let mut sql = format!("SELECT {} FROM {}", self.columns.join(", "), self.table);
let all_columns = self
.columns
.iter()
.chain(self.array_agg_columns.iter())
.cloned()
.collect::<Vec<_>>()
.join(", ");
let mut sql = format!("SELECT {} FROM {}", all_columns, self.table);
let mut params: Vec<_> = Vec::new();
if let Some((clause, value)) = self.where_clause {
@ -96,15 +139,46 @@ impl SelectSqlBuilder {
params.push(value);
}
if let Some((clause, values)) = self.where_clause_in {
let placeholders: Vec<String> = values
.iter()
.enumerate()
.map(|(i, _)| format!("${}", i + 1))
.collect();
sql.push_str(&format!(
" WHERE {} IN ({})",
clause,
placeholders.join(",")
));
params.extend(values);
}
if let Some(group_by_column) = self.group_by_column {
sql.push_str(&format!(" GROUP BY {}", group_by_column));
}
if let Some((order_by_column, asc)) = self.order_by {
let order = if asc { "ASC" } else { "DESC" };
sql.push_str(&format!(" ORDER BY {} {}", order_by_column, order));
}
// ARRAY_AGG is an aggregate function that concatenates the values from column_name
// into an array.
for array_agg_column in self.array_agg_columns {
sql = sql.replace(
&array_agg_column,
&format!("ARRAY_AGG({}) as {}", array_agg_column, array_agg_column),
);
}
if let Some(limit) = self.limit {
sql.push_str(&format!(" LIMIT {}", limit));
}
if self.lock {
sql.push_str(" FOR UPDATE");
}
(sql, params)
}
}

View File

@ -1,11 +1,4 @@
use std::collections::HashMap;
use std::str::FromStr;
use serde::{Deserialize, Deserializer};
use uuid::Uuid;
use flowy_error::{internal_error, ErrorCode, FlowyError};
use lib_infra::box_any::BoxAny;
/// Handles the case where the value is null. If the value is null, return the default value of the
/// type. Otherwise, deserialize the value.
@ -17,11 +10,3 @@ where
let opt = Option::deserialize(deserializer)?;
Ok(opt.unwrap_or_default())
}
pub(crate) fn uuid_from_box_any(any: BoxAny) -> Result<Uuid, FlowyError> {
let map: HashMap<String, String> = any.unbox_or_error()?;
let uuid = map
.get("uuid")
.ok_or_else(|| FlowyError::new(ErrorCode::MissingAuthField, "Missing uuid field"))?;
Uuid::from_str(uuid).map_err(internal_error)
}

View File

@ -1,10 +1,12 @@
use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::RwLock;
use uuid::Uuid;
use flowy_server::supabase::impls::{SupabaseUserAuthServiceImpl, USER_UUID};
use flowy_server::supabase::{PostgresConfiguration, PostgresServer};
use flowy_server::supabase::{PostgresServer, SupabaseServerServiceImpl};
use flowy_server_config::supabase_config::PostgresConfiguration;
use flowy_user::entities::{SignUpResponse, UpdateUserProfileParams};
use flowy_user::event_map::{UserAuthService, UserCredentials};
use lib_infra::box_any::BoxAny;
@ -17,26 +19,27 @@ async fn user_sign_up_test() {
if dotenv::from_filename("./.env.test").is_err() {
return;
}
let server = Arc::new(PostgresServer::new(
PostgresConfiguration::from_env().unwrap(),
));
let user_service = SupabaseUserAuthServiceImpl::new(server);
let user_service = user_auth_service_impl();
let mut params = HashMap::new();
params.insert(USER_UUID.to_string(), Uuid::new_v4().to_string());
let user: SignUpResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
assert!(!user.workspace_id.is_empty());
}
fn user_auth_service_impl() -> SupabaseUserAuthServiceImpl<SupabaseServerServiceImpl> {
let server = Arc::new(PostgresServer::new(
PostgresConfiguration::from_env().unwrap(),
));
let weak_server = SupabaseServerServiceImpl(Arc::new(RwLock::new(Some(server))));
SupabaseUserAuthServiceImpl::new(weak_server)
}
#[tokio::test]
async fn user_sign_up_with_existing_uuid_test() {
if dotenv::from_filename("./.env.test").is_err() {
return;
}
let server = Arc::new(PostgresServer::new(
PostgresConfiguration::from_env().unwrap(),
));
let user_service = SupabaseUserAuthServiceImpl::new(server);
let user_service = user_auth_service_impl();
let uuid = Uuid::new_v4();
let mut params = HashMap::new();
@ -54,10 +57,7 @@ async fn update_user_profile_test() {
if dotenv::from_filename("./.env.test").is_err() {
return;
}
let server = Arc::new(PostgresServer::new(
PostgresConfiguration::from_env().unwrap(),
));
let user_service = SupabaseUserAuthServiceImpl::new(server);
let user_service = user_auth_service_impl();
let uuid = Uuid::new_v4();
let mut params = HashMap::new();
@ -98,10 +98,7 @@ async fn get_user_profile_test() {
return;
}
setup_log();
let server = Arc::new(PostgresServer::new(
PostgresConfiguration::from_env().unwrap(),
));
let user_service = SupabaseUserAuthServiceImpl::new(server);
let user_service = user_auth_service_impl();
let uuid = Uuid::new_v4();
let mut params = HashMap::new();
@ -146,10 +143,7 @@ async fn get_not_exist_user_profile_test() {
return;
}
setup_log();
let server = Arc::new(PostgresServer::new(
PostgresConfiguration::from_env().unwrap(),
));
let user_service = SupabaseUserAuthServiceImpl::new(server);
let user_service = user_auth_service_impl();
let result = user_service
.get_user_profile(UserCredentials::from_uid(i64::MAX))
.await