mirror of
https://github.com/AppFlowy-IO/AppFlowy.git
synced 2024-08-30 18:12:39 +00:00
refactor: File upload (#5542)
* chore: rename service * refactor: upload * chore: save upload meta data * chore: add sql test * chore: uploader * chore: fix upload * chore: cache file and remove after finish * chore: retry upload * chore: pause when netowork unreachable * chore: add event test * chore: add test * chore: clippy * chore: update client-api commit id * chore: fix flutter test
This commit is contained in:
@ -3,21 +3,24 @@ name = "flowy-storage"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
[lib]
|
||||
crate-type = ["cdylib", "rlib"]
|
||||
|
||||
[dependencies]
|
||||
reqwest = { version = "0.11", features = ["json", "stream"] }
|
||||
flowy-storage-pub.workspace = true
|
||||
serde_json.workspace = true
|
||||
serde.workspace = true
|
||||
async-trait.workspace = true
|
||||
bytes.workspace = true
|
||||
mime_guess = "2.0"
|
||||
lib-infra = { workspace = true }
|
||||
url = "2.2.2"
|
||||
flowy-error = { workspace = true, features = ["impl_from_reqwest"] }
|
||||
mime = "0.3.17"
|
||||
tokio = { workspace = true, features = ["sync", "io-util"]}
|
||||
flowy-error = { workspace = true, features = ["impl_from_reqwest", "impl_from_sqlite"] }
|
||||
tokio = { workspace = true, features = ["sync", "io-util"] }
|
||||
tracing.workspace = true
|
||||
fxhash = "0.2.1"
|
||||
flowy-sqlite.workspace = true
|
||||
mime_guess = "2.0.4"
|
||||
fxhash = "0.2.1"
|
||||
anyhow = "1.0.86"
|
||||
chrono = "0.4.33"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
uuid = "1.6.1"
|
||||
rand = { version = "0.8", features = ["std_rng"] }
|
||||
|
88
frontend/rust-lib/flowy-storage/src/file_cache.rs
Normal file
88
frontend/rust-lib/flowy-storage/src/file_cache.rs
Normal file
@ -0,0 +1,88 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
|
||||
use tracing::error;
|
||||
|
||||
/// [FileTempStorage] is used to store the temporary files for uploading. After the file is uploaded,
|
||||
/// the file will be deleted.
|
||||
pub struct FileTempStorage {
|
||||
storage_dir: PathBuf,
|
||||
}
|
||||
|
||||
impl FileTempStorage {
|
||||
/// Creates a new `FileTempStorage` with the specified temporary directory.
|
||||
pub fn new(storage_dir: PathBuf) -> Self {
|
||||
if !storage_dir.exists() {
|
||||
if let Err(err) = std::fs::create_dir_all(&storage_dir) {
|
||||
error!("Failed to create temporary storage directory: {:?}", err);
|
||||
}
|
||||
}
|
||||
|
||||
FileTempStorage { storage_dir }
|
||||
}
|
||||
|
||||
/// Generates a temporary file path using the given file name.
|
||||
fn generate_temp_file_path_with_name(&self, file_name: &str) -> PathBuf {
|
||||
self.storage_dir.join(file_name)
|
||||
}
|
||||
|
||||
/// Creates a temporary file from an existing local file path.
|
||||
pub async fn create_temp_file_from_existing(
|
||||
&self,
|
||||
existing_file_path: &Path,
|
||||
) -> io::Result<String> {
|
||||
let file_name = existing_file_path
|
||||
.file_name()
|
||||
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Invalid file name"))?
|
||||
.to_str()
|
||||
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Invalid file name"))?;
|
||||
|
||||
let temp_file_path = self.generate_temp_file_path_with_name(file_name);
|
||||
fs::copy(existing_file_path, &temp_file_path).await?;
|
||||
Ok(
|
||||
temp_file_path
|
||||
.to_str()
|
||||
.ok_or(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
"Invalid file path",
|
||||
))?
|
||||
.to_owned(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Creates a temporary file from bytes and a specified file name.
|
||||
#[allow(dead_code)]
|
||||
pub async fn create_temp_file_from_bytes(
|
||||
&self,
|
||||
file_name: &str,
|
||||
data: &[u8],
|
||||
) -> io::Result<PathBuf> {
|
||||
let temp_file_path = self.generate_temp_file_path_with_name(file_name);
|
||||
let mut file = File::create(&temp_file_path).await?;
|
||||
file.write_all(data).await?;
|
||||
Ok(temp_file_path)
|
||||
}
|
||||
|
||||
/// Writes data to the specified temporary file.
|
||||
#[allow(dead_code)]
|
||||
pub async fn write_to_temp_file(&self, file_path: &Path, data: &[u8]) -> io::Result<()> {
|
||||
let mut file = File::create(file_path).await?;
|
||||
file.write_all(data).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reads data from the specified temporary file.
|
||||
#[allow(dead_code)]
|
||||
pub async fn read_from_temp_file(&self, file_path: &Path) -> io::Result<Vec<u8>> {
|
||||
let mut file = File::open(file_path).await?;
|
||||
let mut data = Vec::new();
|
||||
file.read_to_end(&mut data).await?;
|
||||
Ok(data)
|
||||
}
|
||||
|
||||
/// Deletes the specified temporary file.
|
||||
pub async fn delete_temp_file<T: AsRef<Path>>(&self, file_path: T) -> io::Result<()> {
|
||||
fs::remove_file(file_path).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -1,154 +1,4 @@
|
||||
if_native! {
|
||||
mod native;
|
||||
pub use native::*;
|
||||
}
|
||||
|
||||
if_wasm! {
|
||||
mod wasm;
|
||||
pub use wasm::*;
|
||||
}
|
||||
|
||||
use bytes::Bytes;
|
||||
|
||||
use flowy_error::FlowyError;
|
||||
use lib_infra::future::FutureResult;
|
||||
use lib_infra::{conditional_send_sync_trait, if_native, if_wasm};
|
||||
use mime::Mime;
|
||||
|
||||
pub struct ObjectIdentity {
|
||||
pub workspace_id: String,
|
||||
pub file_id: String,
|
||||
pub ext: String,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ObjectValue {
|
||||
pub raw: Bytes,
|
||||
pub mime: Mime,
|
||||
}
|
||||
conditional_send_sync_trait! {
|
||||
"Provides a service for object storage. The trait includes methods for CRUD operations on storage objects.";
|
||||
ObjectStorageService {
|
||||
/// Creates a new storage object.
|
||||
///
|
||||
/// # Parameters
|
||||
/// - `url`: url of the object to be created.
|
||||
///
|
||||
/// # Returns
|
||||
/// - `Ok()`
|
||||
/// - `Err(Error)`: An error occurred during the operation.
|
||||
fn get_object_url(&self, object_id: ObjectIdentity) -> FutureResult<String, FlowyError>;
|
||||
|
||||
/// Creates a new storage object.
|
||||
///
|
||||
/// # Parameters
|
||||
/// - `url`: url of the object to be created.
|
||||
///
|
||||
/// # Returns
|
||||
/// - `Ok()`
|
||||
/// - `Err(Error)`: An error occurred during the operation.
|
||||
fn put_object(&self, url: String, object_value: ObjectValue) -> FutureResult<(), FlowyError>;
|
||||
|
||||
/// Deletes a storage object by its URL.
|
||||
///
|
||||
/// # Parameters
|
||||
/// - `url`: url of the object to be deleted.
|
||||
///
|
||||
/// # Returns
|
||||
/// - `Ok()`
|
||||
/// - `Err(Error)`: An error occurred during the operation.
|
||||
fn delete_object(&self, url: String) -> FutureResult<(), FlowyError>;
|
||||
|
||||
/// Fetches a storage object by its URL.
|
||||
///
|
||||
/// # Parameters
|
||||
/// - `url`: url of the object
|
||||
///
|
||||
/// # Returns
|
||||
/// - `Ok(File)`: The returned file object.
|
||||
/// - `Err(Error)`: An error occurred during the operation.
|
||||
fn get_object(&self, url: String) -> FutureResult<ObjectValue, FlowyError>;
|
||||
}
|
||||
}
|
||||
|
||||
pub trait FileStoragePlan: Send + Sync + 'static {
|
||||
fn storage_size(&self) -> FutureResult<u64, FlowyError>;
|
||||
fn maximum_file_size(&self) -> FutureResult<u64, FlowyError>;
|
||||
|
||||
fn check_upload_object(&self, object: &StorageObject) -> FutureResult<(), FlowyError>;
|
||||
}
|
||||
|
||||
pub struct StorageObject {
|
||||
pub workspace_id: String,
|
||||
pub file_name: String,
|
||||
pub value: ObjectValueSupabase,
|
||||
}
|
||||
|
||||
pub enum ObjectValueSupabase {
|
||||
File { file_path: String },
|
||||
Bytes { bytes: Bytes, mime: String },
|
||||
}
|
||||
|
||||
impl ObjectValueSupabase {
|
||||
pub fn mime_type(&self) -> String {
|
||||
match self {
|
||||
ObjectValueSupabase::File { file_path } => mime_guess::from_path(file_path)
|
||||
.first_or_octet_stream()
|
||||
.to_string(),
|
||||
ObjectValueSupabase::Bytes { mime, .. } => mime.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl StorageObject {
|
||||
/// Creates a `StorageObject` from a file.
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// * `name`: The name of the storage object.
|
||||
/// * `file_path`: The file path to the storage object's data.
|
||||
///
|
||||
pub fn from_file<T: ToString>(workspace_id: &str, file_name: &str, file_path: T) -> Self {
|
||||
Self {
|
||||
workspace_id: workspace_id.to_string(),
|
||||
file_name: file_name.to_string(),
|
||||
value: ObjectValueSupabase::File {
|
||||
file_path: file_path.to_string(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a `StorageObject` from bytes.
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// * `name`: The name of the storage object.
|
||||
/// * `bytes`: The byte data of the storage object.
|
||||
/// * `mime`: The MIME type of the storage object.
|
||||
///
|
||||
pub fn from_bytes<B: Into<Bytes>>(
|
||||
workspace_id: &str,
|
||||
file_name: &str,
|
||||
bytes: B,
|
||||
mime: String,
|
||||
) -> Self {
|
||||
let bytes = bytes.into();
|
||||
Self {
|
||||
workspace_id: workspace_id.to_string(),
|
||||
file_name: file_name.to_string(),
|
||||
value: ObjectValueSupabase::Bytes { bytes, mime },
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the file size of the `StorageObject`.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// The file size in bytes.
|
||||
pub fn file_size(&self) -> u64 {
|
||||
match &self.value {
|
||||
ObjectValueSupabase::File { file_path } => std::fs::metadata(file_path).unwrap().len(),
|
||||
ObjectValueSupabase::Bytes { bytes, .. } => bytes.len() as u64,
|
||||
}
|
||||
}
|
||||
}
|
||||
mod file_cache;
|
||||
pub mod manager;
|
||||
pub mod sqlite_sql;
|
||||
mod uploader;
|
||||
|
648
frontend/rust-lib/flowy-storage/src/manager.rs
Normal file
648
frontend/rust-lib/flowy-storage/src/manager.rs
Normal file
@ -0,0 +1,648 @@
|
||||
use crate::file_cache::FileTempStorage;
|
||||
use crate::sqlite_sql::{
|
||||
batch_select_upload_file, delete_upload_file, insert_upload_file, insert_upload_part,
|
||||
select_upload_file, select_upload_parts, update_upload_file_upload_id, UploadFilePartTable,
|
||||
UploadFileTable,
|
||||
};
|
||||
use crate::uploader::{FileUploader, FileUploaderRunner, Signal, UploadTask, UploadTaskQueue};
|
||||
use async_trait::async_trait;
|
||||
use flowy_error::{FlowyError, FlowyResult};
|
||||
use flowy_sqlite::DBConnection;
|
||||
use flowy_storage_pub::chunked_byte::{ChunkedBytes, MIN_CHUNK_SIZE};
|
||||
use flowy_storage_pub::cloud::{ObjectIdentity, ObjectValue, StorageCloudService};
|
||||
use flowy_storage_pub::storage::{
|
||||
CompletedPartRequest, CreatedUpload, StorageService, UploadPartResponse, UploadResult,
|
||||
UploadStatus,
|
||||
};
|
||||
use lib_infra::box_any::BoxAny;
|
||||
use lib_infra::future::FutureResult;
|
||||
use lib_infra::util::timestamp;
|
||||
use std::io::ErrorKind;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::sync::watch;
|
||||
use tracing::{debug, error, info, instrument, trace};
|
||||
|
||||
pub trait StorageUserService: Send + Sync + 'static {
|
||||
fn user_id(&self) -> Result<i64, FlowyError>;
|
||||
fn workspace_id(&self) -> Result<String, FlowyError>;
|
||||
fn sqlite_connection(&self, uid: i64) -> Result<DBConnection, FlowyError>;
|
||||
fn get_application_root_dir(&self) -> &str;
|
||||
}
|
||||
|
||||
pub struct StorageManager {
|
||||
pub storage_service: Arc<dyn StorageService>,
|
||||
uploader: Arc<FileUploader>,
|
||||
broadcast: tokio::sync::broadcast::Sender<UploadResult>,
|
||||
}
|
||||
|
||||
impl Drop for StorageManager {
|
||||
fn drop(&mut self) {
|
||||
info!("[File] StorageManager is dropped");
|
||||
}
|
||||
}
|
||||
|
||||
impl StorageManager {
|
||||
pub fn new(
|
||||
cloud_service: Arc<dyn StorageCloudService>,
|
||||
user_service: Arc<dyn StorageUserService>,
|
||||
) -> Self {
|
||||
let temp_storage_path = PathBuf::from(format!(
|
||||
"{}/cache_files",
|
||||
user_service.get_application_root_dir()
|
||||
));
|
||||
let temp_storage = Arc::new(FileTempStorage::new(temp_storage_path));
|
||||
let (notifier, notifier_rx) = watch::channel(Signal::Proceed);
|
||||
let (broadcast, _) = tokio::sync::broadcast::channel::<UploadResult>(100);
|
||||
let task_queue = Arc::new(UploadTaskQueue::new(notifier));
|
||||
let storage_service = Arc::new(StorageServiceImpl {
|
||||
cloud_service,
|
||||
user_service: user_service.clone(),
|
||||
temp_storage,
|
||||
task_queue: task_queue.clone(),
|
||||
upload_status_notifier: broadcast.clone(),
|
||||
});
|
||||
|
||||
let uploader = Arc::new(FileUploader::new(storage_service.clone(), task_queue));
|
||||
tokio::spawn(FileUploaderRunner::run(
|
||||
Arc::downgrade(&uploader),
|
||||
notifier_rx,
|
||||
));
|
||||
|
||||
let weak_uploader = Arc::downgrade(&uploader);
|
||||
tokio::spawn(async move {
|
||||
// Start uploading after 30 seconds
|
||||
tokio::time::sleep(Duration::from_secs(30)).await;
|
||||
if let Some(uploader) = weak_uploader.upgrade() {
|
||||
if let Err(err) = prepare_upload_task(uploader, user_service).await {
|
||||
error!("prepare upload task failed: {}", err);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Self {
|
||||
storage_service,
|
||||
uploader,
|
||||
broadcast,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_network_reachable(&self, reachable: bool) {
|
||||
if reachable {
|
||||
self.uploader.resume();
|
||||
} else {
|
||||
self.uploader.pause();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn subscribe_upload_result(&self) -> tokio::sync::broadcast::Receiver<UploadResult> {
|
||||
self.broadcast.subscribe()
|
||||
}
|
||||
}
|
||||
|
||||
async fn prepare_upload_task(
|
||||
uploader: Arc<FileUploader>,
|
||||
user_service: Arc<dyn StorageUserService>,
|
||||
) -> FlowyResult<()> {
|
||||
let uid = user_service.user_id()?;
|
||||
let conn = user_service.sqlite_connection(uid)?;
|
||||
let upload_files = batch_select_upload_file(conn, 100)?;
|
||||
let tasks = upload_files
|
||||
.into_iter()
|
||||
.map(|upload_file| UploadTask::BackgroundTask {
|
||||
workspace_id: upload_file.workspace_id,
|
||||
file_id: upload_file.file_id,
|
||||
parent_dir: upload_file.parent_dir,
|
||||
created_at: upload_file.created_at,
|
||||
retry_count: 0,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
info!("prepare upload task: {}", tasks.len());
|
||||
uploader.queue_tasks(tasks).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub struct StorageServiceImpl {
|
||||
cloud_service: Arc<dyn StorageCloudService>,
|
||||
user_service: Arc<dyn StorageUserService>,
|
||||
temp_storage: Arc<FileTempStorage>,
|
||||
task_queue: Arc<UploadTaskQueue>,
|
||||
upload_status_notifier: tokio::sync::broadcast::Sender<UploadResult>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl StorageService for StorageServiceImpl {
|
||||
fn upload_object(
|
||||
&self,
|
||||
workspace_id: &str,
|
||||
local_file_path: &str,
|
||||
) -> FutureResult<String, FlowyError> {
|
||||
let cloud_service = self.cloud_service.clone();
|
||||
let workspace_id = workspace_id.to_string();
|
||||
let local_file_path = local_file_path.to_string();
|
||||
FutureResult::new(async move {
|
||||
let (object_identity, object_value) =
|
||||
object_from_disk(&workspace_id, &local_file_path).await?;
|
||||
let url = cloud_service.get_object_url(object_identity).await?;
|
||||
match cloud_service.put_object(url.clone(), object_value).await {
|
||||
Ok(_) => {
|
||||
debug!("[File] success uploaded file to cloud: {}", url);
|
||||
},
|
||||
Err(err) => {
|
||||
error!("[File] upload file failed: {}", err);
|
||||
return Err(err);
|
||||
},
|
||||
}
|
||||
Ok(url)
|
||||
})
|
||||
}
|
||||
|
||||
fn delete_object(&self, url: String, local_file_path: String) -> FlowyResult<()> {
|
||||
let cloud_service = self.cloud_service.clone();
|
||||
tokio::spawn(async move {
|
||||
match tokio::fs::remove_file(&local_file_path).await {
|
||||
Ok(_) => {
|
||||
debug!("[File] deleted file from local disk: {}", local_file_path)
|
||||
},
|
||||
Err(err) => {
|
||||
error!("[File] delete file at {} failed: {}", local_file_path, err);
|
||||
},
|
||||
}
|
||||
if let Err(e) = cloud_service.delete_object(&url).await {
|
||||
// TODO: add WAL to log the delete operation.
|
||||
// keep a list of files to be deleted, and retry later
|
||||
error!("[File] delete file failed: {}", e);
|
||||
}
|
||||
debug!("[File] deleted file from cloud: {}", url);
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn download_object(&self, url: String, local_file_path: String) -> FlowyResult<()> {
|
||||
let cloud_service = self.cloud_service.clone();
|
||||
tokio::spawn(async move {
|
||||
if tokio::fs::metadata(&local_file_path).await.is_ok() {
|
||||
tracing::warn!("file already exist in user local disk: {}", local_file_path);
|
||||
return Ok(());
|
||||
}
|
||||
let object_value = cloud_service.get_object(url).await?;
|
||||
let mut file = tokio::fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.write(true)
|
||||
.open(&local_file_path)
|
||||
.await?;
|
||||
|
||||
match file.write(&object_value.raw).await {
|
||||
Ok(n) => {
|
||||
info!("downloaded {} bytes to file: {}", n, local_file_path);
|
||||
},
|
||||
Err(err) => {
|
||||
error!("write file failed: {}", err);
|
||||
},
|
||||
}
|
||||
Ok::<_, FlowyError>(())
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_upload(
|
||||
&self,
|
||||
workspace_id: &str,
|
||||
parent_dir: &str,
|
||||
file_path: &str,
|
||||
) -> FutureResult<CreatedUpload, FlowyError> {
|
||||
if workspace_id.is_empty() {
|
||||
return FutureResult::new(async {
|
||||
Err(FlowyError::internal().with_context("workspace id is empty"))
|
||||
});
|
||||
}
|
||||
|
||||
if parent_dir.is_empty() {
|
||||
return FutureResult::new(async {
|
||||
Err(FlowyError::internal().with_context("parent dir is empty"))
|
||||
});
|
||||
}
|
||||
|
||||
if file_path.is_empty() {
|
||||
return FutureResult::new(async {
|
||||
Err(FlowyError::internal().with_context("local file path is empty"))
|
||||
});
|
||||
}
|
||||
|
||||
let workspace_id = workspace_id.to_string();
|
||||
let parent_dir = parent_dir.to_string();
|
||||
let file_path = file_path.to_string();
|
||||
let temp_storage = self.temp_storage.clone();
|
||||
let task_queue = self.task_queue.clone();
|
||||
let user_service = self.user_service.clone();
|
||||
let cloud_service = self.cloud_service.clone();
|
||||
|
||||
FutureResult::new(async move {
|
||||
let local_file_path = temp_storage
|
||||
.create_temp_file_from_existing(Path::new(&file_path))
|
||||
.await
|
||||
.map_err(|err| {
|
||||
error!("[File] create temp file failed: {}", err);
|
||||
FlowyError::internal()
|
||||
.with_context(format!("create temp file for upload file failed: {}", err))
|
||||
})?;
|
||||
|
||||
// 1. create a file record and chunk the file
|
||||
let (chunks, record) =
|
||||
create_upload_record(workspace_id, parent_dir, local_file_path).await?;
|
||||
|
||||
// 2. save the record to sqlite
|
||||
let conn = user_service.sqlite_connection(user_service.user_id()?)?;
|
||||
insert_upload_file(conn, &record)?;
|
||||
|
||||
// 3. generate url for given file
|
||||
let url = cloud_service.get_object_url_v1(
|
||||
&record.workspace_id,
|
||||
&record.parent_dir,
|
||||
&record.file_id,
|
||||
)?;
|
||||
let file_id = record.file_id.clone();
|
||||
|
||||
task_queue
|
||||
.queue_task(UploadTask::Task {
|
||||
chunks,
|
||||
record,
|
||||
retry_count: 0,
|
||||
})
|
||||
.await;
|
||||
|
||||
Ok::<_, FlowyError>(CreatedUpload { url, file_id })
|
||||
})
|
||||
}
|
||||
|
||||
async fn start_upload(&self, chunks: &ChunkedBytes, record: &BoxAny) -> Result<(), FlowyError> {
|
||||
let file_record = record.downcast_ref::<UploadFileTable>().ok_or_else(|| {
|
||||
FlowyError::internal().with_context("failed to downcast record to UploadFileTable")
|
||||
})?;
|
||||
|
||||
if let Err(err) = start_upload(
|
||||
&self.cloud_service,
|
||||
&self.user_service,
|
||||
&self.temp_storage,
|
||||
chunks,
|
||||
file_record,
|
||||
self.upload_status_notifier.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!("[File] start upload failed: {}", err);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn resume_upload(
|
||||
&self,
|
||||
workspace_id: &str,
|
||||
parent_dir: &str,
|
||||
file_id: &str,
|
||||
) -> Result<(), FlowyError> {
|
||||
// Gathering the upload record and parts from the sqlite database.
|
||||
let record = {
|
||||
let mut conn = self
|
||||
.user_service
|
||||
.sqlite_connection(self.user_service.user_id()?)?;
|
||||
conn.immediate_transaction(|conn| {
|
||||
Ok::<_, FlowyError>(
|
||||
// When resuming an upload, check if the upload_id is empty.
|
||||
// If the upload_id is empty, the upload has likely not been created yet.
|
||||
// If the upload_id is not empty, verify which parts have already been uploaded.
|
||||
select_upload_file(conn, &workspace_id, &parent_dir, &file_id)?.and_then(|record| {
|
||||
if record.upload_id.is_empty() {
|
||||
Some((record, vec![]))
|
||||
} else {
|
||||
let parts = select_upload_parts(conn, &record.upload_id).unwrap_or_default();
|
||||
Some((record, parts))
|
||||
}
|
||||
}),
|
||||
)
|
||||
})?
|
||||
};
|
||||
|
||||
if let Some((upload_file, parts)) = record {
|
||||
resume_upload(
|
||||
&self.cloud_service,
|
||||
&self.user_service,
|
||||
&self.temp_storage,
|
||||
upload_file,
|
||||
parts,
|
||||
self.upload_status_notifier.clone(),
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
error!("[File] resume upload failed: record not found");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn create_upload_record(
|
||||
workspace_id: String,
|
||||
parent_dir: String,
|
||||
local_file_path: String,
|
||||
) -> FlowyResult<(ChunkedBytes, UploadFileTable)> {
|
||||
// read file and chunk it base on CHUNK_SIZE. We use MIN_CHUNK_SIZE as the minimum chunk size
|
||||
let chunked_bytes = ChunkedBytes::from_file(&local_file_path, MIN_CHUNK_SIZE as i32).await?;
|
||||
let ext = Path::new(&local_file_path)
|
||||
.extension()
|
||||
.and_then(std::ffi::OsStr::to_str)
|
||||
.unwrap_or("")
|
||||
.to_owned();
|
||||
let content_type = mime_guess::from_path(&local_file_path)
|
||||
.first_or_octet_stream()
|
||||
.to_string();
|
||||
let file_id = format!("{}.{}", fxhash::hash(&chunked_bytes.data).to_string(), ext);
|
||||
let record = UploadFileTable {
|
||||
workspace_id,
|
||||
file_id,
|
||||
upload_id: "".to_string(),
|
||||
parent_dir,
|
||||
local_file_path,
|
||||
content_type,
|
||||
chunk_size: chunked_bytes.chunk_size,
|
||||
num_chunk: chunked_bytes.offsets.len() as i32,
|
||||
created_at: timestamp(),
|
||||
};
|
||||
Ok((chunked_bytes, record))
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip_all, err)]
|
||||
async fn start_upload(
|
||||
cloud_service: &Arc<dyn StorageCloudService>,
|
||||
user_service: &Arc<dyn StorageUserService>,
|
||||
temp_storage: &Arc<FileTempStorage>,
|
||||
chunked_bytes: &ChunkedBytes,
|
||||
upload_file: &UploadFileTable,
|
||||
notifier: tokio::sync::broadcast::Sender<UploadResult>,
|
||||
) -> FlowyResult<()> {
|
||||
let mut upload_file = upload_file.clone();
|
||||
if upload_file.upload_id.is_empty() {
|
||||
// 1. create upload
|
||||
trace!(
|
||||
"[File] create upload for workspace: {}, parent_dir: {}, file_id: {}",
|
||||
upload_file.workspace_id,
|
||||
upload_file.parent_dir,
|
||||
upload_file.file_id
|
||||
);
|
||||
|
||||
let create_upload_resp = cloud_service
|
||||
.create_upload(
|
||||
&upload_file.workspace_id,
|
||||
&upload_file.parent_dir,
|
||||
&upload_file.file_id,
|
||||
&upload_file.content_type,
|
||||
)
|
||||
.await?;
|
||||
// 2. update upload_id
|
||||
let conn = user_service.sqlite_connection(user_service.user_id()?)?;
|
||||
update_upload_file_upload_id(
|
||||
conn,
|
||||
&upload_file.workspace_id,
|
||||
&upload_file.parent_dir,
|
||||
&upload_file.file_id,
|
||||
&create_upload_resp.upload_id,
|
||||
)?;
|
||||
|
||||
trace!(
|
||||
"[File] {} update upload_id: {}",
|
||||
upload_file.file_id,
|
||||
create_upload_resp.upload_id
|
||||
);
|
||||
// temporary store the upload_id
|
||||
upload_file.upload_id = create_upload_resp.upload_id;
|
||||
}
|
||||
|
||||
let _ = notifier.send(UploadResult {
|
||||
file_id: upload_file.file_id.clone(),
|
||||
status: UploadStatus::InProgress,
|
||||
});
|
||||
|
||||
// 3. start uploading parts
|
||||
trace!(
|
||||
"[File] {} start uploading parts: {}",
|
||||
upload_file.file_id,
|
||||
chunked_bytes.iter().count()
|
||||
);
|
||||
let mut iter = chunked_bytes.iter().enumerate();
|
||||
let mut completed_parts = Vec::new();
|
||||
|
||||
while let Some((index, chunk_bytes)) = iter.next() {
|
||||
let part_number = index as i32 + 1;
|
||||
trace!(
|
||||
"[File] {} uploading part: {}, len:{}KB",
|
||||
upload_file.file_id,
|
||||
part_number,
|
||||
chunk_bytes.len() / 1000,
|
||||
);
|
||||
// start uploading parts
|
||||
match upload_part(
|
||||
&cloud_service,
|
||||
&user_service,
|
||||
&upload_file.workspace_id,
|
||||
&upload_file.parent_dir,
|
||||
&upload_file.upload_id,
|
||||
&upload_file.file_id,
|
||||
part_number,
|
||||
chunk_bytes.to_vec(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(resp) => {
|
||||
trace!(
|
||||
"[File] {} upload {} part success, total:{},",
|
||||
upload_file.file_id,
|
||||
part_number,
|
||||
chunked_bytes.offsets.len()
|
||||
);
|
||||
// gather completed part
|
||||
completed_parts.push(CompletedPartRequest {
|
||||
e_tag: resp.e_tag,
|
||||
part_number: resp.part_num,
|
||||
});
|
||||
},
|
||||
Err(err) => {
|
||||
error!("[File] {} upload part failed: {}", upload_file.file_id, err);
|
||||
return Err(err);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// mark it as completed
|
||||
complete_upload(
|
||||
&cloud_service,
|
||||
&user_service,
|
||||
temp_storage,
|
||||
&upload_file,
|
||||
completed_parts,
|
||||
notifier,
|
||||
)
|
||||
.await?;
|
||||
|
||||
trace!("[File] {} upload completed", upload_file.file_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip_all, err)]
|
||||
async fn resume_upload(
|
||||
cloud_service: &Arc<dyn StorageCloudService>,
|
||||
user_service: &Arc<dyn StorageUserService>,
|
||||
temp_storage: &Arc<FileTempStorage>,
|
||||
upload_file: UploadFileTable,
|
||||
parts: Vec<UploadFilePartTable>,
|
||||
notifier: tokio::sync::broadcast::Sender<UploadResult>,
|
||||
) -> FlowyResult<()> {
|
||||
trace!(
|
||||
"[File] resume upload for workspace: {}, parent_dir: {}, file_id: {}, local_file_path:{}",
|
||||
upload_file.workspace_id,
|
||||
upload_file.parent_dir,
|
||||
upload_file.file_id,
|
||||
upload_file.local_file_path
|
||||
);
|
||||
|
||||
match ChunkedBytes::from_file(&upload_file.local_file_path, MIN_CHUNK_SIZE as i32).await {
|
||||
Ok(mut chunked_bytes) => {
|
||||
// When there were any parts already uploaded, skip those parts by setting the current offset.
|
||||
chunked_bytes.set_current_offset(parts.len() as i32);
|
||||
start_upload(
|
||||
cloud_service,
|
||||
user_service,
|
||||
temp_storage,
|
||||
&chunked_bytes,
|
||||
&upload_file,
|
||||
notifier,
|
||||
)
|
||||
.await?;
|
||||
},
|
||||
Err(err) => {
|
||||
//
|
||||
match err.kind() {
|
||||
ErrorKind::NotFound => {
|
||||
error!("[File] file not found: {}", upload_file.local_file_path);
|
||||
if let Ok(uid) = user_service.user_id() {
|
||||
if let Ok(conn) = user_service.sqlite_connection(uid) {
|
||||
delete_upload_file(conn, &upload_file.upload_id)?;
|
||||
}
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
error!("[File] read file failed: {}", err);
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip_all)]
|
||||
async fn upload_part(
|
||||
cloud_service: &Arc<dyn StorageCloudService>,
|
||||
user_service: &Arc<dyn StorageUserService>,
|
||||
workspace_id: &str,
|
||||
parent_dir: &str,
|
||||
upload_id: &str,
|
||||
file_id: &str,
|
||||
part_number: i32,
|
||||
body: Vec<u8>,
|
||||
) -> Result<UploadPartResponse, FlowyError> {
|
||||
let resp = cloud_service
|
||||
.upload_part(
|
||||
&workspace_id,
|
||||
&parent_dir,
|
||||
&upload_id,
|
||||
&file_id,
|
||||
part_number,
|
||||
body,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// save uploaded part to sqlite
|
||||
let conn = user_service.sqlite_connection(user_service.user_id()?)?;
|
||||
insert_upload_part(
|
||||
conn,
|
||||
&UploadFilePartTable {
|
||||
upload_id: upload_id.to_string(),
|
||||
e_tag: resp.e_tag.clone(),
|
||||
part_num: resp.part_num,
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
async fn complete_upload(
|
||||
cloud_service: &Arc<dyn StorageCloudService>,
|
||||
user_service: &Arc<dyn StorageUserService>,
|
||||
temp_storage: &Arc<FileTempStorage>,
|
||||
upload_file: &UploadFileTable,
|
||||
parts: Vec<CompletedPartRequest>,
|
||||
notifier: tokio::sync::broadcast::Sender<UploadResult>,
|
||||
) -> Result<(), FlowyError> {
|
||||
match cloud_service
|
||||
.complete_upload(
|
||||
&upload_file.workspace_id,
|
||||
&upload_file.parent_dir,
|
||||
&upload_file.upload_id,
|
||||
&upload_file.file_id,
|
||||
parts,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
info!("[File] completed upload file: {}", upload_file.upload_id);
|
||||
trace!("[File] delete upload record from sqlite");
|
||||
let _ = notifier.send(UploadResult {
|
||||
file_id: upload_file.file_id.clone(),
|
||||
status: UploadStatus::Finish,
|
||||
});
|
||||
|
||||
let conn = user_service.sqlite_connection(user_service.user_id()?)?;
|
||||
delete_upload_file(conn, &upload_file.upload_id)?;
|
||||
if let Err(err) = temp_storage
|
||||
.delete_temp_file(&upload_file.local_file_path)
|
||||
.await
|
||||
{
|
||||
error!("[File] delete temp file failed: {}", err);
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
error!("[File] complete upload failed: {}", err);
|
||||
},
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn object_from_disk(
|
||||
workspace_id: &str,
|
||||
local_file_path: &str,
|
||||
) -> Result<(ObjectIdentity, ObjectValue), FlowyError> {
|
||||
let ext = Path::new(local_file_path)
|
||||
.extension()
|
||||
.and_then(std::ffi::OsStr::to_str)
|
||||
.unwrap_or("")
|
||||
.to_owned();
|
||||
let mut file = tokio::fs::File::open(local_file_path).await?;
|
||||
let mut content = Vec::new();
|
||||
let n = file.read_to_end(&mut content).await?;
|
||||
info!("read {} bytes from file: {}", n, local_file_path);
|
||||
let mime = mime_guess::from_path(local_file_path).first_or_octet_stream();
|
||||
let hash = fxhash::hash(&content);
|
||||
|
||||
Ok((
|
||||
ObjectIdentity {
|
||||
workspace_id: workspace_id.to_owned(),
|
||||
file_id: hash.to_string(),
|
||||
ext,
|
||||
},
|
||||
ObjectValue {
|
||||
raw: content.into(),
|
||||
mime,
|
||||
},
|
||||
))
|
||||
}
|
@ -1,34 +0,0 @@
|
||||
use crate::{ObjectIdentity, ObjectValue};
|
||||
use flowy_error::FlowyError;
|
||||
use std::path::Path;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tracing::info;
|
||||
|
||||
pub async fn object_from_disk(
|
||||
workspace_id: &str,
|
||||
local_file_path: &str,
|
||||
) -> Result<(ObjectIdentity, ObjectValue), FlowyError> {
|
||||
let ext = Path::new(local_file_path)
|
||||
.extension()
|
||||
.and_then(std::ffi::OsStr::to_str)
|
||||
.unwrap_or("")
|
||||
.to_owned();
|
||||
let mut file = tokio::fs::File::open(local_file_path).await?;
|
||||
let mut content = Vec::new();
|
||||
let n = file.read_to_end(&mut content).await?;
|
||||
info!("read {} bytes from file: {}", n, local_file_path);
|
||||
let mime = mime_guess::from_path(local_file_path).first_or_octet_stream();
|
||||
let hash = fxhash::hash(&content);
|
||||
|
||||
Ok((
|
||||
ObjectIdentity {
|
||||
workspace_id: workspace_id.to_owned(),
|
||||
file_id: hash.to_string(),
|
||||
ext,
|
||||
},
|
||||
ObjectValue {
|
||||
raw: content.into(),
|
||||
mime,
|
||||
},
|
||||
))
|
||||
}
|
161
frontend/rust-lib/flowy-storage/src/sqlite_sql.rs
Normal file
161
frontend/rust-lib/flowy-storage/src/sqlite_sql.rs
Normal file
@ -0,0 +1,161 @@
|
||||
use flowy_error::{FlowyError, FlowyResult};
|
||||
use flowy_sqlite::schema::{upload_file_part, upload_file_table};
|
||||
use flowy_sqlite::{
|
||||
diesel, AsChangeset, BoolExpressionMethods, DBConnection, ExpressionMethods, Identifiable,
|
||||
Insertable, OptionalExtension, QueryDsl, Queryable, RunQueryDsl, SqliteConnection,
|
||||
};
|
||||
use tracing::warn;
|
||||
|
||||
#[derive(Queryable, Insertable, AsChangeset, Identifiable, Debug, Clone)]
|
||||
#[diesel(table_name = upload_file_table)]
|
||||
#[diesel(primary_key(workspace_id, parent_dir, file_id))]
|
||||
pub struct UploadFileTable {
|
||||
pub workspace_id: String,
|
||||
pub file_id: String,
|
||||
pub parent_dir: String,
|
||||
pub local_file_path: String,
|
||||
pub content_type: String,
|
||||
pub chunk_size: i32,
|
||||
pub num_chunk: i32,
|
||||
pub upload_id: String,
|
||||
pub created_at: i64,
|
||||
}
|
||||
|
||||
#[derive(Queryable, Insertable, AsChangeset, Identifiable, Debug)]
|
||||
#[diesel(table_name = upload_file_part)]
|
||||
#[diesel(primary_key(upload_id, part_num))]
|
||||
pub struct UploadFilePartTable {
|
||||
pub upload_id: String,
|
||||
pub e_tag: String,
|
||||
pub part_num: i32,
|
||||
}
|
||||
|
||||
pub fn is_upload_file_exist(
|
||||
conn: &mut SqliteConnection,
|
||||
workspace_id: &str,
|
||||
parent_dir: &str,
|
||||
file_id: &str,
|
||||
) -> FlowyResult<bool> {
|
||||
let result = upload_file_table::dsl::upload_file_table
|
||||
.filter(
|
||||
upload_file_table::workspace_id
|
||||
.eq(workspace_id)
|
||||
.and(upload_file_table::parent_dir.eq(parent_dir))
|
||||
.and(upload_file_table::file_id.eq(file_id)),
|
||||
)
|
||||
.first::<UploadFileTable>(conn)
|
||||
.optional()?;
|
||||
Ok(result.is_some())
|
||||
}
|
||||
|
||||
pub fn insert_upload_file(
|
||||
mut conn: DBConnection,
|
||||
upload_file: &UploadFileTable,
|
||||
) -> FlowyResult<()> {
|
||||
diesel::insert_into(upload_file_table::table)
|
||||
.values(upload_file)
|
||||
.execute(&mut *conn)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn update_upload_file_upload_id(
|
||||
mut conn: DBConnection,
|
||||
workspace_id: &str,
|
||||
parent_dir: &str,
|
||||
file_id: &str,
|
||||
upload_id: &str,
|
||||
) -> FlowyResult<()> {
|
||||
diesel::update(
|
||||
upload_file_table::dsl::upload_file_table.filter(
|
||||
upload_file_table::workspace_id
|
||||
.eq(workspace_id)
|
||||
.and(upload_file_table::parent_dir.eq(parent_dir))
|
||||
.and(upload_file_table::file_id.eq(file_id)),
|
||||
),
|
||||
)
|
||||
.set(upload_file_table::upload_id.eq(upload_id))
|
||||
.execute(&mut *conn)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn insert_upload_part(
|
||||
mut conn: DBConnection,
|
||||
upload_part: &UploadFilePartTable,
|
||||
) -> FlowyResult<()> {
|
||||
diesel::insert_into(upload_file_part::table)
|
||||
.values(upload_part)
|
||||
.execute(&mut *conn)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn select_latest_upload_part(
|
||||
mut conn: DBConnection,
|
||||
upload_id: &str,
|
||||
) -> FlowyResult<Option<UploadFilePartTable>> {
|
||||
let result = upload_file_part::dsl::upload_file_part
|
||||
.filter(upload_file_part::upload_id.eq(upload_id))
|
||||
.order(upload_file_part::part_num.desc())
|
||||
.first::<UploadFilePartTable>(&mut *conn)
|
||||
.optional()?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub fn select_upload_parts(
|
||||
conn: &mut SqliteConnection,
|
||||
upload_id: &str,
|
||||
) -> FlowyResult<Vec<UploadFilePartTable>> {
|
||||
let results = upload_file_part::dsl::upload_file_part
|
||||
.filter(upload_file_part::upload_id.eq(upload_id))
|
||||
.load::<UploadFilePartTable>(conn)?;
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
pub fn batch_select_upload_file(
|
||||
mut conn: DBConnection,
|
||||
limit: i32,
|
||||
) -> FlowyResult<Vec<UploadFileTable>> {
|
||||
let results = upload_file_table::dsl::upload_file_table
|
||||
.order(upload_file_table::created_at.desc())
|
||||
.limit(limit.into())
|
||||
.load::<UploadFileTable>(&mut conn)?;
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
pub fn select_upload_file(
|
||||
conn: &mut SqliteConnection,
|
||||
workspace_id: &str,
|
||||
parent_dir: &str,
|
||||
file_id: &str,
|
||||
) -> FlowyResult<Option<UploadFileTable>> {
|
||||
let result = upload_file_table::dsl::upload_file_table
|
||||
.filter(
|
||||
upload_file_table::workspace_id
|
||||
.eq(workspace_id)
|
||||
.and(upload_file_table::parent_dir.eq(parent_dir))
|
||||
.and(upload_file_table::file_id.eq(file_id)),
|
||||
)
|
||||
.first::<UploadFileTable>(conn)
|
||||
.optional()?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub fn delete_upload_file(mut conn: DBConnection, upload_id: &str) -> FlowyResult<()> {
|
||||
conn.immediate_transaction(|conn| {
|
||||
diesel::delete(
|
||||
upload_file_table::dsl::upload_file_table.filter(upload_file_table::upload_id.eq(upload_id)),
|
||||
)
|
||||
.execute(&mut *conn)?;
|
||||
|
||||
if let Err(err) = diesel::delete(
|
||||
upload_file_part::dsl::upload_file_part.filter(upload_file_part::upload_id.eq(upload_id)),
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
{
|
||||
warn!("Failed to delete upload parts: {:?}", err)
|
||||
}
|
||||
|
||||
Ok::<_, FlowyError>(())
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
294
frontend/rust-lib/flowy-storage/src/uploader.rs
Normal file
294
frontend/rust-lib/flowy-storage/src/uploader.rs
Normal file
@ -0,0 +1,294 @@
|
||||
use crate::sqlite_sql::UploadFileTable;
|
||||
use crate::uploader::UploadTask::BackgroundTask;
|
||||
use flowy_storage_pub::chunked_byte::ChunkedBytes;
|
||||
use flowy_storage_pub::storage::StorageService;
|
||||
use lib_infra::box_any::BoxAny;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::BinaryHeap;
|
||||
use std::fmt::Display;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU8};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::Duration;
|
||||
use tokio::sync::{watch, RwLock};
|
||||
use tracing::{info, trace};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum Signal {
|
||||
Stop,
|
||||
Proceed,
|
||||
ProceedAfterSecs(u64),
|
||||
}
|
||||
|
||||
pub struct UploadTaskQueue {
|
||||
tasks: RwLock<BinaryHeap<UploadTask>>,
|
||||
notifier: watch::Sender<Signal>,
|
||||
}
|
||||
|
||||
impl UploadTaskQueue {
|
||||
pub fn new(notifier: watch::Sender<Signal>) -> Self {
|
||||
Self {
|
||||
tasks: Default::default(),
|
||||
notifier,
|
||||
}
|
||||
}
|
||||
pub async fn queue_task(&self, task: UploadTask) {
|
||||
trace!("[File] Queued task: {}", task);
|
||||
self.tasks.write().await.push(task);
|
||||
let _ = self.notifier.send(Signal::Proceed);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FileUploader {
|
||||
storage_service: Arc<dyn StorageService>,
|
||||
queue: Arc<UploadTaskQueue>,
|
||||
max_uploads: u8,
|
||||
current_uploads: AtomicU8,
|
||||
pause_sync: AtomicBool,
|
||||
}
|
||||
|
||||
impl Drop for FileUploader {
|
||||
fn drop(&mut self) {
|
||||
let _ = self.queue.notifier.send(Signal::Stop);
|
||||
}
|
||||
}
|
||||
|
||||
impl FileUploader {
|
||||
pub fn new(storage_service: Arc<dyn StorageService>, queue: Arc<UploadTaskQueue>) -> Self {
|
||||
Self {
|
||||
storage_service,
|
||||
queue,
|
||||
max_uploads: 3,
|
||||
current_uploads: Default::default(),
|
||||
pause_sync: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn queue_tasks(&self, tasks: Vec<UploadTask>) {
|
||||
let mut queue_lock = self.queue.tasks.write().await;
|
||||
for task in tasks {
|
||||
queue_lock.push(task);
|
||||
}
|
||||
let _ = self.queue.notifier.send(Signal::Proceed);
|
||||
}
|
||||
|
||||
pub fn pause(&self) {
|
||||
self
|
||||
.pause_sync
|
||||
.store(true, std::sync::atomic::Ordering::SeqCst);
|
||||
}
|
||||
|
||||
pub fn resume(&self) {
|
||||
self
|
||||
.pause_sync
|
||||
.store(false, std::sync::atomic::Ordering::SeqCst);
|
||||
let _ = self.queue.notifier.send(Signal::ProceedAfterSecs(3));
|
||||
}
|
||||
|
||||
pub async fn process_next(&self) -> Option<()> {
|
||||
// Do not proceed if the uploader is paused.
|
||||
if self.pause_sync.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
return None;
|
||||
}
|
||||
|
||||
trace!(
|
||||
"[File] Max concurrent uploads: {}, current: {}",
|
||||
self.max_uploads,
|
||||
self
|
||||
.current_uploads
|
||||
.load(std::sync::atomic::Ordering::SeqCst)
|
||||
);
|
||||
|
||||
if self
|
||||
.current_uploads
|
||||
.load(std::sync::atomic::Ordering::SeqCst)
|
||||
>= self.max_uploads
|
||||
{
|
||||
// If the current uploads count is greater than or equal to the max uploads, do not proceed.
|
||||
let _ = self.queue.notifier.send(Signal::ProceedAfterSecs(10));
|
||||
return None;
|
||||
}
|
||||
|
||||
let task = self.queue.tasks.write().await.pop()?;
|
||||
if task.retry_count() > 5 {
|
||||
// If the task has been retried more than 5 times, we should not retry it anymore.
|
||||
let _ = self.queue.notifier.send(Signal::ProceedAfterSecs(2));
|
||||
return None;
|
||||
}
|
||||
|
||||
// increment the current uploads count
|
||||
self
|
||||
.current_uploads
|
||||
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
||||
|
||||
match task {
|
||||
UploadTask::Task {
|
||||
chunks,
|
||||
record,
|
||||
mut retry_count,
|
||||
} => {
|
||||
let record = BoxAny::new(record);
|
||||
if let Err(err) = self.storage_service.start_upload(&chunks, &record).await {
|
||||
info!(
|
||||
"Failed to upload file: {}, retry_count:{}",
|
||||
err, retry_count
|
||||
);
|
||||
|
||||
let record = record.unbox_or_error().unwrap();
|
||||
retry_count += 1;
|
||||
self.queue.tasks.write().await.push(UploadTask::Task {
|
||||
chunks,
|
||||
record,
|
||||
retry_count,
|
||||
});
|
||||
}
|
||||
},
|
||||
UploadTask::BackgroundTask {
|
||||
workspace_id,
|
||||
parent_dir,
|
||||
file_id,
|
||||
created_at,
|
||||
mut retry_count,
|
||||
} => {
|
||||
if let Err(err) = self
|
||||
.storage_service
|
||||
.resume_upload(&workspace_id, &parent_dir, &file_id)
|
||||
.await
|
||||
{
|
||||
info!(
|
||||
"Failed to resume upload file: {}, retry_count:{}",
|
||||
err, retry_count
|
||||
);
|
||||
retry_count += 1;
|
||||
self.queue.tasks.write().await.push(BackgroundTask {
|
||||
workspace_id,
|
||||
parent_dir,
|
||||
file_id,
|
||||
created_at,
|
||||
retry_count,
|
||||
});
|
||||
}
|
||||
},
|
||||
}
|
||||
self
|
||||
.current_uploads
|
||||
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
|
||||
let _ = self.queue.notifier.send(Signal::ProceedAfterSecs(2));
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FileUploaderRunner;
|
||||
|
||||
impl FileUploaderRunner {
|
||||
pub async fn run(weak_uploader: Weak<FileUploader>, mut notifier: watch::Receiver<Signal>) {
|
||||
loop {
|
||||
// stops the runner if the notifier was closed.
|
||||
if notifier.changed().await.is_err() {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(uploader) = weak_uploader.upgrade() {
|
||||
let value = notifier.borrow().clone();
|
||||
match value {
|
||||
Signal::Stop => break,
|
||||
Signal::Proceed => {
|
||||
tokio::spawn(async move {
|
||||
uploader.process_next().await;
|
||||
});
|
||||
},
|
||||
Signal::ProceedAfterSecs(secs) => {
|
||||
tokio::time::sleep(Duration::from_secs(secs)).await;
|
||||
tokio::spawn(async move {
|
||||
uploader.process_next().await;
|
||||
});
|
||||
},
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum UploadTask {
|
||||
Task {
|
||||
chunks: ChunkedBytes,
|
||||
record: UploadFileTable,
|
||||
retry_count: u8,
|
||||
},
|
||||
BackgroundTask {
|
||||
workspace_id: String,
|
||||
file_id: String,
|
||||
parent_dir: String,
|
||||
created_at: i64,
|
||||
retry_count: u8,
|
||||
},
|
||||
}
|
||||
|
||||
impl UploadTask {
|
||||
pub fn retry_count(&self) -> u8 {
|
||||
match self {
|
||||
Self::Task { retry_count, .. } => *retry_count,
|
||||
Self::BackgroundTask { retry_count, .. } => *retry_count,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for UploadTask {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Task { record, .. } => write!(f, "Task: {}", record.file_id),
|
||||
Self::BackgroundTask { file_id, .. } => write!(f, "BackgroundTask: {}", file_id),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl Eq for UploadTask {}
|
||||
|
||||
impl PartialEq for UploadTask {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
match (self, other) {
|
||||
(Self::Task { record: lhs, .. }, Self::Task { record: rhs, .. }) => {
|
||||
lhs.local_file_path == rhs.local_file_path
|
||||
},
|
||||
(
|
||||
Self::BackgroundTask {
|
||||
workspace_id: l_workspace_id,
|
||||
file_id: l_file_id,
|
||||
..
|
||||
},
|
||||
Self::BackgroundTask {
|
||||
workspace_id: r_workspace_id,
|
||||
file_id: r_file_id,
|
||||
..
|
||||
},
|
||||
) => l_workspace_id == r_workspace_id && l_file_id == r_file_id,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for UploadTask {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for UploadTask {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
match (self, other) {
|
||||
(Self::Task { record: lhs, .. }, Self::Task { record: rhs, .. }) => {
|
||||
lhs.created_at.cmp(&rhs.created_at)
|
||||
},
|
||||
(_, Self::Task { .. }) => Ordering::Less,
|
||||
(Self::Task { .. }, _) => Ordering::Greater,
|
||||
(
|
||||
Self::BackgroundTask {
|
||||
created_at: lhs, ..
|
||||
},
|
||||
Self::BackgroundTask {
|
||||
created_at: rhs, ..
|
||||
},
|
||||
) => lhs.cmp(rhs),
|
||||
}
|
||||
}
|
||||
}
|
@ -1,12 +0,0 @@
|
||||
use crate::{ObjectIdentity, ObjectValue};
|
||||
use flowy_error::FlowyError;
|
||||
|
||||
pub async fn object_from_disk(
|
||||
_workspace_id: &str,
|
||||
_local_file_path: &str,
|
||||
) -> Result<(ObjectIdentity, ObjectValue), FlowyError> {
|
||||
Err(
|
||||
FlowyError::not_support()
|
||||
.with_context(format!("object_from_disk is not implemented for wasm32")),
|
||||
)
|
||||
}
|
@ -0,0 +1,181 @@
|
||||
use flowy_sqlite::Database;
|
||||
use flowy_storage::sqlite_sql::{
|
||||
batch_select_upload_file, delete_upload_file, insert_upload_file, insert_upload_part,
|
||||
select_latest_upload_part, select_upload_parts, UploadFilePartTable, UploadFileTable,
|
||||
};
|
||||
use flowy_storage_pub::chunked_byte::{ChunkedBytes, MIN_CHUNK_SIZE};
|
||||
use rand::distributions::Alphanumeric;
|
||||
use rand::{thread_rng, Rng};
|
||||
use std::env::temp_dir;
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
pub fn test_database() -> (Database, PathBuf) {
|
||||
let db_path = temp_dir().join(&format!("test-{}.db", generate_random_string(8)));
|
||||
(flowy_sqlite::init(&db_path).unwrap(), db_path)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_insert_new_upload() {
|
||||
let (db, _) = test_database();
|
||||
|
||||
let workspace_id = uuid::Uuid::new_v4().to_string();
|
||||
|
||||
// test insert one upload file record
|
||||
let mut upload_ids = vec![];
|
||||
for _i in 0..5 {
|
||||
let upload_id = uuid::Uuid::new_v4().to_string();
|
||||
let local_file_path = create_temp_file_with_random_content(8 * 1024 * 1024).unwrap();
|
||||
let upload_file =
|
||||
create_upload_file_record(workspace_id.clone(), upload_id.clone(), local_file_path).await;
|
||||
upload_ids.push(upload_file.upload_id.clone());
|
||||
|
||||
// insert
|
||||
let conn = db.get_connection().unwrap();
|
||||
insert_upload_file(conn, &upload_file).unwrap();
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
upload_ids.reverse();
|
||||
|
||||
// select
|
||||
let conn = db.get_connection().unwrap();
|
||||
let records = batch_select_upload_file(conn, 100).unwrap();
|
||||
|
||||
assert_eq!(records.len(), 5);
|
||||
// compare the upload id order is the same as upload_ids
|
||||
for i in 0..5 {
|
||||
assert_eq!(records[i].upload_id, upload_ids[i]);
|
||||
|
||||
// delete
|
||||
let conn = db.get_connection().unwrap();
|
||||
delete_upload_file(conn, &records[i].upload_id).unwrap();
|
||||
}
|
||||
|
||||
let conn = db.get_connection().unwrap();
|
||||
let records = batch_select_upload_file(conn, 100).unwrap();
|
||||
assert!(records.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_upload_part_test() {
|
||||
let (db, _) = test_database();
|
||||
|
||||
let workspace_id = uuid::Uuid::new_v4().to_string();
|
||||
|
||||
// test insert one upload file record
|
||||
let upload_id = uuid::Uuid::new_v4().to_string();
|
||||
let local_file_path = create_temp_file_with_random_content(20 * 1024 * 1024).unwrap();
|
||||
let upload_file =
|
||||
create_upload_file_record(workspace_id.clone(), upload_id.clone(), local_file_path).await;
|
||||
|
||||
// insert
|
||||
let conn = db.get_connection().unwrap();
|
||||
insert_upload_file(conn, &upload_file).unwrap();
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
||||
// insert uploaded part 1
|
||||
let part = UploadFilePartTable {
|
||||
upload_id: upload_id.clone(),
|
||||
e_tag: "1".to_string(),
|
||||
part_num: 1,
|
||||
};
|
||||
let conn = db.get_connection().unwrap();
|
||||
insert_upload_part(conn, &part).unwrap();
|
||||
|
||||
// insert uploaded part 2
|
||||
let part = UploadFilePartTable {
|
||||
upload_id: upload_id.clone(),
|
||||
e_tag: "2".to_string(),
|
||||
part_num: 2,
|
||||
};
|
||||
let conn = db.get_connection().unwrap();
|
||||
insert_upload_part(conn, &part).unwrap();
|
||||
|
||||
// get latest part
|
||||
let conn = db.get_connection().unwrap();
|
||||
let part = select_latest_upload_part(conn, &upload_id)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(part.part_num, 2);
|
||||
|
||||
// get all existing parts
|
||||
let mut conn = db.get_connection().unwrap();
|
||||
let parts = select_upload_parts(&mut *conn, &upload_id).unwrap();
|
||||
assert_eq!(parts.len(), 2);
|
||||
assert_eq!(parts[0].part_num, 1);
|
||||
assert_eq!(parts[1].part_num, 2);
|
||||
|
||||
// delete upload file and then all existing parts will be deleted
|
||||
let conn = db.get_connection().unwrap();
|
||||
delete_upload_file(conn, &upload_id).unwrap();
|
||||
|
||||
let mut conn = db.get_connection().unwrap();
|
||||
let parts = select_upload_parts(&mut *conn, &upload_id).unwrap();
|
||||
assert!(parts.is_empty())
|
||||
}
|
||||
|
||||
pub fn generate_random_string(len: usize) -> String {
|
||||
let rng = thread_rng();
|
||||
rng
|
||||
.sample_iter(&Alphanumeric)
|
||||
.take(len)
|
||||
.map(char::from)
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn create_temp_file_with_random_content(
|
||||
size_in_bytes: usize,
|
||||
) -> Result<String, Box<dyn std::error::Error>> {
|
||||
// Generate a random string of the specified size
|
||||
let content: String = rand::thread_rng()
|
||||
.sample_iter(&Alphanumeric)
|
||||
.take(size_in_bytes)
|
||||
.map(char::from)
|
||||
.collect();
|
||||
|
||||
// Create a temporary file path
|
||||
let file_path = std::env::temp_dir().join("test.txt");
|
||||
|
||||
// Write the content to the temporary file
|
||||
let mut file = File::create(&file_path)?;
|
||||
file.write_all(content.as_bytes())?;
|
||||
|
||||
// Return the file path
|
||||
Ok(file_path.to_str().unwrap().to_string())
|
||||
}
|
||||
|
||||
pub async fn create_upload_file_record(
|
||||
workspace_id: String,
|
||||
upload_id: String,
|
||||
local_file_path: String,
|
||||
) -> UploadFileTable {
|
||||
// Create ChunkedBytes from file
|
||||
let chunked_bytes = ChunkedBytes::from_file(&local_file_path, MIN_CHUNK_SIZE as i32)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Determine content type
|
||||
let content_type = mime_guess::from_path(&local_file_path)
|
||||
.first_or_octet_stream()
|
||||
.to_string();
|
||||
|
||||
// Calculate file ID
|
||||
let file_id = fxhash::hash(&chunked_bytes.data).to_string();
|
||||
|
||||
// Create UploadFileTable record
|
||||
let upload_file = UploadFileTable {
|
||||
workspace_id,
|
||||
file_id,
|
||||
upload_id,
|
||||
parent_dir: "test".to_string(),
|
||||
local_file_path,
|
||||
content_type,
|
||||
chunk_size: MIN_CHUNK_SIZE as i32,
|
||||
num_chunk: chunked_bytes.offsets.len() as i32,
|
||||
created_at: chrono::Utc::now().timestamp(),
|
||||
};
|
||||
|
||||
upload_file
|
||||
}
|
Reference in New Issue
Block a user