fix: require write lock to make sure exclusive accest to sync_seq

This commit is contained in:
appflowy 2022-08-18 08:37:45 +08:00
parent 6dfed4a7b0
commit 7361172c89

View File

@ -73,12 +73,13 @@ impl RevisionPersistence {
revision: &'a Revision, revision: &'a Revision,
compactor: &Arc<dyn RevisionCompactor + 'a>, compactor: &Arc<dyn RevisionCompactor + 'a>,
) -> FlowyResult<i64> { ) -> FlowyResult<i64> {
let result = self.sync_seq.read().await.compact(); let mut sync_seq_write_guard = self.sync_seq.write().await;
let result = sync_seq_write_guard.compact();
match result { match result {
None => { None => {
tracing::Span::current().record("rev_id", &revision.rev_id); tracing::Span::current().record("rev_id", &revision.rev_id);
self.add(revision.clone(), RevisionState::Sync, true).await?; self.add(revision.clone(), RevisionState::Sync, true).await?;
self.sync_seq.write().await.add(revision.rev_id)?; sync_seq_write_guard.add(revision.rev_id)?;
Ok(revision.rev_id) Ok(revision.rev_id)
} }
Some((range, mut compact_seq)) => { Some((range, mut compact_seq)) => {
@ -101,8 +102,10 @@ impl RevisionPersistence {
// replace the revisions in range with compact revision // replace the revisions in range with compact revision
self.compact(&range, compact_revision).await?; self.compact(&range, compact_revision).await?;
debug_assert_eq!(self.sync_seq.read().await.len(), compact_seq.len()); //
self.sync_seq.write().await.reset(compact_seq); debug_assert_eq!(compact_seq.len(), 2);
debug_assert_eq!(sync_seq_write_guard.len(), compact_seq.len());
sync_seq_write_guard.reset(compact_seq);
Ok(rev_id) Ok(rev_id)
} }
} }
@ -315,7 +318,11 @@ impl RevisionSyncSequence {
// Compact the rev_ids into one except the current synchronizing rev_id. // Compact the rev_ids into one except the current synchronizing rev_id.
fn compact(&self) -> Option<(RevisionRange, VecDeque<i64>)> { fn compact(&self) -> Option<(RevisionRange, VecDeque<i64>)> {
self.next_rev_id()?; // Make sure there are two rev_id going to sync. No need to compact if there is only
// one rev_id in queue.
if self.next_rev_id().is_none() {
return None;
}
let mut new_seq = self.0.clone(); let mut new_seq = self.0.clone();
let mut drained = new_seq.drain(1..).collect::<VecDeque<_>>(); let mut drained = new_seq.drain(1..).collect::<VecDeque<_>>();