diff --git a/frontend/rust-lib/Cargo.lock b/frontend/rust-lib/Cargo.lock index 68bc48d2d7..aa7cd8aef7 100644 --- a/frontend/rust-lib/Cargo.lock +++ b/frontend/rust-lib/Cargo.lock @@ -93,6 +93,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "atomic_refcell" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73b5e5f48b927f04e952dedc932f31995a65a0bf65ec971c74436e51bf6e970d" + [[package]] name = "atty" version = "0.2.14" @@ -925,6 +931,7 @@ dependencies = [ name = "flowy-grid" version = "0.1.0" dependencies = [ + "atomic_refcell", "bytes", "chrono", "dart-notify", diff --git a/frontend/rust-lib/flowy-grid/Cargo.toml b/frontend/rust-lib/flowy-grid/Cargo.toml index 1cf38dbb3f..ba3702038e 100644 --- a/frontend/rust-lib/flowy-grid/Cargo.toml +++ b/frontend/rust-lib/flowy-grid/Cargo.toml @@ -39,6 +39,7 @@ fancy-regex = "0.10.0" regex = "1.5.6" url = { version = "2"} futures = "0.3.15" +atomic_refcell = "0.1.8" [dev-dependencies] flowy-test = { path = "../flowy-test" } diff --git a/frontend/rust-lib/flowy-grid/src/manager.rs b/frontend/rust-lib/flowy-grid/src/manager.rs index 57548e09e4..302264d8bd 100644 --- a/frontend/rust-lib/flowy-grid/src/manager.rs +++ b/frontend/rust-lib/flowy-grid/src/manager.rs @@ -2,16 +2,18 @@ use crate::services::grid_editor::GridRevisionEditor; use crate::services::persistence::block_index::BlockIndexCache; use crate::services::persistence::kv::GridKVPersistence; use crate::services::persistence::GridDatabase; +use crate::services::tasks::GridTaskScheduler; use bytes::Bytes; use dashmap::DashMap; use flowy_database::ConnectionPool; use flowy_error::{FlowyError, FlowyResult}; -use flowy_grid_data_model::revision::{BuildGridContext, GridRevision, GridSettingRevision}; +use flowy_grid_data_model::revision::{BuildGridContext, GridRevision}; use flowy_revision::disk::{SQLiteGridBlockMetaRevisionPersistence, SQLiteGridRevisionPersistence}; use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket}; use flowy_sync::client_grid::{make_block_meta_delta, make_grid_delta}; use flowy_sync::entities::revision::{RepeatedRevision, Revision}; use std::sync::Arc; +use tokio::sync::RwLock; pub trait GridUser: Send + Sync { fn user_id(&self) -> Result; @@ -20,11 +22,12 @@ pub trait GridUser: Send + Sync { } pub struct GridManager { - editor_map: Arc>>, + grid_editors: Arc>>, grid_user: Arc, block_index_cache: Arc, #[allow(dead_code)] kv_persistence: Arc, + task_scheduler: Arc>, } impl GridManager { @@ -35,12 +38,14 @@ impl GridManager { ) -> Self { let grid_editors = Arc::new(DashMap::new()); let kv_persistence = Arc::new(GridKVPersistence::new(database.clone())); - let block_index_persistence = Arc::new(BlockIndexCache::new(database)); + let block_index_cache = Arc::new(BlockIndexCache::new(database)); + let task_scheduler = GridTaskScheduler::new(); Self { - editor_map: grid_editors, + grid_editors, grid_user, kv_persistence, - block_index_cache: block_index_persistence, + block_index_cache, + task_scheduler, } } @@ -77,7 +82,7 @@ impl GridManager { pub fn close_grid>(&self, grid_id: T) -> FlowyResult<()> { let grid_id = grid_id.as_ref(); tracing::Span::current().record("grid_id", &grid_id); - self.editor_map.remove(grid_id); + self.grid_editors.remove(grid_id); Ok(()) } @@ -85,7 +90,7 @@ impl GridManager { pub fn delete_grid>(&self, grid_id: T) -> FlowyResult<()> { let grid_id = grid_id.as_ref(); tracing::Span::current().record("grid_id", &grid_id); - self.editor_map.remove(grid_id); + self.grid_editors.remove(grid_id); Ok(()) } @@ -93,23 +98,23 @@ impl GridManager { // #[tracing::instrument(level = "debug", skip(self), err)] pub fn get_grid_editor(&self, grid_id: &str) -> FlowyResult> { - match self.editor_map.get(grid_id) { + match self.grid_editors.get(grid_id) { None => Err(FlowyError::internal().context("Should call open_grid function first")), Some(editor) => Ok(editor.clone()), } } async fn get_or_create_grid_editor(&self, grid_id: &str) -> FlowyResult> { - match self.editor_map.get(grid_id) { + match self.grid_editors.get(grid_id) { None => { tracing::trace!("Create grid editor with id: {}", grid_id); let db_pool = self.grid_user.db_pool()?; let editor = self.make_grid_editor(grid_id, db_pool).await?; - if self.editor_map.contains_key(grid_id) { + if self.grid_editors.contains_key(grid_id) { tracing::warn!("Grid:{} already exists in cache", grid_id); } - self.editor_map.insert(grid_id.to_string(), editor.clone()); + self.grid_editors.insert(grid_id.to_string(), editor.clone()); Ok(editor) } Some(editor) => Ok(editor.clone()), @@ -124,7 +129,14 @@ impl GridManager { ) -> Result, FlowyError> { let user = self.grid_user.clone(); let rev_manager = self.make_grid_rev_manager(grid_id, pool.clone())?; - let grid_editor = GridRevisionEditor::new(grid_id, user, rev_manager, self.block_index_cache.clone()).await?; + let grid_editor = GridRevisionEditor::new( + grid_id, + user, + rev_manager, + self.block_index_cache.clone(), + self.task_scheduler.clone(), + ) + .await?; Ok(grid_editor) } @@ -164,10 +176,10 @@ pub async fn make_grid_view_data( }); // Create grid's block - let grid_block_meta_delta = make_block_meta_delta(&block_meta_data); + let grid_block_meta_delta = make_block_meta_delta(block_meta_data); let block_meta_delta_data = grid_block_meta_delta.to_delta_bytes(); let repeated_revision: RepeatedRevision = - Revision::initial_revision(user_id, &block_id, block_meta_delta_data).into(); + Revision::initial_revision(user_id, block_id, block_meta_delta_data).into(); let _ = grid_manager .create_grid_block_meta(&block_id, repeated_revision) .await?; diff --git a/frontend/rust-lib/flowy-grid/src/services/block_manager.rs b/frontend/rust-lib/flowy-grid/src/services/block_manager.rs index c0d4019f57..a78f6fa37b 100644 --- a/frontend/rust-lib/flowy-grid/src/services/block_manager.rs +++ b/frontend/rust-lib/flowy-grid/src/services/block_manager.rs @@ -30,10 +30,10 @@ impl GridBlockManager { pub(crate) async fn new( grid_id: &str, user: &Arc, - block_revs: Vec>, + block_meta_revs: Vec>, persistence: Arc, ) -> FlowyResult { - let editor_map = make_block_meta_editor_map(user, block_revs).await?; + let editor_map = make_block_meta_editor_map(user, block_meta_revs).await?; let user = user.clone(); let grid_id = grid_id.to_owned(); let manager = Self { @@ -265,12 +265,12 @@ impl GridBlockManager { async fn make_block_meta_editor_map( user: &Arc, - block_revs: Vec>, + block_meta_revs: Vec>, ) -> FlowyResult>> { let editor_map = DashMap::new(); - for block_rev in block_revs { - let editor = make_block_meta_editor(user, &block_rev.block_id).await?; - editor_map.insert(block_rev.block_id.clone(), Arc::new(editor)); + for block_meta_rev in block_meta_revs { + let editor = make_block_meta_editor(user, &block_meta_rev.block_id).await?; + editor_map.insert(block_meta_rev.block_id.clone(), Arc::new(editor)); } Ok(editor_map) diff --git a/frontend/rust-lib/flowy-grid/src/services/filter/filter_runner.rs b/frontend/rust-lib/flowy-grid/src/services/filter/filter_runner.rs deleted file mode 100644 index 195dd8c5f8..0000000000 --- a/frontend/rust-lib/flowy-grid/src/services/filter/filter_runner.rs +++ /dev/null @@ -1 +0,0 @@ -pub(crate) struct FilterRunner {} diff --git a/frontend/rust-lib/flowy-grid/src/services/filter/filter_service.rs b/frontend/rust-lib/flowy-grid/src/services/filter/filter_service.rs new file mode 100644 index 0000000000..9b6ca32bf1 --- /dev/null +++ b/frontend/rust-lib/flowy-grid/src/services/filter/filter_service.rs @@ -0,0 +1,6 @@ +pub struct GridFilterService {} +impl GridFilterService { + pub fn new() -> Self { + Self {} + } +} diff --git a/frontend/rust-lib/flowy-grid/src/services/filter/mod.rs b/frontend/rust-lib/flowy-grid/src/services/filter/mod.rs index 04be28a6ef..98bdf94ec1 100644 --- a/frontend/rust-lib/flowy-grid/src/services/filter/mod.rs +++ b/frontend/rust-lib/flowy-grid/src/services/filter/mod.rs @@ -1,3 +1,3 @@ -mod filter_runner; +mod filter_service; -pub use filter_runner::*; +pub use filter_service::*; diff --git a/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs b/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs index 9bfcae6580..19a4304875 100644 --- a/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs +++ b/frontend/rust-lib/flowy-grid/src/services/grid_editor.rs @@ -3,8 +3,10 @@ use crate::entities::CellIdentifier; use crate::manager::GridUser; use crate::services::block_manager::GridBlockManager; use crate::services::field::{default_type_option_builder_from_type, type_option_builder_from_bytes, FieldBuilder}; +use crate::services::filter::GridFilterService; use crate::services::persistence::block_index::BlockIndexCache; use crate::services::row::*; +use crate::services::tasks::GridTaskScheduler; use bytes::Bytes; use flowy_error::{ErrorCode, FlowyError, FlowyResult}; use flowy_grid_data_model::entities::*; @@ -21,16 +23,18 @@ use std::sync::Arc; use tokio::sync::RwLock; pub struct GridRevisionEditor { - grid_id: String, + pub(crate) grid_id: String, user: Arc, grid_pad: Arc>, rev_manager: Arc, block_manager: Arc, + task_scheduler: Arc>, + pub(crate) filter_service: Arc, } impl Drop for GridRevisionEditor { fn drop(&mut self) { - tracing::trace!("Drop GridMetaEditor"); + tracing::trace!("Drop GridRevisionEditor"); } } @@ -40,22 +44,29 @@ impl GridRevisionEditor { user: Arc, mut rev_manager: RevisionManager, persistence: Arc, + task_scheduler: Arc>, ) -> FlowyResult> { let token = user.token()?; let cloud = Arc::new(GridRevisionCloudService { token }); let grid_pad = rev_manager.load::(Some(cloud)).await?; let rev_manager = Arc::new(rev_manager); let grid_pad = Arc::new(RwLock::new(grid_pad)); - let block_revs = grid_pad.read().await.get_block_meta_revs(); - - let block_meta_manager = Arc::new(GridBlockManager::new(grid_id, &user, block_revs, persistence).await?); - Ok(Arc::new(Self { + let block_meta_revs = grid_pad.read().await.get_block_meta_revs(); + let block_manager = Arc::new(GridBlockManager::new(grid_id, &user, block_meta_revs, persistence).await?); + let filter_service = Arc::new(GridFilterService::new()); + let editor = Arc::new(Self { grid_id: grid_id.to_owned(), user, grid_pad, rev_manager, - block_manager: block_meta_manager, - })) + block_manager, + filter_service, + task_scheduler: task_scheduler.clone(), + }); + + task_scheduler.write().await.register_handler(editor.clone()); + + Ok(editor) } pub async fn insert_field(&self, params: InsertFieldParams) -> FlowyResult<()> { diff --git a/frontend/rust-lib/flowy-grid/src/services/grid_editor_task.rs b/frontend/rust-lib/flowy-grid/src/services/grid_editor_task.rs new file mode 100644 index 0000000000..9d6c47deda --- /dev/null +++ b/frontend/rust-lib/flowy-grid/src/services/grid_editor_task.rs @@ -0,0 +1,21 @@ +use crate::services::grid_editor::GridRevisionEditor; +use crate::services::tasks::{GridTaskHandler, Task, TaskContent}; +use flowy_error::FlowyError; +use lib_infra::future::BoxResultFuture; +use std::sync::Arc; + +impl GridTaskHandler for Arc { + fn handler_id(&self) -> &str { + &self.grid_id + } + + fn process_task(&self, task: Task) -> BoxResultFuture<(), FlowyError> { + Box::pin(async move { + match task.content { + TaskContent::Snapshot { .. } => {} + TaskContent::Filter => {} + } + Ok(()) + }) + } +} diff --git a/frontend/rust-lib/flowy-grid/src/services/mod.rs b/frontend/rust-lib/flowy-grid/src/services/mod.rs index 9fd0067416..e051544839 100644 --- a/frontend/rust-lib/flowy-grid/src/services/mod.rs +++ b/frontend/rust-lib/flowy-grid/src/services/mod.rs @@ -5,7 +5,9 @@ pub mod block_revision_editor; pub mod field; mod filter; pub mod grid_editor; +mod grid_editor_task; pub mod persistence; pub mod row; pub mod setting; -// mod tasks; +mod snapshot; +pub mod tasks; diff --git a/frontend/rust-lib/flowy-grid/src/services/row/cell_data_operation.rs b/frontend/rust-lib/flowy-grid/src/services/row/cell_data_operation.rs index 9a290254ef..aaa7300046 100644 --- a/frontend/rust-lib/flowy-grid/src/services/row/cell_data_operation.rs +++ b/frontend/rust-lib/flowy-grid/src/services/row/cell_data_operation.rs @@ -137,7 +137,7 @@ pub fn decode_cell_data>(data: T, field_rev: &Fie if let Ok(type_option_cell_data) = data.try_into() { let TypeOptionCellData { data, field_type } = type_option_cell_data; let to_field_type = &field_rev.field_type; - match try_decode_cell_data(data, &field_rev, &field_type, to_field_type) { + match try_decode_cell_data(data, field_rev, &field_type, to_field_type) { Ok(cell_data) => cell_data, Err(e) => { tracing::error!("Decode cell data failed, {:?}", e); diff --git a/frontend/rust-lib/flowy-grid/src/services/snapshot/mod.rs b/frontend/rust-lib/flowy-grid/src/services/snapshot/mod.rs new file mode 100644 index 0000000000..9c00ccc85f --- /dev/null +++ b/frontend/rust-lib/flowy-grid/src/services/snapshot/mod.rs @@ -0,0 +1,3 @@ +mod snapshot_service; + +pub use snapshot_service::*; diff --git a/frontend/rust-lib/flowy-grid/src/services/snapshot/snapshot_service.rs b/frontend/rust-lib/flowy-grid/src/services/snapshot/snapshot_service.rs new file mode 100644 index 0000000000..9f847e4d2a --- /dev/null +++ b/frontend/rust-lib/flowy-grid/src/services/snapshot/snapshot_service.rs @@ -0,0 +1,7 @@ +pub struct GridSnapshotService {} + +impl GridSnapshotService { + pub fn new() -> Self { + Self {} + } +} diff --git a/frontend/rust-lib/flowy-grid/src/services/tasks/filter/mod.rs b/frontend/rust-lib/flowy-grid/src/services/tasks/filter/mod.rs deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/frontend/rust-lib/flowy-grid/src/services/tasks/mod.rs b/frontend/rust-lib/flowy-grid/src/services/tasks/mod.rs index 5dade00d39..7438cf47f9 100644 --- a/frontend/rust-lib/flowy-grid/src/services/tasks/mod.rs +++ b/frontend/rust-lib/flowy-grid/src/services/tasks/mod.rs @@ -1,3 +1,8 @@ -mod filter; +mod queue; mod runner; mod scheduler; +mod store; +mod task; + +pub use scheduler::*; +pub use task::*; diff --git a/frontend/rust-lib/flowy-grid/src/services/tasks/queue.rs b/frontend/rust-lib/flowy-grid/src/services/tasks/queue.rs new file mode 100644 index 0000000000..6e3f37399d --- /dev/null +++ b/frontend/rust-lib/flowy-grid/src/services/tasks/queue.rs @@ -0,0 +1,118 @@ +use crate::services::tasks::task::{PendingTask, Task, TaskContent, TaskType}; +use atomic_refcell::AtomicRefCell; + +use std::cmp::Ordering; +use std::collections::hash_map::Entry; +use std::collections::{BinaryHeap, HashMap}; +use std::ops::{Deref, DerefMut}; +use std::sync::Arc; + +#[derive(Default)] +pub(crate) struct GridTaskQueue { + // index_tasks for quick access + index_tasks: HashMap>>, + queue: BinaryHeap>>, +} + +impl GridTaskQueue { + pub(crate) fn new() -> Self { + Self::default() + } + + pub(crate) fn push(&mut self, task: &Task) { + let task_type = match task.content { + TaskContent::Snapshot { .. } => TaskType::Snapshot, + TaskContent::Filter => TaskType::Filter, + }; + let pending_task = PendingTask { + ty: task_type, + id: task.id.clone(), + }; + match self.index_tasks.entry("1".to_owned()) { + Entry::Occupied(entry) => { + let mut list = entry.get().borrow_mut(); + assert!(list.peek().map(|old_id| pending_task.id >= old_id.id).unwrap_or(true)); + list.push(pending_task); + } + Entry::Vacant(entry) => { + let mut task_list = TaskList::new(entry.key()); + task_list.push(pending_task); + let task_list = Arc::new(AtomicRefCell::new(task_list)); + entry.insert(task_list.clone()); + self.queue.push(task_list); + } + } + } + + pub(crate) fn mut_head(&mut self, mut f: F) -> Option + where + F: FnMut(&mut TaskList) -> T, + { + let head = self.queue.pop()?; + let result = { + let mut ref_head = head.borrow_mut(); + f(&mut *ref_head) + }; + if !head.borrow().tasks.is_empty() { + self.queue.push(head); + } else { + self.index_tasks.remove(&head.borrow().id); + } + + Some(result) + } +} + +#[derive(Debug)] +pub(crate) struct TaskList { + id: String, + tasks: BinaryHeap, +} + +impl Deref for TaskList { + type Target = BinaryHeap; + + fn deref(&self) -> &Self::Target { + &self.tasks + } +} + +impl DerefMut for TaskList { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.tasks + } +} + +impl TaskList { + fn new(id: &str) -> Self { + Self { + id: id.to_owned(), + tasks: BinaryHeap::new(), + } + } +} + +impl PartialEq for TaskList { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl Eq for TaskList {} + +impl Ord for TaskList { + fn cmp(&self, other: &Self) -> Ordering { + match (self.peek(), other.peek()) { + (None, None) => Ordering::Equal, + (None, Some(_)) => Ordering::Less, + (Some(_), None) => Ordering::Greater, + (Some(lhs), Some(rhs)) => lhs.cmp(rhs), + } + } +} + +impl PartialOrd for TaskList { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} diff --git a/frontend/rust-lib/flowy-grid/src/services/tasks/runner.rs b/frontend/rust-lib/flowy-grid/src/services/tasks/runner.rs index e69de29bb2..254ae84d9e 100644 --- a/frontend/rust-lib/flowy-grid/src/services/tasks/runner.rs +++ b/frontend/rust-lib/flowy-grid/src/services/tasks/runner.rs @@ -0,0 +1,45 @@ +use crate::services::tasks::scheduler::GridTaskScheduler; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::{watch, RwLock}; +use tokio::time::interval; + +pub struct GridTaskRunner { + scheduler: Arc>, + debounce_duration: Duration, + notifier: Option>, +} + +impl GridTaskRunner { + pub fn new( + scheduler: Arc>, + notifier: watch::Receiver<()>, + debounce_duration: Duration, + ) -> Self { + Self { + scheduler, + debounce_duration, + notifier: Some(notifier), + } + } + + pub async fn run(mut self) { + let mut notifier = self + .notifier + .take() + .expect("The GridTaskRunner's notifier should only take once"); + + loop { + if notifier.changed().await.is_err() { + // The runner will be stopped if the corresponding Sender drop. + break; + } + let mut interval = interval(self.debounce_duration); + interval.tick().await; + + if let Err(e) = self.scheduler.write().await.process_next_task() { + tracing::error!("{:?}", e); + } + } + } +} diff --git a/frontend/rust-lib/flowy-grid/src/services/tasks/scheduler.rs b/frontend/rust-lib/flowy-grid/src/services/tasks/scheduler.rs index 5ae3feafef..74d5baefbe 100644 --- a/frontend/rust-lib/flowy-grid/src/services/tasks/scheduler.rs +++ b/frontend/rust-lib/flowy-grid/src/services/tasks/scheduler.rs @@ -1,76 +1,63 @@ -use std::cmp::Ordering; -use std::collections::BinaryHeap; -use std::ops::{Deref, DerefMut}; +use crate::services::tasks::queue::GridTaskQueue; +use crate::services::tasks::runner::GridTaskRunner; +use crate::services::tasks::store::GridTaskStore; +use crate::services::tasks::task::Task; +use flowy_error::{FlowyError, FlowyResult}; +use lib_infra::future::BoxResultFuture; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::{watch, RwLock}; -enum TaskType { - /// Remove the row if it doesn't satisfy the filter. - Filter, - /// Generate snapshot for grid, unused by now. - Snapshot, +pub trait GridTaskHandler: Send + Sync + 'static { + fn handler_id(&self) -> &str; + + fn process_task(&self, task: Task) -> BoxResultFuture<(), FlowyError>; } -/// Two tasks are equal if they have the same type. -impl PartialEq for TaskType { - fn eq(&self, other: &Self) -> bool { - matches!((self, other),) - } -} - -pub type TaskId = u32; - -#[derive(Eq, Debug, Clone, Copy)] -struct PendingTask { - kind: TaskType, - id: TaskId, -} - -impl PartialEq for PendingTask { - fn eq(&self, other: &Self) -> bool { - self.id.eq(&other.id) - } -} - -impl PartialOrd for PendingTask { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for PendingTask { - fn cmp(&self, other: &Self) -> Ordering { - self.id.cmp(&other.id).reverse() - } -} - -#[derive(PartialEq, Eq, Hash, Debug, Clone)] -enum TaskListIdentifier { - Filter(String), - Snapshot(String), -} - -#[derive(Debug)] -struct TaskList { - tasks: BinaryHeap, -} - -impl Deref for TaskList { - type Target = BinaryHeap; - - fn deref(&self) -> &Self::Target { - &self.tasks - } -} - -impl DerefMut for TaskList { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.tasks - } -} - -impl TaskList { - fn new() -> Self { - Self { - tasks: Default::default(), - } +pub struct GridTaskScheduler { + queue: GridTaskQueue, + store: GridTaskStore, + notifier: watch::Sender<()>, + handlers: Vec>, +} + +impl GridTaskScheduler { + pub fn new() -> Arc> { + let (notifier, rx) = watch::channel(()); + + let scheduler = Self { + queue: GridTaskQueue::new(), + store: GridTaskStore::new(), + notifier, + handlers: vec![], + }; + // The runner will receive the newest value after start running. + scheduler.notify(); + + let scheduler = Arc::new(RwLock::new(scheduler)); + let debounce_duration = Duration::from_millis(300); + let runner = GridTaskRunner::new(scheduler.clone(), rx, debounce_duration); + tokio::spawn(runner.run()); + + scheduler + } + + pub fn register_handler(&mut self, handler: T) + where + T: GridTaskHandler, + { + // todo!() + } + + pub fn process_next_task(&mut self) -> FlowyResult<()> { + Ok(()) + } + + pub fn register_task(&self, task: Task) { + assert!(!task.is_finished()); + } + + pub fn notify(&self) { + let _ = self.notifier.send(()); } } diff --git a/frontend/rust-lib/flowy-grid/src/services/tasks/store.rs b/frontend/rust-lib/flowy-grid/src/services/tasks/store.rs new file mode 100644 index 0000000000..54aa06727b --- /dev/null +++ b/frontend/rust-lib/flowy-grid/src/services/tasks/store.rs @@ -0,0 +1,11 @@ +use crate::services::tasks::task::Task; + +pub struct GridTaskStore { + tasks: Vec, +} + +impl GridTaskStore { + pub fn new() -> Self { + Self { tasks: vec![] } + } +} diff --git a/frontend/rust-lib/flowy-grid/src/services/tasks/task.rs b/frontend/rust-lib/flowy-grid/src/services/tasks/task.rs new file mode 100644 index 0000000000..0e863532c9 --- /dev/null +++ b/frontend/rust-lib/flowy-grid/src/services/tasks/task.rs @@ -0,0 +1,67 @@ +use std::cmp::Ordering; + +#[derive(Eq, Debug, Clone, Copy)] +pub enum TaskType { + /// Remove the row if it doesn't satisfy the filter. + Filter, + /// Generate snapshot for grid, unused by now. + Snapshot, +} + +impl PartialEq for TaskType { + fn eq(&self, other: &Self) -> bool { + matches!( + (self, other), + (Self::Filter, Self::Filter) | (Self::Snapshot, Self::Snapshot) + ) + } +} + +pub type TaskId = u32; + +#[derive(Eq, Debug, Clone, Copy)] +pub struct PendingTask { + pub ty: TaskType, + pub id: TaskId, +} + +impl PartialEq for PendingTask { + fn eq(&self, other: &Self) -> bool { + self.id.eq(&other.id) + } +} + +impl PartialOrd for PendingTask { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for PendingTask { + fn cmp(&self, other: &Self) -> Ordering { + match (self.ty, other.ty) { + (TaskType::Snapshot, TaskType::Snapshot) => Ordering::Equal, + (TaskType::Snapshot, _) => Ordering::Greater, + (_, TaskType::Snapshot) => Ordering::Less, + (TaskType::Filter, TaskType::Filter) => self.id.cmp(&other.id), + } + } +} + +pub type ContentId = String; + +pub enum TaskContent { + Snapshot { content_id: ContentId }, + Filter, +} + +pub struct Task { + pub id: TaskId, + pub content: TaskContent, +} + +impl Task { + pub fn is_finished(&self) -> bool { + todo!() + } +} diff --git a/frontend/rust-lib/flowy-grid/tests/grid/cell_test.rs b/frontend/rust-lib/flowy-grid/tests/grid/cell_test.rs index 1efc19c81f..225389bd6c 100644 --- a/frontend/rust-lib/flowy-grid/tests/grid/cell_test.rs +++ b/frontend/rust-lib/flowy-grid/tests/grid/cell_test.rs @@ -9,7 +9,7 @@ async fn grid_cell_update() { let mut test = GridEditorTest::new().await; let field_revs = &test.field_revs; let row_revs = &test.row_revs; - let grid_blocks = &test.grid_block_revs; + let grid_blocks = &test.block_meta_revs; // For the moment, We only have one block to store rows let block_id = &grid_blocks.first().unwrap().block_id; diff --git a/frontend/rust-lib/flowy-grid/tests/grid/script.rs b/frontend/rust-lib/flowy-grid/tests/grid/script.rs index c4d35aeeca..64163cf315 100644 --- a/frontend/rust-lib/flowy-grid/tests/grid/script.rs +++ b/frontend/rust-lib/flowy-grid/tests/grid/script.rs @@ -87,7 +87,7 @@ pub struct GridEditorTest { pub grid_id: String, pub editor: Arc, pub field_revs: Vec, - pub grid_block_revs: Vec, + pub block_meta_revs: Vec>, pub row_revs: Vec>, pub field_count: usize, @@ -103,10 +103,10 @@ impl GridEditorTest { let test = ViewTest::new_grid_view(&sdk, view_data.to_vec()).await; let editor = sdk.grid_manager.open_grid(&test.view.id).await.unwrap(); let field_revs = editor.get_field_revs::(None).await.unwrap(); - let grid_blocks = editor.get_block_meta_revs().await.unwrap(); + let block_meta_revs = editor.get_block_meta_revs().await.unwrap(); let row_revs = editor.grid_block_snapshots(None).await.unwrap().pop().unwrap().row_revs; assert_eq!(row_revs.len(), 3); - assert_eq!(grid_blocks.len(), 1); + assert_eq!(block_meta_revs.len(), 1); // It seems like you should add the field in the make_test_grid() function. // Because we assert the initialize count of the fields is equal to FieldType::COUNT. @@ -118,7 +118,7 @@ impl GridEditorTest { grid_id, editor, field_revs, - grid_block_revs: grid_blocks, + block_meta_revs, row_revs, field_count: FieldType::COUNT, row_order_by_row_id: HashMap::default(), @@ -172,7 +172,7 @@ impl GridEditorTest { } EditorScript::CreateBlock { block } => { self.editor.create_block(block).await.unwrap(); - self.grid_block_revs = self.editor.get_block_meta_revs().await.unwrap(); + self.block_meta_revs = self.editor.get_block_meta_revs().await.unwrap(); } EditorScript::UpdateBlock { changeset: change } => { self.editor.update_block(change).await.unwrap(); @@ -185,19 +185,19 @@ impl GridEditorTest { row_count, start_row_index, } => { - assert_eq!(self.grid_block_revs[block_index].row_count, row_count); - assert_eq!(self.grid_block_revs[block_index].start_row_index, start_row_index); + assert_eq!(self.block_meta_revs[block_index].row_count, row_count); + assert_eq!(self.block_meta_revs[block_index].start_row_index, start_row_index); } EditorScript::AssertBlockEqual { block_index, block } => { let blocks = self.editor.get_block_meta_revs().await.unwrap(); let compared_block = blocks[block_index].clone(); - assert_eq!(compared_block, block); + assert_eq!(compared_block, Arc::new(block)); } EditorScript::CreateEmptyRow => { let row_order = self.editor.create_row(None).await.unwrap(); self.row_order_by_row_id.insert(row_order.row_id.clone(), row_order); self.row_revs = self.get_row_revs().await; - self.grid_block_revs = self.editor.get_block_meta_revs().await.unwrap(); + self.block_meta_revs = self.editor.get_block_meta_revs().await.unwrap(); } EditorScript::CreateRow { payload: context } => { let row_orders = self.editor.insert_rows(vec![context]).await.unwrap(); @@ -205,7 +205,7 @@ impl GridEditorTest { self.row_order_by_row_id.insert(row_order.row_id.clone(), row_order); } self.row_revs = self.get_row_revs().await; - self.grid_block_revs = self.editor.get_block_meta_revs().await.unwrap(); + self.block_meta_revs = self.editor.get_block_meta_revs().await.unwrap(); } EditorScript::UpdateRow { changeset: change } => self.editor.update_row(change).await.unwrap(), EditorScript::DeleteRows { row_ids } => { @@ -216,7 +216,7 @@ impl GridEditorTest { self.editor.delete_rows(row_orders).await.unwrap(); self.row_revs = self.get_row_revs().await; - self.grid_block_revs = self.editor.get_block_meta_revs().await.unwrap(); + self.block_meta_revs = self.editor.get_block_meta_revs().await.unwrap(); } EditorScript::AssertRow { expected_row } => { let row = &*self diff --git a/frontend/rust-lib/flowy-text-block/src/editor.rs b/frontend/rust-lib/flowy-text-block/src/editor.rs index c1e50536a5..87b4416ef0 100644 --- a/frontend/rust-lib/flowy-text-block/src/editor.rs +++ b/frontend/rust-lib/flowy-text-block/src/editor.rs @@ -199,6 +199,15 @@ fn spawn_edit_queue( ) -> EditorCommandSender { let (sender, receiver) = mpsc::channel(1000); let edit_queue = EditBlockQueue::new(user, rev_manager, delta, receiver); + // We can use tokio::task::spawn_local here by using tokio::spawn_blocking. + // https://github.com/tokio-rs/tokio/issues/2095 + // tokio::task::spawn_blocking(move || { + // let rt = tokio::runtime::Handle::current(); + // rt.block_on(async { + // let local = tokio::task::LocalSet::new(); + // local.run_until(edit_queue.run()).await; + // }); + // }); tokio::spawn(edit_queue.run()); sender } diff --git a/shared-lib/flowy-sync/src/client_grid/grid_block_revsion_pad.rs b/shared-lib/flowy-sync/src/client_grid/grid_block_revsion_pad.rs index 15f77861c3..8ee18309ad 100644 --- a/shared-lib/flowy-sync/src/client_grid/grid_block_revsion_pad.rs +++ b/shared-lib/flowy-sync/src/client_grid/grid_block_revsion_pad.rs @@ -5,7 +5,6 @@ use flowy_grid_data_model::revision::{ gen_block_id, gen_row_id, CellRevision, GridBlockRevision, RowMetaChangeset, RowRevision, }; use lib_ot::core::{OperationTransformable, PlainTextAttributes, PlainTextDelta, PlainTextDeltaBuilder}; -use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::collections::HashMap; use std::sync::Arc; @@ -19,6 +18,14 @@ pub struct GridBlockRevisionPad { pub(crate) delta: GridBlockRevisionDelta, } +impl std::ops::Deref for GridBlockRevisionPad { + type Target = GridBlockRevision; + + fn deref(&self) -> &Self::Target { + &self.block_revision + } +} + impl GridBlockRevisionPad { pub async fn duplicate_data(&self, duplicated_block_id: &str) -> GridBlockRevision { let duplicated_rows = self