feat: storage file (#3303)

This commit is contained in:
Nathan.fooo 2023-08-31 16:40:40 +08:00 committed by GitHub
parent 4847b8b114
commit 3b4f8e53a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 776 additions and 265 deletions

View File

@ -46,6 +46,7 @@ class InitSupabaseTask extends LaunchTask {
realtimeService = null;
}
realtimeService = SupbaseRealtimeService(supabase: initializedSupabase);
supabase = initializedSupabase;
if (Platform.isWindows) {

View File

@ -181,9 +181,9 @@ dependencies = [
[[package]]
name = "async-trait"
version = "0.1.68"
version = "0.1.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842"
checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0"
dependencies = [
"proc-macro2",
"quote",
@ -1625,6 +1625,7 @@ dependencies = [
"flowy-error",
"flowy-folder-deps",
"flowy-server-config",
"flowy-storage",
"flowy-user-deps",
"futures",
"futures-util",
@ -1632,6 +1633,7 @@ dependencies = [
"hyper",
"lazy_static",
"lib-infra",
"mime_guess",
"parking_lot 0.12.1",
"postgrest",
"reqwest",
@ -1641,8 +1643,10 @@ dependencies = [
"thiserror",
"tokio",
"tokio-retry",
"tokio-util",
"tracing",
"tracing-subscriber 0.3.16",
"url",
"uuid",
"yrs",
]
@ -1677,6 +1681,18 @@ dependencies = [
"tracing",
]
[[package]]
name = "flowy-storage"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"reqwest",
"serde",
"serde_json",
]
[[package]]
name = "flowy-task"
version = "0.1.0"
@ -1823,9 +1839,9 @@ checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]]
name = "form_urlencoded"
version = "1.1.0"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8"
checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652"
dependencies = [
"percent-encoding",
]
@ -2287,9 +2303,9 @@ dependencies = [
[[package]]
name = "idna"
version = "0.3.0"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6"
checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c"
dependencies = [
"unicode-bidi",
"unicode-normalization",
@ -2678,6 +2694,16 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@ -2981,9 +3007,9 @@ checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
[[package]]
name = "percent-encoding"
version = "2.2.0"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]]
name = "pest"
@ -3723,6 +3749,7 @@ dependencies = [
"js-sys",
"log",
"mime",
"mime_guess",
"native-tls",
"once_cell",
"percent-encoding",
@ -3735,10 +3762,12 @@ dependencies = [
"tokio",
"tokio-native-tls",
"tokio-rustls",
"tokio-util",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"webpki-roots",
"winreg",
@ -4868,6 +4897,15 @@ dependencies = [
"unic-common",
]
[[package]]
name = "unicase"
version = "2.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89"
dependencies = [
"version_check",
]
[[package]]
name = "unicode-bidi"
version = "0.3.13"
@ -4919,12 +4957,12 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "url"
version = "2.3.1"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643"
checksum = "143b538f18257fac9cad154828a57c6bf5157e1aa604d4816b5995bf6de87ae5"
dependencies = [
"form_urlencoded",
"idna 0.3.0",
"idna 0.4.0",
"percent-encoding",
]
@ -5082,6 +5120,19 @@ version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0046fef7e28c3804e5e38bfa31ea2a0f73905319b677e57ebe37e49358989b5d"
[[package]]
name = "wasm-streams"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bbae3363c08332cadccd13b67db371814cd214c2524020932f0804b8cf7c078"
dependencies = [
"futures-util",
"js-sys",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "web-sys"
version = "0.3.61"

View File

@ -22,6 +22,7 @@ members = [
"flowy-server-config",
"flowy-config",
"flowy-encrypt",
"flowy-storage",
]
[profile.dev]

View File

@ -12,9 +12,9 @@ use flowy_document2::deps::DocumentData;
use flowy_document_deps::cloud::{DocumentCloudService, DocumentSnapshot};
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_folder_deps::cloud::*;
use flowy_server::af_cloud::configuration::appflowy_cloud_server_configuration;
use flowy_server::af_cloud::AFCloudServer;
use flowy_server::local_server::{LocalServer, LocalServerDB};
use flowy_server::self_host::configuration::self_host_server_configuration;
use flowy_server::self_host::SelfHostServer;
use flowy_server::supabase::SupabaseServer;
use flowy_server::{AppFlowyEncryption, AppFlowyServer, EncryptionImpl};
use flowy_server_config::supabase_config::SupabaseConfiguration;
@ -117,7 +117,7 @@ impl AppFlowyServerProvider {
Ok::<Arc<dyn AppFlowyServer>, FlowyError>(server)
},
ServerProviderType::AppFlowyCloud => {
let config = self_host_server_configuration().map_err(|e| {
let config = appflowy_cloud_server_configuration().map_err(|e| {
FlowyError::new(
ErrorCode::InvalidAuthConfig,
format!(
@ -126,7 +126,7 @@ impl AppFlowyServerProvider {
),
)
})?;
let server = Arc::new(SelfHostServer::new(config));
let server = Arc::new(AFCloudServer::new(config));
Ok::<Arc<dyn AppFlowyServer>, FlowyError>(server)
},
ServerProviderType::Supabase => {

View File

@ -9,7 +9,7 @@ edition = "2021"
tracing = { version = "0.1" }
futures = "0.3.26"
futures-util = "0.3.26"
reqwest = { version = "0.11.14", features = ["native-tls-vendored"] }
reqwest = { version = "0.11.14", features = ["native-tls-vendored", "multipart"] }
hyper = "0.14"
config = { version = "0.10.1", default-features = false, features = ["yaml"] }
serde = { version = "1.0", features = ["derive"] }
@ -37,6 +37,10 @@ flowy-document-deps = { path = "../flowy-document-deps" }
flowy-error = { path = "../flowy-error", features = ["impl_from_postgres", "impl_from_serde", "impl_from_reqwest"] }
flowy-server-config = { path = "../flowy-server-config" }
flowy-encrypt = { path = "../flowy-encrypt" }
flowy-storage = { path = "../flowy-storage" }
mime_guess = "2.0"
url = "2.4"
tokio-util = "0.7"
[dev-dependencies]
uuid = { version = "1.3.3", features = ["v4"] }

View File

@ -6,7 +6,7 @@ use serde_aux::field_attributes::deserialize_number_from_string;
pub const HEADER_TOKEN: &str = "token";
#[derive(serde::Deserialize, Clone, Debug)]
pub struct SelfHostedConfiguration {
pub struct AFCloudConfiguration {
#[serde(deserialize_with = "deserialize_number_from_string")]
pub port: u16,
pub host: String,
@ -14,7 +14,7 @@ pub struct SelfHostedConfiguration {
pub ws_scheme: String,
}
pub fn self_host_server_configuration() -> Result<SelfHostedConfiguration, config::ConfigError> {
pub fn appflowy_cloud_server_configuration() -> Result<AFCloudConfiguration, config::ConfigError> {
let mut settings = config::Config::default();
let base = include_str!("./configuration/base.yaml");
settings.merge(config::File::from_str(base, FileFormat::Yaml).required(true))?;
@ -33,7 +33,7 @@ pub fn self_host_server_configuration() -> Result<SelfHostedConfiguration, confi
settings.try_into()
}
impl SelfHostedConfiguration {
impl AFCloudConfiguration {
pub fn reset_host_with_port(&mut self, host: &str, port: u16) {
self.host = host.to_owned();
self.port = port;

View File

@ -6,9 +6,9 @@ use flowy_database_deps::cloud::{
};
use lib_infra::future::FutureResult;
pub(crate) struct SelfHostedDatabaseCloudServiceImpl();
pub(crate) struct AFCloudDatabaseCloudServiceImpl();
impl DatabaseCloudService for SelfHostedDatabaseCloudServiceImpl {
impl DatabaseCloudService for AFCloudDatabaseCloudServiceImpl {
fn get_collab_update(
&self,
_object_id: &str,

View File

@ -3,9 +3,9 @@ use anyhow::Error;
use flowy_document_deps::cloud::*;
use lib_infra::future::FutureResult;
pub(crate) struct SelfHostedDocumentCloudServiceImpl();
pub(crate) struct AFCloudDocumentCloudServiceImpl();
impl DocumentCloudService for SelfHostedDocumentCloudServiceImpl {
impl DocumentCloudService for AFCloudDocumentCloudServiceImpl {
fn get_document_updates(&self, _document_id: &str) -> FutureResult<Vec<Vec<u8>>, Error> {
FutureResult::new(async move { Ok(vec![]) })
}

View File

@ -6,9 +6,9 @@ use flowy_folder_deps::cloud::{
use lib_infra::future::FutureResult;
use lib_infra::util::timestamp;
pub(crate) struct SelfHostedServerFolderCloudServiceImpl();
pub(crate) struct AFCloudFolderCloudServiceImpl();
impl FolderCloudService for SelfHostedServerFolderCloudServiceImpl {
impl FolderCloudService for AFCloudFolderCloudServiceImpl {
fn create_workspace(&self, _uid: i64, name: &str) -> FutureResult<Workspace, Error> {
let name = name.to_string();
FutureResult::new(async move {

View File

@ -7,20 +7,20 @@ use flowy_user_deps::entities::*;
use lib_infra::box_any::BoxAny;
use lib_infra::future::FutureResult;
use crate::af_cloud::configuration::{AFCloudConfiguration, HEADER_TOKEN};
use crate::request::HttpRequestBuilder;
use crate::self_host::configuration::{SelfHostedConfiguration, HEADER_TOKEN};
pub(crate) struct SelfHostedUserAuthServiceImpl {
config: SelfHostedConfiguration,
pub(crate) struct AFCloudUserAuthServiceImpl {
config: AFCloudConfiguration,
}
impl SelfHostedUserAuthServiceImpl {
pub(crate) fn new(config: SelfHostedConfiguration) -> Self {
impl AFCloudUserAuthServiceImpl {
pub(crate) fn new(config: AFCloudConfiguration) -> Self {
Self { config }
}
}
impl UserCloudService for SelfHostedUserAuthServiceImpl {
impl UserCloudService for AFCloudUserAuthServiceImpl {
fn sign_up(&self, params: BoxAny) -> FutureResult<SignUpResponse, Error> {
let url = self.config.sign_up_url();
FutureResult::new(async move {

View File

@ -7,38 +7,38 @@ use flowy_document_deps::cloud::DocumentCloudService;
use flowy_folder_deps::cloud::FolderCloudService;
use flowy_user_deps::cloud::UserCloudService;
use crate::self_host::configuration::SelfHostedConfiguration;
use crate::self_host::impls::{
SelfHostedDatabaseCloudServiceImpl, SelfHostedDocumentCloudServiceImpl,
SelfHostedServerFolderCloudServiceImpl, SelfHostedUserAuthServiceImpl,
use crate::af_cloud::configuration::AFCloudConfiguration;
use crate::af_cloud::impls::{
AFCloudDatabaseCloudServiceImpl, AFCloudDocumentCloudServiceImpl, AFCloudFolderCloudServiceImpl,
AFCloudUserAuthServiceImpl,
};
use crate::AppFlowyServer;
pub struct SelfHostServer {
pub(crate) config: SelfHostedConfiguration,
pub struct AFCloudServer {
pub(crate) config: AFCloudConfiguration,
}
impl SelfHostServer {
pub fn new(config: SelfHostedConfiguration) -> Self {
impl AFCloudServer {
pub fn new(config: AFCloudConfiguration) -> Self {
Self { config }
}
}
impl AppFlowyServer for SelfHostServer {
impl AppFlowyServer for AFCloudServer {
fn user_service(&self) -> Arc<dyn UserCloudService> {
Arc::new(SelfHostedUserAuthServiceImpl::new(self.config.clone()))
Arc::new(AFCloudUserAuthServiceImpl::new(self.config.clone()))
}
fn folder_service(&self) -> Arc<dyn FolderCloudService> {
Arc::new(SelfHostedServerFolderCloudServiceImpl())
Arc::new(AFCloudFolderCloudServiceImpl())
}
fn database_service(&self) -> Arc<dyn DatabaseCloudService> {
Arc::new(SelfHostedDatabaseCloudServiceImpl())
Arc::new(AFCloudDatabaseCloudServiceImpl())
}
fn document_service(&self) -> Arc<dyn DocumentCloudService> {
Arc::new(SelfHostedDocumentCloudServiceImpl())
Arc::new(AFCloudDocumentCloudServiceImpl())
}
fn collab_storage(&self, _collab_object: &CollabObject) -> Option<Arc<dyn RemoteCollabStorage>> {

View File

@ -1,116 +1,9 @@
use std::sync::Arc;
use collab_plugins::cloud_storage::{CollabObject, RemoteCollabStorage};
use parking_lot::RwLock;
use flowy_database_deps::cloud::DatabaseCloudService;
use flowy_document_deps::cloud::DocumentCloudService;
use flowy_folder_deps::cloud::FolderCloudService;
use flowy_user_deps::cloud::UserCloudService;
pub use server::*;
pub mod af_cloud;
pub mod local_server;
mod request;
mod response;
pub mod self_host;
mod server;
pub mod supabase;
pub mod util;
pub trait AppFlowyEncryption: Send + Sync + 'static {
fn get_secret(&self) -> Option<String>;
fn set_secret(&self, secret: String);
}
impl<T> AppFlowyEncryption for Arc<T>
where
T: AppFlowyEncryption,
{
fn get_secret(&self) -> Option<String> {
(**self).get_secret()
}
fn set_secret(&self, secret: String) {
(**self).set_secret(secret)
}
}
/// `AppFlowyServer` trait defines a collection of services that offer cloud-based interactions
/// and functionalities in AppFlowy. The methods provided ensure efficient, asynchronous operations
/// for managing and accessing user data, folders, collaborative objects, and documents in a cloud environment.
pub trait AppFlowyServer: Send + Sync + 'static {
/// Enables or disables server sync.
///
/// # Arguments
///
/// * `_enable` - A boolean to toggle the server synchronization.
fn set_enable_sync(&self, _enable: bool) {}
/// Provides access to cloud-based user management functionalities. This includes operations
/// such as user registration, authentication, profile management, and handling of user workspaces.
/// The interface also offers methods for managing collaborative objects, subscribing to user updates,
/// and receiving real-time events.
///
/// # Returns
///
/// An `Arc` wrapping the `UserCloudService` interface.
fn user_service(&self) -> Arc<dyn UserCloudService>;
/// Provides a service for managing workspaces and folders in a cloud environment. This includes
/// functionalities to create workspaces, and fetch data, snapshots, and updates related to specific folders.
///
/// # Returns
///
/// An `Arc` wrapping the `FolderCloudService` interface.
fn folder_service(&self) -> Arc<dyn FolderCloudService>;
/// Offers a set of operations for interacting with collaborative objects within a cloud database.
/// This includes functionalities such as retrieval of updates for specific objects, batch fetching,
/// and obtaining snapshots.
///
/// # Returns
///
/// An `Arc` wrapping the `DatabaseCloudService` interface.
fn database_service(&self) -> Arc<dyn DatabaseCloudService>;
/// Facilitates cloud-based document management. This service offers operations for updating documents,
/// fetching snapshots, and accessing primary document data in an asynchronous manner.
///
/// # Returns
///
/// An `Arc` wrapping the `DocumentCloudService` interface.
fn document_service(&self) -> Arc<dyn DocumentCloudService>;
/// Manages collaborative objects within a remote storage system. This includes operations such as
/// checking storage status, retrieving updates and snapshots, and dispatching updates. The service
/// also provides subscription capabilities for real-time updates.
///
/// # Arguments
///
/// * `collab_object` - A reference to the collaborative object.
///
/// # Returns
///
/// An `Option` that might contain an `Arc` wrapping the `RemoteCollabStorage` interface.
fn collab_storage(&self, collab_object: &CollabObject) -> Option<Arc<dyn RemoteCollabStorage>>;
}
pub struct EncryptionImpl {
secret: RwLock<Option<String>>,
}
impl EncryptionImpl {
pub fn new(secret: Option<String>) -> Self {
Self {
secret: RwLock::new(secret),
}
}
}
impl AppFlowyEncryption for EncryptionImpl {
fn get_secret(&self) -> Option<String> {
self.secret.read().clone()
}
fn set_secret(&self, secret: String) {
*self.secret.write() = Some(secret);
}
}

View File

@ -7,8 +7,8 @@ use tokio::sync::oneshot;
use flowy_error::{internal_error, FlowyError};
use crate::af_cloud::configuration::HEADER_TOKEN;
use crate::response::HttpResponse;
use crate::self_host::configuration::HEADER_TOKEN;
pub trait ResponseMiddleware {
fn receive_response(&self, token: &Option<String>, response: &HttpResponse);

View File

@ -1,8 +1,14 @@
use bytes::Bytes;
use flowy_error::ErrorCode;
use serde::{Deserialize, Serialize};
use std::fmt;
use anyhow::Error;
use bytes::Bytes;
use reqwest::{Response, StatusCode};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use flowy_error::{ErrorCode, FlowyError};
use lib_infra::future::{to_fut, Fut};
#[derive(Debug, Serialize, Deserialize)]
pub struct HttpResponse {
pub data: Bytes,
@ -28,3 +34,116 @@ impl fmt::Display for HttpError {
write!(f, "{:?}: {}", self.code, self.msg)
}
}
/// Trait `ExtendedResponse` provides an extension method to handle and transform the response data.
///
/// This trait introduces a single method:
///
/// - `get_value`: It extracts the value from the response, and returns it as an instance of a type `T`.
/// This method will return an error if the status code of the response signifies a failure (not success).
/// Otherwise, it attempts to parse the response body into an instance of type `T`, which must implement
/// `serde::de::DeserializeOwned`, `Send`, `Sync`, and have a static lifetime ('static).
pub trait ExtendedResponse {
/// Returns the value of the response as a Future of `Result<T, Error>`.
///
/// If the status code of the response is not a success, returns an `Error`.
/// Otherwise, attempts to parse the response into an instance of type `T`.
///
/// # Type Parameters
///
/// * `T`: The type of the value to be returned. Must implement `serde::de::DeserializeOwned`,
/// `Send`, `Sync`, and have a static lifetime ('static).
fn get_value<T>(self) -> Fut<Result<T, Error>>
where
T: serde::de::DeserializeOwned + Send + Sync + 'static;
fn get_bytes(self) -> Fut<Result<Bytes, Error>>;
fn get_json(self) -> Fut<Result<Value, Error>>;
fn success(self) -> Fut<Result<(), Error>>;
fn success_with_body(self) -> Fut<Result<String, Error>>;
}
impl ExtendedResponse for Response {
fn get_value<T>(self) -> Fut<Result<T, Error>>
where
T: serde::de::DeserializeOwned + Send + Sync + 'static,
{
to_fut(async move {
let status_code = self.status();
if !status_code.is_success() {
return Err(parse_response_as_error(self).await.into());
}
let bytes = self.bytes().await?;
let value = serde_json::from_slice(&bytes).map_err(|e| {
FlowyError::new(
ErrorCode::Serde,
format!(
"failed to parse json: {}, body: {}",
e,
String::from_utf8_lossy(&bytes)
),
)
})?;
Ok(value)
})
}
fn get_bytes(self) -> Fut<Result<Bytes, Error>> {
to_fut(async move {
let status_code = self.status();
if !status_code.is_success() {
return Err(parse_response_as_error(self).await.into());
}
let bytes = self.bytes().await?;
Ok(bytes)
})
}
fn get_json(self) -> Fut<Result<Value, Error>> {
to_fut(async move {
if !self.status().is_success() {
return Err(parse_response_as_error(self).await.into());
}
let bytes = self.bytes().await?;
let value = serde_json::from_slice::<Value>(&bytes)?;
Ok(value)
})
}
fn success(self) -> Fut<Result<(), Error>> {
to_fut(async move {
if !self.status().is_success() {
return Err(parse_response_as_error(self).await.into());
}
Ok(())
})
}
fn success_with_body(self) -> Fut<Result<String, Error>> {
to_fut(async move {
if !self.status().is_success() {
return Err(parse_response_as_error(self).await.into());
}
Ok(self.text().await?)
})
}
}
async fn parse_response_as_error(response: Response) -> FlowyError {
let status_code = response.status();
let msg = response.text().await.unwrap_or_default();
if status_code == StatusCode::CONFLICT {
return FlowyError::new(ErrorCode::Conflict, msg);
}
FlowyError::new(
ErrorCode::HttpError,
format!(
"expected status code 2XX, but got {}, body: {}",
status_code, msg
),
)
}

View File

@ -0,0 +1,114 @@
use std::sync::Arc;
use collab_plugins::cloud_storage::{CollabObject, RemoteCollabStorage};
use parking_lot::RwLock;
use flowy_database_deps::cloud::DatabaseCloudService;
use flowy_document_deps::cloud::DocumentCloudService;
use flowy_folder_deps::cloud::FolderCloudService;
use flowy_storage::core::FileStorageService;
use flowy_user_deps::cloud::UserCloudService;
pub trait AppFlowyEncryption: Send + Sync + 'static {
fn get_secret(&self) -> Option<String>;
fn set_secret(&self, secret: String);
}
impl<T> AppFlowyEncryption for Arc<T>
where
T: AppFlowyEncryption,
{
fn get_secret(&self) -> Option<String> {
(**self).get_secret()
}
fn set_secret(&self, secret: String) {
(**self).set_secret(secret)
}
}
/// `AppFlowyServer` trait defines a collection of services that offer cloud-based interactions
/// and functionalities in AppFlowy. The methods provided ensure efficient, asynchronous operations
/// for managing and accessing user data, folders, collaborative objects, and documents in a cloud environment.
pub trait AppFlowyServer: Send + Sync + 'static {
/// Enables or disables server sync.
///
/// # Arguments
///
/// * `_enable` - A boolean to toggle the server synchronization.
fn set_enable_sync(&self, _enable: bool) {}
/// Provides access to cloud-based user management functionalities. This includes operations
/// such as user registration, authentication, profile management, and handling of user workspaces.
/// The interface also offers methods for managing collaborative objects, subscribing to user updates,
/// and receiving real-time events.
///
/// # Returns
///
/// An `Arc` wrapping the `UserCloudService` interface.
fn user_service(&self) -> Arc<dyn UserCloudService>;
/// Provides a service for managing workspaces and folders in a cloud environment. This includes
/// functionalities to create workspaces, and fetch data, snapshots, and updates related to specific folders.
///
/// # Returns
///
/// An `Arc` wrapping the `FolderCloudService` interface.
fn folder_service(&self) -> Arc<dyn FolderCloudService>;
/// Offers a set of operations for interacting with collaborative objects within a cloud database.
/// This includes functionalities such as retrieval of updates for specific objects, batch fetching,
/// and obtaining snapshots.
///
/// # Returns
///
/// An `Arc` wrapping the `DatabaseCloudService` interface.
fn database_service(&self) -> Arc<dyn DatabaseCloudService>;
/// Facilitates cloud-based document management. This service offers operations for updating documents,
/// fetching snapshots, and accessing primary document data in an asynchronous manner.
///
/// # Returns
///
/// An `Arc` wrapping the `DocumentCloudService` interface.
fn document_service(&self) -> Arc<dyn DocumentCloudService>;
/// Manages collaborative objects within a remote storage system. This includes operations such as
/// checking storage status, retrieving updates and snapshots, and dispatching updates. The service
/// also provides subscription capabilities for real-time updates.
///
/// # Arguments
///
/// * `collab_object` - A reference to the collaborative object.
///
/// # Returns
///
/// An `Option` that might contain an `Arc` wrapping the `RemoteCollabStorage` interface.
fn collab_storage(&self, collab_object: &CollabObject) -> Option<Arc<dyn RemoteCollabStorage>>;
fn file_storage(&self) -> Option<Arc<dyn FileStorageService>> {
None
}
}
pub struct EncryptionImpl {
secret: RwLock<Option<String>>,
}
impl EncryptionImpl {
pub fn new(secret: Option<String>) -> Self {
Self {
secret: RwLock::new(secret),
}
}
}
impl AppFlowyEncryption for EncryptionImpl {
fn get_secret(&self) -> Option<String> {
self.secret.read().clone()
}
fn set_secret(&self, secret: String) {
*self.secret.write() = Some(secret);
}
}

View File

@ -14,13 +14,12 @@ use tokio::task::spawn_blocking;
use lib_infra::async_trait::async_trait;
use lib_infra::util::md5;
use crate::response::ExtendedResponse;
use crate::supabase::api::request::{
create_snapshot, get_snapshots_from_server, get_updates_from_server, FetchObjectUpdateAction,
UpdateItem,
};
use crate::supabase::api::util::{
ExtendedResponse, InsertParamsBuilder, SupabaseBinaryColumnEncoder,
};
use crate::supabase::api::util::{InsertParamsBuilder, SupabaseBinaryColumnEncoder};
use crate::supabase::api::{PostgresWrapper, SupabaseServerService};
use crate::supabase::define::*;
use crate::AppFlowyEncryption;

View File

@ -12,10 +12,11 @@ use flowy_folder_deps::cloud::{
};
use lib_infra::future::FutureResult;
use crate::response::ExtendedResponse;
use crate::supabase::api::request::{
get_snapshots_from_server, get_updates_from_server, FetchObjectUpdateAction,
};
use crate::supabase::api::util::{ExtendedResponse, InsertParamsBuilder};
use crate::supabase::api::util::InsertParamsBuilder;
use crate::supabase::api::SupabaseServerService;
use crate::supabase::define::*;

View File

@ -15,8 +15,9 @@ use tokio_retry::{Action, Condition, RetryIf};
use flowy_database_deps::cloud::{CollabObjectUpdate, CollabObjectUpdateByOid};
use lib_infra::util::md5;
use crate::response::ExtendedResponse;
use crate::supabase::api::util::{
BinaryColumnDecoder, ExtendedResponse, InsertParamsBuilder, SupabaseBinaryColumnDecoder,
BinaryColumnDecoder, InsertParamsBuilder, SupabaseBinaryColumnDecoder,
SupabaseBinaryColumnEncoder,
};
use crate::supabase::api::PostgresWrapper;

View File

@ -24,11 +24,12 @@ use lib_infra::box_any::BoxAny;
use lib_infra::future::FutureResult;
use lib_infra::util::timestamp;
use crate::response::ExtendedResponse;
use crate::supabase::api::request::{
get_updates_from_server, FetchObjectUpdateAction, RetryCondition,
};
use crate::supabase::api::util::{
ExtendedResponse, InsertParamsBuilder, RealtimeBinaryColumnDecoder, SupabaseBinaryColumnDecoder,
InsertParamsBuilder, RealtimeBinaryColumnDecoder, SupabaseBinaryColumnDecoder,
};
use crate::supabase::api::{flush_collab_with_update, PostgresWrapper, SupabaseServerService};
use crate::supabase::define::*;

View File

@ -1,11 +1,8 @@
use anyhow::Error;
use anyhow::Result;
use reqwest::{Response, StatusCode};
use serde_json::Value;
use flowy_encrypt::{decrypt_data, encrypt_data};
use flowy_error::{ErrorCode, FlowyError};
use lib_infra::future::{to_fut, Fut};
#[derive(Default)]
pub struct InsertParamsBuilder {
@ -28,105 +25,7 @@ impl InsertParamsBuilder {
serde_json::to_string(&self.map).unwrap()
}
}
/// Trait `ExtendedResponse` provides an extension method to handle and transform the response data.
///
/// This trait introduces a single method:
///
/// - `get_value`: It extracts the value from the response, and returns it as an instance of a type `T`.
/// This method will return an error if the status code of the response signifies a failure (not success).
/// Otherwise, it attempts to parse the response body into an instance of type `T`, which must implement
/// `serde::de::DeserializeOwned`, `Send`, `Sync`, and have a static lifetime ('static).
pub trait ExtendedResponse {
/// Returns the value of the response as a Future of `Result<T, Error>`.
///
/// If the status code of the response is not a success, returns an `Error`.
/// Otherwise, attempts to parse the response into an instance of type `T`.
///
/// # Type Parameters
///
/// * `T`: The type of the value to be returned. Must implement `serde::de::DeserializeOwned`,
/// `Send`, `Sync`, and have a static lifetime ('static).
fn get_value<T>(self) -> Fut<Result<T, Error>>
where
T: serde::de::DeserializeOwned + Send + Sync + 'static;
fn get_json(self) -> Fut<Result<Value, Error>>;
fn success(self) -> Fut<Result<(), Error>>;
fn success_with_body(self) -> Fut<Result<String, Error>>;
}
impl ExtendedResponse for Response {
fn get_value<T>(self) -> Fut<Result<T, Error>>
where
T: serde::de::DeserializeOwned + Send + Sync + 'static,
{
to_fut(async move {
let status_code = self.status();
if !status_code.is_success() {
return Err(parse_response_as_error(self).await.into());
}
let bytes = self.bytes().await?;
let value = serde_json::from_slice(&bytes).map_err(|e| {
FlowyError::new(
ErrorCode::Serde,
format!(
"failed to parse json: {}, body: {}",
e,
String::from_utf8_lossy(&bytes)
),
)
})?;
Ok(value)
})
}
fn get_json(self) -> Fut<Result<Value, Error>> {
to_fut(async move {
if !self.status().is_success() {
return Err(parse_response_as_error(self).await.into());
}
let bytes = self.bytes().await?;
let value = serde_json::from_slice::<Value>(&bytes)?;
Ok(value)
})
}
fn success(self) -> Fut<Result<(), Error>> {
to_fut(async move {
if !self.status().is_success() {
return Err(parse_response_as_error(self).await.into());
}
Ok(())
})
}
fn success_with_body(self) -> Fut<Result<String, Error>> {
to_fut(async move {
if !self.status().is_success() {
return Err(parse_response_as_error(self).await.into());
}
Ok(self.text().await?)
})
}
}
async fn parse_response_as_error(response: Response) -> FlowyError {
let status_code = response.status();
let msg = response.text().await.unwrap_or_default();
if status_code == StatusCode::CONFLICT {
return FlowyError::new(ErrorCode::Conflict, msg);
}
FlowyError::new(
ErrorCode::HttpError,
format!(
"expected status code 2XX, but got {}, body: {}",
status_code, msg
),
)
}
/// An encoder for binary columns in Supabase.
///
/// Provides utilities to encode binary data into a format suitable for Supabase columns.

View File

@ -0,0 +1,251 @@
use anyhow::{anyhow, Error};
use bytes::Bytes;
use hyper::header::{CACHE_CONTROL, CONTENT_TYPE};
use reqwest::header::IntoHeaderName;
use reqwest::multipart::{Form, Part};
use reqwest::{
header::{HeaderMap, HeaderValue},
Body, Client, Method, RequestBuilder,
};
use serde_json::Value;
use tokio::fs::File;
use tokio_util::codec::{BytesCodec, FramedRead};
use url::Url;
use flowy_server_config::supabase_config::SupabaseConfiguration;
use flowy_storage::core::FileStorageService;
use lib_infra::async_trait::async_trait;
use crate::response::ExtendedResponse;
use crate::supabase::file_storage::{DeleteObjects, FileOptions, NewBucket};
pub struct SupabaseFileStorage {
url: Url,
headers: HeaderMap,
client: Client,
}
impl SupabaseFileStorage {
pub fn new(config: &SupabaseConfiguration) -> Result<Self, Error> {
let mut headers = HeaderMap::new();
let url = format!("{}/storage/v1", config.url);
let auth = format!("Bearer {}", config.anon_key);
headers.insert(
"Authorization",
HeaderValue::from_str(&auth).expect("Authorization is invalid"),
);
headers.insert(
"apikey",
HeaderValue::from_str(&config.anon_key).expect("apikey value is invalid"),
);
Ok(Self {
url: Url::parse(&url)?,
headers,
client: Client::new(),
})
}
pub fn request(&self) -> FileStorageRequestBuilder {
FileStorageRequestBuilder::new(self.url.clone(), self.headers.clone(), self.client.clone())
}
}
pub enum RequestBody {
Empty,
File {
file_path: String,
options: FileOptions,
},
Text {
text: String,
},
}
pub struct FileStorageRequestBuilder {
url: Url,
headers: HeaderMap,
client: Client,
method: Method,
body: RequestBody,
}
impl FileStorageRequestBuilder {
pub fn new(url: Url, headers: HeaderMap, client: Client) -> Self {
Self {
url,
headers,
client,
method: Method::GET,
body: RequestBody::Empty,
}
}
pub fn with_header(mut self, key: impl IntoHeaderName, value: HeaderValue) -> Self {
self.headers.insert(key, value);
self
}
pub fn get_buckets(mut self) -> Self {
self.method = Method::GET;
self.url.path_segments_mut().unwrap().push("bucket");
self
}
pub fn create_bucket(mut self, bucket_name: &str) -> Self {
self.method = Method::POST;
self
.headers
.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
self.url.path_segments_mut().unwrap().push("bucket");
let bucket = serde_json::to_string(&NewBucket::new(bucket_name.to_string())).unwrap();
self.body = RequestBody::Text { text: bucket };
self
}
pub fn delete_object(mut self, bucket_id: &str, object: &str) -> Self {
self.method = Method::DELETE;
self
.headers
.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
let delete_objects = DeleteObjects::new(vec![object.to_string()]);
let text = serde_json::to_string(&delete_objects).unwrap();
self.body = RequestBody::Text { text };
self
.url
.path_segments_mut()
.unwrap()
.push("object")
.push(bucket_id)
.push(object);
self
}
pub fn get_object(mut self, bucket_name: &str, object: &str) -> Self {
self.method = Method::GET;
self
.url
.path_segments_mut()
.unwrap()
.push("object")
.push(bucket_name)
.push(object);
self
}
pub fn upload_object(mut self, bucket_name: &str, object: &str, file_path: &str) -> Self {
self.method = Method::POST;
let options = FileOptions::from_file_path(file_path);
self.headers.insert(
CONTENT_TYPE,
HeaderValue::from_str(&options.content_type).unwrap(),
);
self
.url
.path_segments_mut()
.unwrap()
.push("object")
.push(bucket_name)
.push(object);
self.body = RequestBody::File {
file_path: file_path.to_string(),
options,
};
self
}
pub fn download_object(mut self, bucket_id: &str) -> Self {
self.method = Method::POST;
self
.headers
.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
self
.url
.path_segments_mut()
.unwrap()
.push("object")
.push(bucket_id);
self
}
pub async fn build(mut self) -> Result<RequestBuilder, Error> {
let url = self.url.to_string();
let mut builder = self.client.request(self.method, url);
match self.body {
RequestBody::Empty => {},
RequestBody::File { file_path, options } => {
self.headers.insert(
CACHE_CONTROL,
HeaderValue::from_str(&options.cache_control).unwrap(),
);
self.headers.insert(
"x-upsert",
HeaderValue::from_str(&options.upsert.to_string()).unwrap(),
);
let file = File::open(&file_path).await?;
let file_body = Body::wrap_stream(FramedRead::new(file, BytesCodec::new()));
let part = Part::stream(file_body).mime_str(&options.content_type)?;
builder = builder.multipart(Form::new().part(file_path, part));
},
RequestBody::Text { text } => {
builder = builder.body(text);
},
}
builder = builder.headers(self.headers);
Ok(builder)
}
}
#[async_trait]
impl FileStorageService for SupabaseFileStorage {
async fn create_object(&self, object_name: &str, object_path: &str) -> Result<String, Error> {
let resp: Value = self
.request()
.upload_object("data", object_name, object_path)
.build()
.await?
.send()
.await?
.get_json()
.await?;
let key = resp
.get("Key")
.and_then(|v| v.as_str())
.ok_or(anyhow!("Key not found in response"))?
.to_string();
Ok(key)
}
async fn delete_object(&self, object_name: &str) -> Result<(), Error> {
let resp = self
.request()
.delete_object("data", object_name)
.build()
.await?
.send()
.await?
.success()
.await?;
println!("{:?}", resp);
Ok(())
}
async fn get_object(&self, object_name: &str) -> Result<Bytes, Error> {
let bytes = self
.request()
.get_object("data", object_name)
.build()
.await?
.send()
.await?
.get_bytes()
.await?;
Ok(bytes)
}
}

View File

@ -0,0 +1,75 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SupabaseStorageError {
pub status_code: String,
pub error: String,
pub message: String,
}
#[derive(Serialize)]
pub struct NewBucket {
pub name: String,
pub id: String,
pub public: bool,
pub file_size_limit: Option<u32>,
pub allowed_mime_types: Option<Vec<String>>,
}
impl NewBucket {
pub fn new(name: String) -> Self {
Self {
name: name.clone(),
id: name,
public: false,
file_size_limit: None,
allowed_mime_types: None,
}
}
}
pub struct FileOptions {
pub cache_control: String,
pub upsert: bool,
pub content_type: String,
}
impl FileOptions {
pub fn from_file_path(file_path: &str) -> Self {
let mime = mime_guess::from_path(file_path)
.first_or_octet_stream()
.to_string();
Self {
cache_control: "3600".to_string(),
upsert: false,
content_type: mime,
}
}
pub fn with_cache_control(mut self, cache_control: &str) -> Self {
self.cache_control = cache_control.to_string();
self
}
pub fn with_upsert(mut self, upsert: bool) -> Self {
self.upsert = upsert;
self
}
pub fn with_content_type(mut self, content_type: &str) -> Self {
self.content_type = content_type.to_string();
self
}
}
#[derive(Serialize)]
pub struct DeleteObjects {
pub prefixes: Vec<String>,
}
impl DeleteObjects {
pub fn new(prefixes: Vec<String>) -> Self {
Self { prefixes }
}
}

View File

@ -0,0 +1,4 @@
pub use entities::*;
pub mod core;
mod entities;

View File

@ -5,4 +5,5 @@ mod entities;
pub mod define;
// mod queue;
pub mod api;
pub mod file_storage;
mod server;

View File

@ -8,6 +8,7 @@ use flowy_database_deps::cloud::DatabaseCloudService;
use flowy_document_deps::cloud::DocumentCloudService;
use flowy_folder_deps::cloud::FolderCloudService;
use flowy_server_config::supabase_config::SupabaseConfiguration;
use flowy_storage::core::FileStorageService;
use flowy_user_deps::cloud::UserCloudService;
use crate::supabase::api::{
@ -15,6 +16,7 @@ use crate::supabase::api::{
SupabaseCollabStorageImpl, SupabaseDatabaseServiceImpl, SupabaseDocumentServiceImpl,
SupabaseFolderServiceImpl, SupabaseServerServiceImpl, SupabaseUserServiceImpl,
};
use crate::supabase::file_storage::core::SupabaseFileStorage;
use crate::{AppFlowyEncryption, AppFlowyServer};
/// https://www.pgbouncer.org/features.html
@ -61,6 +63,7 @@ pub struct SupabaseServer {
device_id: Arc<RwLock<String>>,
collab_update_sender: Arc<CollabUpdateSenderByOid>,
restful_postgres: Arc<RwLock<Option<Arc<RESTfulPostgresServer>>>>,
file_storage: Arc<RwLock<Option<Arc<SupabaseFileStorage>>>>,
encryption: Weak<dyn AppFlowyEncryption>,
}
@ -80,11 +83,18 @@ impl SupabaseServer {
} else {
None
};
let file_storage = if enable_sync {
Some(Arc::new(SupabaseFileStorage::new(&config).unwrap()))
} else {
None
};
Self {
config,
device_id,
collab_update_sender,
restful_postgres: Arc::new(RwLock::new(restful_postgres)),
file_storage: Arc::new(RwLock::new(file_storage)),
encryption,
}
}
@ -160,4 +170,12 @@ impl AppFlowyServer for SupabaseServer {
self.encryption.clone(),
)))
}
fn file_storage(&self) -> Option<Arc<dyn FileStorageService>> {
self
.file_storage
.read()
.clone()
.map(|s| s as Arc<dyn FileStorageService>)
}
}

View File

@ -0,0 +1,44 @@
use crate::supabase_test::util::{file_storage_service, get_supabase_ci_config};
#[tokio::test]
async fn supabase_get_object_test() {
if get_supabase_ci_config().is_none() {
return;
}
let service = file_storage_service();
let file_name = format!("test-{}.txt", chrono::Utc::now().timestamp());
// Upload a file
let key = service
.create_object(&file_name, "tests/test.txt")
.await
.unwrap();
assert_eq!(key, format!("data/{}", file_name));
// Download the file
let bytes = service.get_object(&file_name).await.unwrap();
assert_eq!(bytes.len(), 248);
}
#[tokio::test]
async fn supabase_delete_object_test() {
if get_supabase_ci_config().is_none() {
return;
}
let service = file_storage_service();
let file_name = format!("test-{}.txt", chrono::Utc::now().timestamp());
let _ = service
.create_object(&file_name, "tests/test.txt")
.await
.unwrap();
let result = service.get_object(&file_name).await;
assert!(result.is_ok());
let _ = service.delete_object(&file_name).await;
let result = service.get_object(&file_name).await;
assert!(result.is_err());
}

View File

@ -1,4 +1,5 @@
mod database_test;
mod file_test;
mod folder_test;
mod user_test;
mod util;

View File

@ -13,8 +13,10 @@ use flowy_server::supabase::api::{
SupabaseFolderServiceImpl, SupabaseServerServiceImpl, SupabaseUserServiceImpl,
};
use flowy_server::supabase::define::{USER_DEVICE_ID, USER_EMAIL, USER_UUID};
use flowy_server::supabase::file_storage::core::SupabaseFileStorage;
use flowy_server::{AppFlowyEncryption, EncryptionImpl};
use flowy_server_config::supabase_config::SupabaseConfiguration;
use flowy_storage::core::FileStorageService;
use flowy_user_deps::cloud::UserCloudService;
use crate::setup_log;
@ -56,6 +58,11 @@ pub fn folder_service() -> Arc<dyn FolderCloudService> {
Arc::new(SupabaseFolderServiceImpl::new(server))
}
pub fn file_storage_service() -> Arc<dyn FileStorageService> {
let config = SupabaseConfiguration::from_env().unwrap();
Arc::new(SupabaseFileStorage::new(&config).unwrap())
}
#[allow(dead_code)]
pub fn encryption_folder_service(
secret: Option<String>,

View File

@ -0,0 +1 @@
hello world

View File

@ -0,0 +1,14 @@
[package]
name = "flowy-storage"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
reqwest = { version = "0.11", features = ["json", "stream"] }
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
anyhow = "1.0.75"
async-trait = "0.1.73"
bytes = "1.0.1"

View File

@ -0,0 +1,10 @@
use anyhow::Error;
use async_trait::async_trait;
use bytes::Bytes;
#[async_trait]
pub trait FileStorageService: Send + Sync + 'static {
async fn create_object(&self, object_name: &str, object_path: &str) -> Result<String, Error>;
async fn delete_object(&self, object_name: &str) -> Result<(), Error>;
async fn get_object(&self, object_name: &str) -> Result<Bytes, Error>;
}

View File

@ -0,0 +1 @@
pub mod core;