diff --git a/frontend/rust-lib/flowy-grid/src/event_handler.rs b/frontend/rust-lib/flowy-grid/src/event_handler.rs index 5555c841bf..b312069ead 100644 --- a/frontend/rust-lib/flowy-grid/src/event_handler.rs +++ b/frontend/rust-lib/flowy-grid/src/event_handler.rs @@ -42,7 +42,7 @@ pub(crate) async fn update_grid_setting_handler( ) -> Result<(), FlowyError> { let params: GridSettingChangesetParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(¶ms.grid_id)?; + let editor = manager.get_grid_editor(¶ms.grid_id).await?; if let Some(insert_params) = params.insert_group { let _ = editor.insert_group(insert_params).await?; } @@ -67,7 +67,7 @@ pub(crate) async fn get_grid_blocks_handler( manager: AppData>, ) -> DataResult { let params: QueryGridBlocksParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(¶ms.grid_id)?; + let editor = manager.get_grid_editor(¶ms.grid_id).await?; let repeated_grid_block = editor.get_blocks(Some(params.block_ids)).await?; data_result(repeated_grid_block) } @@ -78,7 +78,7 @@ pub(crate) async fn get_fields_handler( manager: AppData>, ) -> DataResult { let params: QueryFieldParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(¶ms.grid_id)?; + let editor = manager.get_grid_editor(¶ms.grid_id).await?; let field_orders = params .field_ids .items @@ -96,7 +96,7 @@ pub(crate) async fn update_field_handler( manager: AppData>, ) -> Result<(), FlowyError> { let changeset: FieldChangesetParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(&changeset.grid_id)?; + let editor = manager.get_grid_editor(&changeset.grid_id).await?; let _ = editor.update_field(changeset).await?; Ok(()) } @@ -107,7 +107,7 @@ pub(crate) async fn update_field_type_option_handler( manager: AppData>, ) -> Result<(), FlowyError> { let params: UpdateFieldTypeOptionParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(¶ms.grid_id)?; + let editor = manager.get_grid_editor(¶ms.grid_id).await?; let _ = editor .update_field_type_option(¶ms.grid_id, ¶ms.field_id, params.type_option_data) .await?; @@ -120,7 +120,7 @@ pub(crate) async fn delete_field_handler( manager: AppData>, ) -> Result<(), FlowyError> { let params: FieldIdParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(¶ms.grid_id)?; + let editor = manager.get_grid_editor(¶ms.grid_id).await?; let _ = editor.delete_field(¶ms.field_id).await?; Ok(()) } @@ -131,7 +131,7 @@ pub(crate) async fn switch_to_field_handler( manager: AppData>, ) -> Result<(), FlowyError> { let params: EditFieldParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(¶ms.grid_id)?; + let editor = manager.get_grid_editor(¶ms.grid_id).await?; editor .switch_to_field_type(¶ms.field_id, ¶ms.field_type) .await?; @@ -157,7 +157,7 @@ pub(crate) async fn duplicate_field_handler( manager: AppData>, ) -> Result<(), FlowyError> { let params: FieldIdParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(¶ms.grid_id)?; + let editor = manager.get_grid_editor(¶ms.grid_id).await?; let _ = editor.duplicate_field(¶ms.field_id).await?; Ok(()) } @@ -169,7 +169,7 @@ pub(crate) async fn get_field_type_option_data_handler( manager: AppData>, ) -> DataResult { let params: FieldTypeOptionIdParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(¶ms.grid_id)?; + let editor = manager.get_grid_editor(¶ms.grid_id).await?; match editor.get_field_rev(¶ms.field_id).await { None => Err(FlowyError::record_not_found()), Some(field_rev) => { @@ -192,7 +192,7 @@ pub(crate) async fn create_field_type_option_data_handler( manager: AppData>, ) -> DataResult { let params: CreateFieldParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(¶ms.grid_id)?; + let editor = manager.get_grid_editor(¶ms.grid_id).await?; let field_rev = editor .create_new_field_rev(¶ms.field_type, params.type_option_data) .await?; @@ -212,7 +212,7 @@ pub(crate) async fn move_field_handler( manager: AppData>, ) -> Result<(), FlowyError> { let params: MoveFieldParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(¶ms.grid_id)?; + let editor = manager.get_grid_editor(¶ms.grid_id).await?; let _ = editor.move_field(params).await?; Ok(()) } @@ -237,7 +237,7 @@ pub(crate) async fn get_row_handler( manager: AppData>, ) -> DataResult { let params: RowIdParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(¶ms.grid_id)?; + let editor = manager.get_grid_editor(¶ms.grid_id).await?; let row = editor.get_row_rev(¶ms.row_id).await?.map(make_row_from_row_rev); data_result(OptionalRowPB { row }) @@ -249,7 +249,7 @@ pub(crate) async fn delete_row_handler( manager: AppData>, ) -> Result<(), FlowyError> { let params: RowIdParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(¶ms.grid_id)?; + let editor = manager.get_grid_editor(¶ms.grid_id).await?; let _ = editor.delete_row(¶ms.row_id).await?; Ok(()) } @@ -260,7 +260,7 @@ pub(crate) async fn duplicate_row_handler( manager: AppData>, ) -> Result<(), FlowyError> { let params: RowIdParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(¶ms.grid_id)?; + let editor = manager.get_grid_editor(¶ms.grid_id).await?; let _ = editor.duplicate_row(¶ms.row_id).await?; Ok(()) } @@ -271,7 +271,7 @@ pub(crate) async fn move_row_handler( manager: AppData>, ) -> Result<(), FlowyError> { let params: MoveRowParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(¶ms.view_id)?; + let editor = manager.get_grid_editor(¶ms.view_id).await?; let _ = editor.move_row(params).await?; Ok(()) } @@ -282,7 +282,7 @@ pub(crate) async fn create_table_row_handler( manager: AppData>, ) -> DataResult { let params: CreateRowParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(params.grid_id.as_ref())?; + let editor = manager.get_grid_editor(params.grid_id.as_ref()).await?; let row = editor.create_row(params).await?; data_result(row) } @@ -293,7 +293,7 @@ pub(crate) async fn get_cell_handler( manager: AppData>, ) -> DataResult { let params: GridCellIdParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(¶ms.grid_id)?; + let editor = manager.get_grid_editor(¶ms.grid_id).await?; match editor.get_cell(¶ms).await { None => data_result(GridCellPB::empty(¶ms.field_id)), Some(cell) => data_result(cell), @@ -306,7 +306,7 @@ pub(crate) async fn update_cell_handler( manager: AppData>, ) -> Result<(), FlowyError> { let changeset: CellChangesetPB = data.into_inner(); - let editor = manager.get_grid_editor(&changeset.grid_id)?; + let editor = manager.get_grid_editor(&changeset.grid_id).await?; let _ = editor.update_cell(changeset).await?; Ok(()) } @@ -317,7 +317,7 @@ pub(crate) async fn new_select_option_handler( manager: AppData>, ) -> DataResult { let params: CreateSelectOptionParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(¶ms.grid_id)?; + let editor = manager.get_grid_editor(¶ms.grid_id).await?; match editor.get_field_rev(¶ms.field_id).await { None => Err(ErrorCode::InvalidData.into()), Some(field_rev) => { @@ -334,7 +334,7 @@ pub(crate) async fn update_select_option_handler( manager: AppData>, ) -> Result<(), FlowyError> { let changeset: SelectOptionChangeset = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(&changeset.cell_identifier.grid_id)?; + let editor = manager.get_grid_editor(&changeset.cell_identifier.grid_id).await?; let _ = editor .modify_field_rev(&changeset.cell_identifier.field_id, |field_rev| { @@ -391,7 +391,7 @@ pub(crate) async fn get_select_option_handler( manager: AppData>, ) -> DataResult { let params: GridCellIdParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(¶ms.grid_id)?; + let editor = manager.get_grid_editor(¶ms.grid_id).await?; match editor.get_field_rev(¶ms.field_id).await { None => { tracing::error!("Can't find the select option field with id: {}", params.field_id); @@ -420,7 +420,7 @@ pub(crate) async fn update_select_option_cell_handler( manager: AppData>, ) -> Result<(), FlowyError> { let params: SelectOptionCellChangesetParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(¶ms.cell_identifier.grid_id)?; + let editor = manager.get_grid_editor(¶ms.cell_identifier.grid_id).await?; let _ = editor.update_cell(params.into()).await?; Ok(()) } @@ -431,7 +431,7 @@ pub(crate) async fn update_date_cell_handler( manager: AppData>, ) -> Result<(), FlowyError> { let params: DateChangesetParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(¶ms.cell_identifier.grid_id)?; + let editor = manager.get_grid_editor(¶ms.cell_identifier.grid_id).await?; let _ = editor.update_cell(params.into()).await?; Ok(()) } @@ -442,7 +442,7 @@ pub(crate) async fn get_groups_handler( manager: AppData>, ) -> DataResult { let params: GridIdPB = data.into_inner(); - let editor = manager.get_grid_editor(¶ms.value)?; + let editor = manager.get_grid_editor(¶ms.value).await?; let group = editor.load_groups().await?; data_result(group) } @@ -453,7 +453,7 @@ pub(crate) async fn create_board_card_handler( manager: AppData>, ) -> DataResult { let params: CreateRowParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(params.grid_id.as_ref())?; + let editor = manager.get_grid_editor(params.grid_id.as_ref()).await?; let row = editor.create_row(params).await?; data_result(row) } @@ -464,7 +464,7 @@ pub(crate) async fn move_group_handler( manager: AppData>, ) -> FlowyResult<()> { let params: MoveGroupParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(params.view_id.as_ref())?; + let editor = manager.get_grid_editor(params.view_id.as_ref()).await?; let _ = editor.move_group(params).await?; Ok(()) } @@ -475,7 +475,7 @@ pub(crate) async fn move_group_row_handler( manager: AppData>, ) -> FlowyResult<()> { let params: MoveGroupRowParams = data.into_inner().try_into()?; - let editor = manager.get_grid_editor(params.view_id.as_ref())?; + let editor = manager.get_grid_editor(params.view_id.as_ref()).await?; let _ = editor.move_group_row(params).await?; Ok(()) } diff --git a/frontend/rust-lib/flowy-grid/src/manager.rs b/frontend/rust-lib/flowy-grid/src/manager.rs index d44971c690..68bb61aea9 100644 --- a/frontend/rust-lib/flowy-grid/src/manager.rs +++ b/frontend/rust-lib/flowy-grid/src/manager.rs @@ -19,6 +19,8 @@ use flowy_revision::{ }; use flowy_sync::client_grid::{make_grid_block_operations, make_grid_operations, make_grid_view_operations}; use flowy_sync::entities::revision::Revision; +use lib_infra::ref_map::{RefCountHashMap, RefCountValue}; +use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; @@ -31,7 +33,7 @@ pub trait GridUser: Send + Sync { pub type GridTaskSchedulerRwLock = Arc>; pub struct GridManager { - grid_editors: Arc>>, + grid_editors: RwLock>>, grid_user: Arc, block_index_cache: Arc, #[allow(dead_code)] @@ -46,7 +48,7 @@ impl GridManager { _rev_web_socket: Arc, database: Arc, ) -> Self { - let grid_editors = Arc::new(DashMap::new()); + let grid_editors = RwLock::new(RefCountHashMap::new()); let kv_persistence = Arc::new(GridKVPersistence::new(database.clone())); let block_index_cache = Arc::new(BlockIndexCache::new(database.clone())); let task_scheduler = GridTaskScheduler::new(); @@ -107,35 +109,33 @@ impl GridManager { pub async fn close_grid>(&self, grid_id: T) -> FlowyResult<()> { let grid_id = grid_id.as_ref(); tracing::Span::current().record("grid_id", &grid_id); - self.grid_editors.remove(grid_id); + + self.grid_editors.write().await.remove(grid_id); self.task_scheduler.write().await.unregister_handler(grid_id); Ok(()) } // #[tracing::instrument(level = "debug", skip(self), err)] - pub fn get_grid_editor(&self, grid_id: &str) -> FlowyResult> { - match self.grid_editors.get(grid_id) { + pub async fn get_grid_editor(&self, grid_id: &str) -> FlowyResult> { + match self.grid_editors.read().await.get(grid_id) { None => Err(FlowyError::internal().context("Should call open_grid function first")), Some(editor) => Ok(editor.clone()), } } async fn get_or_create_grid_editor(&self, grid_id: &str) -> FlowyResult> { - match self.grid_editors.get(grid_id) { - None => { - if let Some(editor) = self.grid_editors.get(grid_id) { - tracing::warn!("Grid:{} already open", grid_id); - Ok(editor.clone()) - } else { - let db_pool = self.grid_user.db_pool()?; - let editor = self.make_grid_rev_editor(grid_id, db_pool).await?; - self.grid_editors.insert(grid_id.to_string(), editor.clone()); - self.task_scheduler.write().await.register_handler(editor.clone()); - Ok(editor) - } - } - Some(editor) => Ok(editor.clone()), + if let Some(editor) = self.grid_editors.read().await.get(grid_id) { + return Ok(editor.clone()); } + + let db_pool = self.grid_user.db_pool()?; + let editor = self.make_grid_rev_editor(grid_id, db_pool).await?; + self.grid_editors + .write() + .await + .insert(grid_id.to_string(), editor.clone()); + self.task_scheduler.write().await.register_handler(editor.clone()); + Ok(editor) } #[tracing::instrument(level = "trace", skip(self, pool), err)] @@ -240,3 +240,9 @@ pub async fn make_grid_view_data( Ok(grid_rev_delta_bytes) } + +impl RefCountValue for GridRevisionEditor { + fn did_remove(&self) { + self.close(); + } +} diff --git a/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs b/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs index 8598652abc..3a9e7ac16c 100644 --- a/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs +++ b/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs @@ -94,6 +94,8 @@ impl GridRevisionEditor { Ok(editor) } + pub fn close(&self) {} + /// Save the type-option data to disk and send a `GridNotification::DidUpdateField` notification /// to dart side. /// diff --git a/frontend/rust-lib/flowy-grid/src/services/tasks/scheduler.rs b/frontend/rust-lib/flowy-grid/src/services/tasks/scheduler.rs index 73ba298d9b..a4ebe36d2e 100644 --- a/frontend/rust-lib/flowy-grid/src/services/tasks/scheduler.rs +++ b/frontend/rust-lib/flowy-grid/src/services/tasks/scheduler.rs @@ -6,12 +6,13 @@ use crate::services::tasks::task::Task; use crate::services::tasks::{TaskContent, TaskId, TaskStatus}; use flowy_error::FlowyError; use lib_infra::future::BoxResultFuture; +use lib_infra::ref_map::{RefCountHashMap, RefCountValue}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::sync::{watch, RwLock}; -pub(crate) trait GridTaskHandler: Send + Sync + 'static { +pub(crate) trait GridTaskHandler: Send + Sync + 'static + RefCountValue { fn handler_id(&self) -> &str; fn process_content(&self, content: TaskContent) -> BoxResultFuture<(), FlowyError>; @@ -21,7 +22,7 @@ pub struct GridTaskScheduler { queue: GridTaskQueue, store: GridTaskStore, notifier: watch::Sender, - handlers: HashMap>, + handlers: RefCountHashMap>, } impl GridTaskScheduler { @@ -32,7 +33,7 @@ impl GridTaskScheduler { queue: GridTaskQueue::new(), store: GridTaskStore::new(), notifier, - handlers: HashMap::new(), + handlers: RefCountHashMap::new(), }; // The runner will receive the newest value after start running. scheduler.notify(); @@ -54,7 +55,7 @@ impl GridTaskScheduler { } pub(crate) fn unregister_handler>(&mut self, handler_id: T) { - let _ = self.handlers.remove(handler_id.as_ref()); + self.handlers.remove(handler_id.as_ref()); } #[allow(dead_code)] @@ -110,6 +111,7 @@ mod tests { use crate::services::tasks::{GridTaskHandler, GridTaskScheduler, Task, TaskContent, TaskStatus}; use flowy_error::FlowyError; use lib_infra::future::BoxResultFuture; + use lib_infra::ref_map::RefCountValue; use std::sync::Arc; use std::time::Duration; use tokio::time::interval; @@ -169,6 +171,11 @@ mod tests { assert_eq!(rx_2.await.unwrap().status, TaskStatus::Done); } struct MockGridTaskHandler(); + + impl RefCountValue for MockGridTaskHandler { + fn did_remove(&self) {} + } + impl GridTaskHandler for MockGridTaskHandler { fn handler_id(&self) -> &str { "1" diff --git a/shared-lib/lib-infra/src/lib.rs b/shared-lib/lib-infra/src/lib.rs index f304749731..9168f97f09 100644 --- a/shared-lib/lib-infra/src/lib.rs +++ b/shared-lib/lib-infra/src/lib.rs @@ -1,4 +1,5 @@ pub mod code_gen; pub mod future; +pub mod ref_map; pub mod retry; pub mod util; diff --git a/shared-lib/lib-infra/src/ref_map.rs b/shared-lib/lib-infra/src/ref_map.rs new file mode 100644 index 0000000000..1b9e3baad7 --- /dev/null +++ b/shared-lib/lib-infra/src/ref_map.rs @@ -0,0 +1,70 @@ +use std::collections::HashMap; +use std::sync::Arc; + +pub trait RefCountValue { + fn did_remove(&self); +} + +struct RefCountHandler { + ref_count: usize, + inner: T, +} + +impl RefCountHandler { + pub fn new(inner: T) -> Self { + Self { ref_count: 1, inner } + } + + pub fn increase_ref_count(&mut self) { + self.ref_count += 1; + } +} + +pub struct RefCountHashMap(HashMap>); + +impl RefCountHashMap +where + T: Clone + Send + Sync + RefCountValue, +{ + pub fn new() -> Self { + Self(Default::default()) + } + + pub fn get(&self, key: &str) -> Option { + self.0.get(key).and_then(|handler| Some(handler.inner.clone())) + } + + pub fn insert(&mut self, key: String, value: T) { + if let Some(handler) = self.0.get_mut(&key) { + handler.increase_ref_count(); + } else { + let handler = RefCountHandler::new(value); + self.0.insert(key, handler); + } + } + + pub fn remove(&mut self, key: &str) { + let mut should_remove = false; + if let Some(value) = self.0.get_mut(key) { + if value.ref_count > 0 { + value.ref_count -= 1; + } + should_remove = value.ref_count == 0; + } + + if should_remove { + if let Some(handler) = self.0.remove(key) { + handler.inner.did_remove(); + } + } + } +} + +impl RefCountValue for Arc +where + T: RefCountValue, +{ + fn did_remove(&self) { + (**self).did_remove() + } +}