chore: use tanvity as fallback ai search (#7892)

* chore: use tanvity as fallback ai search

* chore: fix test
This commit is contained in:
Nathan.fooo
2025-05-08 00:17:02 +08:00
committed by GitHub
parent b1d3553110
commit 95f9b07ef7
26 changed files with 786 additions and 579 deletions

View File

@ -3100,8 +3100,13 @@ dependencies = [
"client-api",
"collab",
"collab-folder",
"dashmap 6.0.1",
"flowy-error",
"lib-infra",
"once_cell",
"tantivy",
"tokio",
"tracing",
"uuid",
]

View File

@ -146,6 +146,7 @@ impl EmbeddingScheduler {
return Ok(SearchSummaryResult { summaries: vec![] });
}
trace!("[Search] generate local ai overview");
let docs = search_results
.into_iter()
.map(|v| LLMDocument {
@ -153,6 +154,7 @@ impl EmbeddingScheduler {
object_id: v.object_id,
})
.collect::<Vec<_>>();
let resp = summarize_documents(&self.ollama, question, model_name, docs)
.await
.map_err(|err| {

View File

@ -4,7 +4,6 @@ use std::sync::{Arc, Weak};
use tracing::{error, event, info, instrument};
use crate::full_indexed_data_provider::FullIndexedDataWriter;
use crate::indexed_data_consumer::{close_document_tantivy_state, get_document_tantivy_state};
use crate::server_layer::ServerProvider;
use collab_entity::CollabType;
use collab_integrate::collab_builder::AppFlowyCollabBuilder;
@ -17,6 +16,7 @@ use flowy_document::manager::DocumentManager;
use flowy_error::{FlowyError, FlowyResult};
use flowy_folder::manager::{FolderInitDataSource, FolderManager};
use flowy_search::services::manager::SearchManager;
use flowy_search_pub::tantivy_state_init::close_document_tantivy_state;
use flowy_server::af_cloud::define::LoggedUser;
use flowy_storage::manager::StorageManager;
use flowy_user::event_map::AppLifeCycle;
@ -177,9 +177,6 @@ impl AppLifeCycle for AppLifeCycleImpl {
let cloned_ai_manager = self.ai_manager()?;
let server_provider = self.server_provider()?;
self
.create_thanvity_state_if_not_exists(user_id, workspace_id, user_paths)
.await;
self
.start_full_indexed_data_provider(
user_id,
@ -201,15 +198,20 @@ impl AppLifeCycle for AppLifeCycleImpl {
.await;
// do not change the order of this function
let tanvity_state = self
.create_tanvity_state_if_not_exists(user_id, workspace_id, user_paths)
.await;
self
.search_manager()?
.on_launch_if_authenticated(workspace_id, get_document_tantivy_state(workspace_id))
.on_launch_if_authenticated(workspace_id, tanvity_state.clone())
.await;
let workspace_id = *workspace_id;
let workspace_type = *workspace_type;
self.runtime.spawn(async move {
server_provider.on_launch_if_authenticated(&workspace_type);
server_provider
.on_launch_if_authenticated(tanvity_state)
.await;
if let Err(err) = cloned_ai_manager
.on_launch_if_authenticated(&workspace_id)
.await
@ -236,11 +238,6 @@ impl AppLifeCycle for AppLifeCycleImpl {
workspace_id,
user_config.device_id,
);
let server_provider = self.server_provider()?;
let c_workspace_type = *workspace_type;
self.runtime.spawn(async move {
server_provider.on_sign_in(&c_workspace_type);
});
let data_source = self
.folder_init_data_source(user_id, workspace_id, workspace_type)
@ -258,18 +255,6 @@ impl AppLifeCycle for AppLifeCycleImpl {
.initialize_after_sign_in(user_id)
.await?;
let ai_manager = self.ai_manager()?;
let cloned_workspace_id = *workspace_id;
self.runtime.spawn(async move {
ai_manager
.initialize_after_sign_in(&cloned_workspace_id)
.await?;
Ok::<_, FlowyError>(())
});
self
.create_thanvity_state_if_not_exists(user_id, workspace_id, user_paths)
.await;
self
.start_full_indexed_data_provider(
user_id,
@ -290,11 +275,25 @@ impl AppLifeCycle for AppLifeCycleImpl {
)
.await;
// do not change the order of this function
let server_provider = self.server_provider()?;
let tanvity_state = self
.create_tanvity_state_if_not_exists(user_id, workspace_id, user_paths)
.await;
self
.search_manager()?
.initialize_after_sign_in(workspace_id, get_document_tantivy_state(workspace_id))
.initialize_after_sign_in(workspace_id, tanvity_state.clone())
.await;
let ai_manager = self.ai_manager()?;
let cloned_workspace_id = *workspace_id;
self.runtime.spawn(async move {
server_provider.on_sign_in(tanvity_state).await;
ai_manager
.initialize_after_sign_in(&cloned_workspace_id)
.await?;
Ok::<_, FlowyError>(())
});
Ok(())
}
@ -315,12 +314,7 @@ impl AppLifeCycle for AppLifeCycleImpl {
workspace_id,
user_config.device_id,
);
let c_workspace_type = *workspace_type;
let server_provider = self.server_provider()?;
self.runtime.spawn(async move {
server_provider.on_sign_in(&c_workspace_type);
});
let data_source = self
.folder_init_data_source(user_profile.uid, workspace_id, workspace_type)
.await?;
@ -349,18 +343,6 @@ impl AppLifeCycle for AppLifeCycleImpl {
.await
.context("DocumentManager error")?;
let ai_manager = self.ai_manager()?;
let cloned_workspace_id = *workspace_id;
self.runtime.spawn(async move {
ai_manager
.initialize_after_sign_up(&cloned_workspace_id)
.await?;
Ok::<_, FlowyError>(())
});
self
.create_thanvity_state_if_not_exists(user_profile.uid, workspace_id, user_paths)
.await;
self
.start_full_indexed_data_provider(
user_profile.uid,
@ -380,11 +362,24 @@ impl AppLifeCycle for AppLifeCycleImpl {
)
.await;
// do not change the order of this function
let tanvity_state = self
.create_tanvity_state_if_not_exists(user_profile.uid, workspace_id, user_paths)
.await;
self
.search_manager()?
.initialize_after_sign_up(workspace_id, get_document_tantivy_state(workspace_id))
.initialize_after_sign_up(workspace_id, tanvity_state.clone())
.await;
let ai_manager = self.ai_manager()?;
let cloned_workspace_id = *workspace_id;
self.runtime.spawn(async move {
server_provider.on_sign_in(tanvity_state).await;
ai_manager
.initialize_after_sign_up(&cloned_workspace_id)
.await?;
Ok::<_, FlowyError>(())
});
Ok(())
}
@ -412,12 +407,6 @@ impl AppLifeCycle for AppLifeCycleImpl {
.folder_init_data_source(user_id, workspace_id, workspace_type)
.await?;
let server_provider = self.server_provider()?;
let c_workspace_type = *workspace_type;
self.runtime.spawn(async move {
server_provider.init_after_open_workspace(&c_workspace_type);
});
self
.folder_manager()?
.initialize_after_open_workspace(user_id, data_source)
@ -431,22 +420,30 @@ impl AppLifeCycle for AppLifeCycleImpl {
.initialize_after_open_workspace(user_id)
.await?;
let ai_manager = self.ai_manager()?;
let cloned_workspace_id = *workspace_id;
self.runtime.spawn(async move {
ai_manager
.initialize_after_open_workspace(&cloned_workspace_id)
.await?;
Ok::<_, FlowyError>(())
});
self
.storage_manager()?
.initialize_after_open_workspace(workspace_id)
.await;
self
.create_thanvity_state_if_not_exists(user_id, workspace_id, user_paths)
let tanvity_state = self
.create_tanvity_state_if_not_exists(user_id, workspace_id, user_paths)
.await;
self
.search_manager()?
.initialize_after_open_workspace(workspace_id, tanvity_state.clone())
.await;
let server_provider = self.server_provider()?;
let cloned_workspace_id = *workspace_id;
let ai_manager = self.ai_manager()?;
self.runtime.spawn(async move {
server_provider.on_workspace_opened(tanvity_state).await;
ai_manager
.initialize_after_open_workspace(&cloned_workspace_id)
.await?;
Ok::<_, FlowyError>(())
});
self
.start_full_indexed_data_provider(
user_id,
@ -466,11 +463,6 @@ impl AppLifeCycle for AppLifeCycleImpl {
)
.await;
// do not change the order of this function
self
.search_manager()?
.initialize_after_open_workspace(workspace_id, get_document_tantivy_state(workspace_id))
.await;
Ok(())
}

View File

@ -840,7 +840,7 @@ impl SearchCloudService for ServerProvider {
query: String,
) -> Result<Vec<SearchDocumentResponseItem>, FlowyError> {
let server = self.get_server()?;
match server.search_service() {
match server.search_service().await {
Some(search_service) => search_service.document_search(workspace_id, query).await,
None => Err(FlowyError::internal().with_context("SearchCloudService not found")),
}
@ -853,7 +853,7 @@ impl SearchCloudService for ServerProvider {
search_results: Vec<SearchResult>,
) -> Result<SearchSummaryResult, FlowyError> {
let server = self.get_server()?;
match server.search_service() {
match server.search_service().await {
Some(search_service) => {
search_service
.generate_search_summary(workspace_id, query, search_results)

View File

@ -14,6 +14,7 @@ use flowy_folder::share::ImportType;
use flowy_folder::view_operation::{
FolderOperationHandler, GatherEncodedCollab, ImportedData, ViewData,
};
use flowy_search_pub::tantivy_state_init::get_document_tantivy_state;
use lib_dispatch::prelude::ToBytes;
use lib_infra::async_trait::async_trait;
use std::convert::TryFrom;
@ -77,7 +78,13 @@ impl FolderOperationHandler for DocumentFolderOperation {
}
async fn delete_view(&self, view_id: &Uuid) -> Result<(), FlowyError> {
match self.document_manager()?.delete_document(view_id).await {
let document_manager = self.document_manager()?;
let workspace_id = document_manager.user_service.workspace_id()?;
if let Some(state) = get_document_tantivy_state(&workspace_id).and_then(|v| v.upgrade()) {
let _ = state.write().await.delete_document(&view_id.to_string());
}
match document_manager.delete_document(view_id).await {
Ok(_) => tracing::trace!("Delete document: {}", view_id),
Err(e) => tracing::error!("🔴delete document failed: {}", e),
}

View File

@ -1,7 +1,7 @@
use collab::core::collab::{IndexContent, IndexContentReceiver};
use collab_folder::ViewIndexContent;
use flowy_search::document::local_search_handler::DocumentTantivyState;
use flowy_search_pub::entities::FolderViewObserver;
use flowy_search_pub::tantivy_state::DocumentTantivyState;
use lib_infra::async_trait::async_trait;
use std::sync::Weak;
use tokio::sync::RwLock;

View File

@ -1,3 +1,4 @@
use crate::indexed_data_consumer::index_views_from_folder;
use client_api::entity::workspace_dto::ViewIcon;
use collab::preclude::Collab;
use collab_entity::CollabType;
@ -100,12 +101,7 @@ impl FullIndexedDataWriter {
.folder_manager
.upgrade()
.ok_or_else(|| FlowyError::internal().with_context("Failed to upgrade FolderManager"))?;
let views = folder_manager
.get_all_views()
.await?
.into_iter()
.filter(|v| v.space_info().is_none())
.collect::<Vec<_>>();
let views = index_views_from_folder(&folder_manager).await?;
let view_ids = views.iter().map(|v| v.id.clone()).collect::<Vec<_>>();
let view_by_view_id = Arc::new(
views

View File

@ -2,21 +2,21 @@ use crate::folder_view_observer::FolderViewObserverImpl;
use crate::full_indexed_data_provider::FullIndexedDataConsumer;
use collab_entity::CollabType;
use collab_folder::folder_diff::FolderViewChange;
use collab_folder::{IconType, ViewIcon};
use collab_folder::{IconType, View, ViewIcon};
use collab_integrate::instant_indexed_data_provider::InstantIndexedDataConsumer;
use dashmap::DashMap;
use flowy_ai_pub::entities::{UnindexedCollab, UnindexedCollabMetadata, UnindexedData};
use flowy_error::{FlowyError, FlowyResult};
use flowy_folder::manager::FolderManager;
use flowy_search::document::local_search_handler::DocumentTantivyState;
use flowy_search_pub::entities::FolderViewObserver;
use flowy_search_pub::tantivy_state::DocumentTantivyState;
use flowy_search_pub::tantivy_state_init::get_or_init_document_tantivy_state;
use flowy_server::af_cloud::define::LoggedUser;
use lib_infra::async_trait::async_trait;
use once_cell::sync::Lazy;
use std::path::PathBuf;
use std::sync::{Arc, Weak};
use tokio::sync::RwLock;
use tracing::{error, info, trace};
use tracing::{error, trace};
use uuid::Uuid;
#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
@ -126,48 +126,7 @@ impl InstantIndexedDataConsumer for EmbeddingsInstantConsumerImpl {
Ok(())
}
}
/// Global map: workspace_id → a *weak* handle to its index state.
type DocIndexMap = DashMap<Uuid, Arc<RwLock<DocumentTantivyState>>>;
static SEARCH_INDEX: Lazy<DocIndexMap> = Lazy::new(DocIndexMap::new);
/// Returns a strong handle, creating it if needed.
pub(crate) fn get_or_init_document_tantivy_state(
workspace_id: Uuid,
data_path: PathBuf,
) -> FlowyResult<Arc<RwLock<DocumentTantivyState>>> {
Ok(
SEARCH_INDEX
.entry(workspace_id)
.or_try_insert_with(|| {
info!(
"[Indexing] Creating new tantivy state for workspace: {}",
workspace_id
);
let state = DocumentTantivyState::new(&workspace_id, data_path.clone())?;
let arc_state = Arc::new(RwLock::new(state));
Ok::<_, FlowyError>(arc_state)
})?
.clone(),
)
}
pub(crate) fn close_document_tantivy_state(workspace_id: &Uuid) {
if SEARCH_INDEX.remove(workspace_id).is_some() {
info!(
"[Indexing] close tantivy state for workspace: {}",
workspace_id
);
}
}
pub fn get_document_tantivy_state(
workspace_id: &Uuid,
) -> Option<Weak<RwLock<DocumentTantivyState>>> {
if let Some(existing) = SEARCH_INDEX.get(workspace_id) {
return Some(Arc::downgrade(existing.value()));
}
None
}
/// -----------------------------------------------------
/// Fullindex consumer holds only a Weak reference:
/// -----------------------------------------------------
@ -276,7 +235,7 @@ impl SearchInstantIndexImpl {
if let (Some(folder_manager), Some(state)) = (folder_manager.upgrade(), weak_state.upgrade())
{
if let Ok(changes) = folder_manager.consumer_recent_workspace_changes().await {
let views = folder_manager.get_all_views().await?;
let views = index_views_from_folder(&folder_manager).await?;
let views_map: std::collections::HashMap<String, _> = views
.into_iter()
.map(|view| (view.id.clone(), view))
@ -395,3 +354,16 @@ impl InstantIndexedDataConsumer for SearchInstantIndexImpl {
Ok(())
}
}
pub(crate) async fn index_views_from_folder(
folder_manager: &FolderManager,
) -> FlowyResult<Vec<Arc<View>>> {
Ok(
folder_manager
.get_all_views()
.await?
.into_iter()
.filter(|v| v.space_info().is_none() && v.layout.is_document())
.collect::<Vec<_>>(),
)
}

View File

@ -1,14 +1,16 @@
use crate::app_life_cycle::AppLifeCycleImpl;
use crate::full_indexed_data_provider::FullIndexedDataWriter;
use crate::indexed_data_consumer::{
get_or_init_document_tantivy_state, EmbeddingsInstantConsumerImpl, SearchFullIndexConsumer,
SearchInstantIndexImpl,
EmbeddingsInstantConsumerImpl, SearchFullIndexConsumer, SearchInstantIndexImpl,
};
use flowy_folder::manager::FolderManager;
use flowy_search_pub::tantivy_state::DocumentTantivyState;
use flowy_search_pub::tantivy_state_init::get_or_init_document_tantivy_state;
use flowy_user::services::entities::{UserConfig, UserPaths};
use flowy_user_pub::entities::WorkspaceType;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::timeout;
use tracing::{error, info, instrument, warn};
use uuid::Uuid;
@ -129,14 +131,15 @@ impl AppLifeCycleImpl {
});
}
pub(crate) async fn create_thanvity_state_if_not_exists(
pub(crate) async fn create_tanvity_state_if_not_exists(
&self,
uid: i64,
workspace_id: &Uuid,
user_paths: &UserPaths,
) {
) -> Option<Weak<RwLock<DocumentTantivyState>>> {
let data_path = user_paths.tanvity_index_path(uid);
let _ = get_or_init_document_tantivy_state(*workspace_id, data_path);
let state = get_or_init_document_tantivy_state(*workspace_id, data_path).ok();
state.map(|state| Arc::downgrade(&state))
}
#[instrument(skip(self, _user_config, user_paths))]

View File

@ -3,11 +3,12 @@ use arc_swap::{ArcSwap, ArcSwapOption};
use collab::entity::EncodedCollab;
use collab_entity::CollabType;
use collab_integrate::instant_indexed_data_provider::InstantIndexedDataWriter;
use dashmap::mapref::one::Ref;
use dashmap::try_result::TryResult;
use dashmap::DashMap;
use flowy_ai::local_ai::controller::LocalAIController;
use flowy_ai_pub::entities::UnindexedCollab;
use flowy_error::{FlowyError, FlowyResult};
use flowy_search_pub::tantivy_state::DocumentTantivyState;
use flowy_server::af_cloud::define::AIUserServiceImpl;
use flowy_server::af_cloud::{define::LoggedUser, AppFlowyCloudServer};
use flowy_server::local_server::LocalServer;
@ -16,10 +17,10 @@ use flowy_server_pub::AuthenticatorType;
use flowy_sqlite::kv::KVStorePreferences;
use flowy_user_pub::entities::*;
use lib_infra::async_trait::async_trait;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
use tracing::info;
use tokio::sync::RwLock;
use tracing::{error, info};
use uuid::Uuid;
pub struct ServerProvider {
@ -35,17 +36,6 @@ pub struct ServerProvider {
}
// Our little guard wrapper:
pub struct ServerHandle<'a>(Ref<'a, AuthType, Arc<dyn AppFlowyServer>>);
#[allow(clippy::needless_lifetimes)]
impl<'a> Deref for ServerHandle<'a> {
type Target = dyn AppFlowyServer;
fn deref(&self) -> &Self::Target {
// `self.0.value()` is an `&Arc<dyn AppFlowyServer>`
// so `&**` gives us a `&dyn AppFlowyServer`
&**self.0.value()
}
}
/// Determine current server type from ENV
pub fn current_server_type() -> AuthType {
@ -82,12 +72,35 @@ impl ServerProvider {
}
}
pub fn on_launch_if_authenticated(&self, _workspace_type: &WorkspaceType) {}
async fn set_tanvity_state(&self, tanvity_state: Option<Weak<RwLock<DocumentTantivyState>>>) {
match self.providers.try_get(self.auth_type.load().as_ref()) {
TryResult::Present(r) => {
r.set_tanvity_state(tanvity_state).await;
},
TryResult::Absent => {},
TryResult::Locked => {
error!("ServerProvider: Failed to get server for auth type");
},
}
}
pub fn on_sign_in(&self, _workspace_type: &WorkspaceType) {}
pub async fn on_launch_if_authenticated(
&self,
tanvity_state: Option<Weak<RwLock<DocumentTantivyState>>>,
) {
self.set_tanvity_state(tanvity_state).await;
}
pub fn on_sign_up(&self, _workspace_type: &WorkspaceType) {}
pub fn init_after_open_workspace(&self, _workspace_type: &WorkspaceType) {}
pub async fn on_sign_in(&self, tanvity_state: Option<Weak<RwLock<DocumentTantivyState>>>) {
self.set_tanvity_state(tanvity_state).await;
}
pub async fn on_workspace_opened(
&self,
tanvity_state: Option<Weak<RwLock<DocumentTantivyState>>>,
) {
self.set_tanvity_state(tanvity_state).await;
}
pub fn set_auth_type(&self, new_auth_type: AuthType) {
let old_type = self.get_auth_type();
@ -109,10 +122,10 @@ impl ServerProvider {
}
/// Lazily create or fetch an AppFlowyServer instance
pub fn get_server(&self) -> FlowyResult<ServerHandle> {
pub fn get_server(&self) -> FlowyResult<Arc<dyn AppFlowyServer>> {
let auth_type = self.get_auth_type();
if let Some(r) = self.providers.get(&auth_type) {
return Ok(ServerHandle(r));
return Ok(r.value().clone());
}
let server: Arc<dyn AppFlowyServer> = match auth_type {
@ -148,7 +161,7 @@ impl ServerProvider {
self.providers.insert(auth_type, server);
let guard = self.providers.get(&auth_type).unwrap();
Ok(ServerHandle(guard))
Ok(guard.clone())
}
}

View File

@ -11,4 +11,9 @@ collab = { workspace = true }
collab-folder = { workspace = true }
flowy-error = { workspace = true }
client-api = { workspace = true }
uuid.workspace = true
uuid.workspace = true
tokio.workspace = true
tracing.workspace = true
tantivy.workspace = true
dashmap.workspace = true
once_cell = "1.18.0"

View File

@ -29,3 +29,18 @@ impl ViewObserveData {
pub trait FolderViewObserver: Send + Sync {
async fn set_observer_rx(&self, rx: IndexContentReceiver);
}
#[derive(Default, Debug, Clone)]
pub struct TanvitySearchResponseItem {
pub id: String,
pub display_name: String,
pub icon: Option<ResultIcon>,
pub workspace_id: String,
pub content: String,
}
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct ResultIcon {
pub ty: u8,
pub value: String,
}

View File

@ -1,2 +1,6 @@
pub mod cloud;
pub mod entities;
pub mod tantivy_state;
pub mod tantivy_state_init;
mod schema;

View File

@ -1,6 +1,5 @@
use tantivy::schema::{Schema, STORED, STRING, TEXT};
/// Wraps the schema and exposes fieldname constants
pub struct LocalSearchTantivySchema(pub Schema);
impl Default for LocalSearchTantivySchema {

View File

@ -0,0 +1,424 @@
use collab_folder::ViewIcon;
use std::fs;
use std::path::PathBuf;
use tantivy::directory::MmapDirectory;
use tantivy::schema::Value;
use tantivy::{Index, IndexReader, IndexWriter, TantivyDocument, Term};
use tracing::{error, trace, warn};
use uuid::Uuid;
use crate::entities::{ResultIcon, TanvitySearchResponseItem};
use crate::schema::LocalSearchTantivySchema;
use flowy_error::{FlowyError, FlowyResult};
/// Holds the Tantivy index state for a workspace's documents.
pub struct DocumentTantivyState {
pub path: PathBuf,
pub index: Index,
pub schema: LocalSearchTantivySchema,
pub writer: IndexWriter,
pub reader: IndexReader,
pub workspace_id: Uuid,
// Cached fields for better performance
field_workspace_id: tantivy::schema::Field,
field_object_id: tantivy::schema::Field,
field_content: tantivy::schema::Field,
field_name: tantivy::schema::Field,
field_icon: tantivy::schema::Field,
field_icon_type: tantivy::schema::Field,
}
impl DocumentTantivyState {
pub fn new(workspace_id: &Uuid, path: PathBuf) -> FlowyResult<Self> {
let index_path = path.join(workspace_id.to_string()).join("documents");
if !index_path.exists() {
fs::create_dir_all(&index_path).map_err(|e| {
error!("Failed to create index directory: {:?}", e);
FlowyError::internal().with_context("Failed to create folder index")
})?;
}
let schema = LocalSearchTantivySchema::new();
let dir = MmapDirectory::open(&index_path)?;
let index = Index::open_or_create(dir, schema.0.clone())?;
let writer = index.writer(15_000_000)?; // 15 MB buffer
let reader = index.reader()?;
// Cache field lookups
let field_workspace_id = schema
.0
.get_field(LocalSearchTantivySchema::WORKSPACE_ID)
.map_err(|_| FlowyError::internal().with_context("workspace_id field missing"))?;
let field_object_id = schema
.0
.get_field(LocalSearchTantivySchema::OBJECT_ID)
.map_err(|_| FlowyError::internal().with_context("object_id field missing"))?;
let field_content = schema
.0
.get_field(LocalSearchTantivySchema::CONTENT)
.map_err(|_| FlowyError::internal().with_context("content field missing"))?;
let field_name = schema
.0
.get_field(LocalSearchTantivySchema::NAME)
.map_err(|_| FlowyError::internal().with_context("name field missing"))?;
let field_icon = schema
.0
.get_field(LocalSearchTantivySchema::ICON)
.map_err(|_| FlowyError::internal().with_context("icon field missing"))?;
let field_icon_type = schema
.0
.get_field(LocalSearchTantivySchema::ICON_TYPE)
.map_err(|_| FlowyError::internal().with_context("icon_type field missing"))?;
Ok(Self {
path,
index,
schema,
writer,
reader,
workspace_id: *workspace_id,
field_workspace_id,
field_object_id,
field_content,
field_name,
field_icon,
field_icon_type,
})
}
pub fn add_document(
&mut self,
id: &str,
content: String,
name: Option<String>,
icon: Option<ViewIcon>,
) -> FlowyResult<()> {
trace!("[Tantivy] Adding document with id:{}, name:{:?}", id, name);
let term = Term::from_field_text(self.field_object_id, id);
let searcher = self.reader.searcher();
let query =
tantivy::query::TermQuery::new(term.clone(), tantivy::schema::IndexRecordOption::Basic);
let top_docs = searcher.search(&query, &tantivy::collector::TopDocs::with_limit(1))?;
let (existing_name, existing_icon) = if let Some((_score, doc_address)) = top_docs.first() {
let retrieved: TantivyDocument = searcher.doc(*doc_address)?;
// Get existing name if needed
let existing_name = if name.is_none() {
retrieved
.get_first(self.field_name)
.and_then(|v| v.as_str())
.map(|s| s.to_string())
} else {
None
};
// Get existing icon if needed
let existing_icon = if icon.is_none() {
let icon_value = retrieved
.get_first(self.field_icon)
.and_then(|v| v.as_str())
.map(|s| s.to_string());
if let Some(icon_value) = icon_value {
let icon_type_str = retrieved
.get_first(self.field_icon_type)
.and_then(|v| v.as_str())
.unwrap_or_default();
let icon_type = icon_type_str.parse::<u8>().unwrap_or_default();
// Recreate the ViewIcon from stored values
Some(ViewIcon {
value: icon_value,
ty: icon_type.into(),
})
} else {
None
}
} else {
None
};
(existing_name, existing_icon)
} else {
(None, None)
};
// Delete existing document with same ID
self.writer.delete_term(term);
// Use existing values if new ones not provided
let final_name = name.or(existing_name);
let final_icon = icon.or(existing_icon);
// Ensure we have a name
let document_name = final_name.unwrap_or_else(|| String::from("Untitled"));
// Create base document with required fields
let mut doc_builder = tantivy::doc!(
self.field_workspace_id => self.workspace_id.to_string(),
self.field_object_id => id,
self.field_content => content,
self.field_name => document_name
);
// Only add icon fields if icon is present
if let Some(view_icon) = final_icon {
doc_builder.add_text(self.field_icon, view_icon.value);
doc_builder.add_text(self.field_icon_type, (view_icon.ty as u8).to_string());
}
self.writer.add_document(doc_builder)?;
self.writer.commit()?;
Ok(())
}
pub fn add_document_metadata(
&mut self,
id: &str,
name: Option<String>,
icon: Option<ViewIcon>,
) -> FlowyResult<()> {
let term = Term::from_field_text(self.field_object_id, id);
let searcher = self.reader.searcher();
let query =
tantivy::query::TermQuery::new(term.clone(), tantivy::schema::IndexRecordOption::Basic);
// Search for the document
let top_docs = searcher.search(&query, &tantivy::collector::TopDocs::with_limit(1))?;
let (existing_content, existing_name, existing_icon) =
if let Some((_score, doc_address)) = top_docs.first() {
let retrieved: TantivyDocument = searcher.doc(*doc_address)?;
// Get existing content
let content = retrieved
.get_first(self.field_content)
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
// Get existing name if needed
let existing_name = if name.is_none() {
retrieved
.get_first(self.field_name)
.and_then(|v| v.as_str())
.map(|s| s.to_string())
} else {
None
};
// Get existing icon if needed
let existing_icon = if icon.is_none() {
let icon_value = retrieved
.get_first(self.field_icon)
.and_then(|v| v.as_str())
.map(|s| s.to_string());
if let Some(icon_value) = icon_value {
let icon_type_str = retrieved
.get_first(self.field_icon_type)
.and_then(|v| v.as_str())
.unwrap_or_default();
let icon_type = icon_type_str.parse::<u8>().unwrap_or_default();
Some(ViewIcon {
value: icon_value,
ty: icon_type.into(),
})
} else {
None
}
} else {
None
};
(content, existing_name, existing_icon)
} else {
(String::new(), None, None)
};
// Use existing values if new ones not provided
let final_name = name.or(existing_name);
let final_icon = icon.or(existing_icon);
// Ensure we have a name
let document_name = final_name.unwrap_or_else(|| String::from("Untitled"));
// Delete existing document
self.writer.delete_term(term);
// Create base document with required fields
let mut doc_builder = tantivy::doc!(
self.field_workspace_id => self.workspace_id.to_string(),
self.field_object_id => id,
self.field_content => existing_content,
self.field_name => document_name
);
// Only add icon fields if icon is present
if let Some(view_icon) = final_icon {
doc_builder.add_text(self.field_icon, view_icon.value);
doc_builder.add_text(self.field_icon_type, (view_icon.ty as u8).to_string());
}
self.writer.add_document(doc_builder)?;
self.writer.commit()?;
Ok(())
}
pub fn delete_workspace(&mut self, workspace_id: &Uuid) -> FlowyResult<()> {
let term = Term::from_field_text(self.field_workspace_id, &workspace_id.to_string());
self.writer.delete_term(term);
self.writer.commit()?;
Ok(())
}
/// Delete a document (all fields) matching this `object_id`
pub fn delete_document(&mut self, id: &str) -> FlowyResult<()> {
trace!("[Tantivy] delete document with id: {}", id);
let term = Term::from_field_text(self.field_object_id, id);
self.writer.delete_term(term);
self.writer.commit()?;
Ok(())
}
pub fn delete_documents(&mut self, ids: &[String]) -> FlowyResult<()> {
trace!("[Tantivy] delete documents with ids: {:?}", ids);
for id in ids {
let term = Term::from_field_text(self.field_object_id, id);
self.writer.delete_term(term);
}
self.writer.commit()?;
Ok(())
}
pub fn search(
&self,
workspace_id: &Uuid,
query: &str,
object_ids: Option<Vec<String>>,
) -> FlowyResult<Vec<TanvitySearchResponseItem>> {
let workspace_id = workspace_id.to_string();
let reader = self.reader.clone();
let searcher = reader.searcher();
// Use cached fields for query parser
let mut qp = tantivy::query::QueryParser::for_index(
&self.index,
vec![self.field_content, self.field_name],
);
// Enable fuzzy matching for name field (better user experience for typos)
qp.set_field_fuzzy(self.field_name, true, 2, true);
let query = qp.parse_query(query)?;
let top_docs = searcher.search(&query, &tantivy::collector::TopDocs::with_limit(10))?;
let mut results = Vec::with_capacity(top_docs.len());
let mut seen_ids = std::collections::HashSet::new();
// If object_ids is provided and not empty, create a lookup set for faster filtering
let object_ids_filter = object_ids.and_then(|ids| {
if ids.is_empty() {
None
} else {
Some(ids.into_iter().collect::<std::collections::HashSet<_>>())
}
});
for (_score, doc_address) in top_docs {
let retrieved: TantivyDocument = searcher.doc(doc_address)?;
// Pull out each stored field using cached field references
let workspace_id_str = retrieved
.get_first(self.field_workspace_id)
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
if workspace_id != workspace_id_str {
warn!(
"[Tantivy] Document workspace_id mismatch: {} != {}",
workspace_id, workspace_id_str
);
continue;
}
let object_id = retrieved
.get_first(self.field_object_id)
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
// Skip records with empty object_id and workspace_id
if object_id.is_empty() && workspace_id_str.is_empty() {
continue;
}
// Apply object_ids filter if present
if let Some(ref filter) = object_ids_filter {
if !filter.contains(&object_id) {
continue;
}
}
// Skip duplicate records based on object_id
if !seen_ids.insert(object_id.clone()) {
continue;
}
let name = retrieved
.get_first(self.field_name)
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
// Get icon value and type
let icon = {
let icon_value = retrieved
.get_first(self.field_icon)
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
// Only proceed with creating an icon if we have an actual value
if !icon_value.is_empty() {
let icon_type_str = retrieved
.get_first(self.field_icon_type)
.and_then(|v| v.as_str())
.unwrap_or_default();
let icon_type = icon_type_str.parse::<u8>().unwrap_or(0);
Some(ResultIcon {
ty: icon_type,
value: icon_value,
})
} else {
None
}
};
let content = retrieved
.get_first(self.field_content)
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
results.push(TanvitySearchResponseItem {
id: object_id,
display_name: name,
icon,
workspace_id: workspace_id_str,
content,
});
}
Ok(results)
}
}

View File

@ -0,0 +1,52 @@
use crate::tantivy_state::DocumentTantivyState;
use dashmap::DashMap;
use flowy_error::{FlowyError, FlowyResult};
use once_cell::sync::Lazy;
use std::path::PathBuf;
use std::sync::{Arc, Weak};
use tokio::sync::RwLock;
use tracing::info;
use uuid::Uuid;
/// Global map: workspace_id → a *weak* handle to its index state.
type DocIndexMap = DashMap<Uuid, Arc<RwLock<DocumentTantivyState>>>;
static SEARCH_INDEX: Lazy<DocIndexMap> = Lazy::new(DocIndexMap::new);
/// Returns a strong handle, creating it if needed.
pub fn get_or_init_document_tantivy_state(
workspace_id: Uuid,
data_path: PathBuf,
) -> FlowyResult<Arc<RwLock<DocumentTantivyState>>> {
Ok(
SEARCH_INDEX
.entry(workspace_id)
.or_try_insert_with(|| {
info!(
"[Indexing] Creating new tantivy state for workspace: {}",
workspace_id
);
let state = DocumentTantivyState::new(&workspace_id, data_path.clone())?;
let arc_state = Arc::new(RwLock::new(state));
Ok::<_, FlowyError>(arc_state)
})?
.clone(),
)
}
pub fn close_document_tantivy_state(workspace_id: &Uuid) {
if SEARCH_INDEX.remove(workspace_id).is_some() {
info!(
"[Indexing] close tantivy state for workspace: {}",
workspace_id
);
}
}
pub fn get_document_tantivy_state(
workspace_id: &Uuid,
) -> Option<Weak<RwLock<DocumentTantivyState>>> {
if let Some(existing) = SEARCH_INDEX.get(workspace_id) {
return Some(Arc::downgrade(existing.value()));
}
None
}

View File

@ -1,24 +1,19 @@
use async_stream::stream;
use collab_folder::ViewIcon;
use futures::Stream;
use std::fs;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Weak;
use tantivy::directory::MmapDirectory;
use tantivy::schema::Value;
use tantivy::{Index, IndexReader, IndexWriter, TantivyDocument, Term};
use tokio::sync::RwLock;
use tracing::{error, trace, warn};
use tracing::{error, trace};
use uuid::Uuid;
use crate::entities::{
CreateSearchResultPBArgs, LocalSearchResponseItemPB, RepeatedLocalSearchResponseItemPB,
ResultIconPB, ResultIconTypePB, SearchResponsePB,
};
use crate::schema::LocalSearchTantivySchema;
use crate::services::manager::{SearchHandler, SearchType};
use flowy_error::{FlowyError, FlowyResult};
use flowy_error::FlowyResult;
use flowy_search_pub::entities::TanvitySearchResponseItem;
use flowy_search_pub::tantivy_state::DocumentTantivyState;
use lib_infra::async_trait::async_trait;
pub struct DocumentLocalSearchHandler {
@ -55,7 +50,7 @@ impl SearchHandler for DocumentLocalSearchHandler {
);
},
Some(state) => {
match state.read().await.search(&workspace_id, &query) {
match state.read().await.search(&workspace_id, &query, None) {
Ok(items) => {
trace!("[Tanvity] local document search result: {:?}", items);
if items.is_empty() {
@ -66,6 +61,7 @@ impl SearchHandler for DocumentLocalSearchHandler {
.unwrap(),
);
} else {
let items = items.into_iter().map(tanvity_item_to_local_search_item).collect::<Vec<_>>();
let search_result = RepeatedLocalSearchResponseItemPB { items };
yield Ok(
CreateSearchResultPBArgs::default()
@ -83,394 +79,14 @@ impl SearchHandler for DocumentLocalSearchHandler {
}
}
/// Holds the Tantivy index state for a workspace's documents.
pub struct DocumentTantivyState {
pub path: PathBuf,
pub index: Index,
pub schema: LocalSearchTantivySchema,
pub writer: IndexWriter,
pub reader: IndexReader,
pub workspace_id: Uuid,
// Cached fields for better performance
field_workspace_id: tantivy::schema::Field,
field_object_id: tantivy::schema::Field,
field_content: tantivy::schema::Field,
field_name: tantivy::schema::Field,
field_icon: tantivy::schema::Field,
field_icon_type: tantivy::schema::Field,
}
impl DocumentTantivyState {
pub fn new(workspace_id: &Uuid, path: PathBuf) -> FlowyResult<Self> {
let index_path = path.join(workspace_id.to_string()).join("documents");
if !index_path.exists() {
fs::create_dir_all(&index_path).map_err(|e| {
error!("Failed to create index directory: {:?}", e);
FlowyError::internal().with_context("Failed to create folder index")
})?;
}
let schema = LocalSearchTantivySchema::new();
let dir = MmapDirectory::open(&index_path)?;
let index = Index::open_or_create(dir, schema.0.clone())?;
let writer = index.writer(15_000_000)?; // 15 MB buffer
let reader = index.reader()?;
// Cache field lookups
let field_workspace_id = schema
.0
.get_field(LocalSearchTantivySchema::WORKSPACE_ID)
.map_err(|_| FlowyError::internal().with_context("workspace_id field missing"))?;
let field_object_id = schema
.0
.get_field(LocalSearchTantivySchema::OBJECT_ID)
.map_err(|_| FlowyError::internal().with_context("object_id field missing"))?;
let field_content = schema
.0
.get_field(LocalSearchTantivySchema::CONTENT)
.map_err(|_| FlowyError::internal().with_context("content field missing"))?;
let field_name = schema
.0
.get_field(LocalSearchTantivySchema::NAME)
.map_err(|_| FlowyError::internal().with_context("name field missing"))?;
let field_icon = schema
.0
.get_field(LocalSearchTantivySchema::ICON)
.map_err(|_| FlowyError::internal().with_context("icon field missing"))?;
let field_icon_type = schema
.0
.get_field(LocalSearchTantivySchema::ICON_TYPE)
.map_err(|_| FlowyError::internal().with_context("icon_type field missing"))?;
Ok(Self {
path,
index,
schema,
writer,
reader,
workspace_id: *workspace_id,
field_workspace_id,
field_object_id,
field_content,
field_name,
field_icon,
field_icon_type,
})
}
pub fn add_document(
&mut self,
id: &str,
content: String,
name: Option<String>,
icon: Option<ViewIcon>,
) -> FlowyResult<()> {
trace!("[Tantivy] Adding document with id:{}, name:{:?}", id, name);
let term = Term::from_field_text(self.field_object_id, id);
let searcher = self.reader.searcher();
let query =
tantivy::query::TermQuery::new(term.clone(), tantivy::schema::IndexRecordOption::Basic);
let top_docs = searcher.search(&query, &tantivy::collector::TopDocs::with_limit(1))?;
let (existing_name, existing_icon) = if let Some((_score, doc_address)) = top_docs.first() {
let retrieved: TantivyDocument = searcher.doc(*doc_address)?;
// Get existing name if needed
let existing_name = if name.is_none() {
retrieved
.get_first(self.field_name)
.and_then(|v| v.as_str())
.map(|s| s.to_string())
} else {
None
};
// Get existing icon if needed
let existing_icon = if icon.is_none() {
let icon_value = retrieved
.get_first(self.field_icon)
.and_then(|v| v.as_str())
.map(|s| s.to_string());
if let Some(icon_value) = icon_value {
let icon_type_str = retrieved
.get_first(self.field_icon_type)
.and_then(|v| v.as_str())
.unwrap_or_default();
let icon_type = icon_type_str.parse::<u8>().unwrap_or_default();
// Recreate the ViewIcon from stored values
Some(ViewIcon {
value: icon_value,
ty: icon_type.into(),
})
} else {
None
}
} else {
None
};
(existing_name, existing_icon)
} else {
(None, None)
};
// Delete existing document with same ID
self.writer.delete_term(term);
// Use existing values if new ones not provided
let final_name = name.or(existing_name);
let final_icon = icon.or(existing_icon);
// Ensure we have a name
let document_name = final_name.unwrap_or_else(|| String::from("Untitled"));
// Create base document with required fields
let mut doc_builder = tantivy::doc!(
self.field_workspace_id => self.workspace_id.to_string(),
self.field_object_id => id,
self.field_content => content,
self.field_name => document_name
);
// Only add icon fields if icon is present
if let Some(view_icon) = final_icon {
doc_builder.add_text(self.field_icon, view_icon.value);
doc_builder.add_text(self.field_icon_type, (view_icon.ty as u8).to_string());
}
self.writer.add_document(doc_builder)?;
self.writer.commit()?;
Ok(())
}
pub fn add_document_metadata(
&mut self,
id: &str,
name: Option<String>,
icon: Option<ViewIcon>,
) -> FlowyResult<()> {
let term = Term::from_field_text(self.field_object_id, id);
let searcher = self.reader.searcher();
let query =
tantivy::query::TermQuery::new(term.clone(), tantivy::schema::IndexRecordOption::Basic);
// Search for the document
let top_docs = searcher.search(&query, &tantivy::collector::TopDocs::with_limit(1))?;
let (existing_content, existing_name, existing_icon) =
if let Some((_score, doc_address)) = top_docs.first() {
let retrieved: TantivyDocument = searcher.doc(*doc_address)?;
// Get existing content
let content = retrieved
.get_first(self.field_content)
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
// Get existing name if needed
let existing_name = if name.is_none() {
retrieved
.get_first(self.field_name)
.and_then(|v| v.as_str())
.map(|s| s.to_string())
} else {
None
};
// Get existing icon if needed
let existing_icon = if icon.is_none() {
let icon_value = retrieved
.get_first(self.field_icon)
.and_then(|v| v.as_str())
.map(|s| s.to_string());
if let Some(icon_value) = icon_value {
let icon_type_str = retrieved
.get_first(self.field_icon_type)
.and_then(|v| v.as_str())
.unwrap_or_default();
let icon_type = icon_type_str.parse::<u8>().unwrap_or_default();
Some(ViewIcon {
value: icon_value,
ty: icon_type.into(),
})
} else {
None
}
} else {
None
};
(content, existing_name, existing_icon)
} else {
(String::new(), None, None)
};
// Use existing values if new ones not provided
let final_name = name.or(existing_name);
let final_icon = icon.or(existing_icon);
// Ensure we have a name
let document_name = final_name.unwrap_or_else(|| String::from("Untitled"));
// Delete existing document
self.writer.delete_term(term);
// Create base document with required fields
let mut doc_builder = tantivy::doc!(
self.field_workspace_id => self.workspace_id.to_string(),
self.field_object_id => id,
self.field_content => existing_content,
self.field_name => document_name
);
// Only add icon fields if icon is present
if let Some(view_icon) = final_icon {
doc_builder.add_text(self.field_icon, view_icon.value);
doc_builder.add_text(self.field_icon_type, (view_icon.ty as u8).to_string());
}
self.writer.add_document(doc_builder)?;
self.writer.commit()?;
Ok(())
}
pub fn delete_workspace(&mut self, workspace_id: &Uuid) -> FlowyResult<()> {
let term = Term::from_field_text(self.field_workspace_id, &workspace_id.to_string());
self.writer.delete_term(term);
self.writer.commit()?;
Ok(())
}
/// Delete a document (all fields) matching this `object_id`
pub fn delete_document(&mut self, id: &str) -> FlowyResult<()> {
trace!("[Tantivy] delete document with id: {}", id);
let term = Term::from_field_text(self.field_object_id, id);
self.writer.delete_term(term);
self.writer.commit()?;
Ok(())
}
pub fn delete_documents(&mut self, ids: &[String]) -> FlowyResult<()> {
trace!("[Tantivy] delete documents with ids: {:?}", ids);
for id in ids {
let term = Term::from_field_text(self.field_object_id, id);
self.writer.delete_term(term);
}
self.writer.commit()?;
Ok(())
}
pub fn search(
&self,
workspace_id: &Uuid,
query: &str,
) -> FlowyResult<Vec<LocalSearchResponseItemPB>> {
let workspace_id = workspace_id.to_string();
let reader = self.reader.clone();
let searcher = reader.searcher();
// Use cached fields for query parser
let mut qp = tantivy::query::QueryParser::for_index(
&self.index,
vec![self.field_content, self.field_name],
);
// Enable fuzzy matching for name field (better user experience for typos)
qp.set_field_fuzzy(self.field_name, true, 2, true);
let query = qp.parse_query(query)?;
let top_docs = searcher.search(&query, &tantivy::collector::TopDocs::with_limit(10))?;
let mut results = Vec::with_capacity(top_docs.len());
let mut seen_ids = std::collections::HashSet::new();
for (_score, doc_address) in top_docs {
let retrieved: TantivyDocument = searcher.doc(doc_address)?;
// Pull out each stored field using cached field references
let workspace_id_str = retrieved
.get_first(self.field_workspace_id)
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
if workspace_id != workspace_id_str {
warn!(
"[Tantivy] Document workspace_id mismatch: {} != {}",
workspace_id, workspace_id_str
);
continue;
}
let object_id = retrieved
.get_first(self.field_object_id)
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
// Skip records with empty object_id and workspace_id
if object_id.is_empty() && workspace_id_str.is_empty() {
continue;
}
// Skip duplicate records based on object_id
if !seen_ids.insert(object_id.clone()) {
continue;
}
let name = retrieved
.get_first(self.field_name)
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
// Get icon value and type
let icon = {
let icon_value = retrieved
.get_first(self.field_icon)
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
// Only proceed with creating an icon if we have an actual value
if !icon_value.is_empty() {
let icon_type_str = retrieved
.get_first(self.field_icon_type)
.and_then(|v| v.as_str())
.unwrap_or_default();
let icon_type: ResultIconTypePB = match icon_type_str.parse::<i64>() {
Ok(val) => val.into(),
Err(_) => ResultIconTypePB::default(),
};
Some(ResultIconPB {
ty: icon_type,
value: icon_value,
})
} else {
None
}
};
results.push(LocalSearchResponseItemPB {
id: object_id,
display_name: name,
icon,
workspace_id: workspace_id_str,
});
}
Ok(results)
fn tanvity_item_to_local_search_item(item: TanvitySearchResponseItem) -> LocalSearchResponseItemPB {
LocalSearchResponseItemPB {
id: item.id,
display_name: item.display_name,
icon: item.icon.map(|icon| ResultIconPB {
ty: ResultIconTypePB::from(icon.ty),
value: icon.value,
}),
workspace_id: item.workspace_id,
}
}

View File

@ -131,8 +131,8 @@ impl From<IconType> for ResultIconTypePB {
}
}
impl std::convert::From<i64> for ResultIconTypePB {
fn from(icon_ty: i64) -> Self {
impl std::convert::From<u8> for ResultIconTypePB {
fn from(icon_ty: u8) -> Self {
match icon_ty {
0 => ResultIconTypePB::Emoji,
1 => ResultIconTypePB::Url,

View File

@ -3,5 +3,4 @@ pub mod entities;
pub mod event_handler;
pub mod event_map;
pub mod protobuf;
mod schema;
pub mod services;

View File

@ -1,9 +1,10 @@
use crate::document::local_search_handler::{DocumentLocalSearchHandler, DocumentTantivyState};
use crate::document::local_search_handler::DocumentLocalSearchHandler;
use crate::entities::{SearchResponsePB, SearchStatePB};
use allo_isolate::Isolate;
use arc_swap::ArcSwapOption;
use dashmap::DashMap;
use flowy_error::FlowyResult;
use flowy_search_pub::tantivy_state::DocumentTantivyState;
use lib_infra::async_trait::async_trait;
use lib_infra::isolate_stream::{IsolateSink, SinkExt};
use std::pin::Pin;

View File

@ -1,14 +1,20 @@
use crate::af_cloud::AFServer;
use crate::util::tanvity_local_search;
use flowy_ai_pub::cloud::search_dto::{
SearchDocumentResponseItem, SearchResult, SearchSummaryResult,
};
use flowy_error::FlowyError;
use flowy_search_pub::cloud::SearchCloudService;
use flowy_search_pub::tantivy_state::DocumentTantivyState;
use lib_infra::async_trait::async_trait;
use std::sync::Weak;
use tokio::sync::RwLock;
use tracing::trace;
use uuid::Uuid;
pub(crate) struct AFCloudSearchCloudServiceImpl<T> {
pub inner: T,
pub server: T,
pub state: Option<Weak<RwLock<DocumentTantivyState>>>,
}
const DEFAULT_PREVIEW: u32 = 80;
@ -23,12 +29,20 @@ where
workspace_id: &Uuid,
query: String,
) -> Result<Vec<SearchDocumentResponseItem>, FlowyError> {
let client = self.inner.try_get_client()?;
let client = self.server.try_get_client()?;
let result = client
.search_documents(workspace_id, &query, 10, DEFAULT_PREVIEW, None)
.await?;
Ok(result)
if !result.is_empty() {
return Ok(result);
}
trace!("[Search] Local AI search returned no results, falling back to local search");
let items = tanvity_local_search(&self.state, workspace_id, &query)
.await
.unwrap_or_default();
Ok(items)
}
async fn generate_search_summary(
@ -37,7 +51,7 @@ where
query: String,
search_results: Vec<SearchResult>,
) -> Result<SearchSummaryResult, FlowyError> {
let client = self.inner.try_get_client()?;
let client = self.server.try_get_client()?;
let result = client
.generate_search_summary(workspace_id, &query, search_results)
.await?;

View File

@ -24,6 +24,7 @@ use flowy_storage_pub::cloud::StorageCloudService;
use flowy_user_pub::cloud::{UserCloudService, UserUpdate};
use flowy_user_pub::entities::UserTokenState;
use super::impls::AFCloudSearchCloudServiceImpl;
use crate::af_cloud::impls::{
AFCloudDatabaseCloudServiceImpl, AFCloudDocumentCloudServiceImpl, AFCloudFileStorageServiceImpl,
AFCloudFolderCloudServiceImpl, AFCloudUserAuthServiceImpl, CloudChatServiceImpl,
@ -31,18 +32,18 @@ use crate::af_cloud::impls::{
use crate::AppFlowyServer;
use flowy_ai::offline::offline_message_sync::AutoSyncChatService;
use flowy_ai_pub::user_service::AIUserService;
use flowy_search_pub::tantivy_state::DocumentTantivyState;
use lib_infra::async_trait::async_trait;
use rand::Rng;
use semver::Version;
use tokio::select;
use tokio::sync::watch;
use tokio::sync::{watch, RwLock};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::WatchStream;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use uuid::Uuid;
use super::impls::AFCloudSearchCloudServiceImpl;
pub(crate) type AFCloudClient = Client;
pub struct AppFlowyCloudServer {
@ -55,6 +56,7 @@ pub struct AppFlowyCloudServer {
ws_client: Arc<WSClient>,
logged_user: Weak<dyn LoggedUser>,
ai_user_service: Arc<dyn AIUserService>,
tanvity_state: RwLock<Option<Weak<RwLock<DocumentTantivyState>>>>,
}
impl AppFlowyCloudServer {
@ -104,6 +106,7 @@ impl AppFlowyCloudServer {
ws_client,
logged_user,
ai_user_service,
tanvity_state: Default::default(),
}
}
@ -117,6 +120,7 @@ impl AppFlowyCloudServer {
}
}
#[async_trait]
impl AppFlowyServer for AppFlowyCloudServer {
fn set_token(&self, token: &str) -> Result<(), Error> {
self
@ -262,11 +266,17 @@ impl AppFlowyServer for AppFlowyCloudServer {
)))
}
fn search_service(&self) -> Option<Arc<dyn SearchCloudService>> {
async fn search_service(&self) -> Option<Arc<dyn SearchCloudService>> {
let state = self.tanvity_state.read().await.clone();
Some(Arc::new(AFCloudSearchCloudServiceImpl {
inner: self.get_server_impl(),
server: self.get_server_impl(),
state,
}))
}
async fn set_tanvity_state(&self, state: Option<Weak<RwLock<DocumentTantivyState>>>) {
*self.tanvity_state.write().await = state;
}
}
/// Spawns a new asynchronous task to handle WebSocket connections based on token state.

View File

@ -1,22 +1,24 @@
use crate::af_cloud::define::LoggedUser;
use crate::util::tanvity_local_search;
use flowy_ai::local_ai::controller::LocalAIController;
use flowy_ai_pub::cloud::search_dto::{
SearchDocumentResponseItem, SearchResult, SearchSummaryResult,
};
use flowy_error::FlowyError;
use flowy_search_pub::cloud::SearchCloudService;
use flowy_search_pub::cloud::{
SearchCloudService, SearchDocumentResponseItem, SearchResult, SearchSummaryResult,
};
use flowy_search_pub::tantivy_state::DocumentTantivyState;
use lib_infra::async_trait::async_trait;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use tokio::sync::RwLock;
use tracing::trace;
use uuid::Uuid;
pub struct LocalSearchServiceImpl {
#[allow(dead_code)]
pub logged_user: Arc<dyn LoggedUser>,
pub local_ai: Arc<LocalAIController>,
pub state: Option<Weak<RwLock<DocumentTantivyState>>>,
}
impl LocalSearchServiceImpl {}
#[async_trait]
impl SearchCloudService for LocalSearchServiceImpl {
async fn document_search(
@ -24,19 +26,28 @@ impl SearchCloudService for LocalSearchServiceImpl {
workspace_id: &Uuid,
query: String,
) -> Result<Vec<SearchDocumentResponseItem>, FlowyError> {
let mut results = vec![];
#[cfg(any(target_os = "windows", target_os = "macos", target_os = "linux"))]
{
if let Ok(scheduler) = flowy_ai::embeddings::context::EmbedContext::shared().get_scheduler() {
match scheduler.search(workspace_id, &query).await {
Ok(results) => return Ok(results),
Err(err) => tracing::error!("Local AI search failed: {:?}", err),
Ok(items) => results = items,
Err(err) => tracing::error!("[Search] Local AI search failed: {:?}", err),
}
} else {
tracing::error!("Could not acquire local AI scheduler");
tracing::error!("[Search] Could not acquire local AI scheduler");
}
}
Ok(Vec::new())
if !results.is_empty() {
return Ok(results);
}
trace!("[Search] Local AI search returned no results, falling back to local search");
let items = tanvity_local_search(&self.state, workspace_id, &query)
.await
.unwrap_or_default();
Ok(items)
}
async fn generate_search_summary(
@ -47,6 +58,11 @@ impl SearchCloudService for LocalSearchServiceImpl {
) -> Result<SearchSummaryResult, FlowyError> {
#[cfg(any(target_os = "windows", target_os = "macos", target_os = "linux"))]
{
if search_results.is_empty() {
trace!("[Search] No search results to summarize");
return Ok(SearchSummaryResult { summaries: vec![] });
}
if let Ok(scheduler) = flowy_ai::embeddings::context::EmbedContext::shared().get_scheduler() {
let setting = self.local_ai.get_local_ai_setting();
match scheduler
@ -60,6 +76,8 @@ impl SearchCloudService for LocalSearchServiceImpl {
tracing::error!("Could not acquire local AI scheduler");
}
}
//
Ok(SearchSummaryResult { summaries: vec![] })
}
}

View File

@ -12,16 +12,19 @@ use flowy_database_pub::cloud::{DatabaseAIService, DatabaseCloudService};
use flowy_document_pub::cloud::DocumentCloudService;
use flowy_folder_pub::cloud::FolderCloudService;
use flowy_search_pub::cloud::SearchCloudService;
use flowy_search_pub::tantivy_state::DocumentTantivyState;
use flowy_storage_pub::cloud::StorageCloudService;
use flowy_user_pub::cloud::UserCloudService;
use std::sync::Arc;
use tokio::sync::mpsc;
use lib_infra::async_trait::async_trait;
use std::sync::{Arc, Weak};
use tokio::sync::{mpsc, RwLock};
pub struct LocalServer {
logged_user: Arc<dyn LoggedUser>,
local_ai: Arc<LocalAIController>,
stop_tx: Option<mpsc::Sender<()>>,
embedding_writer: Option<Arc<dyn EmbeddingWriter>>,
tanvity_state: RwLock<Option<Weak<RwLock<DocumentTantivyState>>>>,
}
impl LocalServer {
@ -35,6 +38,7 @@ impl LocalServer {
local_ai,
stop_tx: Default::default(),
embedding_writer,
tanvity_state: Default::default(),
}
}
@ -46,11 +50,16 @@ impl LocalServer {
}
}
#[async_trait]
impl AppFlowyServer for LocalServer {
fn set_token(&self, _token: &str) -> Result<(), Error> {
Ok(())
}
async fn set_tanvity_state(&self, state: Option<Weak<RwLock<DocumentTantivyState>>>) {
*self.tanvity_state.write().await = state;
}
fn user_service(&self) -> Arc<dyn UserCloudService> {
Arc::new(LocalServerUserServiceImpl {
logged_user: self.logged_user.clone(),
@ -85,10 +94,12 @@ impl AppFlowyServer for LocalServer {
})
}
fn search_service(&self) -> Option<Arc<dyn SearchCloudService>> {
async fn search_service(&self) -> Option<Arc<dyn SearchCloudService>> {
let state = self.tanvity_state.read().await.clone();
Some(Arc::new(LocalSearchServiceImpl {
logged_user: self.logged_user.clone(),
local_ai: self.local_ai.clone(),
state,
}))
}

View File

@ -2,7 +2,7 @@ use client_api::ws::ConnectState;
use client_api::ws::WSConnectStateReceiver;
use client_api::ws::WebSocketChannel;
use flowy_search_pub::cloud::SearchCloudService;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use anyhow::Error;
use arc_swap::ArcSwapOption;
@ -15,10 +15,12 @@ use flowy_database_pub::cloud::{DatabaseAIService, DatabaseCloudService};
use flowy_document_pub::cloud::DocumentCloudService;
use flowy_error::FlowyResult;
use flowy_folder_pub::cloud::FolderCloudService;
use flowy_search_pub::tantivy_state::DocumentTantivyState;
use flowy_storage_pub::cloud::StorageCloudService;
use flowy_user_pub::cloud::UserCloudService;
use flowy_user_pub::entities::UserTokenState;
use lib_infra::async_trait::async_trait;
use tokio::sync::RwLock;
use tokio_stream::wrappers::WatchStream;
use uuid::Uuid;
@ -56,9 +58,10 @@ where
/// `AppFlowyServer` trait defines a collection of services that offer cloud-based interactions
/// and functionalities in AppFlowy. The methods provided ensure efficient, asynchronous operations
/// for managing and accessing user data, folders, collaborative objects, and documents in a cloud environment.
#[async_trait]
pub trait AppFlowyServer: Send + Sync + 'static {
fn set_token(&self, _token: &str) -> Result<(), Error>;
async fn set_tanvity_state(&self, state: Option<Weak<RwLock<DocumentTantivyState>>>);
fn set_ai_model(&self, _ai_model: &str) -> Result<(), Error> {
Ok(())
}
@ -120,7 +123,7 @@ pub trait AppFlowyServer: Send + Sync + 'static {
/// Bridge for the Cloud AI Search features
///
fn search_service(&self) -> Option<Arc<dyn SearchCloudService>>;
async fn search_service(&self) -> Option<Arc<dyn SearchCloudService>>;
fn subscribe_ws_state(&self) -> Option<WSConnectStateReceiver> {
None

View File

@ -1,4 +1,11 @@
use flowy_ai_pub::cloud::search_dto::{SearchContentType, SearchDocumentResponseItem};
use flowy_search_pub::entities::TanvitySearchResponseItem;
use flowy_search_pub::tantivy_state::DocumentTantivyState;
use serde::{Deserialize, Deserializer};
use std::sync::Weak;
use tokio::sync::RwLock;
use tracing::trace;
use uuid::Uuid;
/// Handles the case where the value is null. If the value is null, return the default value of the
/// type. Otherwise, deserialize the value.
@ -11,3 +18,42 @@ where
let opt = Option::deserialize(deserializer)?;
Ok(opt.unwrap_or_default())
}
pub async fn tanvity_local_search(
state: &Option<Weak<RwLock<DocumentTantivyState>>>,
workspace_id: &Uuid,
query: &str,
) -> Option<Vec<SearchDocumentResponseItem>> {
match state.as_ref().and_then(|v| v.upgrade()) {
None => {
trace!("[Search] tanvity state is None");
None
},
Some(state) => {
let results = state.read().await.search(workspace_id, query, None).ok()?;
let items = results
.into_iter()
.flat_map(|v| tanvity_document_to_search_document(*workspace_id, v))
.collect::<Vec<_>>();
trace!("[Search] Local search returned {} results", items.len());
Some(items)
},
}
}
pub(crate) fn tanvity_document_to_search_document(
workspace_id: Uuid,
doc: TanvitySearchResponseItem,
) -> Option<SearchDocumentResponseItem> {
let object_id = Uuid::parse_str(&doc.id).ok()?;
Some(SearchDocumentResponseItem {
object_id,
workspace_id,
score: 1.0,
content_type: Some(SearchContentType::PlainText),
content: doc.content,
preview: None,
created_by: "".to_string(),
created_at: Default::default(),
})
}