feat: integrate cloud document search (#5523)

This commit is contained in:
Mathias Mogensen
2024-06-13 01:37:19 +02:00
committed by GitHub
parent 4f4be7eac7
commit bd5f5f8b9e
39 changed files with 539 additions and 110 deletions

View File

@ -21,10 +21,12 @@ flowy-notification.workspace = true
flowy-sqlite.workspace = true
flowy-user.workspace = true
flowy-search-pub.workspace = true
flowy-folder = { workspace = true }
bytes.workspace = true
futures.workspace = true
lib-dispatch.workspace = true
lib-infra = { workspace = true }
protobuf.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@ -0,0 +1,99 @@
use std::sync::Arc;
use flowy_error::FlowyResult;
use flowy_folder::{manager::FolderManager, ViewLayout};
use flowy_search_pub::cloud::SearchCloudService;
use lib_infra::async_trait::async_trait;
use crate::{
entities::{IndexTypePB, ResultIconPB, ResultIconTypePB, SearchFilterPB, SearchResultPB},
services::manager::{SearchHandler, SearchType},
};
pub struct DocumentSearchHandler {
pub cloud_service: Arc<dyn SearchCloudService>,
pub folder_manager: Arc<FolderManager>,
}
impl DocumentSearchHandler {
pub fn new(
cloud_service: Arc<dyn SearchCloudService>,
folder_manager: Arc<FolderManager>,
) -> Self {
Self {
cloud_service,
folder_manager,
}
}
}
#[async_trait]
impl SearchHandler for DocumentSearchHandler {
fn search_type(&self) -> SearchType {
SearchType::Document
}
async fn perform_search(
&self,
query: String,
filter: Option<SearchFilterPB>,
) -> FlowyResult<Vec<SearchResultPB>> {
let filter = match filter {
Some(filter) => filter,
None => return Ok(vec![]),
};
let workspace_id = match filter.workspace_id {
Some(workspace_id) => workspace_id,
None => return Ok(vec![]),
};
let results = self
.cloud_service
.document_search(&workspace_id, query)
.await?;
// Grab all views from folder cache
// Notice that `get_all_view_pb` returns Views that don't include trashed and private views
let mut views = self.folder_manager.get_all_views_pb().await?.into_iter();
let mut search_results: Vec<SearchResultPB> = vec![];
for result in results {
if let Some(view) = views.find(|v| v.id == result.object_id) {
// If there is no View for the result, we don't add it to the results
// If possible we will extract the icon to display for the result
let icon: Option<ResultIconPB> = match view.icon.clone() {
Some(view_icon) => Some(ResultIconPB::from(view_icon)),
None => {
let view_layout_ty: i64 = ViewLayout::from(view.layout.clone()).into();
Some(ResultIconPB {
ty: ResultIconTypePB::Icon,
value: view_layout_ty.to_string(),
})
},
};
search_results.push(SearchResultPB {
index_type: IndexTypePB::Document,
view_id: result.object_id.clone(),
id: result.object_id.clone(),
data: view.name.clone(),
icon,
// We reverse the score, the cloud search score is based on
// 1 being the worst result, and closer to 0 being good result, that is
// the opposite of local search.
score: 1.0 - result.score,
workspace_id: result.workspace_id,
preview: result.preview,
});
}
}
Ok(search_results)
}
/// Ignore for [DocumentSearchHandler]
fn index_count(&self) -> u64 {
0
}
}

View File

@ -0,0 +1 @@
pub mod handler;

View File

@ -3,8 +3,9 @@ use flowy_derive::ProtoBuf_Enum;
#[derive(ProtoBuf_Enum, Eq, PartialEq, Debug, Clone)]
pub enum IndexTypePB {
View = 0,
DocumentBlock = 1,
DatabaseRow = 2,
Document = 1,
DocumentBlock = 2,
DatabaseRow = 3,
}
impl Default for IndexTypePB {

View File

@ -8,7 +8,7 @@ pub struct SearchResultNotificationPB {
pub items: Vec<SearchResultPB>,
#[pb(index = 2)]
pub closed: bool,
pub sends: u64,
#[pb(index = 3, one_of)]
pub channel: Option<String>,
@ -19,7 +19,6 @@ pub enum SearchNotification {
#[default]
Unknown = 0,
DidUpdateResults = 1,
DidCloseResults = 2,
}
impl std::convert::From<SearchNotification> for i32 {
@ -32,7 +31,6 @@ impl std::convert::From<i32> for SearchNotification {
fn from(notification: i32) -> Self {
match notification {
1 => SearchNotification::DidUpdateResults,
2 => SearchNotification::DidCloseResults,
_ => SearchNotification::Unknown,
}
}

View File

@ -1,5 +1,6 @@
use collab_folder::{IconType, ViewIcon};
use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
use flowy_folder::entities::ViewIconPB;
use super::IndexTypePB;
@ -31,6 +32,9 @@ pub struct SearchResultPB {
#[pb(index = 7)]
pub workspace_id: String,
#[pb(index = 8, one_of)]
pub preview: Option<String>,
}
impl SearchResultPB {
@ -43,6 +47,7 @@ impl SearchResultPB {
icon: self.icon.clone(),
score,
workspace_id: self.workspace_id.clone(),
preview: self.preview.clone(),
}
}
}
@ -122,3 +127,12 @@ impl From<ViewIcon> for ResultIconPB {
}
}
}
impl From<ViewIconPB> for ResultIconPB {
fn from(val: ViewIconPB) -> Self {
ResultIconPB {
ty: IconType::from(val.ty).into(),
value: val.value,
}
}
}

View File

@ -30,6 +30,7 @@ impl From<FolderIndexData> for SearchResultPB {
score: 0.0,
icon,
workspace_id: data.workspace_id,
preview: None,
}
}
}

View File

@ -3,6 +3,7 @@ use crate::{
services::manager::{SearchHandler, SearchType},
};
use flowy_error::FlowyResult;
use lib_infra::async_trait::async_trait;
use std::sync::Arc;
use super::indexer::FolderIndexManagerImpl;
@ -17,12 +18,13 @@ impl FolderSearchHandler {
}
}
#[async_trait]
impl SearchHandler for FolderSearchHandler {
fn search_type(&self) -> SearchType {
SearchType::Folder
}
fn perform_search(
async fn perform_search(
&self,
query: String,
filter: Option<SearchFilterPB>,

View File

@ -298,12 +298,9 @@ impl IndexManager for FolderIndexManagerImpl {
let wid = workspace_id.clone();
af_spawn(async move {
while let Ok(msg) = rx.recv().await {
tracing::warn!("[Indexer] Message received: {:?}", msg);
match msg {
IndexContent::Create(value) => match serde_json::from_value::<ViewIndexContent>(value) {
Ok(view) => {
tracing::warn!("[Indexer] CREATE: {:?}", view);
let _ = indexer.add_index(IndexableData {
id: view.id,
data: view.name,
@ -316,7 +313,6 @@ impl IndexManager for FolderIndexManagerImpl {
},
IndexContent::Update(value) => match serde_json::from_value::<ViewIndexContent>(value) {
Ok(view) => {
tracing::warn!("[Indexer] UPDATE: {:?}", view);
let _ = indexer.update_index(IndexableData {
id: view.id,
data: view.name,
@ -328,7 +324,6 @@ impl IndexManager for FolderIndexManagerImpl {
Err(err) => tracing::error!("FolderIndexManager error deserialize: {:?}", err),
},
IndexContent::Delete(ids) => {
tracing::warn!("[Indexer] DELETE: {:?}", ids);
if let Err(e) = indexer.remove_indices(ids) {
tracing::error!("FolderIndexManager error deserialize: {:?}", e);
}
@ -459,7 +454,6 @@ impl FolderIndexManager for FolderIndexManagerImpl {
}
},
FolderViewChange::Deleted { view_ids } => {
tracing::warn!("[Indexer] ViewChange Reached Deleted: {:?}", view_ids);
let _ = self.remove_indices(view_ids);
},
};

View File

@ -1,3 +1,4 @@
pub mod document;
pub mod entities;
pub mod event_handler;
pub mod event_map;

View File

@ -5,21 +5,27 @@ use super::notifier::{SearchNotifier, SearchResultChanged, SearchResultReceiverR
use crate::entities::{SearchFilterPB, SearchResultNotificationPB, SearchResultPB};
use flowy_error::FlowyResult;
use lib_dispatch::prelude::af_spawn;
use tokio::{sync::broadcast, task::spawn_blocking};
use lib_infra::async_trait::async_trait;
use tokio::sync::broadcast;
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub enum SearchType {
Folder,
Document,
}
#[async_trait]
pub trait SearchHandler: Send + Sync + 'static {
/// returns the type of search this handler is responsible for
fn search_type(&self) -> SearchType;
/// performs a search and returns the results
fn perform_search(
async fn perform_search(
&self,
query: String,
filter: Option<SearchFilterPB>,
) -> FlowyResult<Vec<SearchResultPB>>;
/// returns the number of indexed objects
fn index_count(&self) -> u64;
}
@ -57,25 +63,22 @@ impl SearchManager {
filter: Option<SearchFilterPB>,
channel: Option<String>,
) {
let mut sends: usize = 0;
let max: usize = self.handlers.len();
let handlers = self.handlers.clone();
for (_, handler) in handlers {
let q = query.clone();
let f = filter.clone();
let ch = channel.clone();
let notifier = self.notifier.clone();
spawn_blocking(move || {
let res = handler.perform_search(q, f);
sends += 1;
af_spawn(async move {
let res = handler.perform_search(q, f).await;
let close = sends == max;
let items = res.unwrap_or_default();
let notification = SearchResultNotificationPB {
items,
closed: close,
sends: max as u64,
channel: ch,
};

View File

@ -31,15 +31,13 @@ impl SearchResultReceiverRunner {
.for_each(|changed| async {
match changed {
SearchResultChanged::SearchResultUpdate(notification) => {
let ty = if notification.closed {
SearchNotification::DidCloseResults
} else {
SearchNotification::DidUpdateResults
};
send_notification(SEARCH_ID, ty, notification.channel.clone())
.payload(notification)
.send();
send_notification(
SEARCH_ID,
SearchNotification::DidUpdateResults,
notification.channel.clone(),
)
.payload(notification)
.send();
},
}
})