From d0111e30dc570977fb5ee41123122c495ad789e2 Mon Sep 17 00:00:00 2001 From: appflowy Date: Tue, 5 Oct 2021 10:19:43 +0800 Subject: [PATCH] describe the delta interaction between client and server --- backend/tests/document/edit.rs | 45 +++++++++++++---- backend/tests/document/helper.rs | 7 ++- rust-lib/dart-ffi/Cargo.toml | 8 ++-- .../src/entities/doc/revision.rs | 11 +++++ .../src/services/doc/edit/edit_doc.rs | 2 +- .../src/services/doc/revision/manager.rs | 2 +- .../src/services/doc/revision/store_actor.rs | 48 +++++++++++++------ 7 files changed, 90 insertions(+), 33 deletions(-) diff --git a/backend/tests/document/edit.rs b/backend/tests/document/edit.rs index 8ed18ccbd8..809b80b825 100644 --- a/backend/tests/document/edit.rs +++ b/backend/tests/document/edit.rs @@ -94,14 +94,18 @@ async fn delta_sync_with_server_push_delta() { // ◀─────────────────┤ start ws connection // │ │ // ◀─────────────────┤ notify with rev: 1 -// │ │ -// ┌───────────────────┐ │ │ ┌──────────────────────────┐ -// │ops: ["123"] rev: 3│ ├────Push Rev─────▶ │ops: ["abc", "123"] rev: 4│ -// └───────────────────┘ │ │ └──────────────────────────┘ -// ┌──────────────────────────┐ │ │ ┌────────────────────┐ -// │ops: ["abc", "123"] rev: 4│ ◀────Push Rev─────┤ │ops: ["abc"] rev: 4 │ -// └──────────────────────────┘ │ │ └────────────────────┘ -// │ │ +// ┌───────────────────┐ │ │ +// │ops: ["123"] rev: 3│ ├────Push Rev─────▶ transform +// └───────────────────┘ │ │ ┌──────────────────────────┐ +// │ │ │ops: ["abc", "123"] rev: 4│ +// │ │ └──────────────────────────┘ +// │ │ ┌────────────────────────────────┐ +// compose ◀────Push Rev─────┤ │ops: ["abc", "retain 3"] rev: 4 │ +// │ │ └────────────────────────────────┘ +// ┌──────────────────────────┐ │ +// │ops: ["abc", "123"] rev: 4│ │ +// └──────────────────────────┘ │ +// │ │ #[actix_rt::test] async fn delta_sync_while_local_rev_less_than_server_rev() { let test = DocumentTest::new().await; @@ -120,6 +124,31 @@ async fn delta_sync_while_local_rev_less_than_server_rev() { .await; } +#[rustfmt::skip] +// ┌─────────┐ ┌─────────┐ +// │ Server │ │ Client │ +// └─────────┘ └─────────┘ +// ┌───────────────────┐ │ │ +// │ops: ["123"] rev: 1│ │ │ +// └───────────────────┘ │ │ +// ◀── http request ─┤ Open doc +// │ │ +// │ │ ┌───────────────┐ +// ├──http response──┼──▶│ops: [123] rev:│ +// │ │ └───────────────┘ +// │ │ ┌──────────────────────────────────┐ +// │ │ │ops: ["123","abc", "efg"] rev: 3 │ +// │ │ └──────────────────────────────────┘ +// ◀─────────────────┤ start ws connection +// │ │ +// ◀─────────────────┤ notify with rev: 3 +// │ │ +// ├────Pull Rev─────▶ +// │ │ ┌──────────────────────────────────┐ +// compose ◀────Push Rev─────┤ │ops: ["retain 3", "abcefg"] rev: 3│ +// ┌──────────────────────────────────┐│ │ └──────────────────────────────────┘ +// │ops: ["123","abc", "efg"] rev: 3 ││ │ +// └──────────────────────────────────┘│ │ #[actix_rt::test] async fn delta_sync_while_local_rev_greater_than_server_rev() { let test = DocumentTest::new().await; diff --git a/backend/tests/document/helper.rs b/backend/tests/document/helper.rs index 072507e6c9..6dfc1b5a1d 100644 --- a/backend/tests/document/helper.rs +++ b/backend/tests/document/helper.rs @@ -114,7 +114,7 @@ async fn run_scripts(context: Arc>, scripts: Vec { - sleep(Duration::from_millis(300)).await; + // sleep(Duration::from_millis(300)).await; let user_session = context.read().user_session.clone(); let token = user_session.token().unwrap(); let _ = user_session.start_ws_connection(&token).await.unwrap(); @@ -126,13 +126,12 @@ async fn run_scripts(context: Arc>, scripts: Vec { - sleep(Duration::from_millis(300)).await; + sleep(Duration::from_millis(100)).await; let json = context.read().client_edit_context().doc_json().await.unwrap(); assert_eq(s, &json); }, DocScript::AssertServer(s) => { - sleep(Duration::from_millis(300)).await; - + sleep(Duration::from_millis(100)).await; let pg_pool = context.read().pool.clone(); let doc_manager = context.read().doc_manager.clone(); let edit_doc = doc_manager.get(&doc_id, pg_pool).await.unwrap().unwrap(); diff --git a/rust-lib/dart-ffi/Cargo.toml b/rust-lib/dart-ffi/Cargo.toml index ce7f26c91c..0d02ce1b9f 100644 --- a/rust-lib/dart-ffi/Cargo.toml +++ b/rust-lib/dart-ffi/Cargo.toml @@ -7,11 +7,11 @@ edition = "2018" [lib] name = "dart_ffi" # this value will change depending on the target os -# for iOS it would be `cdylib` -# for Macos it would be `cdylib` +# for iOS it would be `rlib` +# for Macos it would be `rlib` # for android it would be `c-dylib` -# default cdylib -crate-type = ["cdylib"] +# default rlib +crate-type = ["rlib"] [dependencies] diff --git a/rust-lib/flowy-document/src/entities/doc/revision.rs b/rust-lib/flowy-document/src/entities/doc/revision.rs index 8f4bab96b3..8033c0c4c4 100644 --- a/rust-lib/flowy-document/src/entities/doc/revision.rs +++ b/rust-lib/flowy-document/src/entities/doc/revision.rs @@ -127,3 +127,14 @@ pub struct RevisionRange { #[pb(index = 3)] pub to_rev_id: i64, } + +impl RevisionRange { + pub fn len(&self) -> i64 { + debug_assert!(self.to_rev_id >= self.from_rev_id); + if self.to_rev_id >= self.from_rev_id { + self.to_rev_id - self.from_rev_id + } else { + 0 + } + } +} diff --git a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs index 9cc274b39c..e3bbb3c3e3 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs @@ -260,7 +260,7 @@ impl ClientEditDoc { }, WsDataType::PullRev => { let range = RevisionRange::try_from(bytes)?; - let revision = self.rev_manager.send_revisions(range).await?; + let revision = self.rev_manager.construct_revisions(range).await?; self.ws.send(revision.into()); }, WsDataType::NewDocUser => {}, diff --git a/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/rust-lib/flowy-document/src/services/doc/revision/manager.rs index 99e358a40d..68b4874d30 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -65,7 +65,7 @@ impl RevisionManager { pub fn update_rev_id(&self, rev_id: i64) { self.rev_id_counter.set(rev_id); } - pub async fn send_revisions(&self, range: RevisionRange) -> Result { + pub async fn construct_revisions(&self, range: RevisionRange) -> Result { debug_assert!(&range.doc_id == &self.doc_id); let (ret, rx) = oneshot::channel(); let sender = self.rev_store.clone(); diff --git a/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs b/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs index 94b307c4c8..6618864e1e 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs @@ -5,7 +5,7 @@ use crate::{ sql_tables::{RevState, RevTableSql}, }; use async_stream::stream; -use dashmap::DashMap; +use dashmap::{mapref::one::Ref, DashMap}; use flowy_database::ConnectionPool; use flowy_ot::core::{Attributes, Delta, OperationTransformable}; use futures::{stream::StreamExt, TryFutureExt}; @@ -85,7 +85,7 @@ impl RevisionStoreActor { self.handle_revision_acked(rev_id).await; }, RevisionCmd::GetRevisions { range, ret } => { - let result = revs_in_range(&self.doc_id, self.persistence.clone(), range).await; + let result = self.revs_in_range(range).await; let _ = ret.send(result); }, RevisionCmd::DocumentDelta { ret } => { @@ -150,6 +150,37 @@ impl RevisionStoreActor { } })); } + + async fn revs_in_range(&self, range: RevisionRange) -> DocResult> { + let iter_range = (range.from_rev_id..=range.to_rev_id); + let revs = iter_range + .flat_map(|rev_id| { + // + match self.revs.get(&rev_id) { + None => None, + Some(rev) => Some((&*(*rev)).clone()), + } + }) + .collect::>(); + + debug_assert!(revs.len() == range.len() as usize); + + if revs.len() == range.len() as usize { + Ok(revs) + } else { + let doc_id = self.doc_id.clone(); + let persistence = self.persistence.clone(); + let result = spawn_blocking(move || { + let conn = &*persistence.pool.get().map_err(internal_error)?; + let revisions = persistence.rev_sql.read_rev_tables_with_range(&doc_id, range, conn)?; + Ok(revisions) + }) + .await + .map_err(internal_error)?; + + result + } + } } async fn fetch_document( @@ -216,19 +247,6 @@ async fn fetch_from_local(doc_id: &str, persistence: Arc) -> DocRes .map_err(internal_error)? } -async fn revs_in_range(doc_id: &str, persistence: Arc, range: RevisionRange) -> DocResult> { - let doc_id = doc_id.to_owned(); - let result = spawn_blocking(move || { - let conn = &*persistence.pool.get().map_err(internal_error)?; - let revisions = persistence.rev_sql.read_rev_tables_with_range(&doc_id, range, conn)?; - Ok(revisions) - }) - .await - .map_err(internal_error)?; - - result -} - struct Persistence { rev_sql: Arc, pool: Arc,