diff --git a/frontend/rust-lib/event-integration-test/tests/document/af_cloud_test/file_upload_test.rs b/frontend/rust-lib/event-integration-test/tests/document/af_cloud_test/file_upload_test.rs index cf0050341e..9ef734a07e 100644 --- a/frontend/rust-lib/event-integration-test/tests/document/af_cloud_test/file_upload_test.rs +++ b/frontend/rust-lib/event-integration-test/tests/document/af_cloud_test/file_upload_test.rs @@ -2,6 +2,7 @@ use crate::document::generate_random_bytes; use event_integration_test::user_event::user_localhost_af_cloud; use event_integration_test::EventIntegrationTest; use flowy_storage_pub::storage::FileUploadState; +use lib_infra::util::md5; use std::env::temp_dir; use std::sync::Arc; use std::time::Duration; @@ -9,19 +10,21 @@ use tokio::fs; use tokio::fs::File; use tokio::io::AsyncWriteExt; use tokio::sync::Mutex; +use tokio::time::timeout; + #[tokio::test] async fn af_cloud_upload_big_file_test() { user_localhost_af_cloud().await; let mut test = EventIntegrationTest::new().await; test.af_cloud_sign_up().await; tokio::time::sleep(Duration::from_secs(6)).await; - + let parent_dir = "temp_test"; let workspace_id = test.get_current_workspace().await.id; let (file_path, upload_data) = generate_file_with_bytes_len(15 * 1024 * 1024).await; let (created_upload, rx) = test .storage_manager .storage_service - .create_upload(&workspace_id, "temp_test", &file_path, false) + .create_upload(&workspace_id, parent_dir, &file_path, false) .await .unwrap(); @@ -42,15 +45,22 @@ async fn af_cloud_upload_big_file_test() { // Restart the test. It will load unfinished uploads let test = EventIntegrationTest::new_with_config(config).await; - let mut rx = test + if let Some(mut rx) = test .storage_manager - .subscribe_file_state(&created_upload.file_id) + .subscribe_file_state(parent_dir, &created_upload.file_id) .await - .unwrap(); - - while let Some(state) = rx.recv().await { - if let FileUploadState::Finished { .. } = state { - break; + .unwrap() + { + let timeout_duration = Duration::from_secs(180); + while let Some(state) = match timeout(timeout_duration, rx.recv()).await { + Ok(result) => result, + Err(_) => { + panic!("Timed out waiting for file upload completion"); + }, + } { + if let FileUploadState::Finished { .. } = state { + break; + } } } @@ -62,8 +72,7 @@ async fn af_cloud_upload_big_file_test() { .file_storage() .unwrap(); let file = file_service.get_object(created_upload.url).await.unwrap(); - assert_eq!(file.raw.to_vec(), upload_data); - + assert_eq!(md5(file.raw), md5(upload_data)); let _ = fs::remove_file(file_path).await; } diff --git a/frontend/rust-lib/flowy-document/tests/document/util.rs b/frontend/rust-lib/flowy-document/tests/document/util.rs index 58663abd14..a545f0a7a3 100644 --- a/frontend/rust-lib/flowy-document/tests/document/util.rs +++ b/frontend/rust-lib/flowy-document/tests/document/util.rs @@ -196,7 +196,7 @@ impl StorageService for DocumentTestFileStorageService { todo!() } - async fn start_upload(&self, _chunks: &ChunkedBytes, _record: &BoxAny) -> Result<(), FlowyError> { + async fn start_upload(&self, _chunks: ChunkedBytes, _record: &BoxAny) -> Result<(), FlowyError> { todo!() } @@ -209,7 +209,11 @@ impl StorageService for DocumentTestFileStorageService { todo!() } - async fn subscribe_file_progress(&self, _url: &str) -> Result { + async fn subscribe_file_progress( + &self, + _parent_idr: &str, + _url: &str, + ) -> Result, FlowyError> { todo!() } } diff --git a/frontend/rust-lib/flowy-sqlite/migrations/2024-08-20-061727_file_upload_finish/down.sql b/frontend/rust-lib/flowy-sqlite/migrations/2024-08-20-061727_file_upload_finish/down.sql new file mode 100644 index 0000000000..8c072ae1ce --- /dev/null +++ b/frontend/rust-lib/flowy-sqlite/migrations/2024-08-20-061727_file_upload_finish/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +ALTER TABLE upload_file_table DROP COLUMN is_finish; diff --git a/frontend/rust-lib/flowy-sqlite/migrations/2024-08-20-061727_file_upload_finish/up.sql b/frontend/rust-lib/flowy-sqlite/migrations/2024-08-20-061727_file_upload_finish/up.sql new file mode 100644 index 0000000000..088564dca4 --- /dev/null +++ b/frontend/rust-lib/flowy-sqlite/migrations/2024-08-20-061727_file_upload_finish/up.sql @@ -0,0 +1,2 @@ +-- Your SQL goes here +ALTER TABLE upload_file_table ADD COLUMN is_finish BOOLEAN NOT NULL DEFAULT FALSE; diff --git a/frontend/rust-lib/flowy-sqlite/src/schema.rs b/frontend/rust-lib/flowy-sqlite/src/schema.rs index 28d278c6a4..ed2290dd6f 100644 --- a/frontend/rust-lib/flowy-sqlite/src/schema.rs +++ b/frontend/rust-lib/flowy-sqlite/src/schema.rs @@ -64,6 +64,7 @@ diesel::table! { num_chunk -> Integer, upload_id -> Text, created_at -> BigInt, + is_finish -> Bool, } } diff --git a/frontend/rust-lib/flowy-storage-pub/src/chunked_byte.rs b/frontend/rust-lib/flowy-storage-pub/src/chunked_byte.rs index d1210ec8b0..8614fb4489 100644 --- a/frontend/rust-lib/flowy-storage-pub/src/chunked_byte.rs +++ b/frontend/rust-lib/flowy-storage-pub/src/chunked_byte.rs @@ -9,6 +9,7 @@ use tokio::io::AsyncReadExt; /// In Amazon S3, the minimum chunk size for multipart uploads is 5 MB,except for the last part, /// which can be smaller.(https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html) pub const MIN_CHUNK_SIZE: usize = 5 * 1024 * 1024; // Minimum Chunk Size 5 MB +#[derive(Debug, Clone)] pub struct ChunkedBytes { pub data: Bytes, pub chunk_size: i32, @@ -28,8 +29,11 @@ impl Display for ChunkedBytes { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "ChunkedBytes: chunk_size: {}, offsets: {:?}, current_offset: {}", - self.chunk_size, self.offsets, self.current_offset + "data:{}, chunk_size:{}, num chunk:{}, offset:{}", + self.data.len(), + self.chunk_size, + self.offsets.len(), + self.current_offset ) } } diff --git a/frontend/rust-lib/flowy-storage-pub/src/storage.rs b/frontend/rust-lib/flowy-storage-pub/src/storage.rs index 30a9231dab..12124504b9 100644 --- a/frontend/rust-lib/flowy-storage-pub/src/storage.rs +++ b/frontend/rust-lib/flowy-storage-pub/src/storage.rs @@ -21,7 +21,7 @@ pub trait StorageService: Send + Sync { upload_immediately: bool, ) -> Result<(CreatedUpload, Option), FlowyError>; - async fn start_upload(&self, chunks: &ChunkedBytes, record: &BoxAny) -> Result<(), FlowyError>; + async fn start_upload(&self, chunks: ChunkedBytes, record: &BoxAny) -> Result<(), FlowyError>; async fn resume_upload( &self, @@ -32,8 +32,9 @@ pub trait StorageService: Send + Sync { async fn subscribe_file_progress( &self, + parent_idr: &str, file_id: &str, - ) -> Result; + ) -> Result, FlowyError>; } pub struct FileProgressReceiver { diff --git a/frontend/rust-lib/flowy-storage/src/manager.rs b/frontend/rust-lib/flowy-storage/src/manager.rs index fb96e18529..145b3456e6 100644 --- a/frontend/rust-lib/flowy-storage/src/manager.rs +++ b/frontend/rust-lib/flowy-storage/src/manager.rs @@ -2,8 +2,8 @@ use crate::file_cache::FileTempStorage; use crate::notification::{make_notification, StorageNotification}; use crate::sqlite_sql::{ batch_select_upload_file, delete_upload_file, insert_upload_file, insert_upload_part, - select_upload_file, select_upload_parts, update_upload_file_upload_id, UploadFilePartTable, - UploadFileTable, + is_upload_completed, select_upload_file, select_upload_parts, update_upload_file_completed, + update_upload_file_upload_id, UploadFilePartTable, UploadFileTable, }; use crate::uploader::{FileUploader, FileUploaderRunner, Signal, UploadTask, UploadTaskQueue}; use async_trait::async_trait; @@ -117,9 +117,13 @@ impl StorageManager { pub async fn subscribe_file_state( &self, + parent_dir: &str, file_id: &str, - ) -> Result { - self.storage_service.subscribe_file_progress(file_id).await + ) -> Result, FlowyError> { + self + .storage_service + .subscribe_file_progress(parent_dir, file_id) + .await } pub async fn get_file_state(&self, file_id: &str) -> Option { @@ -314,7 +318,7 @@ impl StorageService for StorageServiceImpl { } } - async fn start_upload(&self, chunks: &ChunkedBytes, record: &BoxAny) -> Result<(), FlowyError> { + async fn start_upload(&self, chunks: ChunkedBytes, record: &BoxAny) -> Result<(), FlowyError> { let file_record = record.downcast_ref::().ok_or_else(|| { FlowyError::internal().with_context("failed to downcast record to UploadFileTable") })?; @@ -341,34 +345,16 @@ impl StorageService for StorageServiceImpl { file_id: &str, ) -> Result<(), FlowyError> { // Gathering the upload record and parts from the sqlite database. - let record = { - let mut conn = self - .user_service - .sqlite_connection(self.user_service.user_id()?)?; - conn.immediate_transaction(|conn| { - Ok::<_, FlowyError>( - // When resuming an upload, check if the upload_id is empty. - // If the upload_id is empty, the upload has likely not been created yet. - // If the upload_id is not empty, verify which parts have already been uploaded. - select_upload_file(conn, workspace_id, parent_dir, file_id)?.map(|record| { - if record.upload_id.is_empty() { - (record, vec![]) - } else { - let parts = select_upload_parts(conn, &record.upload_id).unwrap_or_default(); - (record, parts) - } - }), - ) - })? - }; + let mut conn = self + .user_service + .sqlite_connection(self.user_service.user_id()?)?; - if let Some((upload_file, parts)) = record { + if let Some(upload_file) = select_upload_file(&mut conn, workspace_id, parent_dir, file_id)? { resume_upload( &self.cloud_service, &self.user_service, &self.temp_storage, upload_file, - parts, self.progress_notifiers.clone(), ) .await?; @@ -380,18 +366,32 @@ impl StorageService for StorageServiceImpl { async fn subscribe_file_progress( &self, + parent_idr: &str, file_id: &str, - ) -> Result { + ) -> Result, FlowyError> { trace!("[File]: subscribe file progress: {}", file_id); + + let is_completed = { + let mut conn = self + .user_service + .sqlite_connection(self.user_service.user_id()?)?; + let workspace_id = self.user_service.workspace_id()?; + is_upload_completed(&mut conn, &workspace_id, parent_idr, file_id).unwrap_or(false) + }; + if is_completed { + return Ok(None); + } + let (notifier, receiver) = ProgressNotifier::new(); let receiver = FileProgressReceiver { rx: receiver, file_id: file_id.to_string(), }; + self .progress_notifiers .insert(file_id.to_string(), notifier); - Ok(receiver) + Ok(Some(receiver)) } } @@ -421,6 +421,7 @@ async fn create_upload_record( chunk_size: chunked_bytes.chunk_size, num_chunk: chunked_bytes.offsets.len() as i32, created_at: timestamp(), + is_finish: false, }; Ok((chunked_bytes, record)) } @@ -430,10 +431,29 @@ async fn start_upload( cloud_service: &Arc, user_service: &Arc, temp_storage: &Arc, - chunked_bytes: &ChunkedBytes, + mut chunked_bytes: ChunkedBytes, upload_file: &UploadFileTable, progress_notifiers: Arc>, ) -> FlowyResult<()> { + // 4. gather existing completed parts + let mut conn = user_service.sqlite_connection(user_service.user_id()?)?; + let mut completed_parts = select_upload_parts(&mut conn, &upload_file.upload_id) + .unwrap_or_default() + .into_iter() + .map(|part| CompletedPartRequest { + e_tag: part.e_tag, + part_number: part.part_num, + }) + .collect::>(); + + let upload_offset = completed_parts.len() as i32; + chunked_bytes.set_current_offset(upload_offset); + + info!( + "[File] start upload: workspace: {}, parent_dir: {}, file_id: {}, chunk: {}", + upload_file.workspace_id, upload_file.parent_dir, upload_file.file_id, chunked_bytes, + ); + let mut upload_file = upload_file.clone(); if upload_file.upload_id.is_empty() { // 1. create upload @@ -489,25 +509,10 @@ async fn start_upload( let total_parts = chunked_bytes.iter().count(); let iter = chunked_bytes.iter().enumerate(); - let mut conn = user_service.sqlite_connection(user_service.user_id()?)?; - - // 4. gather existing completed parts - let mut completed_parts = select_upload_parts(&mut conn, &upload_file.upload_id) - .unwrap_or_default() - .into_iter() - .map(|part| CompletedPartRequest { - e_tag: part.e_tag, - part_number: part.part_num, - }) - .collect::>(); - - // when there are any existing parts, skip those parts by setting the current offset. - let offset = completed_parts.len(); - for (index, chunk_bytes) in iter { - let part_number = offset + index + 1; + let part_number = upload_offset + index as i32 + 1; trace!( - "[File] {} uploading part: {}, len:{}KB", + "[File] {} uploading {}th part, size:{}KB", upload_file.file_id, part_number, chunk_bytes.len() / 1000, @@ -585,7 +590,6 @@ async fn resume_upload( user_service: &Arc, temp_storage: &Arc, upload_file: UploadFileTable, - parts: Vec, progress_notifiers: Arc>, ) -> FlowyResult<()> { trace!( @@ -597,14 +601,13 @@ async fn resume_upload( ); match ChunkedBytes::from_file(&upload_file.local_file_path, MIN_CHUNK_SIZE as i32).await { - Ok(mut chunked_bytes) => { + Ok(chunked_bytes) => { // When there were any parts already uploaded, skip those parts by setting the current offset. - chunked_bytes.set_current_offset(parts.len() as i32); start_upload( cloud_service, user_service, temp_storage, - &chunked_bytes, + chunked_bytes, &upload_file, progress_notifiers, ) @@ -676,7 +679,7 @@ async fn complete_upload( progress_notifiers: &Arc>, ) -> Result<(), FlowyError> { trace!( - "[File]: completing file upload: {}, part: {}", + "[File]: completing file upload: {}, num parts: {}", upload_file.file_id, parts.len() ); @@ -693,7 +696,7 @@ async fn complete_upload( Ok(_) => { info!("[File] completed upload file: {}", upload_file.file_id); if let Some(mut notifier) = progress_notifiers.get_mut(&upload_file.file_id) { - trace!("[File]: notify upload finished"); + info!("[File]: notify upload:{} finished", upload_file.file_id); notifier .notify(FileUploadState::Finished { file_id: upload_file.file_id.clone(), @@ -701,9 +704,8 @@ async fn complete_upload( .await; } - trace!("[File] delete upload record from sqlite"); let conn = user_service.sqlite_connection(user_service.user_id()?)?; - delete_upload_file(conn, &upload_file.upload_id)?; + update_upload_file_completed(conn, &upload_file.upload_id)?; if let Err(err) = temp_storage .delete_temp_file(&upload_file.local_file_path) .await diff --git a/frontend/rust-lib/flowy-storage/src/sqlite_sql.rs b/frontend/rust-lib/flowy-storage/src/sqlite_sql.rs index c05800341f..52487e6de2 100644 --- a/frontend/rust-lib/flowy-storage/src/sqlite_sql.rs +++ b/frontend/rust-lib/flowy-storage/src/sqlite_sql.rs @@ -21,6 +21,7 @@ pub struct UploadFileTable { pub num_chunk: i32, pub upload_id: String, pub created_at: i64, + pub is_finish: bool, } #[derive(Queryable, Insertable, AsChangeset, Identifiable, Debug)] @@ -87,6 +88,55 @@ pub fn update_upload_file_upload_id( Ok(()) } +pub fn update_upload_file_completed(mut conn: DBConnection, upload_id: &str) -> FlowyResult<()> { + diesel::update( + upload_file_table::dsl::upload_file_table.filter(upload_file_table::upload_id.eq(upload_id)), + ) + .set(upload_file_table::is_finish.eq(true)) + .execute(&mut *conn)?; + Ok(()) +} + +pub fn is_upload_completed( + conn: &mut SqliteConnection, + workspace_id: &str, + parent_dir: &str, + file_id: &str, +) -> FlowyResult { + let result = upload_file_table::dsl::upload_file_table + .filter( + upload_file_table::workspace_id + .eq(workspace_id) + .and(upload_file_table::parent_dir.eq(parent_dir)) + .and(upload_file_table::file_id.eq(file_id)) + .and(upload_file_table::is_finish.eq(true)), + ) + .first::(conn) + .optional()?; + Ok(result.is_some()) +} + +pub fn delete_upload_file(mut conn: DBConnection, upload_id: &str) -> FlowyResult<()> { + conn.immediate_transaction(|conn| { + diesel::delete( + upload_file_table::dsl::upload_file_table.filter(upload_file_table::upload_id.eq(upload_id)), + ) + .execute(&mut *conn)?; + + if let Err(err) = diesel::delete( + upload_file_part::dsl::upload_file_part.filter(upload_file_part::upload_id.eq(upload_id)), + ) + .execute(&mut *conn) + { + warn!("Failed to delete upload parts: {:?}", err) + } + + Ok::<_, FlowyError>(()) + })?; + + Ok(()) +} + pub fn insert_upload_part( mut conn: DBConnection, upload_part: &UploadFilePartTable, @@ -147,24 +197,3 @@ pub fn select_upload_file( .optional()?; Ok(result) } - -pub fn delete_upload_file(mut conn: DBConnection, upload_id: &str) -> FlowyResult<()> { - conn.immediate_transaction(|conn| { - diesel::delete( - upload_file_table::dsl::upload_file_table.filter(upload_file_table::upload_id.eq(upload_id)), - ) - .execute(&mut *conn)?; - - if let Err(err) = diesel::delete( - upload_file_part::dsl::upload_file_part.filter(upload_file_part::upload_id.eq(upload_id)), - ) - .execute(&mut *conn) - { - warn!("Failed to delete upload parts: {:?}", err) - } - - Ok::<_, FlowyError>(()) - })?; - - Ok(()) -} diff --git a/frontend/rust-lib/flowy-storage/src/uploader.rs b/frontend/rust-lib/flowy-storage/src/uploader.rs index 7a92f24e03..2ebe3dcf69 100644 --- a/frontend/rust-lib/flowy-storage/src/uploader.rs +++ b/frontend/rust-lib/flowy-storage/src/uploader.rs @@ -164,7 +164,11 @@ impl FileUploader { mut retry_count, } => { let record = BoxAny::new(record); - if let Err(err) = self.storage_service.start_upload(&chunks, &record).await { + if let Err(err) = self + .storage_service + .start_upload(chunks.clone(), &record) + .await + { if err.is_file_limit_exceeded() { error!("Failed to upload file: {}", err); self.disable_storage_write(); diff --git a/frontend/rust-lib/flowy-storage/tests/multiple_part_upload_test.rs b/frontend/rust-lib/flowy-storage/tests/multiple_part_upload_test.rs index 64ab83b076..5b6b02e0aa 100644 --- a/frontend/rust-lib/flowy-storage/tests/multiple_part_upload_test.rs +++ b/frontend/rust-lib/flowy-storage/tests/multiple_part_upload_test.rs @@ -175,5 +175,6 @@ pub async fn create_upload_file_record( chunk_size: MIN_CHUNK_SIZE as i32, num_chunk: chunked_bytes.offsets.len() as i32, created_at: chrono::Utc::now().timestamp(), + is_finish: false, } }