chore: update client api and collab (#5231)

This commit is contained in:
Nathan.fooo
2024-04-30 20:40:03 +08:00
committed by GitHub
parent 4981baac13
commit 9a8109f5f8
12 changed files with 285 additions and 189 deletions

View File

@ -3,8 +3,7 @@ use std::collections::HashMap;
use std::sync::{Arc, Weak};
use collab::core::collab::{DataSource, MutexCollab};
use collab_database::blocks::BlockEvent;
use collab_database::database::{DatabaseData, MutexDatabase};
use collab_database::database::DatabaseData;
use collab_database::error::DatabaseError;
use collab_database::views::{CreateDatabaseParams, CreateViewParams, DatabaseLayout};
use collab_database::workspace_database::{
@ -19,11 +18,9 @@ use collab_integrate::collab_builder::{AppFlowyCollabBuilder, CollabBuilderConfi
use collab_integrate::{CollabKVAction, CollabKVDB, CollabPersistenceConfig};
use flowy_database_pub::cloud::DatabaseCloudService;
use flowy_error::{internal_error, FlowyError, FlowyResult};
use lib_dispatch::prelude::af_spawn;
use lib_infra::priority_task::TaskDispatcher;
use crate::entities::{DatabaseLayoutPB, DatabaseSnapshotPB, DidFetchRowPB};
use crate::notification::{send_notification, DatabaseNotification};
use crate::entities::{DatabaseLayoutPB, DatabaseSnapshotPB};
use crate::services::database::DatabaseEditor;
use crate::services::database_view::DatabaseLayoutDepsResolver;
use crate::services::field_settings::default_field_settings_by_layout_map;
@ -219,9 +216,6 @@ impl DatabaseManager {
.await
.ok_or_else(|| FlowyError::collab_not_sync().with_context("open database error"))?;
// Subscribe the [BlockEvent]
subscribe_block_event(&database);
let editor = Arc::new(DatabaseEditor::new(database, self.task_scheduler.clone()).await?);
self
.editors
@ -418,27 +412,6 @@ impl DatabaseManager {
}
}
/// Send notification to all clients that are listening to the given object.
fn subscribe_block_event(database: &Arc<MutexDatabase>) {
let mut block_event_rx = database.lock().subscribe_block_event();
af_spawn(async move {
while let Ok(event) = block_event_rx.recv().await {
match event {
BlockEvent::DidFetchRow(row_details) => {
for row_detail in row_details {
trace!("Did fetch row: {:?}", row_detail.row.id);
let row_id = row_detail.row.id.clone();
let pb = DidFetchRowPB::from(row_detail);
send_notification(&row_id, DatabaseNotification::DidFetchRow)
.payload(pb)
.send();
}
},
}
}
});
}
struct UserDatabaseCollabServiceImpl {
user: Arc<dyn DatabaseUser>,
collab_builder: Arc<AppFlowyCollabBuilder>,

View File

@ -2,6 +2,7 @@ use crate::entities::*;
use crate::notification::{send_notification, DatabaseNotification};
use crate::services::calculations::Calculation;
use crate::services::cell::{apply_cell_changeset, get_cell_protobuf, CellCache};
use crate::services::database::database_observe::*;
use crate::services::database::util::database_view_setting_pb_from_view;
use crate::services::database::UpdatedRow;
use crate::services::database_view::{
@ -26,8 +27,6 @@ use collab_database::views::{
DatabaseLayout, DatabaseView, FilterMap, LayoutSetting, OrderObjectPosition,
};
use flowy_error::{internal_error, ErrorCode, FlowyError, FlowyResult};
use futures::StreamExt;
use lib_dispatch::prelude::af_spawn;
use lib_infra::box_any::BoxAny;
use lib_infra::future::{to_fut, Fut, FutureResult};
use lib_infra::priority_task::TaskDispatcher;
@ -53,38 +52,11 @@ impl DatabaseEditor {
let database_id = database.lock().get_database_id();
// Receive database sync state and send to frontend via the notification
let mut sync_state = database.lock().subscribe_sync_state();
let cloned_database_id = database_id.clone();
af_spawn(async move {
while let Some(sync_state) = sync_state.next().await {
send_notification(
&cloned_database_id,
DatabaseNotification::DidUpdateDatabaseSyncUpdate,
)
.payload(DatabaseSyncStatePB::from(sync_state))
.send();
}
});
// Receive database snapshot state and send to frontend via the notification
let mut snapshot_state = database.lock().subscribe_snapshot_state();
af_spawn(async move {
while let Some(snapshot_state) = snapshot_state.next().await {
if let Some(new_snapshot_id) = snapshot_state.snapshot_id() {
tracing::debug!(
"Did create {} database remote snapshot: {}",
database_id,
new_snapshot_id
);
send_notification(
&database_id,
DatabaseNotification::DidUpdateDatabaseSnapshotState,
)
.payload(DatabaseSnapshotStatePB { new_snapshot_id })
.send();
}
}
});
observe_sync_state(&database_id, &database).await;
// observe_view_change(&database_id, &database).await;
// observe_field_change(&database_id, &database).await;
// observe_rows_change(&database_id, &database).await;
// observe_block_event(&database_id, &database).await;
// Used to cache the view of the database for fast access.
let editor_by_view_id = Arc::new(RwLock::new(EditorByViewId::default()));

View File

@ -0,0 +1,150 @@
use crate::entities::{DatabaseSyncStatePB, DidFetchRowPB};
use crate::notification::{send_notification, DatabaseNotification};
use collab_database::blocks::BlockEvent;
use collab_database::database::MutexDatabase;
use collab_database::fields::FieldChange;
use collab_database::rows::RowChange;
use collab_database::views::DatabaseViewChange;
use futures::StreamExt;
use lib_dispatch::prelude::af_spawn;
use std::sync::Arc;
use tracing::trace;
pub(crate) async fn observe_sync_state(database_id: &str, database: &Arc<MutexDatabase>) {
let weak_database = Arc::downgrade(database);
let mut sync_state = database.lock().subscribe_sync_state();
let database_id = database_id.to_string();
af_spawn(async move {
while let Some(sync_state) = sync_state.next().await {
if weak_database.upgrade().is_none() {
break;
}
send_notification(
&database_id,
DatabaseNotification::DidUpdateDatabaseSyncUpdate,
)
.payload(DatabaseSyncStatePB::from(sync_state))
.send();
}
});
}
#[allow(dead_code)]
pub(crate) async fn observe_rows_change(database_id: &str, database: &Arc<MutexDatabase>) {
let database_id = database_id.to_string();
let weak_database = Arc::downgrade(database);
let mut row_change = database.lock().subscribe_row_change();
af_spawn(async move {
while let Ok(row_change) = row_change.recv().await {
if weak_database.upgrade().is_none() {
break;
}
trace!(
"[Database Observe]: {} row change:{:?}",
database_id,
row_change
);
match row_change {
RowChange::DidUpdateVisibility { .. } => {},
RowChange::DidUpdateHeight { .. } => {},
RowChange::DidUpdateCell { .. } => {},
RowChange::DidUpdateRowComment { .. } => {},
}
}
});
}
#[allow(dead_code)]
pub(crate) async fn observe_field_change(database_id: &str, database: &Arc<MutexDatabase>) {
let database_id = database_id.to_string();
let weak_database = Arc::downgrade(database);
let mut field_change = database.lock().subscribe_field_change();
af_spawn(async move {
while let Ok(field_change) = field_change.recv().await {
if weak_database.upgrade().is_none() {
break;
}
trace!(
"[Database Observe]: {} field change:{:?}",
database_id,
field_change
);
match field_change {
FieldChange::DidUpdateField { .. } => {},
FieldChange::DidCreateField { .. } => {},
FieldChange::DidDeleteField { .. } => {},
}
}
});
}
#[allow(dead_code)]
pub(crate) async fn observe_view_change(database_id: &str, database: &Arc<MutexDatabase>) {
let database_id = database_id.to_string();
let weak_database = Arc::downgrade(database);
let mut view_change = database.lock().subscribe_view_change();
af_spawn(async move {
while let Ok(view_change) = view_change.recv().await {
if weak_database.upgrade().is_none() {
break;
}
trace!(
"[Database Observe]: {} view change:{:?}",
database_id,
view_change
);
match view_change {
DatabaseViewChange::DidCreateView { .. } => {},
DatabaseViewChange::DidUpdateView { .. } => {},
DatabaseViewChange::DidDeleteView { .. } => {},
DatabaseViewChange::LayoutSettingChanged { .. } => {},
DatabaseViewChange::DidInsertRowOrders { .. } => {},
DatabaseViewChange::DidDeleteRowAtIndex { .. } => {},
DatabaseViewChange::DidCreateFilters { .. } => {},
DatabaseViewChange::DidUpdateFilter { .. } => {},
DatabaseViewChange::DidCreateGroupSettings { .. } => {},
DatabaseViewChange::DidUpdateGroupSetting { .. } => {},
DatabaseViewChange::DidCreateSorts { .. } => {},
DatabaseViewChange::DidUpdateSort { .. } => {},
DatabaseViewChange::DidCreateFieldOrder { .. } => {},
DatabaseViewChange::DidDeleteFieldOrder { .. } => {},
}
}
});
}
#[allow(dead_code)]
pub(crate) async fn observe_block_event(database_id: &str, database: &Arc<MutexDatabase>) {
let database_id = database_id.to_string();
let weak_database = Arc::downgrade(database);
let mut block_event_rx = database.lock().subscribe_block_event();
af_spawn(async move {
while let Ok(event) = block_event_rx.recv().await {
if weak_database.upgrade().is_none() {
break;
}
trace!(
"[Database Observe]: {} block event: {:?}",
database_id,
event
);
match event {
BlockEvent::DidFetchRow(row_details) => {
for row_detail in row_details {
trace!("Did fetch row: {:?}", row_detail.row.id);
let row_id = row_detail.row.id.clone();
let pb = DidFetchRowPB::from(row_detail);
send_notification(&row_id, DatabaseNotification::DidFetchRow)
.payload(pb)
.send();
}
},
}
}
});
}

View File

@ -1,4 +1,5 @@
mod database_editor;
mod database_observe;
mod entities;
mod util;