chore: rm fut (#5958)

* chore: rm fut

* chore: clippy
This commit is contained in:
Nathan.fooo 2024-08-14 10:33:23 +08:00 committed by GitHub
parent b3a0119c18
commit 4b24b41dd4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 477 additions and 664 deletions

View File

@ -1,4 +1,4 @@
use flowy_core::MutexAppFlowyCore; use flowy_core::;
use lib_dispatch::prelude::{ use lib_dispatch::prelude::{
AFPluginDispatcher, AFPluginEventResponse, AFPluginRequest, StatusCode, AFPluginDispatcher, AFPluginEventResponse, AFPluginRequest, StatusCode,
}; };

View File

@ -1,4 +1,4 @@
use flowy_core::{AppFlowyCore, MutexAppFlowyCore}; use flowy_core::MutexAppFlowyCore;
use lib_dispatch::prelude::{ use lib_dispatch::prelude::{
AFPluginDispatcher, AFPluginEventResponse, AFPluginRequest, StatusCode, AFPluginDispatcher, AFPluginEventResponse, AFPluginRequest, StatusCode,
}; };

View File

@ -11,7 +11,6 @@ use client_api::error::AppResponseError;
use flowy_error::FlowyError; use flowy_error::FlowyError;
use futures::stream::BoxStream; use futures::stream::BoxStream;
use lib_infra::async_trait::async_trait; use lib_infra::async_trait::async_trait;
use lib_infra::future::FutureResult;
use serde_json::Value; use serde_json::Value;
use std::collections::HashMap; use std::collections::HashMap;
use std::path::Path; use std::path::Path;
@ -21,12 +20,12 @@ pub type StreamAnswer = BoxStream<'static, Result<QuestionStreamValue, FlowyErro
pub type StreamComplete = BoxStream<'static, Result<Bytes, FlowyError>>; pub type StreamComplete = BoxStream<'static, Result<Bytes, FlowyError>>;
#[async_trait] #[async_trait]
pub trait ChatCloudService: Send + Sync + 'static { pub trait ChatCloudService: Send + Sync + 'static {
fn create_chat( async fn create_chat(
&self, &self,
uid: &i64, uid: &i64,
workspace_id: &str, workspace_id: &str,
chat_id: &str, chat_id: &str,
) -> FutureResult<(), FlowyError>; ) -> Result<(), FlowyError>;
async fn create_question( async fn create_question(
&self, &self,

View File

@ -14,7 +14,6 @@ use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use lib_dispatch::prelude::{data_result_ok, AFPluginData, AFPluginState, DataResult}; use lib_dispatch::prelude::{data_result_ok, AFPluginData, AFPluginState, DataResult};
use lib_infra::isolate_stream::IsolateSink; use lib_infra::isolate_stream::IsolateSink;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use tokio::sync::oneshot;
use tracing::trace; use tracing::trace;
use validator::Validate; use validator::Validate;
@ -114,15 +113,9 @@ pub(crate) async fn get_related_question_handler(
) -> DataResult<RepeatedRelatedQuestionPB, FlowyError> { ) -> DataResult<RepeatedRelatedQuestionPB, FlowyError> {
let ai_manager = upgrade_ai_manager(ai_manager)?; let ai_manager = upgrade_ai_manager(ai_manager)?;
let data = data.into_inner(); let data = data.into_inner();
let (tx, rx) = tokio::sync::oneshot::channel(); let messages = ai_manager
tokio::spawn(async move { .get_related_questions(&data.chat_id, data.message_id)
let messages = ai_manager .await?;
.get_related_questions(&data.chat_id, data.message_id)
.await?;
let _ = tx.send(messages);
Ok::<_, FlowyError>(())
});
let messages = rx.await?;
data_result_ok(messages) data_result_ok(messages)
} }
@ -133,15 +126,9 @@ pub(crate) async fn get_answer_handler(
) -> DataResult<ChatMessagePB, FlowyError> { ) -> DataResult<ChatMessagePB, FlowyError> {
let ai_manager = upgrade_ai_manager(ai_manager)?; let ai_manager = upgrade_ai_manager(ai_manager)?;
let data = data.into_inner(); let data = data.into_inner();
let (tx, rx) = tokio::sync::oneshot::channel(); let message = ai_manager
tokio::spawn(async move { .generate_answer(&data.chat_id, data.message_id)
let message = ai_manager .await?;
.generate_answer(&data.chat_id, data.message_id)
.await?;
let _ = tx.send(message);
Ok::<_, FlowyError>(())
});
let message = rx.await?;
data_result_ok(message) data_result_ok(message)
} }
@ -163,25 +150,17 @@ pub(crate) async fn refresh_local_ai_info_handler(
ai_manager: AFPluginState<Weak<AIManager>>, ai_manager: AFPluginState<Weak<AIManager>>,
) -> DataResult<LLMModelInfoPB, FlowyError> { ) -> DataResult<LLMModelInfoPB, FlowyError> {
let ai_manager = upgrade_ai_manager(ai_manager)?; let ai_manager = upgrade_ai_manager(ai_manager)?;
let (tx, rx) = oneshot::channel::<Result<LLMModelInfo, FlowyError>>(); let model_info = ai_manager.local_ai_controller.refresh().await;
tokio::spawn(async move { if model_info.is_err() {
let model_info = ai_manager.local_ai_controller.refresh().await; if let Some(llm_model) = ai_manager.local_ai_controller.get_current_model() {
if model_info.is_err() { let model_info = LLMModelInfo {
if let Some(llm_model) = ai_manager.local_ai_controller.get_current_model() { selected_model: llm_model.clone(),
let model_info = LLMModelInfo { models: vec![llm_model],
selected_model: llm_model.clone(), };
models: vec![llm_model], return data_result_ok(model_info.into());
};
let _ = tx.send(Ok(model_info));
return;
}
} }
}
let _ = tx.send(model_info); data_result_ok(model_info?.into())
});
let model_info = rx.await??;
data_result_ok(model_info.into())
} }
#[tracing::instrument(level = "debug", skip_all, err)] #[tracing::instrument(level = "debug", skip_all, err)]
@ -268,16 +247,9 @@ pub(crate) async fn chat_file_handler(
} }
tracing::debug!("File size: {} bytes", file_size); tracing::debug!("File size: {} bytes", file_size);
let ai_manager = upgrade_ai_manager(ai_manager)?;
let (tx, rx) = oneshot::channel::<Result<(), FlowyError>>(); ai_manager.chat_with_file(&data.chat_id, file_path).await?;
tokio::spawn(async move { Ok(())
let ai_manager = upgrade_ai_manager(ai_manager)?;
ai_manager.chat_with_file(&data.chat_id, file_path).await?;
let _ = tx.send(Ok(()));
Ok::<_, FlowyError>(())
});
rx.await?
} }
#[tracing::instrument(level = "debug", skip_all, err)] #[tracing::instrument(level = "debug", skip_all, err)]
@ -420,17 +392,10 @@ pub(crate) async fn get_offline_app_handler(
ai_manager: AFPluginState<Weak<AIManager>>, ai_manager: AFPluginState<Weak<AIManager>>,
) -> DataResult<OfflineAIPB, FlowyError> { ) -> DataResult<OfflineAIPB, FlowyError> {
let ai_manager = upgrade_ai_manager(ai_manager)?; let ai_manager = upgrade_ai_manager(ai_manager)?;
let (tx, rx) = oneshot::channel::<Result<String, FlowyError>>(); let link = ai_manager
tokio::spawn(async move { .local_ai_controller
let link = ai_manager .get_offline_ai_app_download_link()
.local_ai_controller .await?;
.get_offline_ai_app_download_link()
.await?;
let _ = tx.send(Ok(link));
Ok::<_, FlowyError>(())
});
let link = rx.await??;
data_result_ok(OfflineAIPB { link }) data_result_ok(OfflineAIPB { link })
} }

View File

@ -14,7 +14,6 @@ use flowy_ai_pub::cloud::{
use flowy_error::{FlowyError, FlowyResult}; use flowy_error::{FlowyError, FlowyResult};
use futures::{stream, Sink, StreamExt, TryStreamExt}; use futures::{stream, Sink, StreamExt, TryStreamExt};
use lib_infra::async_trait::async_trait; use lib_infra::async_trait::async_trait;
use lib_infra::future::FutureResult;
use crate::local_ai::stream_util::QuestionStream; use crate::local_ai::stream_util::QuestionStream;
use crate::stream_message::StreamMessage; use crate::stream_message::StreamMessage;
@ -108,13 +107,16 @@ impl AICloudServiceMiddleware {
#[async_trait] #[async_trait]
impl ChatCloudService for AICloudServiceMiddleware { impl ChatCloudService for AICloudServiceMiddleware {
fn create_chat( async fn create_chat(
&self, &self,
uid: &i64, uid: &i64,
workspace_id: &str, workspace_id: &str,
chat_id: &str, chat_id: &str,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
self.cloud_service.create_chat(uid, workspace_id, chat_id) self
.cloud_service
.create_chat(uid, workspace_id, chat_id)
.await
} }
async fn create_question( async fn create_question(

View File

@ -26,7 +26,7 @@ use flowy_sqlite::kv::KVStorePreferences;
use flowy_user::services::authenticate_user::AuthenticateUser; use flowy_user::services::authenticate_user::AuthenticateUser;
use flowy_user::services::data_import::{load_collab_by_object_id, load_collab_by_object_ids}; use flowy_user::services::data_import::{load_collab_by_object_id, load_collab_by_object_ids};
use lib_dispatch::prelude::ToBytes; use lib_dispatch::prelude::ToBytes;
use lib_infra::future::FutureResult;
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
@ -35,6 +35,7 @@ use tokio::sync::RwLock;
use crate::integrate::server::ServerProvider; use crate::integrate::server::ServerProvider;
use collab_plugins::local_storage::kv::KVTransactionDB; use collab_plugins::local_storage::kv::KVTransactionDB;
use lib_infra::async_trait::async_trait;
pub struct FolderDepsResolver(); pub struct FolderDepsResolver();
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
@ -113,363 +114,305 @@ impl FolderUser for FolderUserImpl {
} }
struct DocumentFolderOperation(Arc<DocumentManager>); struct DocumentFolderOperation(Arc<DocumentManager>);
#[async_trait]
impl FolderOperationHandler for DocumentFolderOperation { impl FolderOperationHandler for DocumentFolderOperation {
fn create_workspace_view( async fn create_workspace_view(
&self, &self,
uid: i64, uid: i64,
workspace_view_builder: Arc<RwLock<NestedViewBuilder>>, workspace_view_builder: Arc<RwLock<NestedViewBuilder>>,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
let manager = self.0.clone(); let manager = self.0.clone();
FutureResult::new(async move { let mut write_guard = workspace_view_builder.write().await;
let mut write_guard = workspace_view_builder.write().await; // Create a view named "Getting started" with an icon ⭐️ and the built-in README data.
// Don't modify this code unless you know what you are doing.
// Create a view named "Getting started" with an icon ⭐️ and the built-in README data. write_guard
// Don't modify this code unless you know what you are doing. .with_view_builder(|view_builder| async {
write_guard let view = view_builder
.with_view_builder(|view_builder| async { .with_name("Getting started")
let view = view_builder .with_icon("⭐️")
.with_name("Getting started") .build();
.with_icon("⭐️") // create a empty document
.build(); let json_str = include_str!("../../assets/read_me.json");
// create a empty document let document_pb = JsonToDocumentParser::json_str_to_document(json_str).unwrap();
let json_str = include_str!("../../assets/read_me.json"); manager
let document_pb = JsonToDocumentParser::json_str_to_document(json_str).unwrap(); .create_document(uid, &view.parent_view.id, Some(document_pb.into()))
manager .await
.create_document(uid, &view.parent_view.id, Some(document_pb.into())) .unwrap();
.await view
.unwrap(); })
view .await;
}) Ok(())
.await;
Ok(())
})
} }
fn open_view(&self, view_id: &str) -> FutureResult<(), FlowyError> { async fn open_view(&self, view_id: &str) -> Result<(), FlowyError> {
let manager = self.0.clone(); self.0.open_document(view_id).await?;
let view_id = view_id.to_string(); Ok(())
FutureResult::new(async move {
manager.open_document(&view_id).await?;
Ok(())
})
} }
/// Close the document view. /// Close the document view.
fn close_view(&self, view_id: &str) -> FutureResult<(), FlowyError> { async fn close_view(&self, view_id: &str) -> Result<(), FlowyError> {
let manager = self.0.clone(); self.0.close_document(view_id).await?;
let view_id = view_id.to_string(); Ok(())
FutureResult::new(async move {
manager.close_document(&view_id).await?;
Ok(())
})
} }
fn delete_view(&self, view_id: &str) -> FutureResult<(), FlowyError> { async fn delete_view(&self, view_id: &str) -> Result<(), FlowyError> {
let manager = self.0.clone(); match self.0.delete_document(view_id).await {
let view_id = view_id.to_string(); Ok(_) => tracing::trace!("Delete document: {}", view_id),
FutureResult::new(async move { Err(e) => tracing::error!("🔴delete document failed: {}", e),
match manager.delete_document(&view_id).await { }
Ok(_) => tracing::trace!("Delete document: {}", view_id), Ok(())
Err(e) => tracing::error!("🔴delete document failed: {}", e),
}
Ok(())
})
} }
fn duplicate_view(&self, view_id: &str) -> FutureResult<Bytes, FlowyError> { async fn duplicate_view(&self, view_id: &str) -> Result<Bytes, FlowyError> {
let manager = self.0.clone(); let data: DocumentDataPB = self.0.get_document_data(view_id).await?.into();
let view_id = view_id.to_string(); let data_bytes = data.into_bytes().map_err(|_| FlowyError::invalid_data())?;
FutureResult::new(async move { Ok(data_bytes)
let data: DocumentDataPB = manager.get_document_data(&view_id).await?.into();
let data_bytes = data.into_bytes().map_err(|_| FlowyError::invalid_data())?;
Ok(data_bytes)
})
} }
fn create_view_with_view_data( async fn create_view_with_view_data(
&self, &self,
user_id: i64, user_id: i64,
params: CreateViewParams, params: CreateViewParams,
) -> FutureResult<Option<EncodedCollab>, FlowyError> { ) -> Result<Option<EncodedCollab>, FlowyError> {
debug_assert_eq!(params.layout, ViewLayoutPB::Document); debug_assert_eq!(params.layout, ViewLayoutPB::Document);
let view_id = params.view_id.to_string(); let data = DocumentDataPB::try_from(Bytes::from(params.initial_data))?;
let manager = self.0.clone(); let encoded_collab = self
FutureResult::new(async move { .0
let data = DocumentDataPB::try_from(Bytes::from(params.initial_data))?; .create_document(user_id, &params.view_id, Some(data.into()))
let encoded_collab = manager .await?;
.create_document(user_id, &view_id, Some(data.into())) Ok(Some(encoded_collab))
.await?;
Ok(Some(encoded_collab))
})
} }
fn get_encoded_collab_v1_from_disk( async fn get_encoded_collab_v1_from_disk(
&self, &self,
user: Arc<dyn FolderUser>, user: Arc<dyn FolderUser>,
view_id: &str, view_id: &str,
) -> FutureResult<EncodedCollabWrapper, FlowyError> { ) -> Result<EncodedCollabWrapper, FlowyError> {
// get the collab_object_id for the document. // get the collab_object_id for the document.
// the collab_object_id for the document is the view_id. // the collab_object_id for the document is the view_id.
let oid = view_id.to_string();
FutureResult::new(async move { let uid = user
let uid = user .user_id()
.user_id() .map_err(|e| e.with_context("unable to get the uid: {}"))?;
.map_err(|e| e.with_context("unable to get the uid: {}"))?;
// get the collab db // get the collab db
let collab_db = user let collab_db = user
.collab_db(uid) .collab_db(uid)
.map_err(|e| e.with_context("unable to get the collab"))?; .map_err(|e| e.with_context("unable to get the collab"))?;
let collab_db = collab_db.upgrade().ok_or_else(|| { let collab_db = collab_db.upgrade().ok_or_else(|| {
FlowyError::internal().with_context( FlowyError::internal().with_context(
"The collab db has been dropped, indicating that the user has switched to a new account", "The collab db has been dropped, indicating that the user has switched to a new account",
) )
})?; })?;
let collab_read_txn = collab_db.read_txn(); let collab_read_txn = collab_db.read_txn();
// read the collab from the db // read the collab from the db
let collab = load_collab_by_object_id(uid, &collab_read_txn, &oid).map_err(|e| { let collab = load_collab_by_object_id(uid, &collab_read_txn, view_id).map_err(|e| {
FlowyError::internal().with_context(format!("load document collab failed: {}", e)) FlowyError::internal().with_context(format!("load document collab failed: {}", e))
})?; })?;
let encoded_collab = collab let encoded_collab = collab
// encode the collab and check the integrity of the collab // encode the collab and check the integrity of the collab
.encode_collab_v1(|collab| CollabType::Document.validate_require_data(collab)) .encode_collab_v1(|collab| CollabType::Document.validate_require_data(collab))
.map_err(|e| { .map_err(|e| {
FlowyError::internal().with_context(format!("encode document collab failed: {}", e)) FlowyError::internal().with_context(format!("encode document collab failed: {}", e))
})?; })?;
Ok(EncodedCollabWrapper::Document(DocumentEncodedCollab { Ok(EncodedCollabWrapper::Document(DocumentEncodedCollab {
document_encoded_collab: encoded_collab, document_encoded_collab: encoded_collab,
})) }))
})
} }
/// Create a view with built-in data. /// Create a view with built-in data.
fn create_built_in_view( async fn create_built_in_view(
&self, &self,
user_id: i64, user_id: i64,
view_id: &str, view_id: &str,
_name: &str, _name: &str,
layout: ViewLayout, layout: ViewLayout,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
debug_assert_eq!(layout, ViewLayout::Document); debug_assert_eq!(layout, ViewLayout::Document);
let view_id = view_id.to_string(); match self.0.create_document(user_id, view_id, None).await {
let manager = self.0.clone(); Ok(_) => Ok(()),
FutureResult::new(async move { Err(err) => {
match manager.create_document(user_id, &view_id, None).await { if err.is_already_exists() {
Ok(_) => Ok(()), Ok(())
Err(err) => { } else {
if err.is_already_exists() { Err(err)
Ok(()) }
} else { },
Err(err) }
}
},
}
})
} }
fn import_from_bytes( async fn import_from_bytes(
&self, &self,
uid: i64, uid: i64,
view_id: &str, view_id: &str,
_name: &str, _name: &str,
_import_type: ImportType, _import_type: ImportType,
bytes: Vec<u8>, bytes: Vec<u8>,
) -> FutureResult<EncodedCollab, FlowyError> { ) -> Result<EncodedCollab, FlowyError> {
let view_id = view_id.to_string(); let data = DocumentDataPB::try_from(Bytes::from(bytes))?;
let manager = self.0.clone(); let encoded_collab = self
FutureResult::new(async move { .0
let data = DocumentDataPB::try_from(Bytes::from(bytes))?; .create_document(uid, view_id, Some(data.into()))
let encoded_collab = manager .await?;
.create_document(uid, &view_id, Some(data.into())) Ok(encoded_collab)
.await?;
Ok(encoded_collab)
})
} }
// will implement soon // will implement soon
fn import_from_file_path( async fn import_from_file_path(
&self, &self,
_view_id: &str, _view_id: &str,
_name: &str, _name: &str,
_path: String, _path: String,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
FutureResult::new(async move { Ok(()) }) Ok(())
} }
} }
struct DatabaseFolderOperation(Arc<DatabaseManager>); struct DatabaseFolderOperation(Arc<DatabaseManager>);
#[async_trait]
impl FolderOperationHandler for DatabaseFolderOperation { impl FolderOperationHandler for DatabaseFolderOperation {
fn open_view(&self, view_id: &str) -> FutureResult<(), FlowyError> { async fn open_view(&self, view_id: &str) -> Result<(), FlowyError> {
let database_manager = self.0.clone(); self.0.open_database_view(view_id).await?;
let view_id = view_id.to_string(); Ok(())
FutureResult::new(async move {
database_manager.open_database_view(view_id).await?;
Ok(())
})
} }
fn close_view(&self, view_id: &str) -> FutureResult<(), FlowyError> { async fn close_view(&self, view_id: &str) -> Result<(), FlowyError> {
let database_manager = self.0.clone(); self.0.close_database_view(view_id).await?;
let view_id = view_id.to_string(); Ok(())
FutureResult::new(async move {
database_manager.close_database_view(view_id).await?;
Ok(())
})
} }
fn delete_view(&self, view_id: &str) -> FutureResult<(), FlowyError> { async fn delete_view(&self, view_id: &str) -> Result<(), FlowyError> {
let database_manager = self.0.clone(); match self.0.delete_database_view(view_id).await {
let view_id = view_id.to_string(); Ok(_) => tracing::trace!("Delete database view: {}", view_id),
FutureResult::new(async move { Err(e) => tracing::error!("🔴delete database failed: {}", e),
match database_manager.delete_database_view(&view_id).await { }
Ok(_) => tracing::trace!("Delete database view: {}", view_id), Ok(())
Err(e) => tracing::error!("🔴delete database failed: {}", e),
}
Ok(())
})
} }
fn get_encoded_collab_v1_from_disk( async fn get_encoded_collab_v1_from_disk(
&self, &self,
user: Arc<dyn FolderUser>, user: Arc<dyn FolderUser>,
view_id: &str, view_id: &str,
) -> FutureResult<EncodedCollabWrapper, FlowyError> { ) -> Result<EncodedCollabWrapper, FlowyError> {
let manager = self.0.clone(); // get the collab_object_id for the database.
let view_id = view_id.to_string(); //
// the collab object_id for the database is not the view_id,
// we should use the view_id to get the database_id
let oid = self.0.get_database_id_with_view_id(view_id).await?;
let row_oids = self.0.get_database_row_ids_with_view_id(view_id).await?;
let row_oids = row_oids
.into_iter()
.map(|oid| oid.into_inner())
.collect::<Vec<_>>();
let database_metas = self.0.get_all_databases_meta().await;
FutureResult::new(async move { let uid = user
// get the collab_object_id for the database. .user_id()
// .map_err(|e| e.with_context("unable to get the uid: {}"))?;
// the collab object_id for the database is not the view_id,
// we should use the view_id to get the database_id
let oid = manager.get_database_id_with_view_id(&view_id).await?;
let row_oids = manager.get_database_row_ids_with_view_id(&view_id).await?;
let row_oids = row_oids
.into_iter()
.map(|oid| oid.into_inner())
.collect::<Vec<_>>();
let database_metas = manager.get_all_databases_meta().await;
let uid = user // get the collab db
.user_id() let collab_db = user
.map_err(|e| e.with_context("unable to get the uid: {}"))?; .collab_db(uid)
.map_err(|e| e.with_context("unable to get the collab"))?;
let collab_db = collab_db.upgrade().ok_or_else(|| {
FlowyError::internal().with_context(
"The collab db has been dropped, indicating that the user has switched to a new account",
)
})?;
// get the collab db let collab_read_txn = collab_db.read_txn();
let collab_db = user
.collab_db(uid)
.map_err(|e| e.with_context("unable to get the collab"))?;
let collab_db = collab_db.upgrade().ok_or_else(|| {
FlowyError::internal().with_context(
"The collab db has been dropped, indicating that the user has switched to a new account",
)
})?;
let collab_read_txn = collab_db.read_txn(); // read the database collab from the db
let database_collab = load_collab_by_object_id(uid, &collab_read_txn, &oid).map_err(|e| {
FlowyError::internal().with_context(format!("load database collab failed: {}", e))
})?;
// read the database collab from the db let database_encoded_collab = database_collab
let database_collab = load_collab_by_object_id(uid, &collab_read_txn, &oid).map_err(|e| {
FlowyError::internal().with_context(format!("load database collab failed: {}", e))
})?;
let database_encoded_collab = database_collab
// encode the collab and check the integrity of the collab // encode the collab and check the integrity of the collab
.encode_collab_v1(|collab| CollabType::Database.validate_require_data(collab)) .encode_collab_v1(|collab| CollabType::Database.validate_require_data(collab))
.map_err(|e| { .map_err(|e| {
FlowyError::internal().with_context(format!("encode database collab failed: {}", e)) FlowyError::internal().with_context(format!("encode database collab failed: {}", e))
})?; })?;
// read the database rows collab from the db // read the database rows collab from the db
let database_row_collabs = load_collab_by_object_ids(uid, &collab_read_txn, &row_oids); let database_row_collabs = load_collab_by_object_ids(uid, &collab_read_txn, &row_oids);
let database_row_encoded_collabs = database_row_collabs let database_row_encoded_collabs = database_row_collabs
.into_iter() .into_iter()
.map(|(oid, collab)| { .map(|(oid, collab)| {
// encode the collab and check the integrity of the collab // encode the collab and check the integrity of the collab
let encoded_collab = collab let encoded_collab = collab
.encode_collab_v1(|collab| CollabType::DatabaseRow.validate_require_data(collab)) .encode_collab_v1(|collab| CollabType::DatabaseRow.validate_require_data(collab))
.map_err(|e| { .map_err(|e| {
FlowyError::internal() FlowyError::internal().with_context(format!("encode database row collab failed: {}", e))
.with_context(format!("encode database row collab failed: {}", e)) })?;
})?; Ok((oid, encoded_collab))
Ok((oid, encoded_collab)) })
}) .collect::<Result<HashMap<_, _>, FlowyError>>()?;
.collect::<Result<HashMap<_, _>, FlowyError>>()?;
// get the relation info from the database meta // get the relation info from the database meta
let database_relations = database_metas let database_relations = database_metas
.into_iter() .into_iter()
.filter_map(|meta| { .filter_map(|meta| {
let linked_views = meta.linked_views.into_iter().next()?; let linked_views = meta.linked_views.into_iter().next()?;
Some((meta.database_id, linked_views)) Some((meta.database_id, linked_views))
}) })
.collect::<HashMap<_, _>>(); .collect::<HashMap<_, _>>();
Ok(EncodedCollabWrapper::Database(DatabaseEncodedCollab { Ok(EncodedCollabWrapper::Database(DatabaseEncodedCollab {
database_encoded_collab, database_encoded_collab,
database_row_encoded_collabs, database_row_encoded_collabs,
database_relations, database_relations,
})) }))
})
} }
fn duplicate_view(&self, view_id: &str) -> FutureResult<Bytes, FlowyError> { async fn duplicate_view(&self, view_id: &str) -> Result<Bytes, FlowyError> {
let database_manager = self.0.clone(); let delta_bytes = self.0.duplicate_database(view_id).await?;
let view_id = view_id.to_owned(); Ok(Bytes::from(delta_bytes))
FutureResult::new(async move {
let delta_bytes = database_manager.duplicate_database(&view_id).await?;
Ok(Bytes::from(delta_bytes))
})
} }
/// Create a database view with duplicated data. /// Create a database view with duplicated data.
/// If the ext contains the {"database_id": "xx"}, then it will link /// If the ext contains the {"database_id": "xx"}, then it will link
/// to the existing database. /// to the existing database.
fn create_view_with_view_data( async fn create_view_with_view_data(
&self, &self,
_user_id: i64, _user_id: i64,
params: CreateViewParams, params: CreateViewParams,
) -> FutureResult<Option<EncodedCollab>, FlowyError> { ) -> Result<Option<EncodedCollab>, FlowyError> {
match CreateDatabaseExtParams::from_map(params.meta.clone()) { match CreateDatabaseExtParams::from_map(params.meta.clone()) {
None => { None => {
let database_manager = self.0.clone(); let encoded_collab = self
let view_id = params.view_id.to_string(); .0
FutureResult::new(async move { .create_database_with_database_data(&params.view_id, params.initial_data)
let encoded_collab = database_manager .await?;
.create_database_with_database_data(&view_id, params.initial_data) Ok(Some(encoded_collab))
.await?;
Ok(Some(encoded_collab))
})
}, },
Some(database_params) => { Some(database_params) => {
let database_manager = self.0.clone();
let layout = match params.layout { let layout = match params.layout {
ViewLayoutPB::Board => DatabaseLayoutPB::Board, ViewLayoutPB::Board => DatabaseLayoutPB::Board,
ViewLayoutPB::Calendar => DatabaseLayoutPB::Calendar, ViewLayoutPB::Calendar => DatabaseLayoutPB::Calendar,
ViewLayoutPB::Grid => DatabaseLayoutPB::Grid, ViewLayoutPB::Grid => DatabaseLayoutPB::Grid,
ViewLayoutPB::Document | ViewLayoutPB::Chat => { ViewLayoutPB::Document | ViewLayoutPB::Chat => {
return FutureResult::new(async move { Err(FlowyError::not_support()) }); return Err(FlowyError::not_support());
}, },
}; };
let name = params.name.to_string(); let name = params.name.to_string();
let database_view_id = params.view_id.to_string(); let database_view_id = params.view_id.to_string();
let database_parent_view_id = params.parent_view_id.to_string(); let database_parent_view_id = params.parent_view_id.to_string();
self
FutureResult::new(async move { .0
database_manager .create_linked_view(
.create_linked_view( name,
name, layout.into(),
layout.into(), database_params.database_id,
database_params.database_id, database_view_id,
database_view_id, database_parent_view_id,
database_parent_view_id, )
) .await?;
.await?; Ok(None)
Ok(None)
})
}, },
} }
} }
@ -478,110 +421,90 @@ impl FolderOperationHandler for DatabaseFolderOperation {
/// If the ext contains the {"database_id": "xx"}, then it will link to /// If the ext contains the {"database_id": "xx"}, then it will link to
/// the existing database. The data of the database will be shared within /// the existing database. The data of the database will be shared within
/// these references views. /// these references views.
fn create_built_in_view( async fn create_built_in_view(
&self, &self,
_user_id: i64, _user_id: i64,
view_id: &str, view_id: &str,
name: &str, name: &str,
layout: ViewLayout, layout: ViewLayout,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
let name = name.to_string(); let name = name.to_string();
let database_manager = self.0.clone();
let data = match layout { let data = match layout {
ViewLayout::Grid => make_default_grid(view_id, &name), ViewLayout::Grid => make_default_grid(view_id, &name),
ViewLayout::Board => make_default_board(view_id, &name), ViewLayout::Board => make_default_board(view_id, &name),
ViewLayout::Calendar => make_default_calendar(view_id, &name), ViewLayout::Calendar => make_default_calendar(view_id, &name),
ViewLayout::Document => { ViewLayout::Document | ViewLayout::Chat => {
return FutureResult::new(async move { return Err(
Err(FlowyError::internal().with_context(format!("Can't handle {:?} layout type", layout))) FlowyError::internal().with_context(format!("Can't handle {:?} layout type", layout)),
}); );
},
ViewLayout::Chat => {
// TODO(nathan): AI
todo!("AI")
}, },
}; };
FutureResult::new(async move { let result = self.0.create_database_with_params(data).await;
let result = database_manager.create_database_with_params(data).await; match result {
match result { Ok(_) => Ok(()),
Ok(_) => Ok(()), Err(err) => {
Err(err) => { if err.is_already_exists() {
if err.is_already_exists() { Ok(())
Ok(()) } else {
} else { Err(err)
Err(err) }
} },
}, }
}
})
} }
fn import_from_bytes( async fn import_from_bytes(
&self, &self,
_uid: i64, _uid: i64,
view_id: &str, view_id: &str,
_name: &str, _name: &str,
import_type: ImportType, import_type: ImportType,
bytes: Vec<u8>, bytes: Vec<u8>,
) -> FutureResult<EncodedCollab, FlowyError> { ) -> Result<EncodedCollab, FlowyError> {
let database_manager = self.0.clone();
let view_id = view_id.to_string();
let format = match import_type { let format = match import_type {
ImportType::CSV => CSVFormat::Original, ImportType::CSV => CSVFormat::Original,
ImportType::HistoryDatabase => CSVFormat::META, ImportType::HistoryDatabase => CSVFormat::META,
ImportType::RawDatabase => CSVFormat::META, ImportType::RawDatabase => CSVFormat::META,
_ => CSVFormat::Original, _ => CSVFormat::Original,
}; };
FutureResult::new(async move { let content = tokio::task::spawn_blocking(move || {
let content = tokio::task::spawn_blocking(move || { String::from_utf8(bytes).map_err(|err| FlowyError::internal().with_context(err))
String::from_utf8(bytes).map_err(|err| FlowyError::internal().with_context(err))
})
.await??;
let result = database_manager
.import_csv(view_id, content, format)
.await?;
Ok(result.encoded_collab)
}) })
.await??;
let result = self
.0
.import_csv(view_id.to_string(), content, format)
.await?;
Ok(result.encoded_collab)
} }
fn import_from_file_path( async fn import_from_file_path(
&self, &self,
_view_id: &str, _view_id: &str,
_name: &str, _name: &str,
path: String, path: String,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
let database_manager = self.0.clone(); self.0.import_csv_from_file(path, CSVFormat::META).await?;
FutureResult::new(async move { Ok(())
database_manager
.import_csv_from_file(path, CSVFormat::META)
.await?;
Ok(())
})
} }
fn did_update_view(&self, old: &View, new: &View) -> FutureResult<(), FlowyError> { async fn did_update_view(&self, old: &View, new: &View) -> Result<(), FlowyError> {
let database_layout = match new.layout { let database_layout = match new.layout {
ViewLayout::Document | ViewLayout::Chat => { ViewLayout::Document | ViewLayout::Chat => {
return FutureResult::new(async { return Err(FlowyError::internal().with_context("Can't handle document layout type"));
Err(FlowyError::internal().with_context("Can't handle document layout type"))
});
}, },
ViewLayout::Grid => DatabaseLayoutPB::Grid, ViewLayout::Grid => DatabaseLayoutPB::Grid,
ViewLayout::Board => DatabaseLayoutPB::Board, ViewLayout::Board => DatabaseLayoutPB::Board,
ViewLayout::Calendar => DatabaseLayoutPB::Calendar, ViewLayout::Calendar => DatabaseLayoutPB::Calendar,
}; };
let database_manager = self.0.clone();
let view_id = new.id.clone();
if old.layout != new.layout { if old.layout != new.layout {
FutureResult::new(async move { self
database_manager .0
.update_database_layout(&view_id, database_layout) .update_database_layout(&new.id, database_layout)
.await?; .await?;
Ok(()) Ok(())
})
} else { } else {
FutureResult::new(async move { Ok(()) }) Ok(())
} }
} }
} }
@ -599,78 +522,61 @@ impl CreateDatabaseExtParams {
} }
struct ChatFolderOperation(Arc<AIManager>); struct ChatFolderOperation(Arc<AIManager>);
#[async_trait]
impl FolderOperationHandler for ChatFolderOperation { impl FolderOperationHandler for ChatFolderOperation {
fn open_view(&self, view_id: &str) -> FutureResult<(), FlowyError> { async fn open_view(&self, view_id: &str) -> Result<(), FlowyError> {
let manager = self.0.clone(); self.0.open_chat(view_id).await
let view_id = view_id.to_string();
FutureResult::new(async move {
manager.open_chat(&view_id).await?;
Ok(())
})
} }
fn close_view(&self, view_id: &str) -> FutureResult<(), FlowyError> { async fn close_view(&self, view_id: &str) -> Result<(), FlowyError> {
let manager = self.0.clone(); self.0.close_chat(view_id).await
let view_id = view_id.to_string();
FutureResult::new(async move {
manager.close_chat(&view_id).await?;
Ok(())
})
} }
fn delete_view(&self, view_id: &str) -> FutureResult<(), FlowyError> { async fn delete_view(&self, view_id: &str) -> Result<(), FlowyError> {
let manager = self.0.clone(); self.0.delete_chat(view_id).await
let view_id = view_id.to_string();
FutureResult::new(async move {
manager.delete_chat(&view_id).await?;
Ok(())
})
} }
fn duplicate_view(&self, _view_id: &str) -> FutureResult<ViewData, FlowyError> { async fn duplicate_view(&self, _view_id: &str) -> Result<ViewData, FlowyError> {
FutureResult::new(async move { Err(FlowyError::not_support()) }) Err(FlowyError::not_support())
} }
fn create_view_with_view_data( async fn create_view_with_view_data(
&self, &self,
_user_id: i64, _user_id: i64,
_params: CreateViewParams, _params: CreateViewParams,
) -> FutureResult<Option<EncodedCollab>, FlowyError> { ) -> Result<Option<EncodedCollab>, FlowyError> {
FutureResult::new(async move { Err(FlowyError::not_support()) }) Err(FlowyError::not_support())
} }
fn create_built_in_view( async fn create_built_in_view(
&self, &self,
user_id: i64, user_id: i64,
view_id: &str, view_id: &str,
_name: &str, _name: &str,
_layout: ViewLayout, _layout: ViewLayout,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
let manager = self.0.clone(); self.0.create_chat(&user_id, view_id).await?;
let view_id = view_id.to_string(); Ok(())
FutureResult::new(async move {
manager.create_chat(&user_id, &view_id).await?;
Ok(())
})
} }
fn import_from_bytes( async fn import_from_bytes(
&self, &self,
_uid: i64, _uid: i64,
_view_id: &str, _view_id: &str,
_name: &str, _name: &str,
_import_type: ImportType, _import_type: ImportType,
_bytes: Vec<u8>, _bytes: Vec<u8>,
) -> FutureResult<EncodedCollab, FlowyError> { ) -> Result<EncodedCollab, FlowyError> {
FutureResult::new(async move { Err(FlowyError::not_support()) }) Err(FlowyError::not_support())
} }
fn import_from_file_path( async fn import_from_file_path(
&self, &self,
_view_id: &str, _view_id: &str,
_name: &str, _name: &str,
_path: String, _path: String,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
FutureResult::new(async move { Err(FlowyError::not_support()) }) Err(FlowyError::not_support())
} }
} }

View File

@ -595,22 +595,17 @@ impl CollabCloudPluginProvider for ServerProvider {
#[async_trait] #[async_trait]
impl ChatCloudService for ServerProvider { impl ChatCloudService for ServerProvider {
fn create_chat( async fn create_chat(
&self, &self,
uid: &i64, uid: &i64,
workspace_id: &str, workspace_id: &str,
chat_id: &str, chat_id: &str,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
let workspace_id = workspace_id.to_string();
let server = self.get_server(); let server = self.get_server();
let chat_id = chat_id.to_string(); server?
let uid = *uid; .chat_service()
FutureResult::new(async move { .create_chat(uid, workspace_id, chat_id)
server? .await
.chat_service()
.create_chat(&uid, &workspace_id, &chat_id)
.await
})
} }
async fn create_question( async fn create_question(

View File

@ -15,7 +15,7 @@ use flowy_storage::manager::StorageManager;
use flowy_user::event_map::UserStatusCallback; use flowy_user::event_map::UserStatusCallback;
use flowy_user_pub::cloud::{UserCloudConfig, UserCloudServiceProvider}; use flowy_user_pub::cloud::{UserCloudConfig, UserCloudServiceProvider};
use flowy_user_pub::entities::{Authenticator, UserProfile, UserWorkspace}; use flowy_user_pub::entities::{Authenticator, UserProfile, UserWorkspace};
use lib_infra::future::{to_fut, Fut}; use lib_infra::async_trait::async_trait;
use crate::integrate::server::{Server, ServerProvider}; use crate::integrate::server::{Server, ServerProvider};
@ -29,21 +29,16 @@ pub(crate) struct UserStatusCallbackImpl {
pub(crate) ai_manager: Arc<AIManager>, pub(crate) ai_manager: Arc<AIManager>,
} }
#[async_trait]
impl UserStatusCallback for UserStatusCallbackImpl { impl UserStatusCallback for UserStatusCallbackImpl {
fn did_init( async fn did_init(
&self, &self,
user_id: i64, user_id: i64,
user_authenticator: &Authenticator, user_authenticator: &Authenticator,
cloud_config: &Option<UserCloudConfig>, cloud_config: &Option<UserCloudConfig>,
user_workspace: &UserWorkspace, user_workspace: &UserWorkspace,
_device_id: &str, _device_id: &str,
) -> Fut<FlowyResult<()>> { ) -> FlowyResult<()> {
let user_id = user_id.to_owned();
let user_workspace = user_workspace.clone();
let folder_manager = self.folder_manager.clone();
let database_manager = self.database_manager.clone();
let document_manager = self.document_manager.clone();
self self
.server_provider .server_provider
.set_user_authenticator(user_authenticator); .set_user_authenticator(user_authenticator);
@ -59,159 +54,142 @@ impl UserStatusCallback for UserStatusCallbackImpl {
} }
} }
to_fut(async move { self
folder_manager .folder_manager
.initialize( .initialize(
user_id, user_id,
&user_workspace.id, &user_workspace.id,
FolderInitDataSource::LocalDisk { FolderInitDataSource::LocalDisk {
create_if_not_exist: false, create_if_not_exist: false,
}, },
) )
.await?; .await?;
database_manager.initialize(user_id).await?; self.database_manager.initialize(user_id).await?;
document_manager.initialize(user_id).await?; self.document_manager.initialize(user_id).await?;
Ok(()) Ok(())
})
} }
fn did_sign_in( async fn did_sign_in(
&self, &self,
user_id: i64, user_id: i64,
user_workspace: &UserWorkspace, user_workspace: &UserWorkspace,
device_id: &str, device_id: &str,
) -> Fut<FlowyResult<()>> { ) -> FlowyResult<()> {
let device_id = device_id.to_owned(); event!(
let user_id = user_id.to_owned(); tracing::Level::TRACE,
let user_workspace = user_workspace.clone(); "Notify did sign in: latest_workspace: {:?}, device_id: {}",
let folder_manager = self.folder_manager.clone(); user_workspace,
let database_manager = self.database_manager.clone(); device_id
let document_manager = self.document_manager.clone(); );
to_fut(async move { self
event!( .folder_manager
tracing::Level::TRACE, .initialize_with_workspace_id(user_id)
"Notify did sign in: latest_workspace: {:?}, device_id: {}", .await?;
user_workspace, self.database_manager.initialize(user_id).await?;
device_id self.document_manager.initialize(user_id).await?;
); Ok(())
folder_manager.initialize_with_workspace_id(user_id).await?;
database_manager.initialize(user_id).await?;
document_manager.initialize(user_id).await?;
Ok(())
})
} }
fn did_sign_up( async fn did_sign_up(
&self, &self,
is_new_user: bool, is_new_user: bool,
user_profile: &UserProfile, user_profile: &UserProfile,
user_workspace: &UserWorkspace, user_workspace: &UserWorkspace,
device_id: &str, device_id: &str,
) -> Fut<FlowyResult<()>> { ) -> FlowyResult<()> {
let device_id = device_id.to_owned();
let user_profile = user_profile.clone();
let folder_manager = self.folder_manager.clone();
let database_manager = self.database_manager.clone();
let user_workspace = user_workspace.clone();
let document_manager = self.document_manager.clone();
self self
.server_provider .server_provider
.set_user_authenticator(&user_profile.authenticator); .set_user_authenticator(&user_profile.authenticator);
let server_type = self.server_provider.get_server_type(); let server_type = self.server_provider.get_server_type();
to_fut(async move { event!(
event!( tracing::Level::TRACE,
tracing::Level::TRACE, "Notify did sign up: is new: {} user_workspace: {:?}, device_id: {}",
"Notify did sign up: is new: {} user_workspace: {:?}, device_id: {}", is_new_user,
is_new_user, user_workspace,
user_workspace, device_id
device_id );
);
// In the current implementation, when a user signs up for AppFlowy Cloud, a default workspace // In the current implementation, when a user signs up for AppFlowy Cloud, a default workspace
// is automatically created for them. However, for users who sign up through Supabase, the creation // is automatically created for them. However, for users who sign up through Supabase, the creation
// of the default workspace relies on the client-side operation. This means that the process // of the default workspace relies on the client-side operation. This means that the process
// for initializing a default workspace differs depending on the sign-up method used. // for initializing a default workspace differs depending on the sign-up method used.
let data_source = match folder_manager let data_source = match self
.cloud_service .folder_manager
.get_folder_doc_state( .cloud_service
&user_workspace.id, .get_folder_doc_state(
user_profile.uid, &user_workspace.id,
CollabType::Folder, user_profile.uid,
&user_workspace.id, CollabType::Folder,
) &user_workspace.id,
.await )
{ .await
Ok(doc_state) => match server_type { {
Server::Local => FolderInitDataSource::LocalDisk { Ok(doc_state) => match server_type {
create_if_not_exist: true, Server::Local => FolderInitDataSource::LocalDisk {
}, create_if_not_exist: true,
Server::AppFlowyCloud => FolderInitDataSource::Cloud(doc_state), },
Server::Supabase => { Server::AppFlowyCloud => FolderInitDataSource::Cloud(doc_state),
if is_new_user { Server::Supabase => {
FolderInitDataSource::LocalDisk { if is_new_user {
create_if_not_exist: true, FolderInitDataSource::LocalDisk {
} create_if_not_exist: true,
} else {
FolderInitDataSource::Cloud(doc_state)
} }
}, } else {
FolderInitDataSource::Cloud(doc_state)
}
}, },
Err(err) => match server_type { },
Server::Local => FolderInitDataSource::LocalDisk { Err(err) => match server_type {
create_if_not_exist: true, Server::Local => FolderInitDataSource::LocalDisk {
}, create_if_not_exist: true,
Server::AppFlowyCloud | Server::Supabase => {
return Err(FlowyError::from(err));
},
}, },
}; Server::AppFlowyCloud | Server::Supabase => {
return Err(FlowyError::from(err));
},
},
};
folder_manager self
.initialize_with_new_user( .folder_manager
user_profile.uid, .initialize_with_new_user(
&user_profile.token, user_profile.uid,
is_new_user, &user_profile.token,
data_source, is_new_user,
&user_workspace.id, data_source,
) &user_workspace.id,
.await )
.context("FolderManager error")?; .await
.context("FolderManager error")?;
database_manager self
.initialize_with_new_user(user_profile.uid) .database_manager
.await .initialize_with_new_user(user_profile.uid)
.context("DatabaseManager error")?; .await
.context("DatabaseManager error")?;
document_manager self
.initialize_with_new_user(user_profile.uid) .document_manager
.await .initialize_with_new_user(user_profile.uid)
.context("DocumentManager error")?; .await
Ok(()) .context("DocumentManager error")?;
}) Ok(())
} }
fn did_expired(&self, _token: &str, user_id: i64) -> Fut<FlowyResult<()>> { async fn did_expired(&self, _token: &str, user_id: i64) -> FlowyResult<()> {
let folder_manager = self.folder_manager.clone(); self.folder_manager.clear(user_id).await;
to_fut(async move { Ok(())
folder_manager.clear(user_id).await;
Ok(())
})
} }
fn open_workspace(&self, user_id: i64, _user_workspace: &UserWorkspace) -> Fut<FlowyResult<()>> { async fn open_workspace(&self, user_id: i64, _user_workspace: &UserWorkspace) -> FlowyResult<()> {
let folder_manager = self.folder_manager.clone(); self
let database_manager = self.database_manager.clone(); .folder_manager
let document_manager = self.document_manager.clone(); .initialize_with_workspace_id(user_id)
.await?;
to_fut(async move { self.database_manager.initialize(user_id).await?;
folder_manager.initialize_with_workspace_id(user_id).await?; self.document_manager.initialize(user_id).await?;
database_manager.initialize(user_id).await?; Ok(())
document_manager.initialize(user_id).await?;
Ok(())
})
} }
fn did_update_network(&self, reachable: bool) { fn did_update_network(&self, reachable: bool) {

View File

@ -1,16 +1,15 @@
use std::collections::HashMap; use async_trait::async_trait;
use std::sync::Arc;
use bytes::Bytes; use bytes::Bytes;
use collab::entity::EncodedCollab; use collab::entity::EncodedCollab;
pub use collab_folder::View; pub use collab_folder::View;
use collab_folder::ViewLayout; use collab_folder::ViewLayout;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use flowy_error::FlowyError; use flowy_error::FlowyError;
use flowy_folder_pub::folder_builder::NestedViewBuilder; use flowy_folder_pub::folder_builder::NestedViewBuilder;
use lib_infra::future::FutureResult;
use lib_infra::util::timestamp; use lib_infra::util::timestamp;
use crate::entities::{CreateViewParams, ViewLayoutPB}; use crate::entities::{CreateViewParams, ViewLayoutPB};
@ -42,36 +41,37 @@ pub struct DatabaseEncodedCollab {
/// view layout. Each [ViewLayout] will have a handler. So when creating a new /// view layout. Each [ViewLayout] will have a handler. So when creating a new
/// view, the [ViewLayout] will be used to get the handler. /// view, the [ViewLayout] will be used to get the handler.
/// ///
#[async_trait]
pub trait FolderOperationHandler { pub trait FolderOperationHandler {
/// Create the view for the workspace of new user. /// Create the view for the workspace of new user.
/// Only called once when the user is created. /// Only called once when the user is created.
fn create_workspace_view( async fn create_workspace_view(
&self, &self,
_uid: i64, _uid: i64,
_workspace_view_builder: Arc<RwLock<NestedViewBuilder>>, _workspace_view_builder: Arc<RwLock<NestedViewBuilder>>,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
FutureResult::new(async { Ok(()) }) Ok(())
} }
fn open_view(&self, view_id: &str) -> FutureResult<(), FlowyError>; async fn open_view(&self, view_id: &str) -> Result<(), FlowyError>;
/// Closes the view and releases the resources that this view has in /// Closes the view and releases the resources that this view has in
/// the backend /// the backend
fn close_view(&self, view_id: &str) -> FutureResult<(), FlowyError>; async fn close_view(&self, view_id: &str) -> Result<(), FlowyError>;
/// Called when the view is deleted. /// Called when the view is deleted.
/// This will called after the view is deleted from the trash. /// This will called after the view is deleted from the trash.
fn delete_view(&self, view_id: &str) -> FutureResult<(), FlowyError>; async fn delete_view(&self, view_id: &str) -> Result<(), FlowyError>;
/// Returns the [ViewData] that can be used to create the same view. /// Returns the [ViewData] that can be used to create the same view.
fn duplicate_view(&self, view_id: &str) -> FutureResult<ViewData, FlowyError>; async fn duplicate_view(&self, view_id: &str) -> Result<ViewData, FlowyError>;
/// get the encoded collab data from the disk. /// get the encoded collab data from the disk.
fn get_encoded_collab_v1_from_disk( async fn get_encoded_collab_v1_from_disk(
&self, &self,
_user: Arc<dyn FolderUser>, _user: Arc<dyn FolderUser>,
_view_id: &str, _view_id: &str,
) -> FutureResult<EncodedCollabWrapper, FlowyError> { ) -> Result<EncodedCollabWrapper, FlowyError> {
FutureResult::new(async move { Err(FlowyError::not_support()) }) Err(FlowyError::not_support())
} }
/// Create a view with the data. /// Create a view with the data.
@ -92,46 +92,46 @@ pub trait FolderOperationHandler {
/// ///
/// The return value is the [Option<EncodedCollab>] that can be used to create the view. /// The return value is the [Option<EncodedCollab>] that can be used to create the view.
/// It can be used in syncing the view data to cloud. /// It can be used in syncing the view data to cloud.
fn create_view_with_view_data( async fn create_view_with_view_data(
&self, &self,
user_id: i64, user_id: i64,
params: CreateViewParams, params: CreateViewParams,
) -> FutureResult<Option<EncodedCollab>, FlowyError>; ) -> Result<Option<EncodedCollab>, FlowyError>;
/// Create a view with the pre-defined data. /// Create a view with the pre-defined data.
/// For example, the initial data of the grid/calendar/kanban board when /// For example, the initial data of the grid/calendar/kanban board when
/// you create a new view. /// you create a new view.
fn create_built_in_view( async fn create_built_in_view(
&self, &self,
user_id: i64, user_id: i64,
view_id: &str, view_id: &str,
name: &str, name: &str,
layout: ViewLayout, layout: ViewLayout,
) -> FutureResult<(), FlowyError>; ) -> Result<(), FlowyError>;
/// Create a view by importing data /// Create a view by importing data
/// ///
/// The return value /// The return value
fn import_from_bytes( async fn import_from_bytes(
&self, &self,
uid: i64, uid: i64,
view_id: &str, view_id: &str,
name: &str, name: &str,
import_type: ImportType, import_type: ImportType,
bytes: Vec<u8>, bytes: Vec<u8>,
) -> FutureResult<EncodedCollab, FlowyError>; ) -> Result<EncodedCollab, FlowyError>;
/// Create a view by importing data from a file /// Create a view by importing data from a file
fn import_from_file_path( async fn import_from_file_path(
&self, &self,
view_id: &str, view_id: &str,
name: &str, name: &str,
path: String, path: String,
) -> FutureResult<(), FlowyError>; ) -> Result<(), FlowyError>;
/// Called when the view is updated. The handler is the `old` registered handler. /// Called when the view is updated. The handler is the `old` registered handler.
fn did_update_view(&self, _old: &View, _new: &View) -> FutureResult<(), FlowyError> { async fn did_update_view(&self, _old: &View, _new: &View) -> Result<(), FlowyError> {
FutureResult::new(async move { Ok(()) }) Ok(())
} }
} }

View File

@ -13,7 +13,6 @@ use flowy_ai_pub::cloud::{
use flowy_error::FlowyError; use flowy_error::FlowyError;
use futures_util::{StreamExt, TryStreamExt}; use futures_util::{StreamExt, TryStreamExt};
use lib_infra::async_trait::async_trait; use lib_infra::async_trait::async_trait;
use lib_infra::future::FutureResult;
use lib_infra::util::{get_operating_system, OperatingSystem}; use lib_infra::util::{get_operating_system, OperatingSystem};
use serde_json::{json, Value}; use serde_json::{json, Value};
use std::collections::HashMap; use std::collections::HashMap;
@ -28,29 +27,25 @@ impl<T> ChatCloudService for AFCloudChatCloudServiceImpl<T>
where where
T: AFServer, T: AFServer,
{ {
fn create_chat( async fn create_chat(
&self, &self,
_uid: &i64, _uid: &i64,
workspace_id: &str, workspace_id: &str,
chat_id: &str, chat_id: &str,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
let workspace_id = workspace_id.to_string();
let chat_id = chat_id.to_string(); let chat_id = chat_id.to_string();
let try_get_client = self.inner.try_get_client(); let try_get_client = self.inner.try_get_client();
let params = CreateChatParams {
chat_id,
name: "".to_string(),
rag_ids: vec![],
};
try_get_client?
.create_chat(workspace_id, params)
.await
.map_err(FlowyError::from)?;
FutureResult::new(async move { Ok(())
let params = CreateChatParams {
chat_id,
name: "".to_string(),
rag_ids: vec![],
};
try_get_client?
.create_chat(&workspace_id, params)
.await
.map_err(FlowyError::from)?;
Ok(())
})
} }
async fn create_question( async fn create_question(

View File

@ -5,7 +5,6 @@ use flowy_ai_pub::cloud::{
}; };
use flowy_error::FlowyError; use flowy_error::FlowyError;
use lib_infra::async_trait::async_trait; use lib_infra::async_trait::async_trait;
use lib_infra::future::FutureResult;
use serde_json::Value; use serde_json::Value;
use std::collections::HashMap; use std::collections::HashMap;
use std::path::Path; use std::path::Path;
@ -14,15 +13,13 @@ pub(crate) struct DefaultChatCloudServiceImpl;
#[async_trait] #[async_trait]
impl ChatCloudService for DefaultChatCloudServiceImpl { impl ChatCloudService for DefaultChatCloudServiceImpl {
fn create_chat( async fn create_chat(
&self, &self,
_uid: &i64, _uid: &i64,
_workspace_id: &str, _workspace_id: &str,
_chat_id: &str, _chat_id: &str,
) -> FutureResult<(), FlowyError> { ) -> Result<(), FlowyError> {
FutureResult::new(async move { Err(FlowyError::not_support().with_context("Chat is not supported in local server."))
Err(FlowyError::not_support().with_context("Chat is not supported in local server."))
})
} }
async fn create_question( async fn create_question(

View File

@ -7,7 +7,7 @@ use flowy_error::FlowyResult;
use flowy_user_pub::cloud::UserCloudConfig; use flowy_user_pub::cloud::UserCloudConfig;
use flowy_user_pub::entities::*; use flowy_user_pub::entities::*;
use lib_dispatch::prelude::*; use lib_dispatch::prelude::*;
use lib_infra::future::{to_fut, Fut}; use lib_infra::async_trait::async_trait;
use crate::event_handler::*; use crate::event_handler::*;
use crate::user_manager::UserManager; use crate::user_manager::UserManager;
@ -276,38 +276,53 @@ pub enum UserEvent {
NotifyDidSwitchPlan = 63, NotifyDidSwitchPlan = 63,
} }
#[async_trait]
pub trait UserStatusCallback: Send + Sync + 'static { pub trait UserStatusCallback: Send + Sync + 'static {
/// When the [Authenticator] changed, this method will be called. Currently, the auth type /// When the [Authenticator] changed, this method will be called. Currently, the auth type
/// will be changed when the user sign in or sign up. /// will be changed when the user sign in or sign up.
fn authenticator_did_changed(&self, _authenticator: Authenticator) {} fn authenticator_did_changed(&self, _authenticator: Authenticator) {}
/// This will be called after the application launches if the user is already signed in. /// This will be called after the application launches if the user is already signed in.
/// If the user is not signed in, this method will not be called /// If the user is not signed in, this method will not be called
fn did_init( async fn did_init(
&self, &self,
user_id: i64, _user_id: i64,
user_authenticator: &Authenticator, _user_authenticator: &Authenticator,
cloud_config: &Option<UserCloudConfig>, _cloud_config: &Option<UserCloudConfig>,
user_workspace: &UserWorkspace, _user_workspace: &UserWorkspace,
device_id: &str, _device_id: &str,
) -> Fut<FlowyResult<()>>; ) -> FlowyResult<()> {
Ok(())
}
/// Will be called after the user signed in. /// Will be called after the user signed in.
fn did_sign_in( async fn did_sign_in(
&self, &self,
user_id: i64, _user_id: i64,
user_workspace: &UserWorkspace, _user_workspace: &UserWorkspace,
device_id: &str, _device_id: &str,
) -> Fut<FlowyResult<()>>; ) -> FlowyResult<()> {
Ok(())
}
/// Will be called after the user signed up. /// Will be called after the user signed up.
fn did_sign_up( async fn did_sign_up(
&self, &self,
is_new_user: bool, _is_new_user: bool,
user_profile: &UserProfile, _user_profile: &UserProfile,
user_workspace: &UserWorkspace, _user_workspace: &UserWorkspace,
device_id: &str, _device_id: &str,
) -> Fut<FlowyResult<()>>; ) -> FlowyResult<()> {
Ok(())
}
fn did_expired(&self, token: &str, user_id: i64) -> Fut<FlowyResult<()>>; async fn did_expired(&self, _token: &str, _user_id: i64) -> FlowyResult<()> {
fn open_workspace(&self, user_id: i64, user_workspace: &UserWorkspace) -> Fut<FlowyResult<()>>; Ok(())
}
async fn open_workspace(
&self,
_user_id: i64,
_user_workspace: &UserWorkspace,
) -> FlowyResult<()> {
Ok(())
}
fn did_update_network(&self, _reachable: bool) {} fn did_update_network(&self, _reachable: bool) {}
fn did_update_plans(&self, _plans: Vec<SubscriptionPlan>) {} fn did_update_plans(&self, _plans: Vec<SubscriptionPlan>) {}
fn did_update_storage_limitation(&self, _can_write: bool) {} fn did_update_storage_limitation(&self, _can_write: bool) {}
@ -315,42 +330,4 @@ pub trait UserStatusCallback: Send + Sync + 'static {
/// Acts as a placeholder [UserStatusCallback] for the user session, but does not perform any function /// Acts as a placeholder [UserStatusCallback] for the user session, but does not perform any function
pub(crate) struct DefaultUserStatusCallback; pub(crate) struct DefaultUserStatusCallback;
impl UserStatusCallback for DefaultUserStatusCallback { impl UserStatusCallback for DefaultUserStatusCallback {}
fn did_init(
&self,
_user_id: i64,
_authenticator: &Authenticator,
_cloud_config: &Option<UserCloudConfig>,
_user_workspace: &UserWorkspace,
_device_id: &str,
) -> Fut<FlowyResult<()>> {
to_fut(async { Ok(()) })
}
fn did_sign_in(
&self,
_user_id: i64,
_user_workspace: &UserWorkspace,
_device_id: &str,
) -> Fut<FlowyResult<()>> {
to_fut(async { Ok(()) })
}
fn did_sign_up(
&self,
_is_new_user: bool,
_user_profile: &UserProfile,
_user_workspace: &UserWorkspace,
_device_id: &str,
) -> Fut<FlowyResult<()>> {
to_fut(async { Ok(()) })
}
fn did_expired(&self, _token: &str, _user_id: i64) -> Fut<FlowyResult<()>> {
to_fut(async { Ok(()) })
}
fn open_workspace(&self, _user_id: i64, _user_workspace: &UserWorkspace) -> Fut<FlowyResult<()>> {
to_fut(async { Ok(()) })
}
}

View File

@ -33,7 +33,7 @@ where
pub fn load_collab_by_object_id<'a, R>( pub fn load_collab_by_object_id<'a, R>(
uid: i64, uid: i64,
collab_read_txn: &R, collab_read_txn: &R,
object_id: &String, object_id: &str,
) -> Result<Collab, PersistenceError> ) -> Result<Collab, PersistenceError>
where where
R: CollabKVAction<'a>, R: CollabKVAction<'a>,

View File

@ -581,7 +581,6 @@ impl UserManager {
Ok(UseAISettingPB::from(settings)) Ok(UseAISettingPB::from(settings))
} }
#[instrument(level = "debug", skip(self), err)]
pub async fn get_workspace_member_info(&self, uid: i64) -> FlowyResult<WorkspaceMember> { pub async fn get_workspace_member_info(&self, uid: i64) -> FlowyResult<WorkspaceMember> {
let workspace_id = self.get_session()?.user_workspace.id.clone(); let workspace_id = self.get_session()?.user_workspace.id.clone();
let db = self.authenticate_user.get_sqlite_connection(uid)?; let db = self.authenticate_user.get_sqlite_connection(uid)?;