fix: potential fail to read the data of the row (#1793)

This commit is contained in:
Nathan.fooo 2023-02-04 10:07:21 +08:00 committed by GitHub
parent 4b605b6373
commit addcabea44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 84 additions and 44 deletions

View File

@ -27,7 +27,7 @@ impl std::ops::Deref for GridBlockRevisionPad {
} }
impl GridBlockRevisionPad { impl GridBlockRevisionPad {
pub async fn duplicate_data(&self, duplicated_block_id: &str) -> DatabaseBlockRevision { pub fn duplicate_data(&self, duplicated_block_id: &str) -> DatabaseBlockRevision {
let duplicated_rows = self let duplicated_rows = self
.block .block
.rows .rows

View File

@ -375,6 +375,7 @@ pub(crate) async fn update_select_option_handler(
let changeset: SelectOptionChangeset = data.into_inner().try_into()?; let changeset: SelectOptionChangeset = data.into_inner().try_into()?;
let editor = manager.get_database_editor(&changeset.cell_path.database_id).await?; let editor = manager.get_database_editor(&changeset.cell_path.database_id).await?;
let field_id = changeset.cell_path.field_id.clone(); let field_id = changeset.cell_path.field_id.clone();
let (tx, rx) = tokio::sync::oneshot::channel();
editor editor
.modify_field_rev(&field_id, |field_rev| { .modify_field_rev(&field_id, |field_rev| {
let mut type_option = select_type_option_from_field_rev(field_rev)?; let mut type_option = select_type_option_from_field_rev(field_rev)?;
@ -403,27 +404,24 @@ pub(crate) async fn update_select_option_handler(
if is_changed.is_some() { if is_changed.is_some() {
field_rev.insert_type_option(&*type_option); field_rev.insert_type_option(&*type_option);
} }
let _ = tx.send(cell_changeset_str);
if let Some(cell_changeset_str) = cell_changeset_str {
let cloned_editor = editor.clone();
tokio::spawn(async move {
match cloned_editor
.update_cell_with_changeset(
&changeset.cell_path.row_id,
&changeset.cell_path.field_id,
cell_changeset_str,
)
.await
{
Ok(_) => {}
Err(e) => tracing::error!("{}", e),
}
});
}
Ok(is_changed) Ok(is_changed)
}) })
.await?; .await?;
if let Ok(Some(cell_changeset_str)) = rx.await {
match editor
.update_cell_with_changeset(
&changeset.cell_path.row_id,
&changeset.cell_path.field_id,
cell_changeset_str,
)
.await
{
Ok(_) => {}
Err(e) => tracing::error!("{}", e),
}
}
Ok(()) Ok(())
} }

View File

@ -1,3 +1,4 @@
use crate::services::retry::GetRowDataRetryAction;
use bytes::Bytes; use bytes::Bytes;
use flowy_client_sync::client_database::{GridBlockRevisionChangeset, GridBlockRevisionPad}; use flowy_client_sync::client_database::{GridBlockRevisionChangeset, GridBlockRevisionPad};
use flowy_client_sync::make_operations_from_revisions; use flowy_client_sync::make_operations_from_revisions;
@ -8,11 +9,14 @@ use flowy_revision::{
use flowy_sqlite::ConnectionPool; use flowy_sqlite::ConnectionPool;
use grid_model::{CellRevision, DatabaseBlockRevision, RowChangeset, RowRevision}; use grid_model::{CellRevision, DatabaseBlockRevision, RowChangeset, RowRevision};
use lib_infra::future::FutureResult; use lib_infra::future::FutureResult;
use lib_infra::retry::spawn_retry;
use lib_ot::core::EmptyAttributes; use lib_ot::core::EmptyAttributes;
use parking_lot::RwLock;
use revision_model::Revision; use revision_model::Revision;
use std::borrow::Cow; use std::borrow::Cow;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock; use std::time::Duration;
// use tokio::sync::RwLock;
pub struct DatabaseBlockRevisionEditor { pub struct DatabaseBlockRevisionEditor {
#[allow(dead_code)] #[allow(dead_code)]
@ -53,7 +57,7 @@ impl DatabaseBlockRevisionEditor {
} }
pub async fn duplicate_block(&self, duplicated_block_id: &str) -> DatabaseBlockRevision { pub async fn duplicate_block(&self, duplicated_block_id: &str) -> DatabaseBlockRevision {
self.pad.read().await.duplicate_data(duplicated_block_id).await self.pad.read().duplicate_data(duplicated_block_id)
} }
/// Create a row after the the with prev_row_id. If prev_row_id is None, the row will be appended to the list /// Create a row after the the with prev_row_id. If prev_row_id is None, the row will be appended to the list
@ -108,20 +112,30 @@ impl DatabaseBlockRevisionEditor {
} }
pub async fn index_of_row(&self, row_id: &str) -> Option<usize> { pub async fn index_of_row(&self, row_id: &str) -> Option<usize> {
self.pad.read().await.index_of_row(row_id) self.pad.read().index_of_row(row_id)
} }
pub async fn number_of_rows(&self) -> i32 { pub async fn number_of_rows(&self) -> i32 {
self.pad.read().await.rows.len() as i32 self.pad.read().rows.len() as i32
} }
pub async fn get_row_rev(&self, row_id: &str) -> FlowyResult<Option<(usize, Arc<RowRevision>)>> { pub async fn get_row_rev(&self, row_id: &str) -> FlowyResult<Option<(usize, Arc<RowRevision>)>> {
if self.pad.try_read().is_err() { let duration = Duration::from_millis(300);
tracing::error!("Required grid block read lock failed"); if let Some(pad) = self.pad.try_read_for(duration) {
Ok(None) Ok(pad.get_row_rev(row_id))
} else { } else {
let row_rev = self.pad.read().await.get_row_rev(row_id); tracing::error!("Required grid block read lock failed, retrying");
Ok(row_rev) let retry = GetRowDataRetryAction {
row_id: row_id.to_owned(),
pad: self.pad.clone(),
};
match spawn_retry(3, 300, retry).await {
Ok(value) => Ok(value),
Err(err) => {
tracing::error!("Read row revision failed with: {}", err);
Ok(None)
}
}
} }
} }
@ -129,7 +143,7 @@ impl DatabaseBlockRevisionEditor {
where where
T: AsRef<str> + ToOwned + ?Sized, T: AsRef<str> + ToOwned + ?Sized,
{ {
let row_revs = self.pad.read().await.get_row_revs(row_ids)?; let row_revs = self.pad.read().get_row_revs(row_ids)?;
Ok(row_revs) Ok(row_revs)
} }
@ -138,7 +152,7 @@ impl DatabaseBlockRevisionEditor {
field_id: &str, field_id: &str,
row_ids: Option<Vec<Cow<'_, String>>>, row_ids: Option<Vec<Cow<'_, String>>>,
) -> FlowyResult<Vec<CellRevision>> { ) -> FlowyResult<Vec<CellRevision>> {
let cell_revs = self.pad.read().await.get_cell_revs(field_id, row_ids)?; let cell_revs = self.pad.read().get_cell_revs(field_id, row_ids)?;
Ok(cell_revs) Ok(cell_revs)
} }
@ -146,11 +160,11 @@ impl DatabaseBlockRevisionEditor {
where where
F: for<'a> FnOnce(&'a mut GridBlockRevisionPad) -> FlowyResult<Option<GridBlockRevisionChangeset>>, F: for<'a> FnOnce(&'a mut GridBlockRevisionPad) -> FlowyResult<Option<GridBlockRevisionChangeset>>,
{ {
let mut write_guard = self.pad.write().await; let changeset = f(&mut self.pad.write())?;
match f(&mut write_guard)? { match changeset {
None => {} None => {}
Some(change) => { Some(changeset) => {
self.apply_change(change).await?; self.apply_change(changeset).await?;
} }
} }
Ok(()) Ok(())

View File

@ -9,6 +9,7 @@ pub mod grid_editor;
mod grid_editor_trait_impl; mod grid_editor_trait_impl;
pub mod group; pub mod group;
pub mod persistence; pub mod persistence;
mod retry;
pub mod row; pub mod row;
pub mod setting; pub mod setting;
pub mod sort; pub mod sort;

View File

@ -0,0 +1,31 @@
use flowy_client_sync::client_database::GridBlockRevisionPad;
use flowy_error::FlowyError;
use grid_model::RowRevision;
use lib_infra::retry::Action;
use parking_lot::RwLock;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
pub struct GetRowDataRetryAction {
pub row_id: String,
pub pad: Arc<RwLock<GridBlockRevisionPad>>,
}
impl Action for GetRowDataRetryAction {
type Future = Pin<Box<dyn Future<Output = Result<Self::Item, Self::Error>> + Send + Sync>>;
type Item = Option<(usize, Arc<RowRevision>)>;
type Error = FlowyError;
fn run(&mut self) -> Self::Future {
let pad = self.pad.clone();
let row_id = self.row_id.clone();
Box::pin(async move {
match pad.try_read() {
None => Ok(None),
Some(read_guard) => Ok(read_guard.get_row_rev(&row_id)),
}
})
}
}

View File

@ -8,10 +8,7 @@ use std::{
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use tokio::{ use tokio::time::{sleep_until, Duration, Instant, Sleep};
task::JoinHandle,
time::{sleep_until, Duration, Instant, Sleep},
};
#[pin_project(project = RetryStateProj)] #[pin_project(project = RetryStateProj)]
enum RetryState<A> enum RetryState<A>
@ -55,7 +52,7 @@ where
I: Iterator<Item = Duration>, I: Iterator<Item = Duration>,
A: Action, A: Action,
{ {
pub fn spawn<T: IntoIterator<IntoIter = I, Item = Duration>>(strategy: T, action: A) -> Retry<I, A> { pub fn new<T: IntoIterator<IntoIter = I, Item = Duration>>(strategy: T, action: A) -> Retry<I, A> {
Retry { Retry {
retry_if: RetryIf::spawn(strategy, action, (|_| true) as fn(&A::Error) -> bool), retry_if: RetryIf::spawn(strategy, action, (|_| true) as fn(&A::Error) -> bool),
} }
@ -196,16 +193,15 @@ impl<E, F: FnMut(&E) -> bool> Condition<E> for F {
} }
pub fn spawn_retry<A: Action + 'static>( pub fn spawn_retry<A: Action + 'static>(
millis: u64,
retry_count: usize, retry_count: usize,
retry_per_millis: u64,
action: A, action: A,
) -> JoinHandle<Result<A::Item, A::Error>> ) -> impl Future<Output = Result<A::Item, A::Error>>
where where
A::Item: Send + Sync, A::Item: Send + Sync,
A::Error: Send + Sync, A::Error: Send + Sync,
<A as Action>::Future: Send + Sync, <A as Action>::Future: Send + Sync,
{ {
let strategy = FixedInterval::from_millis(millis).take(retry_count); let strategy = FixedInterval::from_millis(retry_per_millis).take(retry_count);
let retry = Retry::spawn(strategy, action); Retry::new(strategy, action)
tokio::spawn(async move { retry.await })
} }

View File

@ -103,7 +103,7 @@ impl WSController {
addr, addr,
handlers: self.handlers.clone(), handlers: self.handlers.clone(),
}; };
let retry = Retry::spawn(strategy, action); let retry = Retry::new(strategy, action);
conn_state_notify.update_state(WSConnectState::Connecting); conn_state_notify.update_state(WSConnectState::Connecting);
drop(conn_state_notify); drop(conn_state_notify);