Revert "feat: folder search mvp (#4665)" (#4962)

This reverts commit c1006c18c3.
This commit is contained in:
Lucas.Xu
2024-03-22 14:15:38 +07:00
committed by GitHub
parent e2e38f72bb
commit 27ff5f07ab
123 changed files with 519 additions and 4011 deletions

View File

@ -1,53 +0,0 @@
[package]
name = "flowy-search"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
collab = { version = "0.1.0" }
collab-folder = { version = "0.1.0" }
flowy-derive.workspace = true
flowy-error = { workspace = true, features = [
"impl_from_sqlite",
"impl_from_dispatch_error",
"impl_from_collab_document",
"impl_from_tantivy",
"impl_from_serde",
] }
flowy-notification.workspace = true
flowy-sqlite.workspace = true
flowy-user.workspace = true
flowy-search-pub.workspace = true
bytes.workspace = true
futures.workspace = true
lib-dispatch.workspace = true
protobuf.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = ["full", "rt-multi-thread", "tracing"] }
tracing.workspace = true
async-stream = "0.3.4"
strsim = "0.11.0"
strum_macros = "0.26.1"
tantivy = { version = "0.21.1" }
tempfile = "3.9.0"
validator = { version = "0.16.0", features = ["derive"] }
diesel.workspace = true
diesel_derives = { version = "2.1.0", features = ["sqlite", "r2d2"] }
diesel_migrations = { version = "2.1.0", features = ["sqlite"] }
[build-dependencies]
flowy-codegen.workspace = true
[dev-dependencies]
tempfile = "3.10.0"
[features]
dart = ["flowy-codegen/dart"]
tauri_ts = ["flowy-codegen/ts"]

View File

@ -1,2 +0,0 @@
proto_input = ["src/event_map.rs", "src/entities.rs"]
event_files = ["src/event_map.rs"]

View File

@ -1,19 +0,0 @@
#[cfg(feature = "tauri_ts")]
use flowy_codegen::Project;
fn main() {
#[cfg(any(feature = "dart", feature = "tauri_ts"))]
let crate_name = env!("CARGO_PKG_NAME");
#[cfg(feature = "dart")]
{
flowy_codegen::protobuf_file::dart_gen(crate_name);
flowy_codegen::dart_event::gen(crate_name);
}
#[cfg(feature = "tauri_ts")]
{
flowy_codegen::protobuf_file::ts_gen(crate_name, crate_name, Project::Tauri);
flowy_codegen::ts_event::gen(crate_name, Project::Tauri);
}
}

View File

@ -1,189 +0,0 @@
use collab_folder::{IconType, ViewIcon};
use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
#[derive(Eq, PartialEq, ProtoBuf, Default, Debug, Clone)]
pub struct SearchQueryPB {
#[pb(index = 1)]
pub search: String,
#[pb(index = 2, one_of)]
pub limit: Option<i64>,
}
#[derive(Debug, Default, ProtoBuf, Clone)]
pub struct RepeatedSearchResultPB {
#[pb(index = 1)]
pub items: Vec<SearchResultPB>,
}
#[derive(ProtoBuf, Default, Debug, Clone)]
pub struct SearchResultPB {
#[pb(index = 1)]
pub index_type: IndexTypePB,
#[pb(index = 2)]
pub view_id: String,
#[pb(index = 3)]
pub id: String,
#[pb(index = 4)]
pub data: String,
#[pb(index = 5, one_of)]
pub icon: Option<ResultIconPB>,
#[pb(index = 6)]
pub score: f64,
}
impl SearchResultPB {
pub fn with_score(&self, score: f64) -> Self {
SearchResultPB {
index_type: self.index_type.clone(),
view_id: self.view_id.clone(),
id: self.id.clone(),
data: self.data.clone(),
icon: self.icon.clone(),
score,
}
}
}
#[derive(ProtoBuf_Enum, Clone, Debug, PartialEq, Eq, Default)]
pub enum ResultIconTypePB {
#[default]
Emoji = 0,
Url = 1,
Icon = 2,
}
impl std::convert::From<ResultIconTypePB> for IconType {
fn from(rev: ResultIconTypePB) -> Self {
match rev {
ResultIconTypePB::Emoji => IconType::Emoji,
ResultIconTypePB::Url => IconType::Url,
ResultIconTypePB::Icon => IconType::Icon,
}
}
}
impl From<IconType> for ResultIconTypePB {
fn from(val: IconType) -> Self {
match val {
IconType::Emoji => ResultIconTypePB::Emoji,
IconType::Url => ResultIconTypePB::Url,
IconType::Icon => ResultIconTypePB::Icon,
}
}
}
impl std::convert::From<i64> for ResultIconTypePB {
fn from(icon_ty: i64) -> Self {
match icon_ty {
0 => ResultIconTypePB::Emoji,
1 => ResultIconTypePB::Url,
2 => ResultIconTypePB::Icon,
_ => ResultIconTypePB::Emoji,
}
}
}
impl std::convert::From<ResultIconTypePB> for i64 {
fn from(val: ResultIconTypePB) -> Self {
match val {
ResultIconTypePB::Emoji => 0,
ResultIconTypePB::Url => 1,
ResultIconTypePB::Icon => 2,
}
}
}
#[derive(Default, ProtoBuf, Debug, Clone, PartialEq, Eq)]
pub struct ResultIconPB {
#[pb(index = 1)]
pub ty: ResultIconTypePB,
#[pb(index = 2)]
pub value: String,
}
impl std::convert::From<ResultIconPB> for ViewIcon {
fn from(rev: ResultIconPB) -> Self {
ViewIcon {
ty: rev.ty.into(),
value: rev.value,
}
}
}
impl From<ViewIcon> for ResultIconPB {
fn from(val: ViewIcon) -> Self {
ResultIconPB {
ty: val.ty.into(),
value: val.value,
}
}
}
#[derive(ProtoBuf_Enum, Eq, PartialEq, Debug, Clone)]
pub enum IndexTypePB {
View = 0,
DocumentBlock = 1,
DatabaseRow = 2,
}
impl Default for IndexTypePB {
fn default() -> Self {
Self::View
}
}
impl std::convert::From<IndexTypePB> for i32 {
fn from(notification: IndexTypePB) -> Self {
notification as i32
}
}
impl std::convert::From<i32> for IndexTypePB {
fn from(notification: i32) -> Self {
match notification {
1 => IndexTypePB::View,
2 => IndexTypePB::DocumentBlock,
_ => IndexTypePB::DatabaseRow,
}
}
}
#[derive(ProtoBuf, Default, Debug, Clone)]
pub struct SearchResultNotificationPB {
#[pb(index = 1)]
pub items: Vec<SearchResultPB>,
#[pb(index = 2)]
pub closed: bool,
}
#[derive(ProtoBuf_Enum, Debug, Default)]
pub enum SearchNotification {
#[default]
Unknown = 0,
DidUpdateResults = 1,
DidCloseResults = 2,
}
impl std::convert::From<SearchNotification> for i32 {
fn from(notification: SearchNotification) -> Self {
notification as i32
}
}
impl std::convert::From<i32> for SearchNotification {
fn from(notification: i32) -> Self {
match notification {
1 => SearchNotification::DidUpdateResults,
2 => SearchNotification::DidCloseResults,
_ => SearchNotification::Unknown,
}
}
}

View File

@ -1,27 +0,0 @@
use std::sync::{Arc, Weak};
use flowy_error::{FlowyError, FlowyResult};
use lib_dispatch::prelude::{AFPluginData, AFPluginState};
use crate::{entities::SearchQueryPB, services::manager::SearchManager};
fn upgrade_manager(
search_manager: AFPluginState<Weak<SearchManager>>,
) -> FlowyResult<Arc<SearchManager>> {
let manager = search_manager
.upgrade()
.ok_or(FlowyError::internal().with_context("The SearchManager has already been dropped"))?;
Ok(manager)
}
#[tracing::instrument(level = "debug", skip(manager), err)]
pub(crate) async fn search_handler(
data: AFPluginData<SearchQueryPB>,
manager: AFPluginState<Weak<SearchManager>>,
) -> Result<(), FlowyError> {
let query = data.into_inner();
let manager = upgrade_manager(manager)?;
manager.perform_search(query.search);
Ok(())
}

View File

@ -1,21 +0,0 @@
use std::sync::Weak;
use strum_macros::Display;
use flowy_derive::{Flowy_Event, ProtoBuf_Enum};
use lib_dispatch::prelude::*;
use crate::{event_handler::search_handler, services::manager::SearchManager};
pub fn init(search_manager: Weak<SearchManager>) -> AFPlugin {
AFPlugin::new()
.state(search_manager)
.name(env!("CARGO_PKG_NAME"))
.event(SearchEvent::Search, search_handler)
}
#[derive(Clone, Copy, PartialEq, Eq, Debug, Display, Hash, ProtoBuf_Enum, Flowy_Event)]
#[event_err = "FlowyError"]
pub enum SearchEvent {
#[event(input = "SearchQueryPB")]
Search = 0,
}

View File

@ -1,33 +0,0 @@
use serde::{Deserialize, Serialize};
use crate::entities::{IndexTypePB, ResultIconPB, SearchResultPB};
#[derive(Debug, Serialize, Deserialize)]
pub struct FolderIndexData {
pub id: String,
pub title: String,
pub icon: String,
pub icon_ty: i64,
}
impl From<FolderIndexData> for SearchResultPB {
fn from(data: FolderIndexData) -> Self {
let icon = if data.icon.is_empty() {
None
} else {
Some(ResultIconPB {
ty: data.icon_ty.into(),
value: data.icon,
})
};
Self {
index_type: IndexTypePB::View,
view_id: data.id.clone(),
id: data.id,
data: data.title,
score: 0.0,
icon,
}
}
}

View File

@ -1,30 +0,0 @@
use crate::entities::SearchResultPB;
use crate::services::manager::{SearchHandler, SearchType};
use flowy_error::FlowyResult;
use std::sync::Arc;
use super::indexer::FolderIndexManagerImpl;
pub struct FolderSearchHandler {
pub index_manager: Arc<FolderIndexManagerImpl>,
}
impl FolderSearchHandler {
pub fn new(index_manager: Arc<FolderIndexManagerImpl>) -> Self {
Self { index_manager }
}
}
impl SearchHandler for FolderSearchHandler {
fn search_type(&self) -> SearchType {
SearchType::Folder
}
fn perform_search(&self, query: String) -> FlowyResult<Vec<SearchResultPB>> {
self.index_manager.search(query)
}
fn index_count(&self) -> u64 {
self.index_manager.num_docs()
}
}

View File

@ -1,376 +0,0 @@
use std::{any::Any, collections::HashMap, fs, path::Path, sync::Weak};
use crate::{
entities::ResultIconTypePB,
folder::schema::{FolderSchema, FOLDER_ICON_FIELD_NAME, FOLDER_TITLE_FIELD_NAME},
};
use collab::core::collab::{IndexContent, IndexContentReceiver};
use collab_folder::{View, ViewIcon, ViewIndexContent, ViewLayout};
use flowy_error::{FlowyError, FlowyResult};
use flowy_search_pub::entities::{FolderIndexManager, IndexManager, IndexableData};
use flowy_user::services::authenticate_user::AuthenticateUser;
use lib_dispatch::prelude::af_spawn;
use strsim::levenshtein;
use tantivy::{
collector::TopDocs, directory::MmapDirectory, doc, query::QueryParser, Index, IndexReader,
IndexWriter, Term,
};
use crate::entities::SearchResultPB;
use super::{
entities::FolderIndexData,
schema::{FOLDER_ICON_TY_FIELD_NAME, FOLDER_ID_FIELD_NAME},
};
#[derive(Clone)]
pub struct FolderIndexManagerImpl {
folder_schema: Option<FolderSchema>,
index: Option<Index>,
index_reader: Option<IndexReader>,
}
const FOLDER_INDEX_DIR: &str = "folder_index";
impl FolderIndexManagerImpl {
pub fn new(auth_user: Weak<AuthenticateUser>) -> Self {
// AuthenticateUser is required to get the index path
let authenticate_user = auth_user.upgrade();
// Storage path is the users data path with an index directory
// Eg. /usr/flowy-data/indexes
let storage_path = match authenticate_user {
Some(auth_user) => auth_user.get_index_path(),
None => {
tracing::error!("FolderIndexManager: AuthenticateUser is not available");
return FolderIndexManagerImpl::empty();
},
};
// We check if the `folder_index` directory exists, if not we create it
let index_path = storage_path.join(Path::new(FOLDER_INDEX_DIR));
if !index_path.exists() {
let res = fs::create_dir_all(&index_path);
if let Err(e) = res {
tracing::error!(
"FolderIndexManager failed to create index directory: {:?}",
e
);
return FolderIndexManagerImpl::empty();
}
}
// We open the existing or newly created folder_index directory
// This is required by the Tantivy Index, as it will use it to store
// and read index data
let dir = MmapDirectory::open(index_path);
if let Err(e) = dir {
tracing::error!("FolderIndexManager failed to open index directory: {:?}", e);
return FolderIndexManagerImpl::empty();
}
// The folder schema is used to define the fields of the index along
// with how they are stored and if the field is indexed
let folder_schema = FolderSchema::new();
// We open or create an index that takes the directory r/w and the schema.
let index_res = Index::open_or_create(dir.unwrap(), folder_schema.schema.clone());
if let Err(e) = index_res {
tracing::error!("FolderIndexManager failed to open index: {:?}", e);
return FolderIndexManagerImpl::empty();
}
let index = index_res.unwrap();
// We read the index reader, we only need one IndexReader per index
let index_reader = index.reader();
if let Err(e) = index_reader {
tracing::error!(
"FolderIndexManager failed to instantiate index reader: {:?}",
e
);
return FolderIndexManagerImpl::empty();
}
Self {
folder_schema: Some(folder_schema),
index: Some(index),
index_reader: Some(index_reader.unwrap()),
}
}
fn index_all(&self, indexes: Vec<IndexableData>) -> Result<(), FlowyError> {
if self.is_indexed() || indexes.is_empty() {
return Ok(());
}
let mut index_writer = self.get_index_writer()?;
let folder_schema = self.get_folder_schema()?;
let id_field = folder_schema.schema.get_field(FOLDER_ID_FIELD_NAME)?;
let title_field = folder_schema.schema.get_field(FOLDER_TITLE_FIELD_NAME)?;
let icon_field = folder_schema.schema.get_field(FOLDER_ICON_FIELD_NAME)?;
let icon_ty_field = folder_schema.schema.get_field(FOLDER_ICON_TY_FIELD_NAME)?;
for data in indexes {
let (icon, icon_ty) = self.extract_icon(data.icon, data.layout);
let _ = index_writer.add_document(doc![
id_field => data.id.clone(),
title_field => data.data.clone(),
icon_field => icon.unwrap_or_default(),
icon_ty_field => icon_ty,
]);
}
index_writer.commit()?;
Ok(())
}
pub fn num_docs(&self) -> u64 {
self
.index_reader
.clone()
.map(|reader| reader.searcher().num_docs())
.unwrap_or(0)
}
fn empty() -> Self {
Self {
folder_schema: None,
index: None,
index_reader: None,
}
}
fn get_index_writer(&self) -> FlowyResult<IndexWriter> {
match &self.index {
// Creates an IndexWriter with a heap size of 50 MB (50.000.000 bytes)
Some(index) => Ok(index.writer(50_000_000)?),
None => Err(FlowyError::folder_index_manager_unavailable()),
}
}
fn get_folder_schema(&self) -> FlowyResult<FolderSchema> {
match &self.folder_schema {
Some(folder_schema) => Ok(folder_schema.clone()),
None => Err(FlowyError::folder_index_manager_unavailable()),
}
}
fn extract_icon(
&self,
view_icon: Option<ViewIcon>,
view_layout: ViewLayout,
) -> (Option<String>, i64) {
let icon_ty: i64;
let icon: Option<String>;
if view_icon.clone().is_some_and(|v| !v.value.is_empty()) {
let view_icon = view_icon.unwrap();
let result_icon_ty: ResultIconTypePB = view_icon.ty.into();
icon_ty = result_icon_ty.into();
icon = Some(view_icon.value);
} else {
icon_ty = ResultIconTypePB::Icon.into();
let layout_ty: i64 = view_layout.into();
icon = Some(layout_ty.to_string());
}
(icon, icon_ty)
}
pub fn search(&self, query: String) -> Result<Vec<SearchResultPB>, FlowyError> {
let folder_schema = self.get_folder_schema()?;
let index = match &self.index {
Some(index) => index,
None => return Err(FlowyError::folder_index_manager_unavailable()),
};
let index_reader = match &self.index_reader {
Some(index_reader) => index_reader,
None => return Err(FlowyError::folder_index_manager_unavailable()),
};
let title_field = folder_schema.schema.get_field(FOLDER_TITLE_FIELD_NAME)?;
let length = query.len();
let distance: u8 = match length {
_ if length > 4 => 2,
_ if length > 2 => 1,
_ => 0,
};
let mut query_parser = QueryParser::for_index(&index.clone(), vec![title_field]);
query_parser.set_field_fuzzy(title_field, true, distance, true);
let built_query = query_parser.parse_query(&query.clone())?;
let searcher = index_reader.searcher();
let mut search_results: Vec<SearchResultPB> = vec![];
let top_docs = searcher.search(&built_query, &TopDocs::with_limit(10))?;
for (_score, doc_address) in top_docs {
let retrieved_doc = searcher.doc(doc_address)?;
let mut content = HashMap::new();
let named_doc = folder_schema.schema.to_named_doc(&retrieved_doc);
for (k, v) in named_doc.0 {
content.insert(k, v[0].clone());
}
if content.is_empty() {
continue;
}
let s = serde_json::to_string(&content)?;
let result: SearchResultPB = serde_json::from_str::<FolderIndexData>(&s)?.into();
let score = self.score_result(&query, &result.data);
search_results.push(result.with_score(score));
}
Ok(search_results)
}
// Score result by distance
fn score_result(&self, query: &str, term: &str) -> f64 {
let distance = levenshtein(query, term) as f64;
1.0 / (distance + 1.0)
}
}
impl IndexManager for FolderIndexManagerImpl {
fn is_indexed(&self) -> bool {
self
.index_reader
.clone()
.map(|reader| reader.searcher().num_docs() > 0)
.unwrap_or(false)
}
fn set_index_content_receiver(&self, mut rx: IndexContentReceiver) {
let indexer = self.clone();
af_spawn(async move {
while let Ok(msg) = rx.recv().await {
match msg {
IndexContent::Create(value) => match serde_json::from_value::<ViewIndexContent>(value) {
Ok(view) => {
let _ = indexer.add_index(IndexableData {
id: view.id,
data: view.name,
icon: view.icon,
layout: view.layout,
});
},
Err(err) => tracing::error!("FolderIndexManager error deserialize: {:?}", err),
},
IndexContent::Update(value) => match serde_json::from_value::<ViewIndexContent>(value) {
Ok(view) => {
let _ = indexer.update_index(IndexableData {
id: view.id,
data: view.name,
icon: view.icon,
layout: view.layout,
});
},
Err(err) => tracing::error!("FolderIndexManager error deserialize: {:?}", err),
},
IndexContent::Delete(ids) => {
if let Err(e) = indexer.remove_indices(ids) {
tracing::error!("FolderIndexManager error deserialize: {:?}", e);
}
},
}
}
});
}
fn update_index(&self, data: IndexableData) -> Result<(), FlowyError> {
let mut index_writer = self.get_index_writer()?;
let folder_schema = self.get_folder_schema()?;
let id_field = folder_schema.schema.get_field(FOLDER_ID_FIELD_NAME)?;
let title_field = folder_schema.schema.get_field(FOLDER_TITLE_FIELD_NAME)?;
let icon_field = folder_schema.schema.get_field(FOLDER_ICON_FIELD_NAME)?;
let icon_ty_field = folder_schema.schema.get_field(FOLDER_ICON_TY_FIELD_NAME)?;
let delete_term = Term::from_field_text(id_field, &data.id.clone());
// Remove old index
index_writer.delete_term(delete_term);
let (icon, icon_ty) = self.extract_icon(data.icon, data.layout);
// Add new index
let _ = index_writer.add_document(doc![
id_field => data.id.clone(),
title_field => data.data,
icon_field => icon.unwrap_or_default(),
icon_ty_field => icon_ty,
]);
index_writer.commit()?;
Ok(())
}
fn remove_indices(&self, ids: Vec<String>) -> Result<(), FlowyError> {
let mut index_writer = self.get_index_writer()?;
let folder_schema = self.get_folder_schema()?;
let id_field = folder_schema.schema.get_field(FOLDER_ID_FIELD_NAME)?;
for id in ids {
let delete_term = Term::from_field_text(id_field, &id);
index_writer.delete_term(delete_term);
}
index_writer.commit()?;
Ok(())
}
fn add_index(&self, data: IndexableData) -> Result<(), FlowyError> {
let mut index_writer = self.get_index_writer()?;
let folder_schema = self.get_folder_schema()?;
let id_field = folder_schema.schema.get_field(FOLDER_ID_FIELD_NAME)?;
let title_field = folder_schema.schema.get_field(FOLDER_TITLE_FIELD_NAME)?;
let icon_field = folder_schema.schema.get_field(FOLDER_ICON_FIELD_NAME)?;
let icon_ty_field = folder_schema.schema.get_field(FOLDER_ICON_TY_FIELD_NAME)?;
let (icon, icon_ty) = self.extract_icon(data.icon, data.layout);
// Add new index
let _ = index_writer.add_document(doc![
id_field => data.id,
title_field => data.data,
icon_field => icon.unwrap_or_default(),
icon_ty_field => icon_ty,
]);
index_writer.commit()?;
Ok(())
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl FolderIndexManager for FolderIndexManagerImpl {
fn index_all_views(&self, views: Vec<View>) {
let indexable_data = views
.into_iter()
.map(|view| IndexableData {
id: view.id,
data: view.name,
icon: view.icon,
layout: view.layout,
})
.collect();
let _ = self.index_all(indexable_data);
}
}

View File

@ -1,4 +0,0 @@
pub mod entities;
pub mod handler;
pub mod indexer;
pub mod schema;

View File

@ -1,47 +0,0 @@
use tantivy::schema::Schema;
pub const FOLDER_ID_FIELD_NAME: &str = "id";
pub const FOLDER_TITLE_FIELD_NAME: &str = "title";
pub const FOLDER_ICON_FIELD_NAME: &str = "icon";
pub const FOLDER_ICON_TY_FIELD_NAME: &str = "icon_ty";
#[derive(Clone)]
pub struct FolderSchema {
pub schema: Schema,
}
/// Do not change the schema after the index has been created.
/// Changing field_options or fields, will result in the schema being different
/// from previously created index, causing tantivy to panic and search to stop functioning.
///
/// If you need to change the schema, create a migration that removes the old index,
/// and creates a new one with the new schema.
///
impl FolderSchema {
pub fn new() -> Self {
let mut schema_builder = Schema::builder();
schema_builder.add_text_field(
FOLDER_ID_FIELD_NAME,
tantivy::schema::STRING | tantivy::schema::STORED,
);
schema_builder.add_text_field(
FOLDER_TITLE_FIELD_NAME,
tantivy::schema::TEXT | tantivy::schema::STORED,
);
schema_builder.add_text_field(
FOLDER_ICON_FIELD_NAME,
tantivy::schema::TEXT | tantivy::schema::STORED,
);
schema_builder.add_i64_field(FOLDER_ICON_TY_FIELD_NAME, tantivy::schema::STORED);
let schema = schema_builder.build();
Self { schema }
}
}
impl Default for FolderSchema {
fn default() -> Self {
Self::new()
}
}

View File

@ -1,6 +0,0 @@
pub mod entities;
pub mod event_handler;
pub mod event_map;
pub mod folder;
pub mod protobuf;
pub mod services;

View File

@ -1,77 +0,0 @@
use std::collections::HashMap;
use std::sync::Arc;
use flowy_error::FlowyResult;
use lib_dispatch::prelude::af_spawn;
use tokio::{sync::broadcast, task::spawn_blocking};
use crate::entities::{SearchResultNotificationPB, SearchResultPB};
use super::notifier::{SearchNotifier, SearchResultChanged, SearchResultReceiverRunner};
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub enum SearchType {
Folder,
}
pub trait SearchHandler: Send + Sync + 'static {
/// returns the type of search this handler is responsible for
fn search_type(&self) -> SearchType;
/// performs a search and returns the results
fn perform_search(&self, query: String) -> FlowyResult<Vec<SearchResultPB>>;
/// returns the number of indexed objects
fn index_count(&self) -> u64;
}
/// The [SearchManager] is used to inject multiple [SearchHandler]'s
/// to delegate a search to all relevant handlers, and stream the result
/// to the client until the query has been fully completed.
///
pub struct SearchManager {
pub handlers: HashMap<SearchType, Arc<dyn SearchHandler>>,
notifier: SearchNotifier,
}
impl SearchManager {
pub fn new(handlers: Vec<Arc<dyn SearchHandler>>) -> Self {
let handlers: HashMap<SearchType, Arc<dyn SearchHandler>> = handlers
.into_iter()
.map(|handler| (handler.search_type(), handler))
.collect();
// Initialize Search Notifier
let (notifier, _) = broadcast::channel(100);
af_spawn(SearchResultReceiverRunner(Some(notifier.subscribe())).run());
Self { handlers, notifier }
}
pub fn get_handler(&self, search_type: SearchType) -> Option<&Arc<dyn SearchHandler>> {
self.handlers.get(&search_type)
}
pub fn perform_search(&self, query: String) {
let mut sends: usize = 0;
let max: usize = self.handlers.len();
let handlers = self.handlers.clone();
for (_, handler) in handlers {
let q = query.clone();
let notifier = self.notifier.clone();
spawn_blocking(move || {
let res = handler.perform_search(q);
sends += 1;
let close = sends == max;
let items = res.unwrap_or_default();
let notification = SearchResultNotificationPB {
items,
closed: close,
};
let _ = notifier.send(SearchResultChanged::SearchResultUpdate(notification));
});
}
}
}

View File

@ -1,2 +0,0 @@
pub mod manager;
pub mod notifier;

View File

@ -1,53 +0,0 @@
use async_stream::stream;
use flowy_notification::NotificationBuilder;
use futures::stream::StreamExt;
use tokio::sync::broadcast;
use crate::entities::{SearchNotification, SearchResultNotificationPB};
const OBSERVABLE_SOURCE: &str = "SEARCH";
const SEARCH_ID: &str = "SEARCH_IDENTIFIER";
#[derive(Clone)]
pub enum SearchResultChanged {
SearchResultUpdate(SearchResultNotificationPB),
}
pub type SearchNotifier = broadcast::Sender<SearchResultChanged>;
pub(crate) struct SearchResultReceiverRunner(
pub(crate) Option<broadcast::Receiver<SearchResultChanged>>,
);
impl SearchResultReceiverRunner {
pub(crate) async fn run(mut self) {
let mut receiver = self.0.take().expect("Only take once");
let stream = stream! {
while let Ok(changed) = receiver.recv().await {
yield changed;
}
};
stream
.for_each(|changed| async {
match changed {
SearchResultChanged::SearchResultUpdate(notification) => {
let ty = if notification.closed {
SearchNotification::DidCloseResults
} else {
SearchNotification::DidUpdateResults
};
send_notification(SEARCH_ID, ty)
.payload(notification)
.send();
},
}
})
.await;
}
}
#[tracing::instrument(level = "trace")]
pub fn send_notification(id: &str, ty: SearchNotification) -> NotificationBuilder {
NotificationBuilder::new(id, ty, OBSERVABLE_SOURCE)
}

View File

@ -1,3 +0,0 @@
// mod search;
mod tantivy_test;

View File

@ -1,53 +0,0 @@
use tantivy::collector::TopDocs;
use tantivy::query::QueryParser;
use tantivy::schema::*;
use tantivy::{doc, DocAddress, Index, Score};
#[test]
fn search_folder_test() {
let mut schema_builder = Schema::builder();
let id = schema_builder.add_text_field("id", TEXT);
let title = schema_builder.add_text_field("title", TEXT | STORED);
let schema = schema_builder.build();
// Indexing documents
let index = Index::create_from_tempdir(schema.clone()).unwrap();
// Here we use a buffer of 100MB that will be split
// between indexing threads.
let mut index_writer = index.writer(100_000_000).unwrap();
// Let's index one documents!
index_writer
.add_document(doc!(
id => "123456789",
title => "The Old Man and the Seawhale",
))
.unwrap();
// We need to call .commit() explicitly to force the
// index_writer to finish processing the documents in the queue,
// flush the current index to the disk, and advertise
// the existence of new documents.
index_writer.commit().unwrap();
// # Searching
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let mut query_parser = QueryParser::for_index(&index, vec![title]);
query_parser.set_field_fuzzy(title, true, 2, true);
let query = query_parser.parse_query("sewhals").unwrap();
// Perform search.
// `topdocs` contains the 10 most relevant doc ids, sorted by decreasing scores...
let top_docs: Vec<(Score, DocAddress)> =
searcher.search(&query, &TopDocs::with_limit(10)).unwrap();
for (_score, doc_address) in top_docs {
// Retrieve the actual content of documents given its `doc_address`.
let retrieved_doc = searcher.doc(doc_address).unwrap();
println!("{}", schema.to_json(&retrieved_doc));
}
}