feat: encrypt collab update (#3215)

* feat: implement encrypt and decrypt

* feat: encrypt and decrypt

* feat: update user profile with encrypt

* chore: store encryption sign

* fix: login in setting menu

* chore: show encryption account name

* chore: fix test

* ci: fix warnings

* test: enable supabase test

* chore: fix test and rename column

* fix: update user profile after set the secret

* fix: encryption with wrong secret

* fix: don't save user data if the return value of did_sign_up is err

* chore: encrypt snapshot data

* chore: refactor snapshots interface

* ci: add tests

* chore: update collab rev
This commit is contained in:
Nathan.fooo
2023-08-17 23:46:39 +08:00
committed by GitHub
parent 103f56922f
commit 649b0a135a
103 changed files with 2825 additions and 905 deletions

View File

@ -36,6 +36,7 @@ flowy-database-deps = { path = "../flowy-database-deps" }
flowy-document-deps = { path = "../flowy-document-deps" }
flowy-error = { path = "../flowy-error", features = ["impl_from_postgres", "impl_from_serde", "impl_from_reqwest"] }
flowy-server-config = { path = "../flowy-server-config" }
flowy-encrypt = { path = "../flowy-encrypt" }
[dev-dependencies]
uuid = { version = "1.3.3", features = ["v4"] }

View File

@ -1,6 +1,7 @@
use std::sync::Arc;
use collab_plugins::cloud_storage::{CollabObject, RemoteCollabStorage};
use parking_lot::RwLock;
use serde_json::Value;
use flowy_database_deps::cloud::DatabaseCloudService;
@ -15,8 +16,26 @@ pub mod self_host;
pub mod supabase;
pub mod util;
pub trait AppFlowyEncryption: Send + Sync + 'static {
fn get_secret(&self) -> Option<String>;
fn set_secret(&self, secret: String);
}
impl<T> AppFlowyEncryption for Arc<T>
where
T: AppFlowyEncryption,
{
fn get_secret(&self) -> Option<String> {
(**self).get_secret()
}
fn set_secret(&self, secret: String) {
(**self).set_secret(secret)
}
}
pub trait AppFlowyServer: Send + Sync + 'static {
fn enable_sync(&self, _enable: bool) {}
fn set_enable_sync(&self, _enable: bool) {}
fn set_sync_device_id(&self, _device_id: &str) {}
fn user_service(&self) -> Arc<dyn UserService>;
fn folder_service(&self) -> Arc<dyn FolderCloudService>;
@ -25,3 +44,25 @@ pub trait AppFlowyServer: Send + Sync + 'static {
fn collab_storage(&self, collab_object: &CollabObject) -> Option<Arc<dyn RemoteCollabStorage>>;
fn handle_realtime_event(&self, _json: Value) {}
}
pub struct EncryptionImpl {
secret: RwLock<Option<String>>,
}
impl EncryptionImpl {
pub fn new(secret: Option<String>) -> Self {
Self {
secret: RwLock::new(secret),
}
}
}
impl AppFlowyEncryption for EncryptionImpl {
fn get_secret(&self) -> Option<String> {
self.secret.read().clone()
}
fn set_secret(&self, secret: String) {
*self.secret.write() = Some(secret);
}
}

View File

@ -4,7 +4,6 @@ use collab_plugins::cloud_storage::CollabType;
use flowy_database_deps::cloud::{
CollabObjectUpdate, CollabObjectUpdateByOid, DatabaseCloudService, DatabaseSnapshot,
};
use lib_infra::future::FutureResult;
pub(crate) struct LocalServerDatabaseCloudServiceImpl();
@ -26,10 +25,11 @@ impl DatabaseCloudService for LocalServerDatabaseCloudServiceImpl {
FutureResult::new(async move { Ok(CollabObjectUpdateByOid::default()) })
}
fn get_collab_latest_snapshot(
fn get_collab_snapshots(
&self,
_object_id: &str,
) -> FutureResult<Option<DatabaseSnapshot>, Error> {
FutureResult::new(async move { Ok(None) })
_limit: usize,
) -> FutureResult<Vec<DatabaseSnapshot>, Error> {
FutureResult::new(async move { Ok(vec![]) })
}
}

View File

@ -1,6 +1,6 @@
use anyhow::Error;
use flowy_document_deps::cloud::*;
use flowy_document_deps::cloud::*;
use lib_infra::future::FutureResult;
pub(crate) struct LocalServerDocumentCloudServiceImpl();
@ -10,11 +10,12 @@ impl DocumentCloudService for LocalServerDocumentCloudServiceImpl {
FutureResult::new(async move { Ok(vec![]) })
}
fn get_document_latest_snapshot(
fn get_document_snapshots(
&self,
_document_id: &str,
) -> FutureResult<Option<DocumentSnapshot>, Error> {
FutureResult::new(async move { Ok(None) })
_limit: usize,
) -> FutureResult<Vec<DocumentSnapshot>, Error> {
FutureResult::new(async move { Ok(vec![]) })
}
fn get_document_data(&self, _document_id: &str) -> FutureResult<Option<DocumentData>, Error> {

View File

@ -1,6 +1,7 @@
use anyhow::Error;
use std::sync::Arc;
use anyhow::Error;
use flowy_folder_deps::cloud::{
gen_workspace_id, FolderCloudService, FolderData, FolderSnapshot, Workspace,
};
@ -30,11 +31,12 @@ impl FolderCloudService for LocalServerFolderCloudServiceImpl {
FutureResult::new(async move { Ok(None) })
}
fn get_folder_latest_snapshot(
fn get_folder_snapshots(
&self,
_workspace_id: &str,
) -> FutureResult<Option<FolderSnapshot>, Error> {
FutureResult::new(async move { Ok(None) })
_limit: usize,
) -> FutureResult<Vec<FolderSnapshot>, Error> {
FutureResult::new(async move { Ok(vec![]) })
}
fn get_folder_updates(&self, workspace_id: &str, uid: i64) -> FutureResult<Vec<Vec<u8>>, Error> {

View File

@ -39,10 +39,11 @@ impl UserService for LocalServerUserAuthServiceImpl {
name: user_name,
latest_workspace: user_workspace.clone(),
user_workspaces: vec![user_workspace],
is_new: true,
is_new_user: true,
email: Some(params.email),
token: None,
device_id: params.device_id,
encryption_type: EncryptionType::NoEncryption,
})
})
}
@ -64,6 +65,7 @@ impl UserService for LocalServerUserAuthServiceImpl {
email: Some(params.email),
token: None,
device_id: params.device_id,
encryption_type: EncryptionType::NoEncryption,
})
})
}

View File

@ -4,7 +4,6 @@ use collab_plugins::cloud_storage::CollabType;
use flowy_database_deps::cloud::{
CollabObjectUpdate, CollabObjectUpdateByOid, DatabaseCloudService, DatabaseSnapshot,
};
use lib_infra::future::FutureResult;
pub(crate) struct SelfHostedDatabaseCloudServiceImpl();
@ -26,10 +25,11 @@ impl DatabaseCloudService for SelfHostedDatabaseCloudServiceImpl {
FutureResult::new(async move { Ok(CollabObjectUpdateByOid::default()) })
}
fn get_collab_latest_snapshot(
fn get_collab_snapshots(
&self,
_object_id: &str,
) -> FutureResult<Option<DatabaseSnapshot>, Error> {
FutureResult::new(async move { Ok(None) })
_limit: usize,
) -> FutureResult<Vec<DatabaseSnapshot>, Error> {
FutureResult::new(async move { Ok(vec![]) })
}
}

View File

@ -1,6 +1,6 @@
use anyhow::Error;
use flowy_document_deps::cloud::*;
use flowy_document_deps::cloud::*;
use lib_infra::future::FutureResult;
pub(crate) struct SelfHostedDocumentCloudServiceImpl();
@ -10,11 +10,12 @@ impl DocumentCloudService for SelfHostedDocumentCloudServiceImpl {
FutureResult::new(async move { Ok(vec![]) })
}
fn get_document_latest_snapshot(
fn get_document_snapshots(
&self,
_document_id: &str,
) -> FutureResult<Option<DocumentSnapshot>, Error> {
FutureResult::new(async move { Ok(None) })
_limit: usize,
) -> FutureResult<Vec<DocumentSnapshot>, Error> {
FutureResult::new(async move { Ok(vec![]) })
}
fn get_document_data(&self, _document_id: &str) -> FutureResult<Option<DocumentData>, Error> {

View File

@ -1,4 +1,5 @@
use anyhow::Error;
use flowy_folder_deps::cloud::{
gen_workspace_id, FolderCloudService, FolderData, FolderSnapshot, Workspace,
};
@ -24,11 +25,12 @@ impl FolderCloudService for SelfHostedServerFolderCloudServiceImpl {
FutureResult::new(async move { Ok(None) })
}
fn get_folder_latest_snapshot(
fn get_folder_snapshots(
&self,
_workspace_id: &str,
) -> FutureResult<Option<FolderSnapshot>, Error> {
FutureResult::new(async move { Ok(None) })
_limit: usize,
) -> FutureResult<Vec<FolderSnapshot>, Error> {
FutureResult::new(async move { Ok(vec![]) })
}
fn get_folder_updates(

View File

@ -1,5 +1,5 @@
use std::str::FromStr;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use anyhow::Error;
use chrono::{DateTime, Utc};
@ -15,25 +15,39 @@ use lib_infra::async_trait::async_trait;
use lib_infra::util::md5;
use crate::supabase::api::request::{
create_snapshot, get_latest_snapshot_from_server, get_updates_from_server,
FetchObjectUpdateAction, UpdateItem,
create_snapshot, get_snapshots_from_server, get_updates_from_server, FetchObjectUpdateAction,
UpdateItem,
};
use crate::supabase::api::util::{
ExtendedResponse, InsertParamsBuilder, SupabaseBinaryColumnEncoder,
};
use crate::supabase::api::{PostgresWrapper, SupabaseServerService};
use crate::supabase::define::*;
use crate::AppFlowyEncryption;
pub struct SupabaseCollabStorageImpl<T> {
server: T,
rx: Mutex<Option<RemoteUpdateReceiver>>,
encryption: Weak<dyn AppFlowyEncryption>,
}
impl<T> SupabaseCollabStorageImpl<T> {
pub fn new(server: T, rx: Option<RemoteUpdateReceiver>) -> Self {
pub fn new(
server: T,
rx: Option<RemoteUpdateReceiver>,
encryption: Weak<dyn AppFlowyEncryption>,
) -> Self {
Self {
server,
rx: Mutex::new(rx),
encryption,
}
}
pub fn secret(&self) -> Option<String> {
match self.encryption.upgrade() {
None => None,
Some(encryption) => encryption.get_secret(),
}
}
}
@ -55,11 +69,25 @@ where
Ok(updates)
}
async fn get_latest_snapshot(&self, object_id: &str) -> Option<RemoteCollabSnapshot> {
let postgrest = self.server.try_get_postgrest().ok()?;
get_latest_snapshot_from_server(object_id, postgrest)
.await
.ok()?
async fn get_snapshots(&self, object_id: &str, limit: usize) -> Vec<RemoteCollabSnapshot> {
match self.server.try_get_postgrest() {
Ok(postgrest) => match get_snapshots_from_server(object_id, postgrest, limit).await {
Ok(snapshots) => snapshots,
Err(err) => {
tracing::error!(
"🔴fetch snapshots by oid:{} with limit: {} failed: {:?}",
object_id,
limit,
err
);
vec![]
},
},
Err(err) => {
tracing::error!("🔴get postgrest failed: {:?}", err);
vec![]
},
}
}
async fn get_collab_state(&self, object_id: &str) -> Result<Option<RemoteCollabState>, Error> {
@ -116,7 +144,7 @@ where
let workspace_id = object
.get_workspace_id()
.ok_or(anyhow::anyhow!("Invalid workspace id"))?;
send_update(workspace_id, object, update, &postgrest).await?;
send_update(workspace_id, object, update, &postgrest, &self.secret()).await?;
}
Ok(())
@ -138,7 +166,14 @@ where
// If the update_items is empty, we can send the init_update directly
if update_items.is_empty() {
send_update(workspace_id, object, init_update, &postgrest).await?;
send_update(
workspace_id,
object,
init_update,
&postgrest,
&self.secret(),
)
.await?;
} else {
// 2.Merge the updates into one and then delete the merged updates
let merge_result = spawn_blocking(move || merge_updates(update_items, init_update)).await??;
@ -146,10 +181,12 @@ where
let value_size = merge_result.new_update.len() as i32;
let md5 = md5(&merge_result.new_update);
let new_update = format!("\\x{}", hex::encode(merge_result.new_update));
let (new_update, encrypt) =
SupabaseBinaryColumnEncoder::encode(merge_result.new_update, &self.secret())?;
let params = InsertParamsBuilder::new()
.insert("oid", object.object_id.clone())
.insert("new_value", new_update)
.insert("encrypt", encrypt)
.insert("md5", md5)
.insert("value_size", value_size)
.insert("partition_key", partition_key(&object.ty))
@ -160,7 +197,7 @@ where
.build();
postgrest
.rpc("flush_collab_updates_v2", params)
.rpc("flush_collab_updates_v3", params)
.execute()
.await?
.success()
@ -183,14 +220,16 @@ async fn send_update(
object: &CollabObject,
update: Vec<u8>,
postgrest: &Arc<PostgresWrapper>,
encryption_secret: &Option<String>,
) -> Result<(), Error> {
let value_size = update.len() as i32;
let md5 = md5(&update);
let update = SupabaseBinaryColumnEncoder::encode(update);
let (update, encrypt) = SupabaseBinaryColumnEncoder::encode(update, encryption_secret)?;
let builder = InsertParamsBuilder::new()
.insert("oid", object.object_id.clone())
.insert("partition_key", partition_key(&object.ty))
.insert("value", update)
.insert("encrypt", encrypt)
.insert("uid", object.uid)
.insert("md5", md5)
.insert("workspace_id", workspace_id)

View File

@ -8,7 +8,7 @@ use flowy_database_deps::cloud::{
use lib_infra::future::FutureResult;
use crate::supabase::api::request::{
get_latest_snapshot_from_server, BatchFetchObjectUpdateAction, FetchObjectUpdateAction,
get_snapshots_from_server, BatchFetchObjectUpdateAction, FetchObjectUpdateAction,
};
use crate::supabase::api::SupabaseServerService;
@ -69,23 +69,27 @@ where
FutureResult::new(async { rx.await? })
}
fn get_collab_latest_snapshot(
fn get_collab_snapshots(
&self,
object_id: &str,
) -> FutureResult<Option<DatabaseSnapshot>, Error> {
limit: usize,
) -> FutureResult<Vec<DatabaseSnapshot>, Error> {
let try_get_postgrest = self.server.try_get_postgrest();
let object_id = object_id.to_string();
FutureResult::new(async move {
let postgrest = try_get_postgrest?;
let snapshot = get_latest_snapshot_from_server(&object_id, postgrest)
let snapshots = get_snapshots_from_server(&object_id, postgrest, limit)
.await?
.into_iter()
.map(|snapshot| DatabaseSnapshot {
snapshot_id: snapshot.sid,
database_id: snapshot.oid,
data: snapshot.blob,
created_at: snapshot.created_at,
});
Ok(snapshot)
})
.collect::<Vec<_>>();
Ok(snapshots)
})
}
}

View File

@ -8,13 +8,16 @@ use tokio::sync::oneshot::channel;
use flowy_document_deps::cloud::{DocumentCloudService, DocumentSnapshot};
use lib_infra::future::FutureResult;
use crate::supabase::api::request::{get_latest_snapshot_from_server, FetchObjectUpdateAction};
use crate::supabase::api::request::{get_snapshots_from_server, FetchObjectUpdateAction};
use crate::supabase::api::SupabaseServerService;
pub struct SupabaseDocumentServiceImpl<T>(T);
pub struct SupabaseDocumentServiceImpl<T> {
server: T,
}
impl<T> SupabaseDocumentServiceImpl<T> {
pub fn new(server: T) -> Self {
Self(server)
Self { server }
}
}
@ -23,7 +26,7 @@ where
T: SupabaseServerService,
{
fn get_document_updates(&self, document_id: &str) -> FutureResult<Vec<Vec<u8>>, Error> {
let try_get_postgrest = self.0.try_get_weak_postgrest();
let try_get_postgrest = self.server.try_get_weak_postgrest();
let document_id = document_id.to_string();
let (tx, rx) = channel();
tokio::spawn(async move {
@ -39,28 +42,31 @@ where
FutureResult::new(async { rx.await? })
}
fn get_document_latest_snapshot(
fn get_document_snapshots(
&self,
document_id: &str,
) -> FutureResult<Option<DocumentSnapshot>, Error> {
let try_get_postgrest = self.0.try_get_postgrest();
limit: usize,
) -> FutureResult<Vec<DocumentSnapshot>, Error> {
let try_get_postgrest = self.server.try_get_postgrest();
let document_id = document_id.to_string();
FutureResult::new(async move {
let postgrest = try_get_postgrest?;
let snapshot = get_latest_snapshot_from_server(&document_id, postgrest)
let snapshots = get_snapshots_from_server(&document_id, postgrest, limit)
.await?
.into_iter()
.map(|snapshot| DocumentSnapshot {
snapshot_id: snapshot.sid,
document_id: snapshot.oid,
data: snapshot.blob,
created_at: snapshot.created_at,
});
Ok(snapshot)
})
.collect::<Vec<_>>();
Ok(snapshots)
})
}
fn get_document_data(&self, document_id: &str) -> FutureResult<Option<DocumentData>, Error> {
let try_get_postgrest = self.0.try_get_weak_postgrest();
let try_get_postgrest = self.server.try_get_weak_postgrest();
let document_id = document_id.to_string();
let (tx, rx) = channel();
tokio::spawn(async move {

View File

@ -13,17 +13,19 @@ use flowy_folder_deps::cloud::{
use lib_infra::future::FutureResult;
use crate::supabase::api::request::{
get_latest_snapshot_from_server, get_updates_from_server, FetchObjectUpdateAction,
get_snapshots_from_server, get_updates_from_server, FetchObjectUpdateAction,
};
use crate::supabase::api::util::{ExtendedResponse, InsertParamsBuilder};
use crate::supabase::api::SupabaseServerService;
use crate::supabase::define::*;
pub struct SupabaseFolderServiceImpl<T>(T);
pub struct SupabaseFolderServiceImpl<T> {
server: T,
}
impl<T> SupabaseFolderServiceImpl<T> {
pub fn new(server: T) -> Self {
Self(server)
Self { server }
}
}
@ -32,7 +34,7 @@ where
T: SupabaseServerService,
{
fn create_workspace(&self, uid: i64, name: &str) -> FutureResult<Workspace, Error> {
let try_get_postgrest = self.0.try_get_postgrest();
let try_get_postgrest = self.server.try_get_postgrest();
let name = name.to_string();
let new_workspace_id = gen_workspace_id().to_string();
FutureResult::new(async move {
@ -66,44 +68,51 @@ where
}
fn get_folder_data(&self, workspace_id: &str) -> FutureResult<Option<FolderData>, Error> {
let try_get_postgrest = self.0.try_get_postgrest();
let try_get_postgrest = self.server.try_get_postgrest();
let workspace_id = workspace_id.to_string();
FutureResult::new(async move {
let postgrest = try_get_postgrest?;
get_updates_from_server(&workspace_id, &CollabType::Folder, postgrest)
.await
.map(|updates| {
let updates = updates.into_iter().map(|item| item.value).collect();
let folder =
Folder::from_collab_raw_data(CollabOrigin::Empty, updates, &workspace_id, vec![])
.ok()?;
folder.get_folder_data()
})
let updates = get_updates_from_server(&workspace_id, &CollabType::Folder, postgrest).await?;
let updates = updates
.into_iter()
.map(|item| item.value)
.collect::<Vec<_>>();
if updates.is_empty() {
return Ok(None);
}
let folder =
Folder::from_collab_raw_data(CollabOrigin::Empty, updates, &workspace_id, vec![])?;
Ok(folder.get_folder_data())
})
}
fn get_folder_latest_snapshot(
fn get_folder_snapshots(
&self,
workspace_id: &str,
) -> FutureResult<Option<FolderSnapshot>, Error> {
let try_get_postgrest = self.0.try_get_postgrest();
limit: usize,
) -> FutureResult<Vec<FolderSnapshot>, Error> {
let try_get_postgrest = self.server.try_get_postgrest();
let workspace_id = workspace_id.to_string();
FutureResult::new(async move {
let postgrest = try_get_postgrest?;
let snapshot = get_latest_snapshot_from_server(&workspace_id, postgrest)
let snapshots = get_snapshots_from_server(&workspace_id, postgrest, limit)
.await?
.into_iter()
.map(|snapshot| FolderSnapshot {
snapshot_id: snapshot.sid,
database_id: snapshot.oid,
data: snapshot.blob,
created_at: snapshot.created_at,
});
Ok(snapshot)
})
.collect::<Vec<_>>();
Ok(snapshots)
})
}
fn get_folder_updates(&self, workspace_id: &str, _uid: i64) -> FutureResult<Vec<Vec<u8>>, Error> {
let try_get_postgrest = self.0.try_get_weak_postgrest();
let try_get_postgrest = self.server.try_get_weak_postgrest();
let workspace_id = workspace_id.to_string();
let (tx, rx) = channel();
tokio::spawn(async move {

View File

@ -1,21 +1,35 @@
use anyhow::Error;
use parking_lot::RwLock;
use std::ops::Deref;
use std::sync::{Arc, Weak};
use flowy_error::{ErrorCode, FlowyError};
use anyhow::Error;
use parking_lot::RwLock;
use postgrest::Postgrest;
use flowy_error::{ErrorCode, FlowyError};
use flowy_server_config::supabase_config::SupabaseConfiguration;
use crate::AppFlowyEncryption;
/// Creates a wrapper for Postgrest, which allows us to extend the functionality of Postgrest.
pub struct PostgresWrapper(Postgrest);
pub struct PostgresWrapper {
inner: Postgrest,
pub encryption: Weak<dyn AppFlowyEncryption>,
}
impl PostgresWrapper {
pub fn secret(&self) -> Option<String> {
match self.encryption.upgrade() {
None => None,
Some(encryption) => encryption.get_secret(),
}
}
}
impl Deref for PostgresWrapper {
type Target = Postgrest;
fn deref(&self) -> &Self::Target {
&self.0
&self.inner
}
}
@ -24,14 +38,17 @@ pub struct RESTfulPostgresServer {
}
impl RESTfulPostgresServer {
pub fn new(config: SupabaseConfiguration) -> Self {
pub fn new(config: SupabaseConfiguration, encryption: Weak<dyn AppFlowyEncryption>) -> Self {
let url = format!("{}/rest/v1", config.url);
let auth = format!("Bearer {}", config.anon_key);
let postgrest = Postgrest::new(url)
.insert_header("apikey", config.anon_key)
.insert_header("Authorization", auth);
Self {
postgrest: Arc::new(PostgresWrapper(postgrest)),
postgrest: Arc::new(PostgresWrapper {
inner: postgrest,
encryption,
}),
}
}
}
@ -42,6 +59,23 @@ pub trait SupabaseServerService: Send + Sync + 'static {
fn try_get_weak_postgrest(&self) -> Result<Weak<PostgresWrapper>, Error>;
}
impl<T> SupabaseServerService for Arc<T>
where
T: SupabaseServerService,
{
fn get_postgrest(&self) -> Option<Arc<PostgresWrapper>> {
(**self).get_postgrest()
}
fn try_get_postgrest(&self) -> Result<Arc<PostgresWrapper>, Error> {
(**self).try_get_postgrest()
}
fn try_get_weak_postgrest(&self) -> Result<Weak<PostgresWrapper>, Error> {
(**self).try_get_weak_postgrest()
}
}
#[derive(Clone)]
pub struct SupabaseServerServiceImpl(pub Arc<RwLock<Option<Arc<RESTfulPostgresServer>>>>);

View File

@ -16,7 +16,7 @@ use flowy_database_deps::cloud::{CollabObjectUpdate, CollabObjectUpdateByOid};
use lib_infra::util::md5;
use crate::supabase::api::util::{
ExtendedResponse, InsertParamsBuilder, SupabaseBinaryColumnDecoder,
ExtendedResponse, InsertParamsBuilder, SupabaseBinaryColumnDecoder, SupabaseBinaryColumnEncoder,
};
use crate::supabase::api::PostgresWrapper;
use crate::supabase::define::*;
@ -124,69 +124,106 @@ pub async fn create_snapshot(
snapshot: Vec<u8>,
) -> Result<i64, Error> {
let value_size = snapshot.len() as i32;
let snapshot = format!("\\x{}", hex::encode(snapshot));
postgrest
let (snapshot, encrypt) = SupabaseBinaryColumnEncoder::encode(&snapshot, &postgrest.secret())?;
let ret: Value = postgrest
.from(AF_COLLAB_SNAPSHOT_TABLE)
.insert(
InsertParamsBuilder::new()
.insert(AF_COLLAB_SNAPSHOT_OID_COLUMN, object.object_id.clone())
.insert("name", object.ty.to_string())
.insert(AF_COLLAB_SNAPSHOT_ENCRYPT_COLUMN, encrypt)
.insert(AF_COLLAB_SNAPSHOT_BLOB_COLUMN, snapshot)
.insert(AF_COLLAB_SNAPSHOT_BLOB_SIZE_COLUMN, value_size)
.build(),
)
.execute()
.await?
.success()
.get_json()
.await?;
Ok(1)
let snapshot_id = ret
.as_array()
.and_then(|array| array.first())
.and_then(|value| value.get("sid"))
.and_then(|value| value.as_i64())
.unwrap_or(0);
Ok(snapshot_id)
}
pub async fn get_latest_snapshot_from_server(
pub async fn get_snapshots_from_server(
object_id: &str,
postgrest: Arc<PostgresWrapper>,
) -> Result<Option<RemoteCollabSnapshot>, Error> {
let json = postgrest
limit: usize,
) -> Result<Vec<RemoteCollabSnapshot>, Error> {
let json: Value = postgrest
.from(AF_COLLAB_SNAPSHOT_TABLE)
.select(format!(
"{},{},{}",
"{},{},{},{}",
AF_COLLAB_SNAPSHOT_ID_COLUMN,
AF_COLLAB_SNAPSHOT_BLOB_COLUMN,
AF_COLLAB_SNAPSHOT_CREATED_AT_COLUMN
AF_COLLAB_SNAPSHOT_CREATED_AT_COLUMN,
AF_COLLAB_SNAPSHOT_ENCRYPT_COLUMN
))
.order(format!("{}.desc", AF_COLLAB_SNAPSHOT_ID_COLUMN))
.limit(1)
.limit(limit)
.eq(AF_COLLAB_SNAPSHOT_OID_COLUMN, object_id)
.execute()
.await?
.get_json()
.await?;
let snapshot = json
.as_array()
.and_then(|array| array.first())
.and_then(|value| {
let blob = value
.get("blob")
.and_then(|blob| blob.as_str())
.and_then(SupabaseBinaryColumnDecoder::decode)?;
let sid = value.get("sid").and_then(|id| id.as_i64())?;
let created_at = value.get("created_at").and_then(|created_at| {
created_at
.as_str()
.map(|id| DateTime::<Utc>::from_str(id).ok())
.and_then(|date| date)
})?;
let mut snapshots = vec![];
let secret = postgrest.secret();
match json.as_array() {
None => {
if let Some(snapshot) = parser_snapshot(object_id, &json, &secret) {
snapshots.push(snapshot);
}
},
Some(snapshot_values) => {
for snapshot_value in snapshot_values {
if let Some(snapshot) = parser_snapshot(object_id, snapshot_value, &secret) {
snapshots.push(snapshot);
}
}
},
}
Ok(snapshots)
}
Some(RemoteCollabSnapshot {
sid,
oid: object_id.to_string(),
blob,
created_at: created_at.timestamp(),
})
});
Ok(snapshot)
fn parser_snapshot(
object_id: &str,
snapshot: &Value,
secret: &Option<String>,
) -> Option<RemoteCollabSnapshot> {
let blob = match (
snapshot
.get(AF_COLLAB_SNAPSHOT_ENCRYPT_COLUMN)
.and_then(|encrypt| encrypt.as_i64()),
snapshot
.get(AF_COLLAB_SNAPSHOT_BLOB_COLUMN)
.and_then(|value| value.as_str()),
) {
(Some(encrypt), Some(value)) => {
SupabaseBinaryColumnDecoder::decode(value, encrypt as i32, secret).ok()
},
_ => None,
}?;
let sid = snapshot.get("sid").and_then(|id| id.as_i64())?;
let created_at = snapshot.get("created_at").and_then(|created_at| {
created_at
.as_str()
.map(|id| DateTime::<Utc>::from_str(id).ok())
.and_then(|date| date)
})?;
Some(RemoteCollabSnapshot {
sid,
oid: object_id.to_string(),
blob,
created_at: created_at.timestamp(),
})
}
pub async fn batch_get_updates_from_server(
@ -196,7 +233,7 @@ pub async fn batch_get_updates_from_server(
) -> Result<CollabObjectUpdateByOid, Error> {
let json = postgrest
.from(table_name(object_ty))
.select("oid, key, value, md5")
.select("oid, key, value, encrypt, md5")
.order(format!("{}.asc", AF_COLLAB_KEY_COLUMN))
.in_("oid", object_ids)
.execute()
@ -207,15 +244,20 @@ pub async fn batch_get_updates_from_server(
let mut updates_by_oid = CollabObjectUpdateByOid::new();
if let Some(records) = json.as_array() {
for record in records {
tracing::debug!("get updates from server: {:?}", record);
if let Some(oid) = record.get("oid").and_then(|value| value.as_str()) {
if let Ok(updates) = parser_updates_form_json(record.clone()) {
let object_updates = updates_by_oid
.entry(oid.to_string())
.or_insert_with(Vec::new);
tracing::debug!("get updates from server: {:?}", record);
for update in updates {
object_updates.push(update.value);
}
match parser_updates_form_json(record.clone(), &postgrest.secret()) {
Ok(updates) => {
let object_updates = updates_by_oid
.entry(oid.to_string())
.or_insert_with(Vec::new);
for update in updates {
object_updates.push(update.value);
}
},
Err(e) => {
tracing::error!("parser_updates_form_json error: {:?}", e);
},
}
}
}
@ -230,14 +272,14 @@ pub async fn get_updates_from_server(
) -> Result<Vec<UpdateItem>, Error> {
let json = postgrest
.from(table_name(object_ty))
.select("key, value, md5")
.select("key, value, encrypt, md5")
.order(format!("{}.asc", AF_COLLAB_KEY_COLUMN))
.eq("oid", object_id)
.execute()
.await?
.get_json()
.await?;
parser_updates_form_json(json)
parser_updates_form_json(json, &postgrest.secret())
}
/// json format:
@ -245,24 +287,35 @@ pub async fn get_updates_from_server(
/// [
/// {
/// "value": "\\x...",
/// "encrypt": 1,
/// "md5": "..."
/// },
/// {
/// "value": "\\x...",
/// "encrypt": 1,
/// "md5": "..."
/// },
/// ...
/// ]
/// ```
fn parser_updates_form_json(json: Value) -> Result<Vec<UpdateItem>, Error> {
fn parser_updates_form_json(
json: Value,
encryption_secret: &Option<String>,
) -> Result<Vec<UpdateItem>, Error> {
let mut updates = vec![];
match json.as_array() {
None => {
updates.push(parser_update_from_json(&json)?);
updates.push(parser_update_from_json(&json, encryption_secret)?);
},
Some(values) => {
let expected_update_len = values.len();
for value in values {
updates.push(parser_update_from_json(value)?);
updates.push(parser_update_from_json(value, encryption_secret)?);
}
if updates.len() != expected_update_len {
return Err(anyhow::anyhow!(
"The length of the updates does not match the length of the expected updates, indicating that some updates failed to parse."
));
}
},
}
@ -270,11 +323,36 @@ fn parser_updates_form_json(json: Value) -> Result<Vec<UpdateItem>, Error> {
Ok(updates)
}
fn parser_update_from_json(json: &Value) -> Result<UpdateItem, Error> {
let some_record = json
.get("value")
.and_then(|value| value.as_str())
.and_then(SupabaseBinaryColumnDecoder::decode);
/// Parses update from a JSON representation.
///
/// This function attempts to decode an encrypted value from a JSON object
/// and verify its integrity against a provided MD5 hash.
///
/// # Parameters
/// - `json`: The JSON value representing the update information.
/// - `encryption_secret`: An optional encryption secret used for decrypting the value.
///
/// json format:
/// ```json
/// {
/// "value": "\\x...",
/// "encrypt": 1,
/// "md5": "..."
/// },
/// ```
fn parser_update_from_json(
json: &Value,
encryption_secret: &Option<String>,
) -> Result<UpdateItem, Error> {
let some_record = match (
json.get("encrypt").and_then(|encrypt| encrypt.as_i64()),
json.get("value").and_then(|value| value.as_str()),
) {
(Some(encrypt), Some(value)) => {
SupabaseBinaryColumnDecoder::decode(value, encrypt as i32, encryption_secret).ok()
},
_ => None,
};
let some_key = json.get("key").and_then(|value| value.as_i64());
if let (Some(value), Some(key)) = (some_record, some_key) {
@ -282,12 +360,14 @@ fn parser_update_from_json(json: &Value) -> Result<UpdateItem, Error> {
// that we calculated locally.
if let Some(expected_md5) = json.get("md5").and_then(|v| v.as_str()) {
let value_md5 = md5(&value);
debug_assert!(
value_md5 == expected_md5,
"md5 not match: {} != {}",
value_md5,
expected_md5
);
if value_md5 != expected_md5 {
let msg = format!(
"md5 not match: key:{} {} != {}",
key, value_md5, expected_md5
);
tracing::error!("{}", msg);
return Err(anyhow::anyhow!(msg));
}
}
Ok(UpdateItem { key, value })
} else {

View File

@ -88,10 +88,11 @@ where
name: user_name,
latest_workspace: latest_workspace.unwrap(),
user_workspaces,
is_new: is_new_user,
is_new_user,
email: Some(user_profile.email),
token: None,
device_id: params.device_id,
encryption_type: EncryptionType::from_sign(&user_profile.encryption_sign),
})
})
}
@ -102,23 +103,24 @@ where
let postgrest = try_get_postgrest?;
let params = third_party_params_from_box_any(params)?;
let uuid = params.uuid;
let user_profile = get_user_profile(postgrest.clone(), GetUserProfileParams::Uuid(uuid))
let response = get_user_profile(postgrest.clone(), GetUserProfileParams::Uuid(uuid))
.await?
.unwrap();
let user_workspaces = get_user_workspaces(postgrest.clone(), user_profile.uid).await?;
let user_workspaces = get_user_workspaces(postgrest.clone(), response.uid).await?;
let latest_workspace = user_workspaces
.iter()
.find(|user_workspace| user_workspace.id == user_profile.latest_workspace_id)
.find(|user_workspace| user_workspace.id == response.latest_workspace_id)
.cloned();
Ok(SignInResponse {
user_id: user_profile.uid,
user_id: response.uid,
name: DEFAULT_USER_NAME(),
latest_workspace: latest_workspace.unwrap(),
user_workspaces,
email: None,
token: None,
device_id: params.device_id,
encryption_type: EncryptionType::from_sign(&response.encryption_sign),
})
})
}
@ -154,15 +156,16 @@ where
let user_profile_resp = get_user_profile(postgrest, GetUserProfileParams::Uid(uid)).await?;
match user_profile_resp {
None => Ok(None),
Some(user_profile_resp) => Ok(Some(UserProfile {
id: user_profile_resp.uid,
email: user_profile_resp.email,
name: user_profile_resp.name,
Some(response) => Ok(Some(UserProfile {
uid: response.uid,
email: response.email,
name: response.name,
token: "".to_string(),
icon_url: "".to_string(),
openai_key: "".to_string(),
workspace_id: user_profile_resp.latest_workspace_id,
workspace_id: response.latest_workspace_id,
auth_type: AuthType::Supabase,
encryption_type: EncryptionType::from_sign(&response.encryption_sign),
})),
}
})
@ -214,7 +217,7 @@ where
let postgrest = try_get_postgrest?;
let action =
FetchObjectUpdateAction::new(awareness_id, CollabType::UserAwareness, postgrest);
action.run_with_fix_interval(5, 10).await
action.run_with_fix_interval(3, 3).await
}
.await,
)
@ -229,7 +232,7 @@ async fn get_user_profile(
) -> Result<Option<UserProfileResponse>, Error> {
let mut builder = postgrest
.from(USER_PROFILE_VIEW)
.select("uid, email, name, latest_workspace_id");
.select("uid, email, name, encryption_sign, latest_workspace_id");
match params {
GetUserProfileParams::Uid(uid) => builder = builder.eq("uid", uid.to_string()),
@ -245,7 +248,10 @@ async fn get_user_profile(
match profiles.len() {
0 => Ok(None),
1 => Ok(Some(profiles.swap_remove(0))),
_ => unreachable!(),
_ => {
tracing::error!("multiple user profile found");
Ok(None)
},
}
}
@ -276,7 +282,7 @@ async fn update_user_profile(
let exists = !postgrest
.from(USER_TABLE)
.select("uid")
.eq("uid", params.id.to_string())
.eq("uid", params.uid.to_string())
.execute()
.await?
.error_for_status()?
@ -284,9 +290,8 @@ async fn update_user_profile(
.await?
.is_empty();
if !exists {
anyhow::bail!("user uid {} does not exist", params.id);
anyhow::bail!("user uid {} does not exist", params.uid);
}
let mut update_params = serde_json::Map::new();
if let Some(name) = params.name {
update_params.insert("name".to_string(), serde_json::json!(name));
@ -294,18 +299,24 @@ async fn update_user_profile(
if let Some(email) = params.email {
update_params.insert("email".to_string(), serde_json::json!(email));
}
let update_payload = serde_json::to_string(&update_params).unwrap();
if let Some(encrypt_sign) = params.encryption_sign {
update_params.insert(
"encryption_sign".to_string(),
serde_json::json!(encrypt_sign),
);
}
let update_payload = serde_json::to_string(&update_params).unwrap();
let resp = postgrest
.from(USER_TABLE)
.update(update_payload)
.eq("uid", params.id.to_string())
.eq("uid", params.uid.to_string())
.execute()
.await?
.success_with_body()
.await?;
tracing::debug!("update user profile resp: {:?}", resp);
tracing::trace!("update user profile resp: {:?}", resp);
Ok(())
}

View File

@ -1,7 +1,9 @@
use anyhow::Error;
use anyhow::Result;
use reqwest::{Response, StatusCode};
use serde_json::Value;
use flowy_encrypt::{decrypt_bytes, encrypt_bytes};
use flowy_error::{ErrorCode, FlowyError};
use lib_infra::future::{to_fut, Fut};
@ -138,8 +140,20 @@ impl SupabaseBinaryColumnEncoder {
///
/// # Returns
/// Returns the encoded string in the format: `\\xHEX_ENCODED_STRING`
pub fn encode<T: AsRef<[u8]>>(value: T) -> String {
format!("\\x{}", hex::encode(value))
pub fn encode<T: AsRef<[u8]>>(
value: T,
encryption_secret: &Option<String>,
) -> Result<(String, i32)> {
let encrypt = if encryption_secret.is_some() { 1 } else { 0 };
let value = match encryption_secret {
None => hex::encode(value),
Some(encryption_secret) => {
let encrypt_data = encrypt_bytes(value, encryption_secret)?;
hex::encode(encrypt_data)
},
};
Ok((format!("\\x{}", value), encrypt))
}
}
@ -157,9 +171,30 @@ impl SupabaseBinaryColumnDecoder {
/// # Returns
/// Returns an `Option` containing the decoded binary data if decoding is successful.
/// Otherwise, returns `None`.
pub fn decode<T: AsRef<str>>(value: T) -> Option<Vec<u8>> {
let s = value.as_ref().strip_prefix("\\x")?;
hex::decode(s).ok()
pub fn decode<T: AsRef<str>>(
value: T,
encrypt: i32,
encryption_secret: &Option<String>,
) -> Result<Vec<u8>> {
let s = value
.as_ref()
.strip_prefix("\\x")
.ok_or(anyhow::anyhow!("Value is not start with: \\x",))?;
if encrypt == 0 {
let bytes = hex::decode(s)?;
Ok(bytes)
} else {
match encryption_secret {
None => Err(anyhow::anyhow!(
"encryption_secret is None, but encrypt is 1"
)),
Some(encryption_secret) => {
let encrypt_data = hex::decode(s)?;
decrypt_bytes(encrypt_data, encryption_secret)
},
}
}
}
}
@ -178,7 +213,8 @@ impl SupabaseRealtimeEventBinaryColumnDecoder {
/// Returns an `Option` containing the decoded binary data if decoding is successful.
/// Otherwise, returns `None`.
pub fn decode<T: AsRef<str>>(value: T) -> Option<Vec<u8>> {
let bytes = SupabaseBinaryColumnDecoder::decode(value)?;
let s = value.as_ref().strip_prefix("\\x")?;
let bytes = hex::decode(s).ok()?;
hex::decode(bytes).ok()
}
}

View File

@ -5,6 +5,7 @@ pub const AF_COLLAB_KEY_COLUMN: &str = "key";
pub const AF_COLLAB_SNAPSHOT_OID_COLUMN: &str = "oid";
pub const AF_COLLAB_SNAPSHOT_ID_COLUMN: &str = "sid";
pub const AF_COLLAB_SNAPSHOT_BLOB_COLUMN: &str = "blob";
pub const AF_COLLAB_SNAPSHOT_ENCRYPT_COLUMN: &str = "encrypt";
pub const AF_COLLAB_SNAPSHOT_BLOB_SIZE_COLUMN: &str = "blob_size";
pub const AF_COLLAB_SNAPSHOT_CREATED_AT_COLUMN: &str = "created_at";
pub const AF_COLLAB_SNAPSHOT_TABLE: &str = "af_collab_snapshot";
@ -16,6 +17,7 @@ pub const USER_EMAIL: &str = "email";
pub const USER_TABLE: &str = "af_user";
pub const WORKSPACE_TABLE: &str = "af_workspace";
pub const USER_PROFILE_VIEW: &str = "af_user_profile_view";
pub const USER_DEVICE_ID: &str = "device_id";
pub(crate) const WORKSPACE_ID: &str = "workspace_id";
pub(crate) const WORKSPACE_NAME: &str = "workspace_name";

View File

@ -25,6 +25,9 @@ pub(crate) struct UserProfileResponse {
#[serde(deserialize_with = "deserialize_null_or_default")]
pub latest_workspace_id: String,
#[serde(deserialize_with = "deserialize_null_or_default")]
pub encryption_sign: String,
}
#[derive(Debug, Deserialize)]
@ -64,6 +67,8 @@ pub struct RealtimeCollabUpdate {
pub did: String,
#[serde(deserialize_with = "deserialize_value")]
pub value: Vec<u8>,
#[serde(default)]
pub encrypt: i32,
}
pub fn deserialize_value<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>

View File

@ -1,5 +1,5 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use collab_plugins::cloud_storage::{CollabObject, RemoteCollabStorage, RemoteUpdateSender};
use parking_lot::{Mutex, RwLock};
@ -17,7 +17,7 @@ use crate::supabase::api::{
SupabaseUserServiceImpl,
};
use crate::supabase::entities::RealtimeCollabUpdateEvent;
use crate::AppFlowyServer;
use crate::{AppFlowyEncryption, AppFlowyServer};
/// https://www.pgbouncer.org/features.html
/// Only support session mode.
@ -60,13 +60,21 @@ pub struct SupabaseServer {
device_id: Mutex<String>,
update_tx: RwLock<HashMap<String, RemoteUpdateSender>>,
restful_postgres: Arc<RwLock<Option<Arc<RESTfulPostgresServer>>>>,
encryption: Weak<dyn AppFlowyEncryption>,
}
impl SupabaseServer {
pub fn new(config: SupabaseConfiguration) -> Self {
pub fn new(
config: SupabaseConfiguration,
enable_sync: bool,
encryption: Weak<dyn AppFlowyEncryption>,
) -> Self {
let update_tx = RwLock::new(HashMap::new());
let restful_postgres = if config.enable_sync {
Some(Arc::new(RESTfulPostgresServer::new(config.clone())))
let restful_postgres = if enable_sync {
Some(Arc::new(RESTfulPostgresServer::new(
config.clone(),
encryption.clone(),
)))
} else {
None
};
@ -75,6 +83,7 @@ impl SupabaseServer {
device_id: Default::default(),
update_tx,
restful_postgres: Arc::new(RwLock::new(restful_postgres)),
encryption,
}
}
@ -83,8 +92,8 @@ impl SupabaseServer {
if self.restful_postgres.read().is_some() {
return;
}
*self.restful_postgres.write() =
Some(Arc::new(RESTfulPostgresServer::new(self.config.clone())));
let postgres = RESTfulPostgresServer::new(self.config.clone(), self.encryption.clone());
*self.restful_postgres.write() = Some(Arc::new(postgres));
} else {
*self.restful_postgres.write() = None;
}
@ -92,7 +101,7 @@ impl SupabaseServer {
}
impl AppFlowyServer for SupabaseServer {
fn enable_sync(&self, enable: bool) {
fn set_enable_sync(&self, enable: bool) {
tracing::info!("supabase sync: {}", enable);
self.set_enable_sync(enable);
}
@ -134,6 +143,7 @@ impl AppFlowyServer for SupabaseServer {
Some(Arc::new(SupabaseCollabStorageImpl::new(
SupabaseServerServiceImpl(self.restful_postgres.clone()),
Some(rx),
self.encryption.clone(),
)))
}

View File

@ -5,18 +5,19 @@ use flowy_user_deps::entities::SignUpResponse;
use lib_infra::box_any::BoxAny;
use crate::supabase_test::util::{
collab_service, database_service, get_supabase_config, sign_up_param, user_auth_service,
collab_service, database_service, get_supabase_ci_config, third_party_sign_up_param,
user_auth_service,
};
#[tokio::test]
async fn supabase_create_workspace_test() {
if get_supabase_config().is_none() {
if get_supabase_ci_config().is_none() {
return;
}
let user_service = user_auth_service();
let uuid = Uuid::new_v4().to_string();
let params = sign_up_param(uuid);
let params = third_party_sign_up_param(uuid);
let user: SignUpResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
let collab_service = collab_service();

View File

@ -1,8 +1,6 @@
use assert_json_diff::assert_json_eq;
use collab_plugins::cloud_storage::{CollabObject, CollabType};
use futures::future::join_all;
use serde_json::json;
use tokio::task;
use uuid::Uuid;
use yrs::types::ToJson;
use yrs::updates::decoder::Decode;
@ -12,12 +10,13 @@ use flowy_user_deps::entities::SignUpResponse;
use lib_infra::box_any::BoxAny;
use crate::supabase_test::util::{
collab_service, folder_service, get_supabase_config, sign_up_param, user_auth_service,
collab_service, folder_service, get_supabase_ci_config, third_party_sign_up_param,
user_auth_service,
};
#[tokio::test]
async fn supabase_create_workspace_test() {
if get_supabase_config().is_none() {
if get_supabase_ci_config().is_none() {
return;
}
@ -29,7 +28,7 @@ async fn supabase_create_workspace_test() {
#[tokio::test]
async fn supabase_get_folder_test() {
if get_supabase_config().is_none() {
if get_supabase_ci_config().is_none() {
return;
}
@ -37,7 +36,7 @@ async fn supabase_get_folder_test() {
let user_service = user_auth_service();
let collab_service = collab_service();
let uuid = Uuid::new_v4().to_string();
let params = sign_up_param(uuid);
let params = third_party_sign_up_param(uuid);
let user: SignUpResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
let collab_object = CollabObject {
@ -75,26 +74,17 @@ async fn supabase_get_folder_test() {
.unwrap();
assert_eq!(updates.len(), 2);
// The init sync will try to merge the updates into one. Spawn 5 tasks to simulate
// multiple clients trying to init sync at the same time.
let mut handles = Vec::new();
for _ in 0..5 {
let cloned_collab_service = collab_service.clone();
let cloned_collab_object = collab_object.clone();
let handle = task::spawn(async move {
cloned_collab_service
.send_init_sync(&cloned_collab_object, 3, vec![])
.await
.unwrap();
});
handles.push(handle);
collab_service
.send_init_sync(&collab_object, 3, vec![])
.await
.unwrap();
}
let _results: Vec<_> = join_all(handles).await;
// after the init sync, the updates should be merged into one.
let updates: Vec<Vec<u8>> = folder_service
.get_folder_updates(&user.latest_workspace.id, user.user_id)
.await
.unwrap();
assert_eq!(updates.len(), 1);
// Other the init sync, try to get the updates from the server.
let remote_update = updates.first().unwrap().clone();
@ -112,7 +102,7 @@ async fn supabase_get_folder_test() {
/// Finally, it asserts that the duplicated updates don't affect the overall data consistency in Supabase.
#[tokio::test]
async fn supabase_duplicate_updates_test() {
if get_supabase_config().is_none() {
if get_supabase_ci_config().is_none() {
return;
}
@ -120,7 +110,7 @@ async fn supabase_duplicate_updates_test() {
let user_service = user_auth_service();
let collab_service = collab_service();
let uuid = Uuid::new_v4().to_string();
let params = sign_up_param(uuid);
let params = third_party_sign_up_param(uuid);
let user: SignUpResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
let collab_object = CollabObject {
@ -206,9 +196,20 @@ async fn supabase_duplicate_updates_test() {
}
}
/// The state vector of doc;
/// ```json
/// "map": {},
/// "array": []
/// ```
/// The old version of doc:
/// ```json
/// "map": {}
/// ```
///
/// Try to apply the updates from doc to old version doc and check the result.
#[tokio::test]
async fn supabase_diff_state_vec_test() {
if get_supabase_config().is_none() {
async fn supabase_diff_state_vector_test() {
if get_supabase_ci_config().is_none() {
return;
}
@ -216,7 +217,7 @@ async fn supabase_diff_state_vec_test() {
let user_service = user_auth_service();
let collab_service = collab_service();
let uuid = Uuid::new_v4().to_string();
let params = sign_up_param(uuid);
let params = third_party_sign_up_param(uuid);
let user: SignUpResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
let collab_object = CollabObject {
@ -278,3 +279,22 @@ async fn supabase_diff_state_vec_test() {
})
);
}
// #[tokio::test]
// async fn print_folder_object_test() {
// if get_supabase_dev_config().is_none() {
// return;
// }
// let secret = Some("43bSxEPHeNkk5ZxxEYOfAjjd7sK2DJ$vVnxwuNc5ru0iKFvhs8wLg==".to_string());
// print_encryption_folder("f8b14b84-e8ec-4cf4-a318-c1e008ecfdfa", secret).await;
// }
//
// #[tokio::test]
// async fn print_folder_snapshot_object_test() {
// if get_supabase_dev_config().is_none() {
// return;
// }
// let secret = Some("NTXRXrDSybqFEm32jwMBDzbxvCtgjU$8np3TGywbBdJAzHtu1QIyQ==".to_string());
// // let secret = None;
// print_encryption_folder_snapshot("12533251-bdd4-41f4-995f-ff12fceeaa42", secret).await;
// }

View File

@ -1,19 +1,22 @@
use uuid::Uuid;
use flowy_encrypt::{encrypt_string, generate_encrypt_secret};
use flowy_user_deps::entities::*;
use lib_infra::box_any::BoxAny;
use crate::supabase_test::util::{get_supabase_config, sign_up_param, user_auth_service};
use crate::supabase_test::util::{
get_supabase_ci_config, third_party_sign_up_param, user_auth_service,
};
// ‼️‼️‼️ Warning: this test will create a table in the database
#[tokio::test]
async fn supabase_user_sign_up_test() {
if get_supabase_config().is_none() {
if get_supabase_ci_config().is_none() {
return;
}
let user_service = user_auth_service();
let uuid = Uuid::new_v4().to_string();
let params = sign_up_param(uuid);
let params = third_party_sign_up_param(uuid);
let user: SignUpResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
assert!(!user.latest_workspace.id.is_empty());
assert!(!user.user_workspaces.is_empty());
@ -22,12 +25,12 @@ async fn supabase_user_sign_up_test() {
#[tokio::test]
async fn supabase_user_sign_up_with_existing_uuid_test() {
if get_supabase_config().is_none() {
if get_supabase_ci_config().is_none() {
return;
}
let user_service = user_auth_service();
let uuid = Uuid::new_v4().to_string();
let params = sign_up_param(uuid);
let params = third_party_sign_up_param(uuid);
let _user: SignUpResponse = user_service
.sign_up(BoxAny::new(params.clone()))
.await
@ -40,12 +43,12 @@ async fn supabase_user_sign_up_with_existing_uuid_test() {
#[tokio::test]
async fn supabase_update_user_profile_test() {
if get_supabase_config().is_none() {
if get_supabase_ci_config().is_none() {
return;
}
let user_service = user_auth_service();
let uuid = Uuid::new_v4().to_string();
let params = sign_up_param(uuid);
let params = third_party_sign_up_param(uuid);
let user: SignUpResponse = user_service
.sign_up(BoxAny::new(params.clone()))
.await
@ -55,13 +58,13 @@ async fn supabase_update_user_profile_test() {
.update_user(
UserCredentials::from_uid(user.user_id),
UpdateUserProfileParams {
id: user.user_id,
auth_type: Default::default(),
uid: user.user_id,
name: Some("123".to_string()),
email: Some(format!("{}@test.com", Uuid::new_v4())),
password: None,
icon_url: None,
openai_key: None,
encryption_sign: None,
},
)
.await
@ -78,12 +81,12 @@ async fn supabase_update_user_profile_test() {
#[tokio::test]
async fn supabase_get_user_profile_test() {
if get_supabase_config().is_none() {
if get_supabase_ci_config().is_none() {
return;
}
let user_service = user_auth_service();
let uuid = Uuid::new_v4().to_string();
let params = sign_up_param(uuid);
let params = third_party_sign_up_param(uuid);
let user: SignUpResponse = user_service
.sign_up(BoxAny::new(params.clone()))
.await
@ -99,7 +102,7 @@ async fn supabase_get_user_profile_test() {
#[tokio::test]
async fn supabase_get_not_exist_user_profile_test() {
if get_supabase_config().is_none() {
if get_supabase_ci_config().is_none() {
return;
}
@ -111,3 +114,37 @@ async fn supabase_get_not_exist_user_profile_test() {
// user not found
assert!(result.is_none());
}
#[tokio::test]
async fn user_encryption_sign_test() {
if get_supabase_ci_config().is_none() {
return;
}
let user_service = user_auth_service();
let uuid = Uuid::new_v4().to_string();
let params = third_party_sign_up_param(uuid);
let user: SignUpResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
// generate encryption sign
let secret = generate_encrypt_secret();
let sign = encrypt_string(user.user_id.to_string(), &secret).unwrap();
user_service
.update_user(
UserCredentials::from_uid(user.user_id),
UpdateUserProfileParams::new(user.user_id)
.with_encryption_type(EncryptionType::SelfEncryption(sign.clone())),
)
.await
.unwrap();
let user_profile: UserProfile = user_service
.get_user_profile(UserCredentials::from_uid(user.user_id))
.await
.unwrap()
.unwrap();
assert_eq!(
user_profile.encryption_type,
EncryptionType::SelfEncryption(sign)
);
}

View File

@ -1,66 +1,123 @@
use std::collections::HashMap;
use std::sync::Arc;
use collab::core::collab::MutexCollab;
use collab::core::origin::CollabOrigin;
use collab_plugins::cloud_storage::RemoteCollabStorage;
use uuid::Uuid;
use flowy_database_deps::cloud::DatabaseCloudService;
use flowy_folder_deps::cloud::FolderCloudService;
use flowy_folder_deps::cloud::{Folder, FolderCloudService};
use flowy_server::supabase::api::{
RESTfulPostgresServer, SupabaseCollabStorageImpl, SupabaseDatabaseServiceImpl,
SupabaseFolderServiceImpl, SupabaseServerServiceImpl, SupabaseUserServiceImpl,
};
use flowy_server::supabase::define::{USER_EMAIL, USER_UUID};
use flowy_server::supabase::define::{USER_DEVICE_ID, USER_EMAIL, USER_UUID};
use flowy_server::{AppFlowyEncryption, EncryptionImpl};
use flowy_server_config::supabase_config::SupabaseConfiguration;
use flowy_user_deps::cloud::UserService;
use crate::setup_log;
pub fn get_supabase_config() -> Option<SupabaseConfiguration> {
dotenv::from_filename("./.env.test").ok()?;
pub fn get_supabase_ci_config() -> Option<SupabaseConfiguration> {
dotenv::from_filename("./.env.ci").ok()?;
setup_log();
SupabaseConfiguration::from_env().ok()
}
#[allow(dead_code)]
pub fn get_supabase_dev_config() -> Option<SupabaseConfiguration> {
dotenv::from_filename("./.env.dev").ok()?;
setup_log();
SupabaseConfiguration::from_env().ok()
}
pub fn collab_service() -> Arc<dyn RemoteCollabStorage> {
let config = SupabaseConfiguration::from_env().unwrap();
let server = Arc::new(RESTfulPostgresServer::new(config));
let (server, encryption_impl) = appflowy_server(None);
Arc::new(SupabaseCollabStorageImpl::new(
SupabaseServerServiceImpl::new(server),
server,
None,
Arc::downgrade(&encryption_impl),
))
}
pub fn database_service() -> Arc<dyn DatabaseCloudService> {
let config = SupabaseConfiguration::from_env().unwrap();
let server = Arc::new(RESTfulPostgresServer::new(config));
Arc::new(SupabaseDatabaseServiceImpl::new(
SupabaseServerServiceImpl::new(server),
))
let (server, _encryption_impl) = appflowy_server(None);
Arc::new(SupabaseDatabaseServiceImpl::new(server))
}
pub fn user_auth_service() -> Arc<dyn UserService> {
let config = SupabaseConfiguration::from_env().unwrap();
let server = Arc::new(RESTfulPostgresServer::new(config));
Arc::new(SupabaseUserServiceImpl::new(
SupabaseServerServiceImpl::new(server),
))
let (server, _encryption_impl) = appflowy_server(None);
Arc::new(SupabaseUserServiceImpl::new(server))
}
pub fn folder_service() -> Arc<dyn FolderCloudService> {
let config = SupabaseConfiguration::from_env().unwrap();
let server = Arc::new(RESTfulPostgresServer::new(config));
Arc::new(SupabaseFolderServiceImpl::new(
SupabaseServerServiceImpl::new(server),
))
let (server, _encryption_impl) = appflowy_server(None);
Arc::new(SupabaseFolderServiceImpl::new(server))
}
pub fn sign_up_param(uuid: String) -> HashMap<String, String> {
#[allow(dead_code)]
pub fn encryption_folder_service(
secret: Option<String>,
) -> (Arc<dyn FolderCloudService>, Arc<dyn AppFlowyEncryption>) {
let (server, encryption_impl) = appflowy_server(secret);
let service = Arc::new(SupabaseFolderServiceImpl::new(server));
(service, encryption_impl)
}
pub fn encryption_collab_service(
secret: Option<String>,
) -> (Arc<dyn RemoteCollabStorage>, Arc<dyn AppFlowyEncryption>) {
let (server, encryption_impl) = appflowy_server(secret);
let service = Arc::new(SupabaseCollabStorageImpl::new(
server,
None,
Arc::downgrade(&encryption_impl),
));
(service, encryption_impl)
}
pub async fn print_encryption_folder(folder_id: &str, encryption_secret: Option<String>) {
let (cloud_service, _encryption) = encryption_folder_service(encryption_secret);
let folder_data = cloud_service.get_folder_data(folder_id).await.unwrap();
let json = serde_json::to_value(folder_data).unwrap();
println!("{}", serde_json::to_string_pretty(&json).unwrap());
}
pub async fn print_encryption_folder_snapshot(folder_id: &str, encryption_secret: Option<String>) {
let (cloud_service, _encryption) = encryption_collab_service(encryption_secret);
let snapshot = cloud_service
.get_snapshots(folder_id, 1)
.await
.pop()
.unwrap();
let collab = Arc::new(
MutexCollab::new_with_raw_data(CollabOrigin::Empty, folder_id, vec![snapshot.blob], vec![])
.unwrap(),
);
let folder_data = Folder::open(collab, None).get_folder_data().unwrap();
let json = serde_json::to_value(folder_data).unwrap();
println!("{}", serde_json::to_string_pretty(&json).unwrap());
}
pub fn appflowy_server(
encryption_secret: Option<String>,
) -> (SupabaseServerServiceImpl, Arc<dyn AppFlowyEncryption>) {
let config = SupabaseConfiguration::from_env().unwrap();
let encryption_impl: Arc<dyn AppFlowyEncryption> =
Arc::new(EncryptionImpl::new(encryption_secret));
let encryption = Arc::downgrade(&encryption_impl);
let server = Arc::new(RESTfulPostgresServer::new(config, encryption));
(SupabaseServerServiceImpl::new(server), encryption_impl)
}
pub fn third_party_sign_up_param(uuid: String) -> HashMap<String, String> {
let mut params = HashMap::new();
params.insert(USER_UUID.to_string(), uuid);
params.insert(
USER_EMAIL.to_string(),
format!("{}@test.com", Uuid::new_v4()),
);
params.insert(USER_DEVICE_ID.to_string(), Uuid::new_v4().to_string());
params
}