diff --git a/frontend/rust-lib/flowy-core/src/controller.rs b/frontend/rust-lib/flowy-core/src/controller.rs index 5d5aba5cfa..e4a6b07f79 100755 --- a/frontend/rust-lib/flowy-core/src/controller.rs +++ b/frontend/rust-lib/flowy-core/src/controller.rs @@ -143,6 +143,7 @@ impl FolderManager { } } + #[tracing::instrument(level = "trace", skip(self), err)] pub async fn initialize(&self, user_id: &str, token: &str) -> FlowyResult<()> { let mut write_guard = INIT_FOLDER_FLAG.write().await; if let Some(is_init) = write_guard.get(user_id) { @@ -150,6 +151,7 @@ impl FolderManager { return Ok(()); } } + tracing::debug!("Initialize folder editor"); let folder_id = FolderId::new(user_id); let _ = self.persistence.initialize(user_id, &folder_id).await?; diff --git a/frontend/rust-lib/flowy-core/src/services/persistence/migration.rs b/frontend/rust-lib/flowy-core/src/services/persistence/migration.rs index c0e5744961..4e17a08f90 100755 --- a/frontend/rust-lib/flowy-core/src/services/persistence/migration.rs +++ b/frontend/rust-lib/flowy-core/src/services/persistence/migration.rs @@ -62,6 +62,7 @@ impl FolderMigration { })?; if workspaces.is_empty() { + KV::set_bool(&key, true); return Ok(None); } diff --git a/frontend/rust-lib/flowy-sdk/src/lib.rs b/frontend/rust-lib/flowy-sdk/src/lib.rs index 18cff4fff0..0616d8538e 100755 --- a/frontend/rust-lib/flowy-sdk/src/lib.rs +++ b/frontend/rust-lib/flowy-sdk/src/lib.rs @@ -235,7 +235,7 @@ async fn _listen_network_status(mut subscribe: broadcast::Receiver, fn init_kv(root: &str) { match flowy_database::kv::KV::init(root) { Ok(_) => {} - Err(e) => tracing::error!("Init kv store failedL: {}", e), + Err(e) => tracing::error!("Init kv store failed: {}", e), } } diff --git a/frontend/rust-lib/flowy-sync/src/cache/disk/mod.rs b/frontend/rust-lib/flowy-sync/src/cache/disk/mod.rs index 97a8704f17..7bcbc30fdd 100755 --- a/frontend/rust-lib/flowy-sync/src/cache/disk/mod.rs +++ b/frontend/rust-lib/flowy-sync/src/cache/disk/mod.rs @@ -32,12 +32,7 @@ pub trait RevisionDiskCache: Sync + Send { fn update_revision_record(&self, changesets: Vec) -> FlowyResult<()>; // Delete all the records if the rev_ids is None - fn delete_revision_records( - &self, - object_id: &str, - rev_ids: Option>, - conn: &SqliteConnection, - ) -> Result<(), Self::Error>; + fn delete_revision_records(&self, object_id: &str, rev_ids: Option>) -> Result<(), Self::Error>; // Delete and insert will be executed in the same transaction. // It deletes all the records if the deleted_rev_ids is None and then insert the new records diff --git a/frontend/rust-lib/flowy-sync/src/cache/disk/sql_impl.rs b/frontend/rust-lib/flowy-sync/src/cache/disk/sql_impl.rs index 0aba4cc6ad..e985a98bd8 100755 --- a/frontend/rust-lib/flowy-sync/src/cache/disk/sql_impl.rs +++ b/frontend/rust-lib/flowy-sync/src/cache/disk/sql_impl.rs @@ -62,12 +62,8 @@ impl RevisionDiskCache for SQLitePersistence { Ok(()) } - fn delete_revision_records( - &self, - object_id: &str, - rev_ids: Option>, - conn: &SqliteConnection, - ) -> Result<(), Self::Error> { + fn delete_revision_records(&self, object_id: &str, rev_ids: Option>) -> Result<(), Self::Error> { + let conn = &*self.pool.get().map_err(internal_error)?; let _ = RevisionTableSql::delete(object_id, rev_ids, conn)?; Ok(()) } @@ -80,7 +76,7 @@ impl RevisionDiskCache for SQLitePersistence { ) -> Result<(), Self::Error> { let conn = self.pool.get().map_err(internal_error)?; conn.immediate_transaction::<_, FlowyError, _>(|| { - let _ = self.delete_revision_records(object_id, deleted_rev_ids, &*conn)?; + let _ = RevisionTableSql::delete(object_id, deleted_rev_ids, &*conn)?; let _ = self.create_revision_records(inserted_records, &*conn)?; Ok(()) }) @@ -105,6 +101,11 @@ impl RevisionTableSql { let records = revision_records .into_iter() .map(|record| { + tracing::trace!( + "[RevisionTable] create revision: {}:{:?}", + record.revision.object_id, + record.revision.rev_id + ); let rev_state: RevisionTableState = record.state.into(); ( dsl::doc_id.eq(record.revision.object_id), @@ -178,9 +179,11 @@ impl RevisionTableSql { rev_ids: Option>, conn: &SqliteConnection, ) -> Result<(), FlowyError> { - let filter = dsl::rev_table.filter(dsl::doc_id.eq(object_id)); - let mut sql = diesel::delete(filter).into_boxed(); + let mut sql = diesel::delete(dsl::rev_table).into_boxed(); + sql = sql.filter(dsl::doc_id.eq(object_id)); + if let Some(rev_ids) = rev_ids { + tracing::trace!("[RevisionTable] Delete revision: {}:{:?}", object_id, rev_ids); sql = sql.filter(dsl::rev_id.eq_any(rev_ids)); } diff --git a/frontend/rust-lib/flowy-sync/src/cache/memory.rs b/frontend/rust-lib/flowy-sync/src/cache/memory.rs index 8d7c3eb6a2..2c99c3f08c 100755 --- a/frontend/rust-lib/flowy-sync/src/cache/memory.rs +++ b/frontend/rust-lib/flowy-sync/src/cache/memory.rs @@ -42,8 +42,10 @@ impl RevisionMemoryCache { let rev_id = record.revision.rev_id; self.revs_map.insert(rev_id, record); - if !self.pending_write_revs.read().await.contains(&rev_id) { - self.pending_write_revs.write().await.push(rev_id); + let mut write_guard = self.pending_write_revs.write().await; + if !write_guard.contains(&rev_id) { + write_guard.push(rev_id); + drop(write_guard); self.make_checkpoint().await; } } diff --git a/frontend/rust-lib/flowy-sync/src/cache/mod.rs b/frontend/rust-lib/flowy-sync/src/cache/mod.rs index e7ee2f7388..eb1004b467 100755 --- a/frontend/rust-lib/flowy-sync/src/cache/mod.rs +++ b/frontend/rust-lib/flowy-sync/src/cache/mod.rs @@ -42,7 +42,8 @@ impl RevisionCache { pub async fn add(&self, revision: Revision, state: RevisionState, write_to_disk: bool) -> FlowyResult<()> { if self.memory_cache.contains(&revision.rev_id) { - return Err(FlowyError::internal().context(format!("Duplicate revision: {} {:?}", revision.rev_id, state))); + tracing::warn!("Duplicate revision: {}:{}-{:?}", self.object_id, revision.rev_id, state); + return Ok(()); } let rev_id = revision.rev_id; let record = RevisionRecord { @@ -58,19 +59,12 @@ impl RevisionCache { pub async fn compact(&self, range: &RevisionRange, new_revision: Revision) -> FlowyResult<()> { self.memory_cache.remove_with_range(range); - let rev_id = new_revision.rev_id; - let record = RevisionRecord { - revision: new_revision, - state: RevisionState::Sync, - write_to_disk: true, - }; - let rev_ids = range.to_rev_ids(); let _ = self .disk_cache - .delete_and_insert_records(&self.object_id, Some(rev_ids), vec![record.clone()])?; - self.memory_cache.add(Cow::Owned(record)).await; - self.set_latest_rev_id(rev_id); + .delete_revision_records(&self.object_id, Some(rev_ids))?; + + self.add(new_revision, RevisionState::Sync, true).await?; Ok(()) } @@ -120,7 +114,7 @@ impl RevisionCache { // let delta = PlainDelta::from_bytes(&record.revision.delta_data).unwrap(); // tracing::trace!("{}", delta.to_string()); // }); - tracing::error!("Revisions len is not equal to range required"); + tracing::error!("Expect revision len {},but receive {}", range_len, records.len()); } } Ok(records diff --git a/frontend/rust-lib/flowy-sync/src/rev_manager.rs b/frontend/rust-lib/flowy-sync/src/rev_manager.rs index b3c46c0b21..e2f63f0f59 100755 --- a/frontend/rust-lib/flowy-sync/src/rev_manager.rs +++ b/frontend/rust-lib/flowy-sync/src/rev_manager.rs @@ -171,17 +171,24 @@ impl RevisionCacheCompact { } } + // Call this method to write the revisions that fetch from server to disk. + #[tracing::instrument(level = "trace", skip(self, revision), fields(rev_id, object_id=%self.object_id), err)] async fn add_ack_revision(&self, revision: &Revision) -> FlowyResult<()> { + tracing::Span::current().record("rev_id", &revision.rev_id); self.inner.add(revision.clone(), RevisionState::Ack, true).await } + // Call this method to sync the revisions that already in local db. + #[tracing::instrument(level = "trace", skip(self), fields(rev_id, object_id=%self.object_id), err)] async fn add_sync_revision(&mut self, revision: &Revision) -> FlowyResult<()> { + tracing::Span::current().record("rev_id", &revision.rev_id); self.inner.add(revision.clone(), RevisionState::Sync, false).await?; self.sync_seq.add(revision.rev_id)?; Ok(()) } - #[tracing::instrument(level = "trace", skip(self, revision), fields(rev_id, compact_range), err)] + // Call this method to save the new revisions generated by the user input. + #[tracing::instrument(level = "trace", skip(self, revision), fields(rev_id, compact_range, object_id=%self.object_id), err)] async fn write_sync_revision(&mut self, revision: &Revision) -> FlowyResult where C: RevisionCompact,