feat: Stream collab objects (#4361)

* feat: stream collab object

* chore: disable snapshot
This commit is contained in:
Nathan.fooo
2024-01-12 06:26:43 +08:00
committed by GitHub
parent 032a648204
commit cd82c13753
18 changed files with 235 additions and 194 deletions

View File

@ -1,9 +1,7 @@
use bytes::Bytes;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use flowy_error::FlowyError;
use flowy_storage::{FileStorageService, ObjectValue, StorageObject};
use flowy_storage::{FileStorageService, StorageObject};
use lib_infra::future::FutureResult;
use crate::af_cloud::AFServer;
@ -20,43 +18,15 @@ impl<T> FileStorageService for AFCloudFileStorageServiceImpl<T>
where
T: AFServer,
{
fn create_object(&self, object: StorageObject) -> FutureResult<String, FlowyError> {
let try_get_client = self.0.try_get_client();
FutureResult::new(async move {
let client = try_get_client?;
match object.value {
ObjectValue::File { file_path } => {
let mut file = File::open(&file_path).await?;
let mime = mime_guess::from_path(file_path)
.first_or_octet_stream()
.to_string();
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await?;
Ok(client.put_blob(&object.workspace_id, buffer, mime).await?)
},
ObjectValue::Bytes { bytes, mime } => {
Ok(client.put_blob(&object.workspace_id, bytes, mime).await?)
},
}
})
fn create_object(&self, _object: StorageObject) -> FutureResult<String, FlowyError> {
FutureResult::new(async move { Err(FlowyError::not_support()) })
}
fn delete_object_by_url(&self, object_url: String) -> FutureResult<(), FlowyError> {
let try_get_client = self.0.try_get_client();
FutureResult::new(async move {
let client = try_get_client?;
client.delete_blob(&object_url).await?;
Ok(())
})
fn delete_object_by_url(&self, _object_url: String) -> FutureResult<(), FlowyError> {
FutureResult::new(async move { Err(FlowyError::not_support()) })
}
fn get_object_by_url(&self, object_url: String) -> FutureResult<Bytes, FlowyError> {
let try_get_client = self.0.try_get_client();
FutureResult::new(async move {
let client = try_get_client?;
let bytes = client.get_blob(&object_url).await?;
Ok(bytes)
})
fn get_object_by_url(&self, _object_url: String) -> FutureResult<Bytes, FlowyError> {
FutureResult::new(async move { Err(FlowyError::not_support()) })
}
}

View File

@ -134,7 +134,7 @@ where
})
.collect::<Vec<_>>();
try_get_client?
.batch_create_collab(&workspace_id, params)
.create_collab_list(&workspace_id, params)
.await
.map_err(FlowyError::from)?;
Ok(())

View File

@ -273,7 +273,7 @@ where
})
.collect::<Vec<_>>();
try_get_client?
.batch_create_collab(&workspace_id, params)
.create_collab_list(&workspace_id, params)
.await
.map_err(FlowyError::from)?;
Ok(())