From addcabea447624912697ab26a7750cc0ee7abf5f Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Sat, 4 Feb 2023 10:07:21 +0800 Subject: [PATCH] fix: potential fail to read the data of the row (#1793) --- .../src/client_database/block_revision_pad.rs | 2 +- .../flowy-database/src/event_handler.rs | 32 +++++++------- .../src/services/block_editor.rs | 44 ++++++++++++------- .../flowy-database/src/services/mod.rs | 1 + .../flowy-database/src/services/retry.rs | 31 +++++++++++++ shared-lib/lib-infra/src/retry/future.rs | 16 +++---- shared-lib/lib-ws/src/ws.rs | 2 +- 7 files changed, 84 insertions(+), 44 deletions(-) create mode 100644 frontend/rust-lib/flowy-database/src/services/retry.rs diff --git a/frontend/rust-lib/flowy-client-sync/src/client_database/block_revision_pad.rs b/frontend/rust-lib/flowy-client-sync/src/client_database/block_revision_pad.rs index 5e4984f752..75e3a2b89b 100644 --- a/frontend/rust-lib/flowy-client-sync/src/client_database/block_revision_pad.rs +++ b/frontend/rust-lib/flowy-client-sync/src/client_database/block_revision_pad.rs @@ -27,7 +27,7 @@ impl std::ops::Deref for GridBlockRevisionPad { } impl GridBlockRevisionPad { - pub async fn duplicate_data(&self, duplicated_block_id: &str) -> DatabaseBlockRevision { + pub fn duplicate_data(&self, duplicated_block_id: &str) -> DatabaseBlockRevision { let duplicated_rows = self .block .rows diff --git a/frontend/rust-lib/flowy-database/src/event_handler.rs b/frontend/rust-lib/flowy-database/src/event_handler.rs index 4b279228a6..de1580d576 100644 --- a/frontend/rust-lib/flowy-database/src/event_handler.rs +++ b/frontend/rust-lib/flowy-database/src/event_handler.rs @@ -375,6 +375,7 @@ pub(crate) async fn update_select_option_handler( let changeset: SelectOptionChangeset = data.into_inner().try_into()?; let editor = manager.get_database_editor(&changeset.cell_path.database_id).await?; let field_id = changeset.cell_path.field_id.clone(); + let (tx, rx) = tokio::sync::oneshot::channel(); editor .modify_field_rev(&field_id, |field_rev| { let mut type_option = select_type_option_from_field_rev(field_rev)?; @@ -403,27 +404,24 @@ pub(crate) async fn update_select_option_handler( if is_changed.is_some() { field_rev.insert_type_option(&*type_option); } - - if let Some(cell_changeset_str) = cell_changeset_str { - let cloned_editor = editor.clone(); - tokio::spawn(async move { - match cloned_editor - .update_cell_with_changeset( - &changeset.cell_path.row_id, - &changeset.cell_path.field_id, - cell_changeset_str, - ) - .await - { - Ok(_) => {} - Err(e) => tracing::error!("{}", e), - } - }); - } + let _ = tx.send(cell_changeset_str); Ok(is_changed) }) .await?; + if let Ok(Some(cell_changeset_str)) = rx.await { + match editor + .update_cell_with_changeset( + &changeset.cell_path.row_id, + &changeset.cell_path.field_id, + cell_changeset_str, + ) + .await + { + Ok(_) => {} + Err(e) => tracing::error!("{}", e), + } + } Ok(()) } diff --git a/frontend/rust-lib/flowy-database/src/services/block_editor.rs b/frontend/rust-lib/flowy-database/src/services/block_editor.rs index 0d4bed6b97..ec64d0d0ef 100644 --- a/frontend/rust-lib/flowy-database/src/services/block_editor.rs +++ b/frontend/rust-lib/flowy-database/src/services/block_editor.rs @@ -1,3 +1,4 @@ +use crate::services::retry::GetRowDataRetryAction; use bytes::Bytes; use flowy_client_sync::client_database::{GridBlockRevisionChangeset, GridBlockRevisionPad}; use flowy_client_sync::make_operations_from_revisions; @@ -8,11 +9,14 @@ use flowy_revision::{ use flowy_sqlite::ConnectionPool; use grid_model::{CellRevision, DatabaseBlockRevision, RowChangeset, RowRevision}; use lib_infra::future::FutureResult; +use lib_infra::retry::spawn_retry; use lib_ot::core::EmptyAttributes; +use parking_lot::RwLock; use revision_model::Revision; use std::borrow::Cow; use std::sync::Arc; -use tokio::sync::RwLock; +use std::time::Duration; +// use tokio::sync::RwLock; pub struct DatabaseBlockRevisionEditor { #[allow(dead_code)] @@ -53,7 +57,7 @@ impl DatabaseBlockRevisionEditor { } pub async fn duplicate_block(&self, duplicated_block_id: &str) -> DatabaseBlockRevision { - self.pad.read().await.duplicate_data(duplicated_block_id).await + self.pad.read().duplicate_data(duplicated_block_id) } /// Create a row after the the with prev_row_id. If prev_row_id is None, the row will be appended to the list @@ -108,20 +112,30 @@ impl DatabaseBlockRevisionEditor { } pub async fn index_of_row(&self, row_id: &str) -> Option { - self.pad.read().await.index_of_row(row_id) + self.pad.read().index_of_row(row_id) } pub async fn number_of_rows(&self) -> i32 { - self.pad.read().await.rows.len() as i32 + self.pad.read().rows.len() as i32 } pub async fn get_row_rev(&self, row_id: &str) -> FlowyResult)>> { - if self.pad.try_read().is_err() { - tracing::error!("Required grid block read lock failed"); - Ok(None) + let duration = Duration::from_millis(300); + if let Some(pad) = self.pad.try_read_for(duration) { + Ok(pad.get_row_rev(row_id)) } else { - let row_rev = self.pad.read().await.get_row_rev(row_id); - Ok(row_rev) + tracing::error!("Required grid block read lock failed, retrying"); + let retry = GetRowDataRetryAction { + row_id: row_id.to_owned(), + pad: self.pad.clone(), + }; + match spawn_retry(3, 300, retry).await { + Ok(value) => Ok(value), + Err(err) => { + tracing::error!("Read row revision failed with: {}", err); + Ok(None) + } + } } } @@ -129,7 +143,7 @@ impl DatabaseBlockRevisionEditor { where T: AsRef + ToOwned + ?Sized, { - let row_revs = self.pad.read().await.get_row_revs(row_ids)?; + let row_revs = self.pad.read().get_row_revs(row_ids)?; Ok(row_revs) } @@ -138,7 +152,7 @@ impl DatabaseBlockRevisionEditor { field_id: &str, row_ids: Option>>, ) -> FlowyResult> { - let cell_revs = self.pad.read().await.get_cell_revs(field_id, row_ids)?; + let cell_revs = self.pad.read().get_cell_revs(field_id, row_ids)?; Ok(cell_revs) } @@ -146,11 +160,11 @@ impl DatabaseBlockRevisionEditor { where F: for<'a> FnOnce(&'a mut GridBlockRevisionPad) -> FlowyResult>, { - let mut write_guard = self.pad.write().await; - match f(&mut write_guard)? { + let changeset = f(&mut self.pad.write())?; + match changeset { None => {} - Some(change) => { - self.apply_change(change).await?; + Some(changeset) => { + self.apply_change(changeset).await?; } } Ok(()) diff --git a/frontend/rust-lib/flowy-database/src/services/mod.rs b/frontend/rust-lib/flowy-database/src/services/mod.rs index d9dd4a5592..42c4168e4b 100644 --- a/frontend/rust-lib/flowy-database/src/services/mod.rs +++ b/frontend/rust-lib/flowy-database/src/services/mod.rs @@ -9,6 +9,7 @@ pub mod grid_editor; mod grid_editor_trait_impl; pub mod group; pub mod persistence; +mod retry; pub mod row; pub mod setting; pub mod sort; diff --git a/frontend/rust-lib/flowy-database/src/services/retry.rs b/frontend/rust-lib/flowy-database/src/services/retry.rs new file mode 100644 index 0000000000..27f0579bc4 --- /dev/null +++ b/frontend/rust-lib/flowy-database/src/services/retry.rs @@ -0,0 +1,31 @@ +use flowy_client_sync::client_database::GridBlockRevisionPad; +use flowy_error::FlowyError; +use grid_model::RowRevision; +use lib_infra::retry::Action; + +use parking_lot::RwLock; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +pub struct GetRowDataRetryAction { + pub row_id: String, + pub pad: Arc>, +} + +impl Action for GetRowDataRetryAction { + type Future = Pin> + Send + Sync>>; + type Item = Option<(usize, Arc)>; + type Error = FlowyError; + + fn run(&mut self) -> Self::Future { + let pad = self.pad.clone(); + let row_id = self.row_id.clone(); + Box::pin(async move { + match pad.try_read() { + None => Ok(None), + Some(read_guard) => Ok(read_guard.get_row_rev(&row_id)), + } + }) + } +} diff --git a/shared-lib/lib-infra/src/retry/future.rs b/shared-lib/lib-infra/src/retry/future.rs index d22d62044b..d605e512de 100644 --- a/shared-lib/lib-infra/src/retry/future.rs +++ b/shared-lib/lib-infra/src/retry/future.rs @@ -8,10 +8,7 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use tokio::{ - task::JoinHandle, - time::{sleep_until, Duration, Instant, Sleep}, -}; +use tokio::time::{sleep_until, Duration, Instant, Sleep}; #[pin_project(project = RetryStateProj)] enum RetryState @@ -55,7 +52,7 @@ where I: Iterator, A: Action, { - pub fn spawn>(strategy: T, action: A) -> Retry { + pub fn new>(strategy: T, action: A) -> Retry { Retry { retry_if: RetryIf::spawn(strategy, action, (|_| true) as fn(&A::Error) -> bool), } @@ -196,16 +193,15 @@ impl bool> Condition for F { } pub fn spawn_retry( - millis: u64, retry_count: usize, + retry_per_millis: u64, action: A, -) -> JoinHandle> +) -> impl Future> where A::Item: Send + Sync, A::Error: Send + Sync, ::Future: Send + Sync, { - let strategy = FixedInterval::from_millis(millis).take(retry_count); - let retry = Retry::spawn(strategy, action); - tokio::spawn(async move { retry.await }) + let strategy = FixedInterval::from_millis(retry_per_millis).take(retry_count); + Retry::new(strategy, action) } diff --git a/shared-lib/lib-ws/src/ws.rs b/shared-lib/lib-ws/src/ws.rs index aa297c4a0f..6d1c354b29 100644 --- a/shared-lib/lib-ws/src/ws.rs +++ b/shared-lib/lib-ws/src/ws.rs @@ -103,7 +103,7 @@ impl WSController { addr, handlers: self.handlers.clone(), }; - let retry = Retry::spawn(strategy, action); + let retry = Retry::new(strategy, action); conn_state_notify.update_state(WSConnectState::Connecting); drop(conn_state_notify);