mirror of
https://github.com/AppFlowy-IO/AppFlowy.git
synced 2024-08-30 18:12:39 +00:00
parent
6d09c33782
commit
6a0650e6d5
@ -2,6 +2,7 @@ use crate::document::generate_random_bytes;
|
||||
use event_integration_test::user_event::user_localhost_af_cloud;
|
||||
use event_integration_test::EventIntegrationTest;
|
||||
use flowy_storage_pub::storage::FileUploadState;
|
||||
use lib_infra::util::md5;
|
||||
use std::env::temp_dir;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@ -9,19 +10,21 @@ use tokio::fs;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::time::timeout;
|
||||
|
||||
#[tokio::test]
|
||||
async fn af_cloud_upload_big_file_test() {
|
||||
user_localhost_af_cloud().await;
|
||||
let mut test = EventIntegrationTest::new().await;
|
||||
test.af_cloud_sign_up().await;
|
||||
tokio::time::sleep(Duration::from_secs(6)).await;
|
||||
|
||||
let parent_dir = "temp_test";
|
||||
let workspace_id = test.get_current_workspace().await.id;
|
||||
let (file_path, upload_data) = generate_file_with_bytes_len(15 * 1024 * 1024).await;
|
||||
let (created_upload, rx) = test
|
||||
.storage_manager
|
||||
.storage_service
|
||||
.create_upload(&workspace_id, "temp_test", &file_path, false)
|
||||
.create_upload(&workspace_id, parent_dir, &file_path, false)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@ -42,15 +45,22 @@ async fn af_cloud_upload_big_file_test() {
|
||||
|
||||
// Restart the test. It will load unfinished uploads
|
||||
let test = EventIntegrationTest::new_with_config(config).await;
|
||||
let mut rx = test
|
||||
if let Some(mut rx) = test
|
||||
.storage_manager
|
||||
.subscribe_file_state(&created_upload.file_id)
|
||||
.subscribe_file_state(parent_dir, &created_upload.file_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
while let Some(state) = rx.recv().await {
|
||||
if let FileUploadState::Finished { .. } = state {
|
||||
break;
|
||||
.unwrap()
|
||||
{
|
||||
let timeout_duration = Duration::from_secs(180);
|
||||
while let Some(state) = match timeout(timeout_duration, rx.recv()).await {
|
||||
Ok(result) => result,
|
||||
Err(_) => {
|
||||
panic!("Timed out waiting for file upload completion");
|
||||
},
|
||||
} {
|
||||
if let FileUploadState::Finished { .. } = state {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -62,8 +72,7 @@ async fn af_cloud_upload_big_file_test() {
|
||||
.file_storage()
|
||||
.unwrap();
|
||||
let file = file_service.get_object(created_upload.url).await.unwrap();
|
||||
assert_eq!(file.raw.to_vec(), upload_data);
|
||||
|
||||
assert_eq!(md5(file.raw), md5(upload_data));
|
||||
let _ = fs::remove_file(file_path).await;
|
||||
}
|
||||
|
||||
|
@ -195,7 +195,7 @@ impl StorageService for DocumentTestFileStorageService {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn start_upload(&self, _chunks: &ChunkedBytes, _record: &BoxAny) -> Result<(), FlowyError> {
|
||||
async fn start_upload(&self, _chunks: ChunkedBytes, _record: &BoxAny) -> Result<(), FlowyError> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
@ -208,7 +208,11 @@ impl StorageService for DocumentTestFileStorageService {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn subscribe_file_progress(&self, _url: &str) -> Result<FileProgressReceiver, FlowyError> {
|
||||
async fn subscribe_file_progress(
|
||||
&self,
|
||||
_parent_idr: &str,
|
||||
_url: &str,
|
||||
) -> Result<Option<FileProgressReceiver>, FlowyError> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,2 @@
|
||||
-- This file should undo anything in `up.sql`
|
||||
ALTER TABLE upload_file_table DROP COLUMN is_finish;
|
@ -0,0 +1,2 @@
|
||||
-- Your SQL goes here
|
||||
ALTER TABLE upload_file_table ADD COLUMN is_finish BOOLEAN NOT NULL DEFAULT FALSE;
|
@ -64,6 +64,7 @@ diesel::table! {
|
||||
num_chunk -> Integer,
|
||||
upload_id -> Text,
|
||||
created_at -> BigInt,
|
||||
is_finish -> Bool,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -9,6 +9,7 @@ use tokio::io::AsyncReadExt;
|
||||
/// In Amazon S3, the minimum chunk size for multipart uploads is 5 MB,except for the last part,
|
||||
/// which can be smaller.(https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html)
|
||||
pub const MIN_CHUNK_SIZE: usize = 5 * 1024 * 1024; // Minimum Chunk Size 5 MB
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ChunkedBytes {
|
||||
pub data: Bytes,
|
||||
pub chunk_size: i32,
|
||||
@ -28,8 +29,11 @@ impl Display for ChunkedBytes {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"ChunkedBytes: chunk_size: {}, offsets: {:?}, current_offset: {}",
|
||||
self.chunk_size, self.offsets, self.current_offset
|
||||
"data:{}, chunk_size:{}, num chunk:{}, offset:{}",
|
||||
self.data.len(),
|
||||
self.chunk_size,
|
||||
self.offsets.len(),
|
||||
self.current_offset
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@ -21,7 +21,7 @@ pub trait StorageService: Send + Sync {
|
||||
upload_immediately: bool,
|
||||
) -> Result<(CreatedUpload, Option<FileProgressReceiver>), FlowyError>;
|
||||
|
||||
async fn start_upload(&self, chunks: &ChunkedBytes, record: &BoxAny) -> Result<(), FlowyError>;
|
||||
async fn start_upload(&self, chunks: ChunkedBytes, record: &BoxAny) -> Result<(), FlowyError>;
|
||||
|
||||
async fn resume_upload(
|
||||
&self,
|
||||
@ -32,8 +32,9 @@ pub trait StorageService: Send + Sync {
|
||||
|
||||
async fn subscribe_file_progress(
|
||||
&self,
|
||||
parent_idr: &str,
|
||||
file_id: &str,
|
||||
) -> Result<FileProgressReceiver, FlowyError>;
|
||||
) -> Result<Option<FileProgressReceiver>, FlowyError>;
|
||||
}
|
||||
|
||||
pub struct FileProgressReceiver {
|
||||
|
@ -2,8 +2,8 @@ use crate::file_cache::FileTempStorage;
|
||||
use crate::notification::{make_notification, StorageNotification};
|
||||
use crate::sqlite_sql::{
|
||||
batch_select_upload_file, delete_upload_file, insert_upload_file, insert_upload_part,
|
||||
select_upload_file, select_upload_parts, update_upload_file_upload_id, UploadFilePartTable,
|
||||
UploadFileTable,
|
||||
is_upload_completed, select_upload_file, select_upload_parts, update_upload_file_completed,
|
||||
update_upload_file_upload_id, UploadFilePartTable, UploadFileTable,
|
||||
};
|
||||
use crate::uploader::{FileUploader, FileUploaderRunner, Signal, UploadTask, UploadTaskQueue};
|
||||
use async_trait::async_trait;
|
||||
@ -117,9 +117,13 @@ impl StorageManager {
|
||||
|
||||
pub async fn subscribe_file_state(
|
||||
&self,
|
||||
parent_dir: &str,
|
||||
file_id: &str,
|
||||
) -> Result<FileProgressReceiver, FlowyError> {
|
||||
self.storage_service.subscribe_file_progress(file_id).await
|
||||
) -> Result<Option<FileProgressReceiver>, FlowyError> {
|
||||
self
|
||||
.storage_service
|
||||
.subscribe_file_progress(parent_dir, file_id)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_file_state(&self, file_id: &str) -> Option<FileUploadState> {
|
||||
@ -313,7 +317,7 @@ impl StorageService for StorageServiceImpl {
|
||||
}
|
||||
}
|
||||
|
||||
async fn start_upload(&self, chunks: &ChunkedBytes, record: &BoxAny) -> Result<(), FlowyError> {
|
||||
async fn start_upload(&self, chunks: ChunkedBytes, record: &BoxAny) -> Result<(), FlowyError> {
|
||||
let file_record = record.downcast_ref::<UploadFileTable>().ok_or_else(|| {
|
||||
FlowyError::internal().with_context("failed to downcast record to UploadFileTable")
|
||||
})?;
|
||||
@ -340,34 +344,16 @@ impl StorageService for StorageServiceImpl {
|
||||
file_id: &str,
|
||||
) -> Result<(), FlowyError> {
|
||||
// Gathering the upload record and parts from the sqlite database.
|
||||
let record = {
|
||||
let mut conn = self
|
||||
.user_service
|
||||
.sqlite_connection(self.user_service.user_id()?)?;
|
||||
conn.immediate_transaction(|conn| {
|
||||
Ok::<_, FlowyError>(
|
||||
// When resuming an upload, check if the upload_id is empty.
|
||||
// If the upload_id is empty, the upload has likely not been created yet.
|
||||
// If the upload_id is not empty, verify which parts have already been uploaded.
|
||||
select_upload_file(conn, workspace_id, parent_dir, file_id)?.map(|record| {
|
||||
if record.upload_id.is_empty() {
|
||||
(record, vec![])
|
||||
} else {
|
||||
let parts = select_upload_parts(conn, &record.upload_id).unwrap_or_default();
|
||||
(record, parts)
|
||||
}
|
||||
}),
|
||||
)
|
||||
})?
|
||||
};
|
||||
let mut conn = self
|
||||
.user_service
|
||||
.sqlite_connection(self.user_service.user_id()?)?;
|
||||
|
||||
if let Some((upload_file, parts)) = record {
|
||||
if let Some(upload_file) = select_upload_file(&mut conn, workspace_id, parent_dir, file_id)? {
|
||||
resume_upload(
|
||||
&self.cloud_service,
|
||||
&self.user_service,
|
||||
&self.temp_storage,
|
||||
upload_file,
|
||||
parts,
|
||||
self.progress_notifiers.clone(),
|
||||
)
|
||||
.await?;
|
||||
@ -379,18 +365,32 @@ impl StorageService for StorageServiceImpl {
|
||||
|
||||
async fn subscribe_file_progress(
|
||||
&self,
|
||||
parent_idr: &str,
|
||||
file_id: &str,
|
||||
) -> Result<FileProgressReceiver, FlowyError> {
|
||||
) -> Result<Option<FileProgressReceiver>, FlowyError> {
|
||||
trace!("[File]: subscribe file progress: {}", file_id);
|
||||
|
||||
let is_completed = {
|
||||
let mut conn = self
|
||||
.user_service
|
||||
.sqlite_connection(self.user_service.user_id()?)?;
|
||||
let workspace_id = self.user_service.workspace_id()?;
|
||||
is_upload_completed(&mut conn, &workspace_id, parent_idr, file_id).unwrap_or(false)
|
||||
};
|
||||
if is_completed {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let (notifier, receiver) = ProgressNotifier::new();
|
||||
let receiver = FileProgressReceiver {
|
||||
rx: receiver,
|
||||
file_id: file_id.to_string(),
|
||||
};
|
||||
|
||||
self
|
||||
.progress_notifiers
|
||||
.insert(file_id.to_string(), notifier);
|
||||
Ok(receiver)
|
||||
Ok(Some(receiver))
|
||||
}
|
||||
}
|
||||
|
||||
@ -420,6 +420,7 @@ async fn create_upload_record(
|
||||
chunk_size: chunked_bytes.chunk_size,
|
||||
num_chunk: chunked_bytes.offsets.len() as i32,
|
||||
created_at: timestamp(),
|
||||
is_finish: false,
|
||||
};
|
||||
Ok((chunked_bytes, record))
|
||||
}
|
||||
@ -429,10 +430,29 @@ async fn start_upload(
|
||||
cloud_service: &Arc<dyn StorageCloudService>,
|
||||
user_service: &Arc<dyn StorageUserService>,
|
||||
temp_storage: &Arc<FileTempStorage>,
|
||||
chunked_bytes: &ChunkedBytes,
|
||||
mut chunked_bytes: ChunkedBytes,
|
||||
upload_file: &UploadFileTable,
|
||||
progress_notifiers: Arc<DashMap<String, ProgressNotifier>>,
|
||||
) -> FlowyResult<()> {
|
||||
// 4. gather existing completed parts
|
||||
let mut conn = user_service.sqlite_connection(user_service.user_id()?)?;
|
||||
let mut completed_parts = select_upload_parts(&mut conn, &upload_file.upload_id)
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.map(|part| CompletedPartRequest {
|
||||
e_tag: part.e_tag,
|
||||
part_number: part.part_num,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let upload_offset = completed_parts.len() as i32;
|
||||
chunked_bytes.set_current_offset(upload_offset);
|
||||
|
||||
info!(
|
||||
"[File] start upload: workspace: {}, parent_dir: {}, file_id: {}, chunk: {}",
|
||||
upload_file.workspace_id, upload_file.parent_dir, upload_file.file_id, chunked_bytes,
|
||||
);
|
||||
|
||||
let mut upload_file = upload_file.clone();
|
||||
if upload_file.upload_id.is_empty() {
|
||||
// 1. create upload
|
||||
@ -488,25 +508,10 @@ async fn start_upload(
|
||||
let total_parts = chunked_bytes.iter().count();
|
||||
let iter = chunked_bytes.iter().enumerate();
|
||||
|
||||
let mut conn = user_service.sqlite_connection(user_service.user_id()?)?;
|
||||
|
||||
// 4. gather existing completed parts
|
||||
let mut completed_parts = select_upload_parts(&mut conn, &upload_file.upload_id)
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.map(|part| CompletedPartRequest {
|
||||
e_tag: part.e_tag,
|
||||
part_number: part.part_num,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// when there are any existing parts, skip those parts by setting the current offset.
|
||||
let offset = completed_parts.len();
|
||||
|
||||
for (index, chunk_bytes) in iter {
|
||||
let part_number = offset + index + 1;
|
||||
let part_number = upload_offset + index as i32 + 1;
|
||||
trace!(
|
||||
"[File] {} uploading part: {}, len:{}KB",
|
||||
"[File] {} uploading {}th part, size:{}KB",
|
||||
upload_file.file_id,
|
||||
part_number,
|
||||
chunk_bytes.len() / 1000,
|
||||
@ -584,7 +589,6 @@ async fn resume_upload(
|
||||
user_service: &Arc<dyn StorageUserService>,
|
||||
temp_storage: &Arc<FileTempStorage>,
|
||||
upload_file: UploadFileTable,
|
||||
parts: Vec<UploadFilePartTable>,
|
||||
progress_notifiers: Arc<DashMap<String, ProgressNotifier>>,
|
||||
) -> FlowyResult<()> {
|
||||
trace!(
|
||||
@ -596,14 +600,13 @@ async fn resume_upload(
|
||||
);
|
||||
|
||||
match ChunkedBytes::from_file(&upload_file.local_file_path, MIN_CHUNK_SIZE as i32).await {
|
||||
Ok(mut chunked_bytes) => {
|
||||
Ok(chunked_bytes) => {
|
||||
// When there were any parts already uploaded, skip those parts by setting the current offset.
|
||||
chunked_bytes.set_current_offset(parts.len() as i32);
|
||||
start_upload(
|
||||
cloud_service,
|
||||
user_service,
|
||||
temp_storage,
|
||||
&chunked_bytes,
|
||||
chunked_bytes,
|
||||
&upload_file,
|
||||
progress_notifiers,
|
||||
)
|
||||
@ -675,7 +678,7 @@ async fn complete_upload(
|
||||
progress_notifiers: &Arc<DashMap<String, ProgressNotifier>>,
|
||||
) -> Result<(), FlowyError> {
|
||||
trace!(
|
||||
"[File]: completing file upload: {}, part: {}",
|
||||
"[File]: completing file upload: {}, num parts: {}",
|
||||
upload_file.file_id,
|
||||
parts.len()
|
||||
);
|
||||
@ -692,7 +695,7 @@ async fn complete_upload(
|
||||
Ok(_) => {
|
||||
info!("[File] completed upload file: {}", upload_file.file_id);
|
||||
if let Some(mut notifier) = progress_notifiers.get_mut(&upload_file.file_id) {
|
||||
trace!("[File]: notify upload finished");
|
||||
info!("[File]: notify upload:{} finished", upload_file.file_id);
|
||||
notifier
|
||||
.notify(FileUploadState::Finished {
|
||||
file_id: upload_file.file_id.clone(),
|
||||
@ -700,9 +703,8 @@ async fn complete_upload(
|
||||
.await;
|
||||
}
|
||||
|
||||
trace!("[File] delete upload record from sqlite");
|
||||
let conn = user_service.sqlite_connection(user_service.user_id()?)?;
|
||||
delete_upload_file(conn, &upload_file.upload_id)?;
|
||||
update_upload_file_completed(conn, &upload_file.upload_id)?;
|
||||
if let Err(err) = temp_storage
|
||||
.delete_temp_file(&upload_file.local_file_path)
|
||||
.await
|
||||
|
@ -21,6 +21,7 @@ pub struct UploadFileTable {
|
||||
pub num_chunk: i32,
|
||||
pub upload_id: String,
|
||||
pub created_at: i64,
|
||||
pub is_finish: bool,
|
||||
}
|
||||
|
||||
#[derive(Queryable, Insertable, AsChangeset, Identifiable, Debug)]
|
||||
@ -87,6 +88,55 @@ pub fn update_upload_file_upload_id(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn update_upload_file_completed(mut conn: DBConnection, upload_id: &str) -> FlowyResult<()> {
|
||||
diesel::update(
|
||||
upload_file_table::dsl::upload_file_table.filter(upload_file_table::upload_id.eq(upload_id)),
|
||||
)
|
||||
.set(upload_file_table::is_finish.eq(true))
|
||||
.execute(&mut *conn)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn is_upload_completed(
|
||||
conn: &mut SqliteConnection,
|
||||
workspace_id: &str,
|
||||
parent_dir: &str,
|
||||
file_id: &str,
|
||||
) -> FlowyResult<bool> {
|
||||
let result = upload_file_table::dsl::upload_file_table
|
||||
.filter(
|
||||
upload_file_table::workspace_id
|
||||
.eq(workspace_id)
|
||||
.and(upload_file_table::parent_dir.eq(parent_dir))
|
||||
.and(upload_file_table::file_id.eq(file_id))
|
||||
.and(upload_file_table::is_finish.eq(true)),
|
||||
)
|
||||
.first::<UploadFileTable>(conn)
|
||||
.optional()?;
|
||||
Ok(result.is_some())
|
||||
}
|
||||
|
||||
pub fn delete_upload_file(mut conn: DBConnection, upload_id: &str) -> FlowyResult<()> {
|
||||
conn.immediate_transaction(|conn| {
|
||||
diesel::delete(
|
||||
upload_file_table::dsl::upload_file_table.filter(upload_file_table::upload_id.eq(upload_id)),
|
||||
)
|
||||
.execute(&mut *conn)?;
|
||||
|
||||
if let Err(err) = diesel::delete(
|
||||
upload_file_part::dsl::upload_file_part.filter(upload_file_part::upload_id.eq(upload_id)),
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
{
|
||||
warn!("Failed to delete upload parts: {:?}", err)
|
||||
}
|
||||
|
||||
Ok::<_, FlowyError>(())
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn insert_upload_part(
|
||||
mut conn: DBConnection,
|
||||
upload_part: &UploadFilePartTable,
|
||||
@ -147,24 +197,3 @@ pub fn select_upload_file(
|
||||
.optional()?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub fn delete_upload_file(mut conn: DBConnection, upload_id: &str) -> FlowyResult<()> {
|
||||
conn.immediate_transaction(|conn| {
|
||||
diesel::delete(
|
||||
upload_file_table::dsl::upload_file_table.filter(upload_file_table::upload_id.eq(upload_id)),
|
||||
)
|
||||
.execute(&mut *conn)?;
|
||||
|
||||
if let Err(err) = diesel::delete(
|
||||
upload_file_part::dsl::upload_file_part.filter(upload_file_part::upload_id.eq(upload_id)),
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
{
|
||||
warn!("Failed to delete upload parts: {:?}", err)
|
||||
}
|
||||
|
||||
Ok::<_, FlowyError>(())
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -164,7 +164,11 @@ impl FileUploader {
|
||||
mut retry_count,
|
||||
} => {
|
||||
let record = BoxAny::new(record);
|
||||
if let Err(err) = self.storage_service.start_upload(&chunks, &record).await {
|
||||
if let Err(err) = self
|
||||
.storage_service
|
||||
.start_upload(chunks.clone(), &record)
|
||||
.await
|
||||
{
|
||||
if err.is_file_limit_exceeded() {
|
||||
error!("Failed to upload file: {}", err);
|
||||
self.disable_storage_write();
|
||||
|
@ -175,5 +175,6 @@ pub async fn create_upload_file_record(
|
||||
chunk_size: MIN_CHUNK_SIZE as i32,
|
||||
num_chunk: chunked_bytes.offsets.len() as i32,
|
||||
created_at: chrono::Utc::now().timestamp(),
|
||||
is_finish: false,
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user