feat: initial file upload api (#4299)

* feat: initial file upload api

* feat: initial file upload api

* fix: add pb index

* feat: remove file name

* feat: read everything to mem

* feat: revamp object storage

* chore: cargo format

* chore: update deps

* feat: revised implementations and style

* chore: use deploy env instead

* chore: use deploy env instead

* chore: use deploy env instead

* refactor: move logic to handler to manager

* fix: format issues

* fix: cargo clippy

* chore: cargo check tauri

* fix: debug docker integration test

* fix: debug docker integration test

* fix: debug docker integration test gotrue

* fix: debug docker integration test docker compose version

* fix: docker scripts

* fix: cargo fmt

* fix: add sleep after docker compose up

---------

Co-authored-by: nathan <nathan@appflowy.io>
This commit is contained in:
Zack
2024-01-17 02:59:15 +08:00
committed by GitHub
parent 15cb1b5f19
commit 38c3e700e9
30 changed files with 757 additions and 403 deletions

View File

@ -36,6 +36,7 @@ futures.workspace = true
tokio-stream = { workspace = true, features = ["sync"] }
scraper = "0.18.0"
lru.workspace = true
fxhash = "0.2.1"
[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2", features = ["js"]}

View File

@ -5,6 +5,8 @@ use collab_document::blocks::{json_str_to_hashmap, Block, BlockAction, DocumentD
use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
use flowy_error::ErrorCode;
use lib_infra::validator_fn::{required_not_empty_str, required_valid_path};
use validator::Validate;
use crate::parse::{NotEmptyStr, NotEmptyVec};
@ -62,6 +64,28 @@ pub struct DocumentRedoUndoResponsePB {
pub is_success: bool,
}
#[derive(Default, ProtoBuf, Validate)]
pub struct UploadFileParamsPB {
#[pb(index = 1)]
#[validate(custom = "required_not_empty_str")]
pub workspace_id: String,
#[pb(index = 2)]
#[validate(custom = "required_valid_path")]
pub local_file_path: String,
}
#[derive(Default, ProtoBuf, Validate)]
pub struct UploadedFilePB {
#[pb(index = 1)]
#[validate(url)]
pub url: String,
#[pb(index = 2)]
#[validate(custom = "required_valid_path")]
pub local_file_path: String,
}
#[derive(Default, ProtoBuf)]
pub struct CreateDocumentPayloadPB {
#[pb(index = 1)]

View File

@ -9,10 +9,10 @@ use std::sync::{Arc, Weak};
use collab_document::blocks::{
BlockAction, BlockActionPayload, BlockActionType, BlockEvent, BlockEventPayload, DeltaType,
};
use tracing::instrument;
use flowy_error::{FlowyError, FlowyResult};
use lib_dispatch::prelude::{data_result_ok, AFPluginData, AFPluginState, DataResult};
use tracing::instrument;
use crate::entities::*;
use crate::parser::document_data_parser::DocumentDataParser;
@ -401,3 +401,50 @@ pub(crate) async fn convert_data_to_json_handler(
data_result_ok(ConvertDataToJsonResponsePB { json: result })
}
// Handler for uploading a file
// `workspace_id` and `file_name` determines file identity
pub(crate) async fn upload_file_handler(
params: AFPluginData<UploadFileParamsPB>,
manager: AFPluginState<Weak<DocumentManager>>,
) -> DataResult<UploadedFilePB, FlowyError> {
let AFPluginData(UploadFileParamsPB {
workspace_id,
local_file_path,
}) = params;
let manager = upgrade_document(manager)?;
let url = manager.upload_file(workspace_id, &local_file_path).await?;
Ok(AFPluginData(UploadedFilePB {
url,
local_file_path,
}))
}
#[instrument(level = "debug", skip_all, err)]
pub(crate) async fn download_file_handler(
params: AFPluginData<UploadedFilePB>,
manager: AFPluginState<Weak<DocumentManager>>,
) -> FlowyResult<()> {
let AFPluginData(UploadedFilePB {
url,
local_file_path,
}) = params;
let manager = upgrade_document(manager)?;
manager.download_file(local_file_path, url).await
}
// Handler for deleting file
pub(crate) async fn delete_file_handler(
params: AFPluginData<UploadedFilePB>,
manager: AFPluginState<Weak<DocumentManager>>,
) -> FlowyResult<()> {
let AFPluginData(UploadedFilePB {
url,
local_file_path,
}) = params;
let manager = upgrade_document(manager)?;
manager.delete_file(local_file_path, url).await
}

View File

@ -39,6 +39,9 @@ pub fn init(document_manager: Weak<DocumentManager>) -> AFPlugin {
DocumentEvent::ConvertDataToJSON,
convert_data_to_json_handler,
)
.event(DocumentEvent::UploadFile, upload_file_handler)
.event(DocumentEvent::DownloadFile, download_file_handler)
.event(DocumentEvent::DeleteFile, delete_file_handler)
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Display, ProtoBuf_Enum, Flowy_Event)]
@ -108,4 +111,11 @@ pub enum DocumentEvent {
#[event(input = "DocumentSnapshotMetaPB", output = "DocumentSnapshotPB")]
GetDocumentSnapshot = 14,
#[event(input = "UploadFileParamsPB", output = "UploadedFilePB")]
UploadFile = 15,
#[event(input = "UploadedFilePB")]
DownloadFile = 16,
#[event(input = "UploadedFilePB")]
DeleteFile = 17,
}

View File

@ -10,15 +10,21 @@ use collab_document::blocks::DocumentData;
use collab_document::document::Document;
use collab_document::document_data::default_document_data;
use collab_entity::CollabType;
use flowy_storage::ObjectIdentity;
use flowy_storage::ObjectValue;
use lru::LruCache;
use parking_lot::Mutex;
use tokio::io::AsyncWriteExt;
use tracing::error;
use tracing::info;
use tracing::warn;
use tracing::{event, instrument};
use collab_integrate::collab_builder::{AppFlowyCollabBuilder, CollabBuilderConfig};
use collab_integrate::{CollabKVAction, CollabKVDB, CollabPersistenceConfig};
use flowy_document_pub::cloud::DocumentCloudService;
use flowy_error::{internal_error, ErrorCode, FlowyError, FlowyResult};
use flowy_storage::FileStorageService;
use flowy_storage::ObjectStorageService;
use crate::document::MutexDocument;
use crate::entities::{
@ -45,7 +51,7 @@ pub struct DocumentManager {
collab_builder: Arc<AppFlowyCollabBuilder>,
documents: Arc<Mutex<LruCache<String, Arc<MutexDocument>>>>,
cloud_service: Arc<dyn DocumentCloudService>,
storage_service: Weak<dyn FileStorageService>,
storage_service: Weak<dyn ObjectStorageService>,
snapshot_service: Arc<dyn DocumentSnapshotService>,
}
@ -54,7 +60,7 @@ impl DocumentManager {
user_service: Arc<dyn DocumentUserService>,
collab_builder: Arc<AppFlowyCollabBuilder>,
cloud_service: Arc<dyn DocumentCloudService>,
storage_service: Weak<dyn FileStorageService>,
storage_service: Weak<dyn ObjectStorageService>,
snapshot_service: Arc<dyn DocumentSnapshotService>,
) -> Self {
let documents = Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(10).unwrap())));
@ -246,6 +252,73 @@ impl DocumentManager {
Ok(snapshot)
}
pub async fn upload_file(
&self,
workspace_id: String,
local_file_path: &str,
) -> FlowyResult<String> {
let object_value = ObjectValue::from_file(local_file_path).await?;
let storage_service = self.storage_service_upgrade()?;
let url = {
let hash = fxhash::hash(object_value.raw.as_ref());
storage_service
.get_object_url(ObjectIdentity {
workspace_id: workspace_id.to_owned(),
file_id: hash.to_string(),
})
.await?
};
// let the upload happen in the background
let clone_url = url.clone();
tokio::spawn(async move {
if let Err(e) = storage_service.put_object(clone_url, object_value).await {
error!("upload file failed: {}", e);
}
});
Ok(url)
}
pub async fn download_file(&self, local_file_path: String, url: String) -> FlowyResult<()> {
if tokio::fs::metadata(&local_file_path).await.is_ok() {
warn!("file already exist in user local disk: {}", local_file_path);
return Ok(());
}
let storage_service = self.storage_service_upgrade()?;
let object_value = storage_service.get_object(url).await?;
// create file if not exist
let mut file = tokio::fs::OpenOptions::new()
.create(true)
.write(true)
.open(&local_file_path)
.await?;
let n = file.write(&object_value.raw).await?;
info!("downloaded {} bytes to file: {}", n, local_file_path);
Ok(())
}
pub async fn delete_file(&self, local_file_path: String, url: String) -> FlowyResult<()> {
// delete file from local
tokio::fs::remove_file(local_file_path).await?;
// delete from cloud
let storage_service = self.storage_service_upgrade()?;
tokio::spawn(async move {
if let Err(e) = storage_service.delete_object(url).await {
// TODO: add WAL to log the delete operation.
// keep a list of files to be deleted, and retry later
error!("delete file failed: {}", e);
}
});
Ok(())
}
async fn collab_for_document(
&self,
uid: i64,
@ -279,6 +352,13 @@ impl DocumentManager {
}
}
fn storage_service_upgrade(&self) -> FlowyResult<Arc<dyn ObjectStorageService>> {
let storage_service = self.storage_service.upgrade().ok_or_else(|| {
FlowyError::internal().with_context("The file storage service is already dropped")
})?;
Ok(storage_service)
}
/// Only expose this method for testing
#[cfg(debug_assertions)]
pub fn get_cloud_service(&self) -> &Arc<dyn DocumentCloudService> {
@ -286,7 +366,7 @@ impl DocumentManager {
}
/// Only expose this method for testing
#[cfg(debug_assertions)]
pub fn get_file_storage_service(&self) -> &Weak<dyn FileStorageService> {
pub fn get_file_storage_service(&self) -> &Weak<dyn ObjectStorageService> {
&self.storage_service
}
}

View File

@ -2,7 +2,6 @@ use std::ops::Deref;
use std::sync::Arc;
use anyhow::Error;
use bytes::Bytes;
use collab::core::collab::CollabDocState;
use collab::preclude::CollabPlugin;
use collab_document::blocks::DocumentData;
@ -23,7 +22,7 @@ use flowy_document::entities::{DocumentSnapshotData, DocumentSnapshotMeta};
use flowy_document::manager::{DocumentManager, DocumentSnapshotService, DocumentUserService};
use flowy_document_pub::cloud::*;
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_storage::{FileStorageService, StorageObject};
use flowy_storage::ObjectStorageService;
use lib_infra::async_trait::async_trait;
use lib_infra::future::{to_fut, Fut, FutureResult};
@ -35,7 +34,7 @@ impl DocumentTest {
pub fn new() -> Self {
let user = FakeUser::new();
let cloud_service = Arc::new(LocalTestDocumentCloudServiceImpl());
let file_storage = Arc::new(DocumentTestFileStorageService) as Arc<dyn FileStorageService>;
let file_storage = Arc::new(DocumentTestFileStorageService) as Arc<dyn ObjectStorageService>;
let document_snapshot = Arc::new(DocumentTestSnapshot);
let manager = DocumentManager::new(
Arc::new(user),
@ -165,16 +164,27 @@ impl DocumentCloudService for LocalTestDocumentCloudServiceImpl {
}
pub struct DocumentTestFileStorageService;
impl FileStorageService for DocumentTestFileStorageService {
fn create_object(&self, _object: StorageObject) -> FutureResult<String, FlowyError> {
impl ObjectStorageService for DocumentTestFileStorageService {
fn get_object_url(
&self,
_object_id: flowy_storage::ObjectIdentity,
) -> FutureResult<String, FlowyError> {
todo!()
}
fn delete_object_by_url(&self, _object_url: String) -> FutureResult<(), FlowyError> {
fn put_object(
&self,
_url: String,
_object_value: flowy_storage::ObjectValue,
) -> FutureResult<(), FlowyError> {
todo!()
}
fn get_object_by_url(&self, _object_url: String) -> FutureResult<Bytes, FlowyError> {
fn delete_object(&self, _url: String) -> FutureResult<(), FlowyError> {
todo!()
}
fn get_object(&self, _url: String) -> FutureResult<flowy_storage::ObjectValue, FlowyError> {
todo!()
}
}

View File

@ -0,0 +1 @@