feat File storage (#3306)

* refactor: file upload

* refactor: support upload plan

* test: add tests
This commit is contained in:
Nathan.fooo 2023-09-01 22:27:29 +08:00 committed by GitHub
parent df8642d446
commit c652c32575
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 844 additions and 306 deletions

View File

@ -1376,6 +1376,7 @@ dependencies = [
"flowy-server",
"flowy-server-config",
"flowy-sqlite",
"flowy-storage",
"flowy-task",
"flowy-user",
"flowy-user-deps",
@ -1485,6 +1486,7 @@ dependencies = [
"flowy-document-deps",
"flowy-error",
"flowy-notification",
"flowy-storage",
"futures",
"indexmap",
"lib-dispatch",
@ -1538,6 +1540,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-postgres",
"url",
]
[[package]]
@ -1685,12 +1688,15 @@ dependencies = [
name = "flowy-storage"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"flowy-error",
"lib-infra",
"mime_guess",
"reqwest",
"serde",
"serde_json",
"url",
]
[[package]]
@ -1731,6 +1737,7 @@ dependencies = [
"flowy-notification",
"flowy-server",
"flowy-server-config",
"flowy-storage",
"flowy-user",
"flowy-user-deps",
"futures-util",
@ -2230,10 +2237,11 @@ dependencies = [
[[package]]
name = "hyper-rustls"
version = "0.23.2"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c"
checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97"
dependencies = [
"futures-util",
"http",
"hyper",
"rustls",
@ -3730,9 +3738,9 @@ dependencies = [
[[package]]
name = "reqwest"
version = "0.11.16"
version = "0.11.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "27b71749df584b7f4cac2c426c127a7c785a5106cc98f7a8feb044115f0fa254"
checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1"
dependencies = [
"base64 0.21.2",
"bytes",
@ -3897,14 +3905,14 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.20.8"
version = "0.21.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f"
checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8"
dependencies = [
"log",
"ring",
"rustls-webpki",
"sct",
"webpki",
]
[[package]]
@ -3916,6 +3924,16 @@ dependencies = [
"base64 0.21.2",
]
[[package]]
name = "rustls-webpki"
version = "0.101.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d93931baf2d282fff8d3a532bbfd7653f734643161b87e3e01e59a04439bf0d"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "rustversion"
version = "1.0.12"
@ -4548,13 +4566,12 @@ dependencies = [
[[package]]
name = "tokio-rustls"
version = "0.23.4"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081"
dependencies = [
"rustls",
"tokio",
"webpki",
]
[[package]]
@ -5122,9 +5139,9 @@ checksum = "0046fef7e28c3804e5e38bfa31ea2a0f73905319b677e57ebe37e49358989b5d"
[[package]]
name = "wasm-streams"
version = "0.2.3"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bbae3363c08332cadccd13b67db371814cd214c2524020932f0804b8cf7c078"
checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7"
dependencies = [
"futures-util",
"js-sys",
@ -5143,24 +5160,11 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "webpki"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "webpki-roots"
version = "0.22.6"
version = "0.25.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87"
dependencies = [
"webpki",
]
checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc"
[[package]]
name = "which"
@ -5362,11 +5366,12 @@ checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
[[package]]
name = "winreg"
version = "0.10.1"
version = "0.50.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d"
checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1"
dependencies = [
"winapi",
"cfg-if",
"windows-sys 0.48.0",
]
[[package]]

View File

@ -26,6 +26,7 @@ flowy-config = { path = "../flowy-config" }
appflowy-integrate = { version = "0.1.0", features = ["postgres_storage_plugin", "snapshot_plugin"] }
diesel = { version = "1.4.8", features = ["sqlite"] }
uuid = { version = "1.3.3", features = ["v4"] }
flowy-storage = { path = "../flowy-storage" }
tracing = { version = "0.1", features = ["log"] }
futures-core = { version = "0.3", default-features = false }

View File

@ -7,6 +7,7 @@ use flowy_database2::DatabaseManager;
use flowy_document2::manager::{DocumentManager, DocumentUser};
use flowy_document_deps::cloud::DocumentCloudService;
use flowy_error::FlowyError;
use flowy_storage::FileStorageService;
use flowy_user::manager::UserManager;
pub struct DocumentDepsResolver();
@ -16,12 +17,14 @@ impl DocumentDepsResolver {
_database_manager: &Arc<DatabaseManager>,
collab_builder: Arc<AppFlowyCollabBuilder>,
cloud_service: Arc<dyn DocumentCloudService>,
storage_service: Weak<dyn FileStorageService>,
) -> Arc<DocumentManager> {
let user: Arc<dyn DocumentUser> = Arc::new(DocumentUserImpl(user_manager));
Arc::new(DocumentManager::new(
user.clone(),
collab_builder,
cloud_service,
storage_service,
))
}
}

View File

@ -4,6 +4,7 @@ use std::sync::{Arc, Weak};
use appflowy_integrate::collab_builder::{CollabStorageProvider, CollabStorageType};
use appflowy_integrate::{CollabObject, CollabType, RemoteCollabStorage, YrsDocAction};
use bytes::Bytes;
use parking_lot::RwLock;
use serde_repr::*;
@ -19,6 +20,7 @@ use flowy_server::supabase::SupabaseServer;
use flowy_server::{AppFlowyEncryption, AppFlowyServer, EncryptionImpl};
use flowy_server_config::supabase_config::SupabaseConfiguration;
use flowy_sqlite::kv::StorePreferences;
use flowy_storage::{FileStorageService, StorageObject};
use flowy_user::event_map::UserCloudServiceProvider;
use flowy_user::services::database::{
get_user_profile, get_user_workspace, open_collab_db, open_user_db,
@ -63,12 +65,14 @@ impl Display for ServerProviderType {
pub struct AppFlowyServerProvider {
config: AppFlowyCoreConfig,
provider_type: RwLock<ServerProviderType>,
device_id: Arc<RwLock<String>>,
providers: RwLock<HashMap<ServerProviderType, Arc<dyn AppFlowyServer>>>,
enable_sync: RwLock<bool>,
encryption: RwLock<Arc<dyn AppFlowyEncryption>>,
store_preferences: Weak<StorePreferences>,
cache_user_service: RwLock<HashMap<ServerProviderType, Arc<dyn UserCloudService>>>,
device_id: Arc<RwLock<String>>,
enable_sync: RwLock<bool>,
uid: Arc<RwLock<Option<i64>>>,
}
impl AppFlowyServerProvider {
@ -87,13 +91,10 @@ impl AppFlowyServerProvider {
encryption: RwLock::new(Arc::new(encryption)),
store_preferences,
cache_user_service: Default::default(),
uid: Default::default(),
}
}
pub fn set_sync_device(&self, device_id: &str) {
*self.device_id.write() = device_id.to_string();
}
pub fn provider_type(&self) -> ServerProviderType {
self.provider_type.read().clone()
}
@ -137,10 +138,11 @@ impl AppFlowyServerProvider {
return Err(e);
},
};
let uid = self.uid.clone();
tracing::trace!("🔑Supabase config: {:?}", config);
let encryption = Arc::downgrade(&*self.encryption.read());
Ok::<Arc<dyn AppFlowyServer>, FlowyError>(Arc::new(SupabaseServer::new(
uid,
config,
*self.enable_sync.read(),
self.device_id.clone(),
@ -157,12 +159,39 @@ impl AppFlowyServerProvider {
}
}
impl FileStorageService for AppFlowyServerProvider {
fn create_object(&self, object: StorageObject) -> FutureResult<String, FlowyError> {
let server = self.get_provider(&self.provider_type.read());
FutureResult::new(async move {
let storage = server?.file_storage().ok_or(FlowyError::internal())?;
storage.create_object(object).await
})
}
fn delete_object_by_url(&self, object_url: String) -> FutureResult<(), FlowyError> {
let server = self.get_provider(&self.provider_type.read());
FutureResult::new(async move {
let storage = server?.file_storage().ok_or(FlowyError::internal())?;
storage.delete_object_by_url(object_url).await
})
}
fn get_object_by_url(&self, object_url: String) -> FutureResult<Bytes, FlowyError> {
let server = self.get_provider(&self.provider_type.read());
FutureResult::new(async move {
let storage = server?.file_storage().ok_or(FlowyError::internal())?;
storage.get_object_by_url(object_url).await
})
}
}
impl UserCloudServiceProvider for AppFlowyServerProvider {
fn set_enable_sync(&self, enable_sync: bool) {
fn set_enable_sync(&self, uid: i64, enable_sync: bool) {
match self.get_provider(&self.provider_type.read()) {
Ok(server) => {
server.set_enable_sync(enable_sync);
server.set_enable_sync(uid, enable_sync);
*self.enable_sync.write() = enable_sync;
*self.uid.write() = Some(uid);
},
Err(e) => tracing::error!("🔴Failed to enable sync: {:?}", e),
}

View File

@ -18,6 +18,7 @@ use flowy_document2::manager::DocumentManager;
use flowy_error::FlowyResult;
use flowy_folder2::manager::{FolderInitializeDataSource, FolderManager};
use flowy_sqlite::kv::StorePreferences;
use flowy_storage::FileStorageService;
use flowy_task::{TaskDispatcher, TaskRunner};
use flowy_user::event_map::{UserCloudServiceProvider, UserStatusCallback};
use flowy_user::manager::{UserManager, UserSessionConfig};
@ -185,6 +186,7 @@ impl AppFlowyCore {
&database_manager,
collab_builder.clone(),
server_provider.clone(),
Arc::downgrade(&(server_provider.clone() as Arc<dyn FileStorageService>)),
);
let folder_manager = FolderDepsResolver::resolve(
@ -295,7 +297,6 @@ impl UserStatusCallback for UserStatusCallbackImpl {
user_workspace: &UserWorkspace,
_device_id: &str,
) -> Fut<FlowyResult<()>> {
let user_id = user_id.to_owned();
let user_workspace = user_workspace.clone();
let collab_builder = self.collab_builder.clone();
let folder_manager = self.folder_manager.clone();
@ -305,7 +306,7 @@ impl UserStatusCallback for UserStatusCallbackImpl {
if let Some(cloud_config) = cloud_config {
self
.server_provider
.set_enable_sync(cloud_config.enable_sync);
.set_enable_sync(user_id, cloud_config.enable_sync);
if cloud_config.enable_encrypt() {
self
.server_provider

View File

@ -10,6 +10,7 @@ collab = { version = "0.1.0" }
collab-document = { version = "0.1.0" }
appflowy-integrate = {version = "0.1.0" }
flowy-document-deps = { path = "../flowy-document-deps" }
flowy-storage = { path = "../flowy-storage" }
flowy-derive = { path = "../../../shared-lib/flowy-derive" }
flowy-notification = { path = "../flowy-notification" }

View File

@ -12,6 +12,7 @@ use parking_lot::RwLock;
use flowy_document_deps::cloud::DocumentCloudService;
use flowy_error::{internal_error, FlowyError, FlowyResult};
use flowy_storage::FileStorageService;
use crate::document::MutexDocument;
use crate::entities::DocumentSnapshotPB;
@ -28,6 +29,7 @@ pub struct DocumentManager {
documents: Arc<RwLock<HashMap<String, Arc<MutexDocument>>>>,
#[allow(dead_code)]
cloud_service: Arc<dyn DocumentCloudService>,
storage_service: Weak<dyn FileStorageService>,
}
impl DocumentManager {
@ -35,12 +37,14 @@ impl DocumentManager {
user: Arc<dyn DocumentUser>,
collab_builder: Arc<AppFlowyCollabBuilder>,
cloud_service: Arc<dyn DocumentCloudService>,
storage_service: Weak<dyn FileStorageService>,
) -> Self {
Self {
user,
collab_builder,
documents: Default::default(),
cloud_service,
storage_service,
}
}
@ -179,4 +183,9 @@ impl DocumentManager {
pub fn get_cloud_service(&self) -> &Arc<dyn DocumentCloudService> {
&self.cloud_service
}
/// Only expose this method for testing
#[cfg(debug_assertions)]
pub fn get_file_storage_service(&self) -> &Weak<dyn FileStorageService> {
&self.storage_service
}
}

View File

@ -4,6 +4,7 @@ use std::sync::Arc;
use anyhow::Error;
use appflowy_integrate::collab_builder::{AppFlowyCollabBuilder, DefaultCollabStorageProvider};
use appflowy_integrate::RocksCollabDB;
use bytes::Bytes;
use collab_document::blocks::DocumentData;
use collab_document::document_data::default_document_data;
use nanoid::nanoid;
@ -14,6 +15,8 @@ use tracing_subscriber::{fmt::Subscriber, util::SubscriberInitExt, EnvFilter};
use flowy_document2::document::MutexDocument;
use flowy_document2::manager::{DocumentManager, DocumentUser};
use flowy_document_deps::cloud::*;
use flowy_error::FlowyError;
use flowy_storage::{FileStorageService, StorageObject};
use lib_infra::future::FutureResult;
pub struct DocumentTest {
@ -24,7 +27,13 @@ impl DocumentTest {
pub fn new() -> Self {
let user = FakeUser::new();
let cloud_service = Arc::new(LocalTestDocumentCloudServiceImpl());
let manager = DocumentManager::new(Arc::new(user), default_collab_builder(), cloud_service);
let file_storage = Arc::new(DocumentTestFileStorageService) as Arc<dyn FileStorageService>;
let manager = DocumentManager::new(
Arc::new(user),
default_collab_builder(),
cloud_service,
Arc::downgrade(&file_storage),
);
Self { inner: manager }
}
}
@ -129,3 +138,18 @@ impl DocumentCloudService for LocalTestDocumentCloudServiceImpl {
FutureResult::new(async move { Ok(None) })
}
}
pub struct DocumentTestFileStorageService;
impl FileStorageService for DocumentTestFileStorageService {
fn create_object(&self, _object: StorageObject) -> FutureResult<String, FlowyError> {
todo!()
}
fn delete_object_by_url(&self, _object_url: String) -> FutureResult<(), FlowyError> {
todo!()
}
fn get_object_by_url(&self, _object_url: String) -> FutureResult<Bytes, FlowyError> {
todo!()
}
}

View File

@ -20,6 +20,7 @@ reqwest = { version = "0.11.14", optional = true, features = ["native-tls-vendor
http-error-code = { git = "https://github.com/AppFlowy-IO/AppFlowy-Server", branch = "refactor/appflowy_server", optional = true }
flowy-sqlite = { path = "../flowy-sqlite", optional = true}
r2d2 = { version = "0.8", optional = true}
url = { version = "2.2", optional = true }
collab-database = { version = "0.1.0", optional = true }
collab-document = { version = "0.1.0", optional = true }
tokio-postgres = { version = "0.7.8", optional = true }
@ -34,6 +35,7 @@ impl_from_appflowy_cloud = ["http-error-code"]
impl_from_collab = ["collab-database", "collab-document", "impl_from_reqwest"]
impl_from_postgres = ["tokio-postgres"]
impl_from_tokio= ["tokio"]
impl_from_url= ["url"]
dart = ["flowy-codegen/dart"]
ts = ["flowy-codegen/ts"]

View File

@ -232,6 +232,12 @@ pub enum ErrorCode {
#[error("It appears that the workspace data has not been fully synchronized")]
WorkspaceDataNotSync = 76,
#[error("Excess storage limited")]
ExcessStorageLimited = 77,
#[error("Parse url failed")]
InvalidURL = 78,
}
impl ErrorCode {

View File

@ -24,3 +24,6 @@ mod postgres;
#[cfg(feature = "impl_from_tokio")]
mod tokio;
#[cfg(feature = "impl_from_url")]
mod url;

View File

@ -0,0 +1,7 @@
use crate::{ErrorCode, FlowyError};
impl std::convert::From<url::ParseError> for FlowyError {
fn from(error: url::ParseError) -> Self {
FlowyError::new(ErrorCode::InvalidURL, "").with_context(error)
}
}

View File

@ -9,7 +9,7 @@ edition = "2021"
tracing = { version = "0.1" }
futures = "0.3.26"
futures-util = "0.3.26"
reqwest = { version = "0.11.14", features = ["native-tls-vendored", "multipart"] }
reqwest = { version = "0.11.20", features = ["native-tls-vendored", "multipart","blocking"] }
hyper = "0.14"
config = { version = "0.10.1", default-features = false, features = ["yaml"] }
serde = { version = "1.0", features = ["derive"] }
@ -34,7 +34,7 @@ flowy-user-deps = { path = "../flowy-user-deps" }
flowy-folder-deps = { path = "../flowy-folder-deps" }
flowy-database-deps = { path = "../flowy-database-deps" }
flowy-document-deps = { path = "../flowy-document-deps" }
flowy-error = { path = "../flowy-error", features = ["impl_from_postgres", "impl_from_serde", "impl_from_reqwest"] }
flowy-error = { path = "../flowy-error", features = ["impl_from_postgres", "impl_from_serde", "impl_from_reqwest", "impl_from_url"] }
flowy-server-config = { path = "../flowy-server-config" }
flowy-encrypt = { path = "../flowy-encrypt" }
flowy-storage = { path = "../flowy-storage" }

View File

@ -5,6 +5,7 @@ use collab_plugins::cloud_storage::{CollabObject, RemoteCollabStorage};
use flowy_database_deps::cloud::DatabaseCloudService;
use flowy_document_deps::cloud::DocumentCloudService;
use flowy_folder_deps::cloud::FolderCloudService;
use flowy_storage::FileStorageService;
use flowy_user_deps::cloud::UserCloudService;
use crate::af_cloud::configuration::AFCloudConfiguration;
@ -44,4 +45,8 @@ impl AppFlowyServer for AFCloudServer {
fn collab_storage(&self, _collab_object: &CollabObject) -> Option<Arc<dyn RemoteCollabStorage>> {
None
}
fn file_storage(&self) -> Option<Arc<dyn FileStorageService>> {
None
}
}

View File

@ -8,6 +8,7 @@ use flowy_database_deps::cloud::DatabaseCloudService;
use flowy_document_deps::cloud::DocumentCloudService;
use flowy_error::FlowyError;
use flowy_folder_deps::cloud::FolderCloudService;
use flowy_storage::FileStorageService;
// use flowy_user::services::database::{
// get_user_profile, get_user_workspace, open_collab_db, open_user_db,
// };
@ -71,4 +72,8 @@ impl AppFlowyServer for LocalServer {
fn collab_storage(&self, _collab_object: &CollabObject) -> Option<Arc<dyn RemoteCollabStorage>> {
None
}
fn file_storage(&self) -> Option<Arc<dyn FileStorageService>> {
None
}
}

View File

@ -6,7 +6,7 @@ use parking_lot::RwLock;
use flowy_database_deps::cloud::DatabaseCloudService;
use flowy_document_deps::cloud::DocumentCloudService;
use flowy_folder_deps::cloud::FolderCloudService;
use flowy_storage::core::FileStorageService;
use flowy_storage::FileStorageService;
use flowy_user_deps::cloud::UserCloudService;
pub trait AppFlowyEncryption: Send + Sync + 'static {
@ -36,7 +36,7 @@ pub trait AppFlowyServer: Send + Sync + 'static {
/// # Arguments
///
/// * `_enable` - A boolean to toggle the server synchronization.
fn set_enable_sync(&self, _enable: bool) {}
fn set_enable_sync(&self, _uid: i64, _enable: bool) {}
/// Provides access to cloud-based user management functionalities. This includes operations
/// such as user registration, authentication, profile management, and handling of user workspaces.
@ -86,9 +86,7 @@ pub trait AppFlowyServer: Send + Sync + 'static {
/// An `Option` that might contain an `Arc` wrapping the `RemoteCollabStorage` interface.
fn collab_storage(&self, collab_object: &CollabObject) -> Option<Arc<dyn RemoteCollabStorage>>;
fn file_storage(&self) -> Option<Arc<dyn FileStorageService>> {
None
}
fn file_storage(&self) -> Option<Arc<dyn FileStorageService>>;
}
pub struct EncryptionImpl {

View File

@ -0,0 +1,154 @@
use std::borrow::Cow;
use anyhow::Error;
use hyper::header::CONTENT_TYPE;
use reqwest::header::IntoHeaderName;
use reqwest::multipart::{Form, Part};
use reqwest::{
header::{HeaderMap, HeaderValue},
Client, Method, RequestBuilder,
};
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use url::Url;
use flowy_storage::StorageObject;
use crate::supabase::file_storage::{DeleteObjects, FileOptions, NewBucket, RequestBody};
pub struct StorageRequestBuilder {
pub url: Url,
headers: HeaderMap,
client: Client,
method: Method,
body: RequestBody,
}
impl StorageRequestBuilder {
pub fn new(url: Url, headers: HeaderMap, client: Client) -> Self {
Self {
url,
headers,
client,
method: Method::GET,
body: RequestBody::Empty,
}
}
pub fn with_header(mut self, key: impl IntoHeaderName, value: HeaderValue) -> Self {
self.headers.insert(key, value);
self
}
pub fn get_buckets(mut self) -> Self {
self.method = Method::GET;
self.url.path_segments_mut().unwrap().push("bucket");
self
}
pub fn create_bucket(mut self, bucket_name: &str) -> Self {
self.method = Method::POST;
self
.headers
.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
self.url.path_segments_mut().unwrap().push("bucket");
let bucket = serde_json::to_string(&NewBucket::new(bucket_name.to_string())).unwrap();
self.body = RequestBody::BodyString { text: bucket };
self
}
pub fn delete_object(mut self, bucket_id: &str, object: &str) -> Self {
self.method = Method::DELETE;
self
.headers
.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
let delete_objects = DeleteObjects::new(vec![object.to_string()]);
let text = serde_json::to_string(&delete_objects).unwrap();
self.body = RequestBody::BodyString { text };
self
.url
.path_segments_mut()
.unwrap()
.push("object")
.push(bucket_id)
.push(object);
self
}
pub fn get_object(mut self, bucket_name: &str, object: &str) -> Self {
self.method = Method::GET;
self
.url
.path_segments_mut()
.unwrap()
.push("object")
.push(bucket_name)
.push(object);
self
}
pub fn upload_object(mut self, bucket_name: &str, object: StorageObject) -> Self {
self.method = Method::POST;
let options = FileOptions::from_mime(object.value.mime_type());
self
.url
.path_segments_mut()
.unwrap()
.push("object")
.push(bucket_name)
.push(&object.file_name);
self.body = (options, object.value).into();
self
}
pub async fn build(mut self) -> Result<RequestBuilder, Error> {
let url = self.url.to_string();
let mut builder = self.client.request(self.method, url);
match self.body {
RequestBody::Empty => {
// Do nothing
},
RequestBody::MultiPartFile { file_path, options } => {
self.headers.insert(
"x-upsert",
HeaderValue::from_str(&options.upsert.to_string()).unwrap(),
);
let mut file = File::open(&file_path).await?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await?;
let part = Part::bytes(buffer)
.file_name(file_path.to_string())
.mime_str(&options.content_type)?;
let form = Form::new()
.part(file_path, part)
.text("cacheControl", options.cache_control);
builder = builder.multipart(form);
},
RequestBody::MultiPartBytes { bytes, options } => {
self.headers.insert(
"x-upsert",
HeaderValue::from_str(&options.upsert.to_string()).unwrap(),
);
let part = Part::bytes(Cow::Owned(bytes.to_vec()))
.file_name("")
.mime_str(&options.content_type)?;
let form = Form::new()
.part("", part)
.text("cacheControl", options.cache_control);
builder = builder.multipart(form);
},
RequestBody::BodyString { text } => {
builder = builder.body(text);
},
}
builder = builder.headers(self.headers);
Ok(builder)
}
}

View File

@ -1,32 +1,38 @@
use std::sync::{Arc, Weak};
use anyhow::{anyhow, Error};
use bytes::Bytes;
use hyper::header::{CACHE_CONTROL, CONTENT_TYPE};
use reqwest::header::IntoHeaderName;
use reqwest::multipart::{Form, Part};
use reqwest::{
header::{HeaderMap, HeaderValue},
Body, Client, Method, RequestBuilder,
Client,
};
use serde_json::Value;
use tokio::fs::File;
use tokio_util::codec::{BytesCodec, FramedRead};
use url::Url;
use flowy_encrypt::{decrypt_data, encrypt_data};
use flowy_error::FlowyError;
use flowy_server_config::supabase_config::SupabaseConfiguration;
use flowy_storage::core::FileStorageService;
use lib_infra::async_trait::async_trait;
use flowy_storage::{FileStoragePlan, FileStorageService, StorageObject};
use lib_infra::future::FutureResult;
use crate::response::ExtendedResponse;
use crate::supabase::file_storage::{DeleteObjects, FileOptions, NewBucket};
use crate::supabase::file_storage::builder::StorageRequestBuilder;
use crate::AppFlowyEncryption;
pub struct SupabaseFileStorage {
url: Url,
headers: HeaderMap,
client: Client,
#[allow(dead_code)]
encryption: ObjectEncryption,
storage_plan: Arc<dyn FileStoragePlan>,
}
impl SupabaseFileStorage {
pub fn new(config: &SupabaseConfiguration) -> Result<Self, Error> {
pub fn new(
config: &SupabaseConfiguration,
encryption: Weak<dyn AppFlowyEncryption>,
storage_plan: Arc<dyn FileStoragePlan>,
) -> Result<Self, Error> {
let mut headers = HeaderMap::new();
let url = format!("{}/storage/v1", config.url);
let auth = format!("Bearer {}", config.anon_key);
@ -40,212 +46,134 @@ impl SupabaseFileStorage {
HeaderValue::from_str(&config.anon_key).expect("apikey value is invalid"),
);
let encryption = ObjectEncryption::new(encryption);
Ok(Self {
url: Url::parse(&url)?,
headers,
client: Client::new(),
encryption,
storage_plan,
})
}
pub fn request(&self) -> FileStorageRequestBuilder {
FileStorageRequestBuilder::new(self.url.clone(), self.headers.clone(), self.client.clone())
pub fn storage(&self) -> StorageRequestBuilder {
StorageRequestBuilder::new(self.url.clone(), self.headers.clone(), self.client.clone())
}
}
pub enum RequestBody {
Empty,
File {
file_path: String,
options: FileOptions,
},
Text {
text: String,
},
}
pub struct FileStorageRequestBuilder {
url: Url,
headers: HeaderMap,
client: Client,
method: Method,
body: RequestBody,
}
impl FileStorageRequestBuilder {
pub fn new(url: Url, headers: HeaderMap, client: Client) -> Self {
Self {
url,
headers,
client,
method: Method::GET,
body: RequestBody::Empty,
}
}
pub fn with_header(mut self, key: impl IntoHeaderName, value: HeaderValue) -> Self {
self.headers.insert(key, value);
self
}
pub fn get_buckets(mut self) -> Self {
self.method = Method::GET;
self.url.path_segments_mut().unwrap().push("bucket");
self
}
pub fn create_bucket(mut self, bucket_name: &str) -> Self {
self.method = Method::POST;
self
.headers
.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
self.url.path_segments_mut().unwrap().push("bucket");
let bucket = serde_json::to_string(&NewBucket::new(bucket_name.to_string())).unwrap();
self.body = RequestBody::Text { text: bucket };
self
}
pub fn delete_object(mut self, bucket_id: &str, object: &str) -> Self {
self.method = Method::DELETE;
self
.headers
.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
let delete_objects = DeleteObjects::new(vec![object.to_string()]);
let text = serde_json::to_string(&delete_objects).unwrap();
self.body = RequestBody::Text { text };
self
.url
.path_segments_mut()
.unwrap()
.push("object")
.push(bucket_id)
.push(object);
self
}
pub fn get_object(mut self, bucket_name: &str, object: &str) -> Self {
self.method = Method::GET;
self
.url
.path_segments_mut()
.unwrap()
.push("object")
.push(bucket_name)
.push(object);
self
}
pub fn upload_object(mut self, bucket_name: &str, object: &str, file_path: &str) -> Self {
self.method = Method::POST;
let options = FileOptions::from_file_path(file_path);
self.headers.insert(
CONTENT_TYPE,
HeaderValue::from_str(&options.content_type).unwrap(),
);
self
.url
.path_segments_mut()
.unwrap()
.push("object")
.push(bucket_name)
.push(object);
self.body = RequestBody::File {
file_path: file_path.to_string(),
options,
};
self
}
pub fn download_object(mut self, bucket_id: &str) -> Self {
self.method = Method::POST;
self
.headers
.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
self
.url
.path_segments_mut()
.unwrap()
.push("object")
.push(bucket_id);
self
}
pub async fn build(mut self) -> Result<RequestBuilder, Error> {
let url = self.url.to_string();
let mut builder = self.client.request(self.method, url);
match self.body {
RequestBody::Empty => {},
RequestBody::File { file_path, options } => {
self.headers.insert(
CACHE_CONTROL,
HeaderValue::from_str(&options.cache_control).unwrap(),
);
self.headers.insert(
"x-upsert",
HeaderValue::from_str(&options.upsert.to_string()).unwrap(),
);
let file = File::open(&file_path).await?;
let file_body = Body::wrap_stream(FramedRead::new(file, BytesCodec::new()));
let part = Part::stream(file_body).mime_str(&options.content_type)?;
builder = builder.multipart(Form::new().part(file_path, part));
},
RequestBody::Text { text } => {
builder = builder.body(text);
},
}
builder = builder.headers(self.headers);
Ok(builder)
}
}
#[async_trait]
impl FileStorageService for SupabaseFileStorage {
async fn create_object(&self, object_name: &str, object_path: &str) -> Result<String, Error> {
let resp: Value = self
.request()
.upload_object("data", object_name, object_path)
.build()
.await?
.send()
.await?
.get_json()
.await?;
fn create_object(&self, object: StorageObject) -> FutureResult<String, FlowyError> {
let mut storage = self.storage();
let storage_plan = Arc::downgrade(&self.storage_plan);
let key = resp
.get("Key")
.and_then(|v| v.as_str())
.ok_or(anyhow!("Key not found in response"))?
.to_string();
FutureResult::new(async move {
let plan = storage_plan
.upgrade()
.ok_or(anyhow!("Storage plan is not available"))?;
plan.check_upload_object(&object).await?;
Ok(key)
storage = storage.upload_object("data", object);
let url = storage.url.to_string();
storage.build().await?.send().await?.success().await?;
Ok(url)
})
}
async fn delete_object(&self, object_name: &str) -> Result<(), Error> {
let resp = self
.request()
.delete_object("data", object_name)
.build()
.await?
.send()
.await?
.success()
.await?;
println!("{:?}", resp);
Ok(())
fn delete_object_by_url(&self, object_url: String) -> FutureResult<(), FlowyError> {
let storage = self.storage();
FutureResult::new(async move {
let url = Url::parse(&object_url)?;
let location = get_object_location_from(&url)?;
storage
.delete_object(location.bucket_id, location.file_name)
.build()
.await?
.send()
.await?
.success()
.await?;
Ok(())
})
}
async fn get_object(&self, object_name: &str) -> Result<Bytes, Error> {
let bytes = self
.request()
.get_object("data", object_name)
.build()
.await?
.send()
.await?
.get_bytes()
.await?;
Ok(bytes)
fn get_object_by_url(&self, object_url: String) -> FutureResult<Bytes, FlowyError> {
let storage = self.storage();
FutureResult::new(async move {
let url = Url::parse(&object_url)?;
let location = get_object_location_from(&url)?;
let bytes = storage
.get_object(location.bucket_id, location.file_name)
.build()
.await?
.send()
.await?
.get_bytes()
.await?;
Ok(bytes)
})
}
}
#[allow(dead_code)]
struct ObjectEncryption {
encryption: Weak<dyn AppFlowyEncryption>,
}
impl ObjectEncryption {
fn new(encryption: Weak<dyn AppFlowyEncryption>) -> Self {
Self { encryption }
}
#[allow(dead_code)]
fn encrypt(&self, object_data: Vec<u8>) -> Result<Vec<u8>, Error> {
if let Some(secret) = self
.encryption
.upgrade()
.and_then(|encryption| encryption.get_secret())
{
let encryption_data = encrypt_data(object_data, &secret)?;
Ok(encryption_data)
} else {
Ok(object_data)
}
}
#[allow(dead_code)]
fn decrypt(&self, object_data: Vec<u8>) -> Result<Vec<u8>, Error> {
if let Some(secret) = self
.encryption
.upgrade()
.and_then(|encryption| encryption.get_secret())
{
let decryption_data = decrypt_data(object_data, &secret)?;
Ok(decryption_data)
} else {
Ok(object_data)
}
}
}
struct ObjectLocation<'a> {
bucket_id: &'a str,
file_name: &'a str,
}
fn get_object_location_from(url: &Url) -> Result<ObjectLocation, Error> {
let mut segments = url
.path_segments()
.ok_or(anyhow!("Invalid object url: {}", url))?
.collect::<Vec<_>>();
let file_name = segments
.pop()
.ok_or(anyhow!("Can't get file name from url: {}", url))?;
let bucket_id = segments
.pop()
.ok_or(anyhow!("Can't get bucket id from url: {}", url))?;
Ok(ObjectLocation {
bucket_id,
file_name,
})
}

View File

@ -1,5 +1,8 @@
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use flowy_storage::ObjectValue;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SupabaseStorageError {
@ -36,11 +39,7 @@ pub struct FileOptions {
}
impl FileOptions {
pub fn from_file_path(file_path: &str) -> Self {
let mime = mime_guess::from_path(file_path)
.first_or_octet_stream()
.to_string();
pub fn from_mime(mime: String) -> Self {
Self {
cache_control: "3600".to_string(),
upsert: false,
@ -73,3 +72,28 @@ impl DeleteObjects {
Self { prefixes }
}
}
pub enum RequestBody {
Empty,
MultiPartFile {
file_path: String,
options: FileOptions,
},
MultiPartBytes {
bytes: Bytes,
options: FileOptions,
},
BodyString {
text: String,
},
}
impl From<(FileOptions, ObjectValue)> for RequestBody {
fn from(params: (FileOptions, ObjectValue)) -> Self {
let (options, value) = params;
match value {
ObjectValue::File { file_path } => RequestBody::MultiPartFile { file_path, options },
ObjectValue::Bytes { bytes, mime: _ } => RequestBody::MultiPartBytes { bytes, options },
}
}
}

View File

@ -1,4 +1,7 @@
pub use entities::*;
pub use plan::*;
mod builder;
pub mod core;
mod entities;
pub mod plan;

View File

@ -0,0 +1,42 @@
use std::sync::Weak;
use parking_lot::RwLock;
use flowy_error::FlowyError;
use flowy_storage::{FileStoragePlan, StorageObject};
use lib_infra::future::FutureResult;
use crate::supabase::api::RESTfulPostgresServer;
#[derive(Default)]
pub struct FileStoragePlanImpl {
#[allow(dead_code)]
uid: Weak<RwLock<Option<i64>>>,
#[allow(dead_code)]
postgrest: Option<Weak<RESTfulPostgresServer>>,
}
impl FileStoragePlanImpl {
pub fn new(
uid: Weak<RwLock<Option<i64>>>,
postgrest: Option<Weak<RESTfulPostgresServer>>,
) -> Self {
Self { uid, postgrest }
}
}
impl FileStoragePlan for FileStoragePlanImpl {
fn storage_size(&self) -> FutureResult<u64, FlowyError> {
// 1 GB
FutureResult::new(async { Ok(1024 * 1024 * 1024) })
}
fn maximum_file_size(&self) -> FutureResult<u64, FlowyError> {
// 5 MB
FutureResult::new(async { Ok(5 * 1024 * 1024) })
}
fn check_upload_object(&self, _object: &StorageObject) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
}

View File

@ -8,7 +8,7 @@ use flowy_database_deps::cloud::DatabaseCloudService;
use flowy_document_deps::cloud::DocumentCloudService;
use flowy_folder_deps::cloud::FolderCloudService;
use flowy_server_config::supabase_config::SupabaseConfiguration;
use flowy_storage::core::FileStorageService;
use flowy_storage::FileStorageService;
use flowy_user_deps::cloud::UserCloudService;
use crate::supabase::api::{
@ -17,6 +17,7 @@ use crate::supabase::api::{
SupabaseFolderServiceImpl, SupabaseServerServiceImpl, SupabaseUserServiceImpl,
};
use crate::supabase::file_storage::core::SupabaseFileStorage;
use crate::supabase::file_storage::FileStoragePlanImpl;
use crate::{AppFlowyEncryption, AppFlowyServer};
/// https://www.pgbouncer.org/features.html
@ -61,6 +62,7 @@ pub struct SupabaseServer {
config: SupabaseConfiguration,
/// did represents as the device id is used to identify the device that is currently using the app.
device_id: Arc<RwLock<String>>,
uid: Arc<RwLock<Option<i64>>>,
collab_update_sender: Arc<CollabUpdateSenderByOid>,
restful_postgres: Arc<RwLock<Option<Arc<RESTfulPostgresServer>>>>,
file_storage: Arc<RwLock<Option<Arc<SupabaseFileStorage>>>>,
@ -69,6 +71,7 @@ pub struct SupabaseServer {
impl SupabaseServer {
pub fn new(
uid: Arc<RwLock<Option<i64>>>,
config: SupabaseConfiguration,
enable_sync: bool,
device_id: Arc<RwLock<String>>,
@ -83,9 +86,14 @@ impl SupabaseServer {
} else {
None
};
let file_storage = if enable_sync {
Some(Arc::new(SupabaseFileStorage::new(&config).unwrap()))
let plan = FileStoragePlanImpl::new(
Arc::downgrade(&uid),
restful_postgres.as_ref().map(Arc::downgrade),
);
Some(Arc::new(
SupabaseFileStorage::new(&config, encryption.clone(), Arc::new(plan)).unwrap(),
))
} else {
None
};
@ -96,26 +104,34 @@ impl SupabaseServer {
restful_postgres: Arc::new(RwLock::new(restful_postgres)),
file_storage: Arc::new(RwLock::new(file_storage)),
encryption,
}
}
pub fn set_enable_sync(&self, enable: bool) {
if enable {
if self.restful_postgres.read().is_some() {
return;
}
let postgres = RESTfulPostgresServer::new(self.config.clone(), self.encryption.clone());
*self.restful_postgres.write() = Some(Arc::new(postgres));
} else {
*self.restful_postgres.write() = None;
uid,
}
}
}
impl AppFlowyServer for SupabaseServer {
fn set_enable_sync(&self, enable: bool) {
tracing::info!("supabase sync: {}", enable);
self.set_enable_sync(enable);
fn set_enable_sync(&self, uid: i64, enable: bool) {
tracing::info!("{} supabase sync: {}", uid, enable);
if enable {
if self.restful_postgres.read().is_none() {
let postgres = RESTfulPostgresServer::new(self.config.clone(), self.encryption.clone());
*self.restful_postgres.write() = Some(Arc::new(postgres));
}
if self.file_storage.read().is_none() {
let plan = FileStoragePlanImpl::new(
Arc::downgrade(&self.uid),
self.restful_postgres.read().as_ref().map(Arc::downgrade),
);
let file_storage =
SupabaseFileStorage::new(&self.config, self.encryption.clone(), Arc::new(plan)).unwrap();
*self.file_storage.write() = Some(Arc::new(file_storage));
}
} else {
*self.restful_postgres.write() = None;
*self.file_storage.write() = None;
}
}
fn user_service(&self) -> Arc<dyn UserCloudService> {

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB

View File

@ -1,3 +1,8 @@
use url::Url;
use uuid::Uuid;
use flowy_storage::StorageObject;
use crate::supabase_test::util::{file_storage_service, get_supabase_ci_config};
#[tokio::test]
@ -7,18 +12,49 @@ async fn supabase_get_object_test() {
}
let service = file_storage_service();
let file_name = format!("test-{}.txt", chrono::Utc::now().timestamp());
let file_name = format!("test-{}.txt", Uuid::new_v4());
let object = StorageObject::from_file(&file_name, "tests/test.txt");
// Upload a file
let key = service
.create_object(&file_name, "tests/test.txt")
let url = service
.create_object(object)
.await
.unwrap()
.parse::<Url>()
.unwrap();
assert_eq!(key, format!("data/{}", file_name));
// The url would be something like:
// https://acfrqdbdtbsceyjbxsfc.supabase.co/storage/v1/object/data/test-1693472809.txt
let name = url.path_segments().unwrap().last().unwrap();
assert_eq!(name, &file_name);
// Download the file
let bytes = service.get_object(&file_name).await.unwrap();
assert_eq!(bytes.len(), 248);
let bytes = service.get_object_by_url(url.to_string()).await.unwrap();
let s = String::from_utf8(bytes.to_vec()).unwrap();
assert_eq!(s, "hello world");
}
#[tokio::test]
async fn supabase_upload_image_test() {
if get_supabase_ci_config().is_none() {
return;
}
let service = file_storage_service();
let file_name = format!("image-{}.png", Uuid::new_v4());
let object = StorageObject::from_file(&file_name, "tests/logo.png");
// Upload a file
let url = service
.create_object(object)
.await
.unwrap()
.parse::<Url>()
.unwrap();
// Download object by url
let bytes = service.get_object_by_url(url.to_string()).await.unwrap();
assert_eq!(bytes.len(), 15694);
}
#[tokio::test]
@ -28,17 +64,15 @@ async fn supabase_delete_object_test() {
}
let service = file_storage_service();
let file_name = format!("test-{}.txt", chrono::Utc::now().timestamp());
let _ = service
.create_object(&file_name, "tests/test.txt")
.await
.unwrap();
let file_name = format!("test-{}.txt", Uuid::new_v4());
let object = StorageObject::from_file(&file_name, "tests/test.txt");
let url = service.create_object(object).await.unwrap();
let result = service.get_object(&file_name).await;
let result = service.get_object_by_url(url.clone()).await;
assert!(result.is_ok());
let _ = service.delete_object(&file_name).await;
let _ = service.delete_object_by_url(url.clone()).await;
let result = service.get_object(&file_name).await;
let result = service.get_object_by_url(url.clone()).await;
assert!(result.is_err());
}

View File

@ -7,6 +7,7 @@ use collab_plugins::cloud_storage::RemoteCollabStorage;
use uuid::Uuid;
use flowy_database_deps::cloud::DatabaseCloudService;
use flowy_error::FlowyError;
use flowy_folder_deps::cloud::{Folder, FolderCloudService};
use flowy_server::supabase::api::{
RESTfulPostgresServer, SupabaseCollabStorageImpl, SupabaseDatabaseServiceImpl,
@ -16,8 +17,9 @@ use flowy_server::supabase::define::{USER_DEVICE_ID, USER_EMAIL, USER_UUID};
use flowy_server::supabase::file_storage::core::SupabaseFileStorage;
use flowy_server::{AppFlowyEncryption, EncryptionImpl};
use flowy_server_config::supabase_config::SupabaseConfiguration;
use flowy_storage::core::FileStorageService;
use flowy_storage::{FileStoragePlan, FileStorageService, StorageObject};
use flowy_user_deps::cloud::UserCloudService;
use lib_infra::future::FutureResult;
use crate::setup_log;
@ -59,8 +61,16 @@ pub fn folder_service() -> Arc<dyn FolderCloudService> {
}
pub fn file_storage_service() -> Arc<dyn FileStorageService> {
let encryption_impl: Arc<dyn AppFlowyEncryption> = Arc::new(EncryptionImpl::new(None));
let config = SupabaseConfiguration::from_env().unwrap();
Arc::new(SupabaseFileStorage::new(&config).unwrap())
Arc::new(
SupabaseFileStorage::new(
&config,
Arc::downgrade(&encryption_impl),
Arc::new(TestFileStoragePlan),
)
.unwrap(),
)
}
#[allow(dead_code)]
@ -131,3 +141,21 @@ pub fn third_party_sign_up_param(uuid: String) -> HashMap<String, String> {
params.insert(USER_DEVICE_ID.to_string(), Uuid::new_v4().to_string());
params
}
pub struct TestFileStoragePlan;
impl FileStoragePlan for TestFileStoragePlan {
fn storage_size(&self) -> FutureResult<u64, FlowyError> {
// 1 GB
FutureResult::new(async { Ok(1024 * 1024 * 1024) })
}
fn maximum_file_size(&self) -> FutureResult<u64, FlowyError> {
// 5 MB
FutureResult::new(async { Ok(5 * 1024 * 1024) })
}
fn check_upload_object(&self, _object: &StorageObject) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
}

View File

@ -9,6 +9,9 @@ edition = "2021"
reqwest = { version = "0.11", features = ["json", "stream"] }
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
anyhow = "1.0.75"
async-trait = "0.1.73"
bytes = "1.0.1"
mime_guess = "2.0"
lib-infra = { path = "../../../shared-lib/lib-infra" }
url = "2.2.2"
flowy-error = { path = "../flowy-error", features = ["impl_from_reqwest"] }

View File

@ -1,10 +0,0 @@
use anyhow::Error;
use async_trait::async_trait;
use bytes::Bytes;
#[async_trait]
pub trait FileStorageService: Send + Sync + 'static {
async fn create_object(&self, object_name: &str, object_path: &str) -> Result<String, Error>;
async fn delete_object(&self, object_name: &str) -> Result<(), Error>;
async fn get_object(&self, object_name: &str) -> Result<Bytes, Error>;
}

View File

@ -1 +1,107 @@
pub mod core;
use bytes::Bytes;
use flowy_error::FlowyError;
use lib_infra::future::FutureResult;
pub struct StorageObject {
pub file_name: String,
pub value: ObjectValue,
}
impl StorageObject {
/// Creates a `StorageObject` from a file.
///
/// # Parameters
///
/// * `name`: The name of the storage object.
/// * `file_path`: The file path to the storage object's data.
///
pub fn from_file<T: ToString>(file_name: &str, file_path: T) -> Self {
Self {
file_name: file_name.to_string(),
value: ObjectValue::File {
file_path: file_path.to_string(),
},
}
}
/// Creates a `StorageObject` from bytes.
///
/// # Parameters
///
/// * `name`: The name of the storage object.
/// * `bytes`: The byte data of the storage object.
/// * `mime`: The MIME type of the storage object.
///
pub fn from_bytes<B: Into<Bytes>>(file_name: &str, bytes: B, mime: String) -> Self {
let bytes = bytes.into();
Self {
file_name: file_name.to_string(),
value: ObjectValue::Bytes { bytes, mime },
}
}
/// Gets the file size of the `StorageObject`.
///
/// # Returns
///
/// The file size in bytes.
pub fn file_size(&self) -> u64 {
match &self.value {
ObjectValue::File { file_path } => std::fs::metadata(file_path).unwrap().len(),
ObjectValue::Bytes { bytes, .. } => bytes.len() as u64,
}
}
}
pub enum ObjectValue {
File { file_path: String },
Bytes { bytes: Bytes, mime: String },
}
impl ObjectValue {
pub fn mime_type(&self) -> String {
match self {
ObjectValue::File { file_path } => mime_guess::from_path(file_path)
.first_or_octet_stream()
.to_string(),
ObjectValue::Bytes { mime, .. } => mime.clone(),
}
}
}
/// Provides a service for storing and managing files.
///
/// The trait includes methods for CRUD operations on storage objects.
pub trait FileStorageService: Send + Sync + 'static {
/// Creates a new storage object.
///
/// # Parameters
/// - `object`: The object to be stored.
///
/// # Returns
/// - `Ok(String)`: A url representing some kind of object identifier.
/// - `Err(Error)`: An error occurred during the operation.
fn create_object(&self, object: StorageObject) -> FutureResult<String, FlowyError>;
/// Deletes a storage object by its URL.
///
/// # Parameters
/// - `object_url`: The URL of the object to be deleted.
///
fn delete_object_by_url(&self, object_url: String) -> FutureResult<(), FlowyError>;
/// Fetches a storage object by its URL.
///
/// # Parameters
/// - `object_url`: The URL of the object to be fetched.
///
fn get_object_by_url(&self, object_url: String) -> FutureResult<Bytes, FlowyError>;
}
pub trait FileStoragePlan: Send + Sync + 'static {
fn storage_size(&self) -> FutureResult<u64, FlowyError>;
fn maximum_file_size(&self) -> FutureResult<u64, FlowyError>;
fn check_upload_object(&self, object: &StorageObject) -> FutureResult<(), FlowyError>;
}

View File

@ -23,6 +23,7 @@ flowy-server = { path = "../flowy-server" }
flowy-server-config = { path = "../flowy-server-config" }
flowy-notification = { path = "../flowy-notification" }
anyhow = "1.0.71"
flowy-storage = { path = "../flowy-storage" }
serde = { version = "1.0", features = ["derive"] }
serde_json = {version = "1.0"}

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB

Binary file not shown.

View File

@ -0,0 +1,107 @@
use std::fs::File;
use std::io::{Cursor, Read};
use std::path::Path;
use uuid::Uuid;
use zip::ZipArchive;
use flowy_storage::StorageObject;
use crate::document::supabase_test::helper::FlowySupabaseDocumentTest;
#[tokio::test]
async fn supabase_document_upload_text_file_test() {
if let Some(test) = FlowySupabaseDocumentTest::new().await {
let storage_service = test
.document_manager
.get_file_storage_service()
.upgrade()
.unwrap();
let object = StorageObject::from_bytes(
&Uuid::new_v4().to_string(),
"hello world".as_bytes(),
"text/plain".to_string(),
);
let url = storage_service.create_object(object).await.unwrap();
let bytes = storage_service
.get_object_by_url(url.clone())
.await
.unwrap();
let s = String::from_utf8(bytes.to_vec()).unwrap();
assert_eq!(s, "hello world");
// Delete the text file
let _ = storage_service.delete_object_by_url(url).await;
}
}
#[tokio::test]
async fn supabase_document_upload_zip_file_test() {
if let Some(test) = FlowySupabaseDocumentTest::new().await {
let storage_service = test
.document_manager
.get_file_storage_service()
.upgrade()
.unwrap();
// Upload zip file
let object =
StorageObject::from_file(&Uuid::new_v4().to_string(), "./tests/asset/test.txt.zip");
let url = storage_service.create_object(object).await.unwrap();
// Read zip file
let zip_data = storage_service
.get_object_by_url(url.clone())
.await
.unwrap();
let reader = Cursor::new(zip_data);
let mut archive = ZipArchive::new(reader).unwrap();
for i in 0..archive.len() {
let mut file = archive.by_index(i).unwrap();
let name = file.name().to_string();
let mut out = Vec::new();
file.read_to_end(&mut out).unwrap();
if name.starts_with("__MACOSX/") {
continue;
}
assert_eq!(name, "test.txt");
assert_eq!(String::from_utf8(out).unwrap(), "hello world");
}
// Delete the zip file
let _ = storage_service.delete_object_by_url(url).await;
}
}
#[tokio::test]
async fn supabase_document_upload_image_test() {
if let Some(test) = FlowySupabaseDocumentTest::new().await {
let storage_service = test
.document_manager
.get_file_storage_service()
.upgrade()
.unwrap();
// Upload zip file
let object = StorageObject::from_file(&Uuid::new_v4().to_string(), "./tests/asset/logo.png");
let url = storage_service.create_object(object).await.unwrap();
let image_data = storage_service
.get_object_by_url(url.clone())
.await
.unwrap();
// Read the image file
let mut file = File::open(Path::new("./tests/asset/logo.png")).unwrap();
let mut local_data = Vec::new();
file.read_to_end(&mut local_data).unwrap();
assert_eq!(image_data, local_data);
// Delete the image
let _ = storage_service.delete_object_by_url(url).await;
}
}

View File

@ -1,2 +1,3 @@
mod file_test;
mod helper;
mod test;

View File

@ -261,7 +261,9 @@ pub async fn set_cloud_config_handler(
.ok_or(FlowyError::internal().with_context("Can't find any cloud config"))?;
if let Some(enable_sync) = update.enable_sync {
manager.cloud_services.set_enable_sync(enable_sync);
manager
.cloud_services
.set_enable_sync(session.user_id, enable_sync);
config.enable_sync = enable_sync;
}

View File

@ -102,7 +102,7 @@ pub trait UserStatusCallback: Send + Sync + 'static {
/// The user cloud service provider.
/// The provider can be supabase, firebase, aws, or any other cloud service.
pub trait UserCloudServiceProvider: Send + Sync + 'static {
fn set_enable_sync(&self, enable_sync: bool);
fn set_enable_sync(&self, uid: i64, enable_sync: bool);
fn set_encrypt_secret(&self, secret: String);
fn set_auth_type(&self, auth_type: AuthType);
fn set_device_id(&self, device_id: &str);
@ -114,8 +114,8 @@ impl<T> UserCloudServiceProvider for Arc<T>
where
T: UserCloudServiceProvider,
{
fn set_enable_sync(&self, enable_sync: bool) {
(**self).set_enable_sync(enable_sync)
fn set_enable_sync(&self, uid: i64, enable_sync: bool) {
(**self).set_enable_sync(uid, enable_sync)
}
fn set_encrypt_secret(&self, secret: String) {