From 61dd68199ed6f4b1ecd2f73197d12bbd503f729e Mon Sep 17 00:00:00 2001 From: appflowy Date: Tue, 25 Jan 2022 19:45:41 +0800 Subject: [PATCH] refactor sync seq --- frontend/rust-lib/flowy-sync/src/cache/mod.rs | 24 ++--- .../rust-lib/flowy-sync/src/rev_manager.rs | 94 +++++-------------- .../rust-lib/flowy-sync/src/ws_manager.rs | 20 ++-- 3 files changed, 44 insertions(+), 94 deletions(-) diff --git a/frontend/rust-lib/flowy-sync/src/cache/mod.rs b/frontend/rust-lib/flowy-sync/src/cache/mod.rs index d1fffb9664..480baae9c5 100755 --- a/frontend/rust-lib/flowy-sync/src/cache/mod.rs +++ b/frontend/rust-lib/flowy-sync/src/cache/mod.rs @@ -45,12 +45,7 @@ impl RevisionCache { } } - pub async fn add( - &self, - revision: Revision, - state: RevisionState, - write_to_disk: bool, - ) -> FlowyResult { + pub async fn add(&self, revision: Revision, state: RevisionState, write_to_disk: bool) -> FlowyResult<()> { if self.memory_cache.contains(&revision.rev_id) { return Err(FlowyError::internal().context(format!("Duplicate revision: {} {:?}", revision.rev_id, state))); } @@ -62,12 +57,11 @@ impl RevisionCache { write_to_disk, }; - self.memory_cache.add(Cow::Borrowed(&record)).await; + self.memory_cache.add(Cow::Owned(record)).await; self.set_latest_rev_id(rev_id); - Ok(record) + Ok(()) } - #[allow(dead_code)] pub async fn ack(&self, rev_id: i64) { self.memory_cache.ack(&rev_id).await; } @@ -79,10 +73,9 @@ impl RevisionCache { .read_revision_records(&self.object_id, Some(vec![rev_id])) { Ok(mut records) => { - if !records.is_empty() { - assert_eq!(records.len(), 1); - } - records.pop() + let record = records.pop()?; + assert!(records.is_empty()); + Some(record) } Err(e) => { tracing::error!("{}", e); @@ -97,11 +90,6 @@ impl RevisionCache { self.disk_cache.read_revision_records(doc_id, None) } - pub async fn latest_revision(&self) -> Revision { - let rev_id = self.latest_rev_id.load(SeqCst); - self.get(rev_id).await.unwrap().revision - } - pub async fn revisions_in_range(&self, range: RevisionRange) -> FlowyResult> { let mut records = self.memory_cache.get_with_range(&range).await?; let range_len = range.len() as usize; diff --git a/frontend/rust-lib/flowy-sync/src/rev_manager.rs b/frontend/rust-lib/flowy-sync/src/rev_manager.rs index 03b088578f..c0ce9d2d06 100755 --- a/frontend/rust-lib/flowy-sync/src/rev_manager.rs +++ b/frontend/rust-lib/flowy-sync/src/rev_manager.rs @@ -1,5 +1,5 @@ -use crate::{RevisionCache, RevisionRecord}; -use dashmap::DashMap; +use crate::RevisionCache; + use flowy_collaboration::{ entities::revision::{RepeatedRevision, Revision, RevisionRange, RevisionState}, util::{pair_rev_id_from_revisions, RevIdCounter}, @@ -23,7 +23,7 @@ pub struct RevisionManager { user_id: String, rev_id_counter: RevIdCounter, revision_cache: Arc, - revision_sync_seq: Arc, + sync_seq: Arc, #[cfg(feature = "flowy_unit_test")] revision_ack_notifier: tokio::sync::broadcast::Sender, @@ -32,7 +32,7 @@ pub struct RevisionManager { impl RevisionManager { pub fn new(user_id: &str, object_id: &str, revision_cache: Arc) -> Self { let rev_id_counter = RevIdCounter::new(0); - let revision_sync_seq = Arc::new(RevisionSyncSequence::new()); + let sync_seq = Arc::new(RevisionSyncSequence::new()); #[cfg(feature = "flowy_unit_test")] let (revision_ack_notifier, _) = tokio::sync::broadcast::channel(1); @@ -41,7 +41,7 @@ impl RevisionManager { user_id: user_id.to_owned(), rev_id_counter, revision_cache, - revision_sync_seq, + sync_seq, #[cfg(feature = "flowy_unit_test")] revision_ack_notifier, @@ -57,7 +57,7 @@ impl RevisionManager { user_id: self.user_id.clone(), cloud, revision_cache: self.revision_cache.clone(), - revision_sync_seq: self.revision_sync_seq.clone(), + revision_sync_seq: self.sync_seq.clone(), } .load() .await?; @@ -95,18 +95,17 @@ impl RevisionManager { return Err(FlowyError::internal().context("Delta data should be empty")); } - let record = self - .revision_cache + self.sync_seq.add_record(revision.rev_id).await?; + self.revision_cache .add(revision.clone(), RevisionState::Sync, true) .await?; - self.revision_sync_seq.add_revision_record(record).await?; + Ok(()) } #[tracing::instrument(level = "debug", skip(self), err)] pub async fn ack_revision(&self, rev_id: i64) -> Result<(), FlowyError> { - #[cfg(feature = "flowy_unit_test")] - if self.revision_sync_seq.ack(&rev_id).await.is_ok() { + if self.sync_seq.ack(&rev_id).await.is_ok() { self.revision_cache.ack(rev_id).await; #[cfg(feature = "flowy_unit_test")] @@ -132,40 +131,25 @@ impl RevisionManager { } pub fn next_sync_revision(&self) -> FutureResult, FlowyError> { - let revision_sync_seq = self.revision_sync_seq.clone(); + let sync_seq = self.sync_seq.clone(); let revision_cache = self.revision_cache.clone(); FutureResult::new(async move { - match revision_sync_seq.next_sync_revision_record().await { - None => match revision_sync_seq.next_sync_rev_id().await { - None => Ok(None), - Some(rev_id) => Ok(revision_cache.get(rev_id).await.map(|record| record.revision)), - }, - Some((_, record)) => Ok(Some(record.revision)), + match sync_seq.next_rev_id().await { + None => Ok(None), + Some(rev_id) => Ok(revision_cache.get(rev_id).await.map(|record| record.revision)), } }) } - pub async fn latest_revision(&self) -> Revision { - self.revision_cache.latest_revision().await - } - pub async fn get_revision(&self, rev_id: i64) -> Option { self.revision_cache.get(rev_id).await.map(|record| record.revision) } } -struct RevisionSyncSequence { - revs_map: Arc>, - local_revs: Arc>>, -} - +struct RevisionSyncSequence(Arc>>); impl std::default::Default for RevisionSyncSequence { fn default() -> Self { - let local_revs = Arc::new(RwLock::new(VecDeque::new())); - RevisionSyncSequence { - revs_map: Arc::new(DashMap::new()), - local_revs, - } + RevisionSyncSequence(Arc::new(RwLock::new(VecDeque::new()))) } } @@ -174,27 +158,22 @@ impl RevisionSyncSequence { RevisionSyncSequence::default() } - async fn add_revision_record(&self, record: RevisionRecord) -> FlowyResult<()> { - if !record.state.is_need_sync() { - return Ok(()); - } - + async fn add_record(&self, new_rev_id: i64) -> FlowyResult<()> { // The last revision's rev_id must be greater than the new one. - if let Some(rev_id) = self.local_revs.read().await.back() { - if *rev_id >= record.revision.rev_id { + if let Some(rev_id) = self.0.read().await.back() { + if *rev_id >= new_rev_id { return Err( FlowyError::internal().context(format!("The new revision's id must be greater than {}", rev_id)) ); } } - self.local_revs.write().await.push_back(record.revision.rev_id); - self.revs_map.insert(record.revision.rev_id, record); + self.0.write().await.push_back(new_rev_id); Ok(()) } - #[allow(dead_code)] async fn ack(&self, rev_id: &i64) -> FlowyResult<()> { - if let Some(pop_rev_id) = self.next_sync_rev_id().await { + let cur_rev_id = self.0.read().await.front().cloned(); + if let Some(pop_rev_id) = cur_rev_id { if &pop_rev_id != rev_id { let desc = format!( "The ack rev_id:{} is not equal to the current rev_id:{}", @@ -202,22 +181,13 @@ impl RevisionSyncSequence { ); return Err(FlowyError::internal().context(desc)); } - - self.revs_map.remove(&pop_rev_id); - let _ = self.local_revs.write().await.pop_front(); + let _ = self.0.write().await.pop_front(); } Ok(()) } - async fn next_sync_revision_record(&self) -> Option<(i64, RevisionRecord)> { - match self.local_revs.read().await.front() { - None => None, - Some(rev_id) => self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())), - } - } - - async fn next_sync_rev_id(&self) -> Option { - self.local_revs.read().await.front().copied() + async fn next_rev_id(&self) -> Option { + self.0.read().await.front().cloned() } } @@ -250,7 +220,7 @@ impl RevisionLoader { rev_id = record.revision.rev_id; if record.state == RevisionState::Sync { // Sync the records if their state is RevisionState::Sync. - let _ = self.revision_sync_seq.add_revision_record(record.clone()).await?; + let _ = self.revision_sync_seq.add_record(record.revision.rev_id).await?; let _ = self.revision_cache.add(record.revision, record.state, false).await?; } Ok::<(), FlowyError>(()) @@ -272,18 +242,6 @@ impl RevisionLoader { } } -#[cfg(feature = "flowy_unit_test")] -impl RevisionSyncSequence { - #[allow(dead_code)] - pub fn revs_map(&self) -> Arc> { - self.revs_map.clone() - } - #[allow(dead_code)] - pub fn pending_revs(&self) -> Arc>> { - self.local_revs.clone() - } -} - #[cfg(feature = "flowy_unit_test")] impl RevisionManager { pub fn revision_cache(&self) -> Arc { diff --git a/frontend/rust-lib/flowy-sync/src/ws_manager.rs b/frontend/rust-lib/flowy-sync/src/ws_manager.rs index 850bd2fb33..393c13e8f8 100755 --- a/frontend/rust-lib/flowy-sync/src/ws_manager.rs +++ b/frontend/rust-lib/flowy-sync/src/ws_manager.rs @@ -287,15 +287,19 @@ impl RevisionWSSink { } async fn send_next_revision(&self) -> FlowyResult<()> { - match self.provider.next().await? { - None => { - tracing::trace!("[{}]: Finish synchronizing revisions", self); - Ok(()) - } - Some(data) => { - tracing::trace!("[{}]: send {}:{}-{:?}", self, data.object_id, data.id(), data.ty); - self.ws_sender.send(data).await + if cfg!(feature = "flowy_unit_test") { + match self.provider.next().await? { + None => { + tracing::trace!("[{}]: Finish synchronizing revisions", self); + Ok(()) + } + Some(data) => { + tracing::trace!("[{}]: send {}:{}-{:?}", self, data.object_id, data.id(), data.ty); + self.ws_sender.send(data).await + } } + } else { + Ok(()) } } }