refactor: database row and cell notification (#5237)

* refactor: database row and cell notification

* chore: clippy

* chore: fix test
This commit is contained in:
Nathan.fooo
2024-05-02 11:42:33 +08:00
committed by GitHub
parent e1e8747f15
commit 7831d8d4ab
19 changed files with 305 additions and 233 deletions

View File

@ -1,7 +1,7 @@
use flowy_derive::ProtoBuf_Enum;
use flowy_notification::NotificationBuilder;
const DATABASE_OBSERVABLE_SOURCE: &str = "Database";
pub(crate) const DATABASE_OBSERVABLE_SOURCE: &str = "Database";
#[derive(ProtoBuf_Enum, Debug, Default)]
pub enum DatabaseNotification {
@ -11,7 +11,7 @@ pub enum DatabaseNotification {
/// storage.
DidFetchRow = 19,
/// Trigger after inserting/deleting/updating a row
DidUpdateViewRows = 20,
DidUpdateRow = 20,
/// Trigger when the visibility of the row was changed. For example, updating the filter will trigger the notification
DidUpdateViewRowsVisibility = 21,
/// Trigger after inserting/deleting/updating a field
@ -64,7 +64,7 @@ impl std::convert::From<i32> for DatabaseNotification {
fn from(notification: i32) -> Self {
match notification {
19 => DatabaseNotification::DidFetchRow,
20 => DatabaseNotification::DidUpdateViewRows,
20 => DatabaseNotification::DidUpdateRow,
21 => DatabaseNotification::DidUpdateViewRowsVisibility,
22 => DatabaseNotification::DidUpdateFields,
40 => DatabaseNotification::DidUpdateCell,

View File

@ -4,7 +4,6 @@ use crate::services::calculations::Calculation;
use crate::services::cell::{apply_cell_changeset, get_cell_protobuf, CellCache};
use crate::services::database::database_observe::*;
use crate::services::database::util::database_view_setting_pb_from_view;
use crate::services::database::UpdatedRow;
use crate::services::database_view::{
DatabaseViewChanged, DatabaseViewEditor, DatabaseViewOperation, DatabaseViews, EditorByViewId,
};
@ -27,6 +26,7 @@ use collab_database::views::{
DatabaseLayout, DatabaseView, FilterMap, LayoutSetting, OrderObjectPosition,
};
use flowy_error::{internal_error, ErrorCode, FlowyError, FlowyResult};
use flowy_notification::DebounceNotificationSender;
use lib_infra::box_any::BoxAny;
use lib_infra::future::{to_fut, Fut, FutureResult};
use lib_infra::priority_task::TaskDispatcher;
@ -41,6 +41,9 @@ pub struct DatabaseEditor {
database: Arc<MutexDatabase>,
pub cell_cache: CellCache,
database_views: Arc<DatabaseViews>,
#[allow(dead_code)]
/// Used to send notification to the frontend.
notification_sender: Arc<DebounceNotificationSender>,
}
impl DatabaseEditor {
@ -48,6 +51,7 @@ impl DatabaseEditor {
database: Arc<MutexDatabase>,
task_scheduler: Arc<RwLock<TaskDispatcher>>,
) -> FlowyResult<Self> {
let notification_sender = Arc::new(DebounceNotificationSender::new(200));
let cell_cache = AnyTypeCache::<u64>::new();
let database_id = database.lock().get_database_id();
@ -55,7 +59,7 @@ impl DatabaseEditor {
observe_sync_state(&database_id, &database).await;
// observe_view_change(&database_id, &database).await;
// observe_field_change(&database_id, &database).await;
// observe_rows_change(&database_id, &database).await;
observe_rows_change(&database_id, &database, &notification_sender).await;
// observe_block_event(&database_id, &database).await;
// Used to cache the view of the database for fast access.
@ -81,6 +85,7 @@ impl DatabaseEditor {
database,
cell_cache,
database_views,
notification_sender,
})
}
@ -154,7 +159,7 @@ impl DatabaseEditor {
if !changes.is_empty() {
for view in self.database_views.editors().await {
send_notification(&view.view_id, DatabaseNotification::DidUpdateViewRows)
send_notification(&view.view_id, DatabaseNotification::DidUpdateRow)
.payload(changes.clone())
.send();
}
@ -497,7 +502,7 @@ impl DatabaseEditor {
let insert_row = InsertedRowPB::new(RowMetaPB::from(row_detail)).with_index(index as i32);
let changes = RowsChangePB::from_move(vec![delete_row_id], vec![insert_row]);
send_notification(view_id, DatabaseNotification::DidUpdateViewRows)
send_notification(view_id, DatabaseNotification::DidUpdateRow)
.payload(changes)
.send();
}
@ -788,10 +793,6 @@ impl DatabaseEditor {
.v_did_update_row(&Some(row_detail.clone()), &row_detail, None)
.await;
}
self
.notify_update_row(view_id, row_detail.row.id, vec![])
.await;
}
/// Update a cell in the database.
@ -851,15 +852,6 @@ impl DatabaseEditor {
.await;
}
}
let changeset = CellChangesetNotifyPB {
view_id: view_id.to_string(),
row_id: row_id.clone().into_inner(),
field_id: field_id.to_string(),
};
self
.notify_update_row(view_id, row_id, vec![changeset])
.await;
}
pub fn get_auto_updated_fields_changesets(
@ -1082,13 +1074,6 @@ impl DatabaseEditor {
self.database.lock().update_row(&row_detail.row.id, |row| {
row.set_cells(Cells::from(row_changeset.cell_by_field_id.clone()));
});
let changesets = cell_changesets_from_cell_by_field_id(
view_id,
row_changeset.row_id,
row_changeset.cell_by_field_id,
);
self.notify_update_row(view_id, from_row, changesets).await;
},
}
@ -1337,58 +1322,6 @@ impl DatabaseEditor {
pub fn get_mutex_database(&self) -> &MutexDatabase {
&self.database
}
async fn notify_update_row(
&self,
view_id: &str,
row: RowId,
extra_changesets: Vec<CellChangesetNotifyPB>,
) {
let mut changesets = self.get_auto_updated_fields_changesets(view_id, row);
changesets.extend(extra_changesets);
notify_did_update_cell(changesets.clone()).await;
notify_did_update_row(changesets).await;
}
}
pub(crate) async fn notify_did_update_cell(changesets: Vec<CellChangesetNotifyPB>) {
for changeset in changesets {
let id = format!("{}:{}", changeset.row_id, changeset.field_id);
send_notification(&id, DatabaseNotification::DidUpdateCell).send();
}
}
async fn notify_did_update_row(changesets: Vec<CellChangesetNotifyPB>) {
let row_id = changesets[0].row_id.clone();
let view_id = changesets[0].view_id.clone();
let field_ids = changesets
.iter()
.map(|changeset| changeset.field_id.to_string())
.collect();
let update_row = UpdatedRow::new(&row_id).with_field_ids(field_ids);
let update_changeset = RowsChangePB::from_update(update_row.into());
send_notification(&view_id, DatabaseNotification::DidUpdateViewRows)
.payload(update_changeset)
.send();
}
fn cell_changesets_from_cell_by_field_id(
view_id: &str,
row_id: RowId,
cell_by_field_id: HashMap<String, Cell>,
) -> Vec<CellChangesetNotifyPB> {
let row_id = row_id.into_inner();
cell_by_field_id
.into_keys()
.map(|field_id| CellChangesetNotifyPB {
view_id: view_id.to_string(),
row_id: row_id.clone(),
field_id,
})
.collect()
}
struct DatabaseViewOperationImpl {

View File

@ -1,14 +1,16 @@
use crate::entities::{DatabaseSyncStatePB, DidFetchRowPB};
use crate::notification::{send_notification, DatabaseNotification};
use crate::entities::{DatabaseSyncStatePB, DidFetchRowPB, RowsChangePB};
use crate::notification::{send_notification, DatabaseNotification, DATABASE_OBSERVABLE_SOURCE};
use crate::services::database::UpdatedRow;
use collab_database::blocks::BlockEvent;
use collab_database::database::MutexDatabase;
use collab_database::fields::FieldChange;
use collab_database::rows::RowChange;
use collab_database::rows::{RowChange, RowId};
use collab_database::views::DatabaseViewChange;
use flowy_notification::{DebounceNotificationSender, NotificationBuilder};
use futures::StreamExt;
use lib_dispatch::prelude::af_spawn;
use std::sync::Arc;
use tracing::trace;
use tracing::{trace, warn};
pub(crate) async fn observe_sync_state(database_id: &str, database: &Arc<MutexDatabase>) {
let weak_database = Arc::downgrade(database);
@ -31,7 +33,12 @@ pub(crate) async fn observe_sync_state(database_id: &str, database: &Arc<MutexDa
}
#[allow(dead_code)]
pub(crate) async fn observe_rows_change(database_id: &str, database: &Arc<MutexDatabase>) {
pub(crate) async fn observe_rows_change(
database_id: &str,
database: &Arc<MutexDatabase>,
notification_sender: &Arc<DebounceNotificationSender>,
) {
let notification_sender = notification_sender.clone();
let database_id = database_id.to_string();
let weak_database = Arc::downgrade(database);
let mut row_change = database.lock().subscribe_row_change();
@ -47,15 +54,24 @@ pub(crate) async fn observe_rows_change(database_id: &str, database: &Arc<MutexD
row_change
);
match row_change {
RowChange::DidUpdateVisibility { .. } => {},
RowChange::DidUpdateHeight { .. } => {},
RowChange::DidUpdateCell { .. } => {},
RowChange::DidUpdateRowComment { .. } => {},
RowChange::DidUpdateCell {
field_id,
row_id,
value: _,
} => {
let cell_id = format!("{}:{}", row_id, field_id);
notify_cell(&notification_sender, &cell_id);
// In the old logic, it will notify the row when the cell is updated. But in the new logic,
// it will notify the cell only. Enable the following code if needed.
// notify_row(&notification_sender, &database_id, field_id, &row_id);
},
_ => {
warn!("unhandled row change: {:?}", row_change);
},
}
}
});
}
#[allow(dead_code)]
pub(crate) async fn observe_field_change(database_id: &str, database: &Arc<MutexDatabase>) {
let database_id = database_id.to_string();
@ -148,3 +164,32 @@ pub(crate) async fn observe_block_event(database_id: &str, database: &Arc<MutexD
}
});
}
#[allow(dead_code)]
fn notify_row(
notification_sender: &Arc<DebounceNotificationSender>,
_database_id: &str,
field_id: String,
row_id: &RowId,
) {
let update_row = UpdatedRow::new(row_id).with_field_ids(vec![field_id]);
let update_changeset = RowsChangePB::from_update(update_row.into());
let subject = NotificationBuilder::new(
row_id,
DatabaseNotification::DidUpdateRow,
DATABASE_OBSERVABLE_SOURCE,
)
.payload(update_changeset)
.build();
notification_sender.send_subject(subject);
}
fn notify_cell(notification_sender: &Arc<DebounceNotificationSender>, cell_id: &str) {
let subject = NotificationBuilder::new(
cell_id,
DatabaseNotification::DidUpdateCell,
DATABASE_OBSERVABLE_SOURCE,
)
.build();
notification_sender.send_subject(subject);
}

View File

@ -86,7 +86,7 @@ impl DatabaseViewChangedReceiverRunner {
is_new: true,
};
let changes = RowsChangePB::from_insert(inserted_row);
send_notification(&result.view_id, DatabaseNotification::DidUpdateViewRows)
send_notification(&result.view_id, DatabaseNotification::DidUpdateRow)
.payload(changes)
.send();
},

View File

@ -173,7 +173,7 @@ impl DatabaseViewEditor {
pub async fn v_did_update_row_meta(&self, row_id: &RowId, row_detail: &RowDetail) {
let update_row = UpdatedRow::new(row_id.as_str()).with_row_meta(row_detail.clone());
let changeset = RowsChangePB::from_update(update_row.into());
send_notification(&self.view_id, DatabaseNotification::DidUpdateViewRows)
send_notification(&self.view_id, DatabaseNotification::DidUpdateRow)
.payload(changeset)
.send();
}
@ -223,7 +223,7 @@ impl DatabaseViewEditor {
}
let changes = RowsChangePB::from_delete(row.id.clone().into_inner());
send_notification(&self.view_id, DatabaseNotification::DidUpdateViewRows)
send_notification(&self.view_id, DatabaseNotification::DidUpdateRow)
.payload(changes)
.send();
@ -1031,7 +1031,7 @@ impl DatabaseViewEditor {
} => RowsChangePB::from_move(vec![deleted_row_id.into_inner()], vec![inserted_row.into()]),
};
send_notification(&self.view_id, DatabaseNotification::DidUpdateViewRows)
send_notification(&self.view_id, DatabaseNotification::DidUpdateRow)
.payload(changeset)
.send();
}