chore: fix file upload test (#6016)

* chore: fix file upload test
This commit is contained in:
Nathan.fooo 2024-08-20 15:18:57 +08:00 committed by nathan
parent 495601b683
commit e9adaae333
11 changed files with 153 additions and 94 deletions

View File

@ -2,6 +2,7 @@ use crate::document::generate_random_bytes;
use event_integration_test::user_event::user_localhost_af_cloud; use event_integration_test::user_event::user_localhost_af_cloud;
use event_integration_test::EventIntegrationTest; use event_integration_test::EventIntegrationTest;
use flowy_storage_pub::storage::FileUploadState; use flowy_storage_pub::storage::FileUploadState;
use lib_infra::util::md5;
use std::env::temp_dir; use std::env::temp_dir;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -9,19 +10,21 @@ use tokio::fs;
use tokio::fs::File; use tokio::fs::File;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::time::timeout;
#[tokio::test] #[tokio::test]
async fn af_cloud_upload_big_file_test() { async fn af_cloud_upload_big_file_test() {
user_localhost_af_cloud().await; user_localhost_af_cloud().await;
let mut test = EventIntegrationTest::new().await; let mut test = EventIntegrationTest::new().await;
test.af_cloud_sign_up().await; test.af_cloud_sign_up().await;
tokio::time::sleep(Duration::from_secs(6)).await; tokio::time::sleep(Duration::from_secs(6)).await;
let parent_dir = "temp_test";
let workspace_id = test.get_current_workspace().await.id; let workspace_id = test.get_current_workspace().await.id;
let (file_path, upload_data) = generate_file_with_bytes_len(15 * 1024 * 1024).await; let (file_path, upload_data) = generate_file_with_bytes_len(15 * 1024 * 1024).await;
let (created_upload, rx) = test let (created_upload, rx) = test
.storage_manager .storage_manager
.storage_service .storage_service
.create_upload(&workspace_id, "temp_test", &file_path, false) .create_upload(&workspace_id, parent_dir, &file_path, false)
.await .await
.unwrap(); .unwrap();
@ -42,17 +45,24 @@ async fn af_cloud_upload_big_file_test() {
// Restart the test. It will load unfinished uploads // Restart the test. It will load unfinished uploads
let test = EventIntegrationTest::new_with_config(config).await; let test = EventIntegrationTest::new_with_config(config).await;
let mut rx = test if let Some(mut rx) = test
.storage_manager .storage_manager
.subscribe_file_state(&created_upload.file_id) .subscribe_file_state(parent_dir, &created_upload.file_id)
.await .await
.unwrap(); .unwrap()
{
while let Some(state) = rx.recv().await { let timeout_duration = Duration::from_secs(180);
while let Some(state) = match timeout(timeout_duration, rx.recv()).await {
Ok(result) => result,
Err(_) => {
panic!("Timed out waiting for file upload completion");
},
} {
if let FileUploadState::Finished { .. } = state { if let FileUploadState::Finished { .. } = state {
break; break;
} }
} }
}
// download the file and then compare the data. // download the file and then compare the data.
let file_service = test let file_service = test
@ -62,8 +72,7 @@ async fn af_cloud_upload_big_file_test() {
.file_storage() .file_storage()
.unwrap(); .unwrap();
let file = file_service.get_object(created_upload.url).await.unwrap(); let file = file_service.get_object(created_upload.url).await.unwrap();
assert_eq!(file.raw.to_vec(), upload_data); assert_eq!(md5(file.raw), md5(upload_data));
let _ = fs::remove_file(file_path).await; let _ = fs::remove_file(file_path).await;
} }

View File

@ -196,7 +196,7 @@ impl StorageService for DocumentTestFileStorageService {
todo!() todo!()
} }
async fn start_upload(&self, _chunks: &ChunkedBytes, _record: &BoxAny) -> Result<(), FlowyError> { async fn start_upload(&self, _chunks: ChunkedBytes, _record: &BoxAny) -> Result<(), FlowyError> {
todo!() todo!()
} }
@ -209,7 +209,11 @@ impl StorageService for DocumentTestFileStorageService {
todo!() todo!()
} }
async fn subscribe_file_progress(&self, _url: &str) -> Result<FileProgressReceiver, FlowyError> { async fn subscribe_file_progress(
&self,
_parent_idr: &str,
_url: &str,
) -> Result<Option<FileProgressReceiver>, FlowyError> {
todo!() todo!()
} }
} }

View File

@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
ALTER TABLE upload_file_table DROP COLUMN is_finish;

View File

@ -0,0 +1,2 @@
-- Your SQL goes here
ALTER TABLE upload_file_table ADD COLUMN is_finish BOOLEAN NOT NULL DEFAULT FALSE;

View File

@ -64,6 +64,7 @@ diesel::table! {
num_chunk -> Integer, num_chunk -> Integer,
upload_id -> Text, upload_id -> Text,
created_at -> BigInt, created_at -> BigInt,
is_finish -> Bool,
} }
} }

View File

@ -9,6 +9,7 @@ use tokio::io::AsyncReadExt;
/// In Amazon S3, the minimum chunk size for multipart uploads is 5 MB,except for the last part, /// In Amazon S3, the minimum chunk size for multipart uploads is 5 MB,except for the last part,
/// which can be smaller.(https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html) /// which can be smaller.(https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html)
pub const MIN_CHUNK_SIZE: usize = 5 * 1024 * 1024; // Minimum Chunk Size 5 MB pub const MIN_CHUNK_SIZE: usize = 5 * 1024 * 1024; // Minimum Chunk Size 5 MB
#[derive(Debug, Clone)]
pub struct ChunkedBytes { pub struct ChunkedBytes {
pub data: Bytes, pub data: Bytes,
pub chunk_size: i32, pub chunk_size: i32,
@ -28,8 +29,11 @@ impl Display for ChunkedBytes {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!( write!(
f, f,
"ChunkedBytes: chunk_size: {}, offsets: {:?}, current_offset: {}", "data:{}, chunk_size:{}, num chunk:{}, offset:{}",
self.chunk_size, self.offsets, self.current_offset self.data.len(),
self.chunk_size,
self.offsets.len(),
self.current_offset
) )
} }
} }

View File

@ -21,7 +21,7 @@ pub trait StorageService: Send + Sync {
upload_immediately: bool, upload_immediately: bool,
) -> Result<(CreatedUpload, Option<FileProgressReceiver>), FlowyError>; ) -> Result<(CreatedUpload, Option<FileProgressReceiver>), FlowyError>;
async fn start_upload(&self, chunks: &ChunkedBytes, record: &BoxAny) -> Result<(), FlowyError>; async fn start_upload(&self, chunks: ChunkedBytes, record: &BoxAny) -> Result<(), FlowyError>;
async fn resume_upload( async fn resume_upload(
&self, &self,
@ -32,8 +32,9 @@ pub trait StorageService: Send + Sync {
async fn subscribe_file_progress( async fn subscribe_file_progress(
&self, &self,
parent_idr: &str,
file_id: &str, file_id: &str,
) -> Result<FileProgressReceiver, FlowyError>; ) -> Result<Option<FileProgressReceiver>, FlowyError>;
} }
pub struct FileProgressReceiver { pub struct FileProgressReceiver {

View File

@ -2,8 +2,8 @@ use crate::file_cache::FileTempStorage;
use crate::notification::{make_notification, StorageNotification}; use crate::notification::{make_notification, StorageNotification};
use crate::sqlite_sql::{ use crate::sqlite_sql::{
batch_select_upload_file, delete_upload_file, insert_upload_file, insert_upload_part, batch_select_upload_file, delete_upload_file, insert_upload_file, insert_upload_part,
select_upload_file, select_upload_parts, update_upload_file_upload_id, UploadFilePartTable, is_upload_completed, select_upload_file, select_upload_parts, update_upload_file_completed,
UploadFileTable, update_upload_file_upload_id, UploadFilePartTable, UploadFileTable,
}; };
use crate::uploader::{FileUploader, FileUploaderRunner, Signal, UploadTask, UploadTaskQueue}; use crate::uploader::{FileUploader, FileUploaderRunner, Signal, UploadTask, UploadTaskQueue};
use async_trait::async_trait; use async_trait::async_trait;
@ -117,9 +117,13 @@ impl StorageManager {
pub async fn subscribe_file_state( pub async fn subscribe_file_state(
&self, &self,
parent_dir: &str,
file_id: &str, file_id: &str,
) -> Result<FileProgressReceiver, FlowyError> { ) -> Result<Option<FileProgressReceiver>, FlowyError> {
self.storage_service.subscribe_file_progress(file_id).await self
.storage_service
.subscribe_file_progress(parent_dir, file_id)
.await
} }
pub async fn get_file_state(&self, file_id: &str) -> Option<FileUploadState> { pub async fn get_file_state(&self, file_id: &str) -> Option<FileUploadState> {
@ -314,7 +318,7 @@ impl StorageService for StorageServiceImpl {
} }
} }
async fn start_upload(&self, chunks: &ChunkedBytes, record: &BoxAny) -> Result<(), FlowyError> { async fn start_upload(&self, chunks: ChunkedBytes, record: &BoxAny) -> Result<(), FlowyError> {
let file_record = record.downcast_ref::<UploadFileTable>().ok_or_else(|| { let file_record = record.downcast_ref::<UploadFileTable>().ok_or_else(|| {
FlowyError::internal().with_context("failed to downcast record to UploadFileTable") FlowyError::internal().with_context("failed to downcast record to UploadFileTable")
})?; })?;
@ -341,34 +345,16 @@ impl StorageService for StorageServiceImpl {
file_id: &str, file_id: &str,
) -> Result<(), FlowyError> { ) -> Result<(), FlowyError> {
// Gathering the upload record and parts from the sqlite database. // Gathering the upload record and parts from the sqlite database.
let record = {
let mut conn = self let mut conn = self
.user_service .user_service
.sqlite_connection(self.user_service.user_id()?)?; .sqlite_connection(self.user_service.user_id()?)?;
conn.immediate_transaction(|conn| {
Ok::<_, FlowyError>(
// When resuming an upload, check if the upload_id is empty.
// If the upload_id is empty, the upload has likely not been created yet.
// If the upload_id is not empty, verify which parts have already been uploaded.
select_upload_file(conn, workspace_id, parent_dir, file_id)?.map(|record| {
if record.upload_id.is_empty() {
(record, vec![])
} else {
let parts = select_upload_parts(conn, &record.upload_id).unwrap_or_default();
(record, parts)
}
}),
)
})?
};
if let Some((upload_file, parts)) = record { if let Some(upload_file) = select_upload_file(&mut conn, workspace_id, parent_dir, file_id)? {
resume_upload( resume_upload(
&self.cloud_service, &self.cloud_service,
&self.user_service, &self.user_service,
&self.temp_storage, &self.temp_storage,
upload_file, upload_file,
parts,
self.progress_notifiers.clone(), self.progress_notifiers.clone(),
) )
.await?; .await?;
@ -380,18 +366,32 @@ impl StorageService for StorageServiceImpl {
async fn subscribe_file_progress( async fn subscribe_file_progress(
&self, &self,
parent_idr: &str,
file_id: &str, file_id: &str,
) -> Result<FileProgressReceiver, FlowyError> { ) -> Result<Option<FileProgressReceiver>, FlowyError> {
trace!("[File]: subscribe file progress: {}", file_id); trace!("[File]: subscribe file progress: {}", file_id);
let is_completed = {
let mut conn = self
.user_service
.sqlite_connection(self.user_service.user_id()?)?;
let workspace_id = self.user_service.workspace_id()?;
is_upload_completed(&mut conn, &workspace_id, parent_idr, file_id).unwrap_or(false)
};
if is_completed {
return Ok(None);
}
let (notifier, receiver) = ProgressNotifier::new(); let (notifier, receiver) = ProgressNotifier::new();
let receiver = FileProgressReceiver { let receiver = FileProgressReceiver {
rx: receiver, rx: receiver,
file_id: file_id.to_string(), file_id: file_id.to_string(),
}; };
self self
.progress_notifiers .progress_notifiers
.insert(file_id.to_string(), notifier); .insert(file_id.to_string(), notifier);
Ok(receiver) Ok(Some(receiver))
} }
} }
@ -421,6 +421,7 @@ async fn create_upload_record(
chunk_size: chunked_bytes.chunk_size, chunk_size: chunked_bytes.chunk_size,
num_chunk: chunked_bytes.offsets.len() as i32, num_chunk: chunked_bytes.offsets.len() as i32,
created_at: timestamp(), created_at: timestamp(),
is_finish: false,
}; };
Ok((chunked_bytes, record)) Ok((chunked_bytes, record))
} }
@ -430,10 +431,29 @@ async fn start_upload(
cloud_service: &Arc<dyn StorageCloudService>, cloud_service: &Arc<dyn StorageCloudService>,
user_service: &Arc<dyn StorageUserService>, user_service: &Arc<dyn StorageUserService>,
temp_storage: &Arc<FileTempStorage>, temp_storage: &Arc<FileTempStorage>,
chunked_bytes: &ChunkedBytes, mut chunked_bytes: ChunkedBytes,
upload_file: &UploadFileTable, upload_file: &UploadFileTable,
progress_notifiers: Arc<DashMap<String, ProgressNotifier>>, progress_notifiers: Arc<DashMap<String, ProgressNotifier>>,
) -> FlowyResult<()> { ) -> FlowyResult<()> {
// 4. gather existing completed parts
let mut conn = user_service.sqlite_connection(user_service.user_id()?)?;
let mut completed_parts = select_upload_parts(&mut conn, &upload_file.upload_id)
.unwrap_or_default()
.into_iter()
.map(|part| CompletedPartRequest {
e_tag: part.e_tag,
part_number: part.part_num,
})
.collect::<Vec<_>>();
let upload_offset = completed_parts.len() as i32;
chunked_bytes.set_current_offset(upload_offset);
info!(
"[File] start upload: workspace: {}, parent_dir: {}, file_id: {}, chunk: {}",
upload_file.workspace_id, upload_file.parent_dir, upload_file.file_id, chunked_bytes,
);
let mut upload_file = upload_file.clone(); let mut upload_file = upload_file.clone();
if upload_file.upload_id.is_empty() { if upload_file.upload_id.is_empty() {
// 1. create upload // 1. create upload
@ -489,25 +509,10 @@ async fn start_upload(
let total_parts = chunked_bytes.iter().count(); let total_parts = chunked_bytes.iter().count();
let iter = chunked_bytes.iter().enumerate(); let iter = chunked_bytes.iter().enumerate();
let mut conn = user_service.sqlite_connection(user_service.user_id()?)?;
// 4. gather existing completed parts
let mut completed_parts = select_upload_parts(&mut conn, &upload_file.upload_id)
.unwrap_or_default()
.into_iter()
.map(|part| CompletedPartRequest {
e_tag: part.e_tag,
part_number: part.part_num,
})
.collect::<Vec<_>>();
// when there are any existing parts, skip those parts by setting the current offset.
let offset = completed_parts.len();
for (index, chunk_bytes) in iter { for (index, chunk_bytes) in iter {
let part_number = offset + index + 1; let part_number = upload_offset + index as i32 + 1;
trace!( trace!(
"[File] {} uploading part: {}, len:{}KB", "[File] {} uploading {}th part, size:{}KB",
upload_file.file_id, upload_file.file_id,
part_number, part_number,
chunk_bytes.len() / 1000, chunk_bytes.len() / 1000,
@ -585,7 +590,6 @@ async fn resume_upload(
user_service: &Arc<dyn StorageUserService>, user_service: &Arc<dyn StorageUserService>,
temp_storage: &Arc<FileTempStorage>, temp_storage: &Arc<FileTempStorage>,
upload_file: UploadFileTable, upload_file: UploadFileTable,
parts: Vec<UploadFilePartTable>,
progress_notifiers: Arc<DashMap<String, ProgressNotifier>>, progress_notifiers: Arc<DashMap<String, ProgressNotifier>>,
) -> FlowyResult<()> { ) -> FlowyResult<()> {
trace!( trace!(
@ -597,14 +601,13 @@ async fn resume_upload(
); );
match ChunkedBytes::from_file(&upload_file.local_file_path, MIN_CHUNK_SIZE as i32).await { match ChunkedBytes::from_file(&upload_file.local_file_path, MIN_CHUNK_SIZE as i32).await {
Ok(mut chunked_bytes) => { Ok(chunked_bytes) => {
// When there were any parts already uploaded, skip those parts by setting the current offset. // When there were any parts already uploaded, skip those parts by setting the current offset.
chunked_bytes.set_current_offset(parts.len() as i32);
start_upload( start_upload(
cloud_service, cloud_service,
user_service, user_service,
temp_storage, temp_storage,
&chunked_bytes, chunked_bytes,
&upload_file, &upload_file,
progress_notifiers, progress_notifiers,
) )
@ -676,7 +679,7 @@ async fn complete_upload(
progress_notifiers: &Arc<DashMap<String, ProgressNotifier>>, progress_notifiers: &Arc<DashMap<String, ProgressNotifier>>,
) -> Result<(), FlowyError> { ) -> Result<(), FlowyError> {
trace!( trace!(
"[File]: completing file upload: {}, part: {}", "[File]: completing file upload: {}, num parts: {}",
upload_file.file_id, upload_file.file_id,
parts.len() parts.len()
); );
@ -693,7 +696,7 @@ async fn complete_upload(
Ok(_) => { Ok(_) => {
info!("[File] completed upload file: {}", upload_file.file_id); info!("[File] completed upload file: {}", upload_file.file_id);
if let Some(mut notifier) = progress_notifiers.get_mut(&upload_file.file_id) { if let Some(mut notifier) = progress_notifiers.get_mut(&upload_file.file_id) {
trace!("[File]: notify upload finished"); info!("[File]: notify upload:{} finished", upload_file.file_id);
notifier notifier
.notify(FileUploadState::Finished { .notify(FileUploadState::Finished {
file_id: upload_file.file_id.clone(), file_id: upload_file.file_id.clone(),
@ -701,9 +704,8 @@ async fn complete_upload(
.await; .await;
} }
trace!("[File] delete upload record from sqlite");
let conn = user_service.sqlite_connection(user_service.user_id()?)?; let conn = user_service.sqlite_connection(user_service.user_id()?)?;
delete_upload_file(conn, &upload_file.upload_id)?; update_upload_file_completed(conn, &upload_file.upload_id)?;
if let Err(err) = temp_storage if let Err(err) = temp_storage
.delete_temp_file(&upload_file.local_file_path) .delete_temp_file(&upload_file.local_file_path)
.await .await

View File

@ -21,6 +21,7 @@ pub struct UploadFileTable {
pub num_chunk: i32, pub num_chunk: i32,
pub upload_id: String, pub upload_id: String,
pub created_at: i64, pub created_at: i64,
pub is_finish: bool,
} }
#[derive(Queryable, Insertable, AsChangeset, Identifiable, Debug)] #[derive(Queryable, Insertable, AsChangeset, Identifiable, Debug)]
@ -87,6 +88,55 @@ pub fn update_upload_file_upload_id(
Ok(()) Ok(())
} }
pub fn update_upload_file_completed(mut conn: DBConnection, upload_id: &str) -> FlowyResult<()> {
diesel::update(
upload_file_table::dsl::upload_file_table.filter(upload_file_table::upload_id.eq(upload_id)),
)
.set(upload_file_table::is_finish.eq(true))
.execute(&mut *conn)?;
Ok(())
}
pub fn is_upload_completed(
conn: &mut SqliteConnection,
workspace_id: &str,
parent_dir: &str,
file_id: &str,
) -> FlowyResult<bool> {
let result = upload_file_table::dsl::upload_file_table
.filter(
upload_file_table::workspace_id
.eq(workspace_id)
.and(upload_file_table::parent_dir.eq(parent_dir))
.and(upload_file_table::file_id.eq(file_id))
.and(upload_file_table::is_finish.eq(true)),
)
.first::<UploadFileTable>(conn)
.optional()?;
Ok(result.is_some())
}
pub fn delete_upload_file(mut conn: DBConnection, upload_id: &str) -> FlowyResult<()> {
conn.immediate_transaction(|conn| {
diesel::delete(
upload_file_table::dsl::upload_file_table.filter(upload_file_table::upload_id.eq(upload_id)),
)
.execute(&mut *conn)?;
if let Err(err) = diesel::delete(
upload_file_part::dsl::upload_file_part.filter(upload_file_part::upload_id.eq(upload_id)),
)
.execute(&mut *conn)
{
warn!("Failed to delete upload parts: {:?}", err)
}
Ok::<_, FlowyError>(())
})?;
Ok(())
}
pub fn insert_upload_part( pub fn insert_upload_part(
mut conn: DBConnection, mut conn: DBConnection,
upload_part: &UploadFilePartTable, upload_part: &UploadFilePartTable,
@ -147,24 +197,3 @@ pub fn select_upload_file(
.optional()?; .optional()?;
Ok(result) Ok(result)
} }
pub fn delete_upload_file(mut conn: DBConnection, upload_id: &str) -> FlowyResult<()> {
conn.immediate_transaction(|conn| {
diesel::delete(
upload_file_table::dsl::upload_file_table.filter(upload_file_table::upload_id.eq(upload_id)),
)
.execute(&mut *conn)?;
if let Err(err) = diesel::delete(
upload_file_part::dsl::upload_file_part.filter(upload_file_part::upload_id.eq(upload_id)),
)
.execute(&mut *conn)
{
warn!("Failed to delete upload parts: {:?}", err)
}
Ok::<_, FlowyError>(())
})?;
Ok(())
}

View File

@ -164,7 +164,11 @@ impl FileUploader {
mut retry_count, mut retry_count,
} => { } => {
let record = BoxAny::new(record); let record = BoxAny::new(record);
if let Err(err) = self.storage_service.start_upload(&chunks, &record).await { if let Err(err) = self
.storage_service
.start_upload(chunks.clone(), &record)
.await
{
if err.is_file_limit_exceeded() { if err.is_file_limit_exceeded() {
error!("Failed to upload file: {}", err); error!("Failed to upload file: {}", err);
self.disable_storage_write(); self.disable_storage_write();

View File

@ -175,5 +175,6 @@ pub async fn create_upload_file_record(
chunk_size: MIN_CHUNK_SIZE as i32, chunk_size: MIN_CHUNK_SIZE as i32,
num_chunk: chunked_bytes.offsets.len() as i32, num_chunk: chunked_bytes.offsets.len() as i32,
created_at: chrono::Utc::now().timestamp(), created_at: chrono::Utc::now().timestamp(),
is_finish: false,
} }
} }