send revisions to server if server rev is outdated

This commit is contained in:
appflowy 2021-10-04 21:53:06 +08:00
parent ab2a997a77
commit c872226b44
9 changed files with 134 additions and 30 deletions

View File

@ -134,7 +134,6 @@ pub async fn init_app_context(configuration: &Settings) -> AppContext {
));
let ws_server = WsServer::new().start();
AppContext::new(ws_server, pg_pool)
}

View File

@ -65,12 +65,19 @@ impl ServerEditDoc {
self.users.insert(user.id(), user.clone());
let cur_rev_id = self.rev_id.load(SeqCst);
if cur_rev_id > rev_id {
let doc_delta = self.document.read().delta().clone();
let cli_revision = self.mk_revision(rev_id, doc_delta);
log::debug!("Server push rev");
let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision);
user.socket.do_send(ws_cli_revision).map_err(internal_error)?;
match cur_rev_id.cmp(&rev_id) {
Ordering::Less => {
user.socket
.do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, rev_id))
.map_err(internal_error)?;
},
Ordering::Equal => {},
Ordering::Greater => {
let doc_delta = self.document.read().delta().clone();
let cli_revision = self.mk_revision(rev_id, doc_delta);
let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision);
user.socket.do_send(ws_cli_revision).map_err(internal_error)?;
},
}
Ok(())
@ -99,7 +106,6 @@ impl ServerEditDoc {
if cur_rev_id != revision.base_rev_id {
// The server document is outdated, try to get the missing revision from the
// client.
log::debug!("Server push rev");
user.socket
.do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, revision.rev_id))
.map_err(internal_error)?;

View File

@ -1,23 +1,49 @@
use crate::document::helper::{DocScript, DocumentTest};
use flowy_document::services::doc::{Document, FlowyDoc};
#[rustfmt::skip]
// ┌─────────┐ ┌─────────┐
// │ Server │ │ Client │
// └─────────┘ └─────────┘
// ┌────────────────┐ │ │ ┌────────────────┐
// │ops: [] rev: 0 │◀┼───── ws ────┼─┤ops: [] rev: 0 │
// └────────────────┘ │ │ └────────────────┘
// ┌────────────────────┐ │ │ ┌────────────────────┐
// │ops: ["abc"] rev: 1 │◀┼───── ws ────┼─│ops: ["abc"] rev: 1 │
// └────────────────────┘ │ │ └────────────────────┘
// ┌──────────────────────────┐ │ │ ┌──────────────────────┐
// │ops: ["abc", "123"] rev: 2│◀┼───── ws ────┼─│ops: ["123"] rev: 2 │
// └──────────────────────────┘ │ │ └──────────────────────┘
// │ │
#[actix_rt::test]
async fn sync_doc_insert_text() {
async fn delta_sync_after_ws_connection() {
let test = DocumentTest::new().await;
test.run_scripts(vec![
DocScript::ConnectWs,
DocScript::OpenDoc,
DocScript::SendText(0, "abc"),
DocScript::SendText(3, "123"),
DocScript::SendText(6, "efg"),
DocScript::AssertClient(r#"[{"insert":"abc123efg\n"}]"#),
DocScript::AssertServer(r#"[{"insert":"abc123efg\n"}]"#),
DocScript::AssertClient(r#"[{"insert":"abc123\n"}]"#),
DocScript::AssertServer(r#"[{"insert":"abc123\n"}]"#),
])
.await;
}
#[rustfmt::skip]
// ┌─────────┐ ┌─────────┐
// │ Server │ │ Client │
// └─────────┘ └─────────┘
// ┌──────────────────────────┐ │ │
// │ops: ["123", "456"] rev: 2│ │ │
// └──────────────────────────┘ │ │
// │ │
// ◀── http request ─┤ Open doc
// │ │
// │ │ ┌──────────────────────────┐
// ├──http response──┼─▶│ops: ["123", "456"] rev: 2│
// │ │ └──────────────────────────┘
#[actix_rt::test]
async fn sync_open_empty_doc_and_sync_from_server() {
async fn delta_sync_with_http_request() {
let test = DocumentTest::new().await;
let mut document = Document::new::<FlowyDoc>();
document.insert(0, "123").unwrap();
@ -34,7 +60,7 @@ async fn sync_open_empty_doc_and_sync_from_server() {
}
#[actix_rt::test]
async fn sync_open_empty_doc_and_sync_from_server_using_ws() {
async fn delta_sync_with_server_push_delta() {
let test = DocumentTest::new().await;
let mut document = Document::new::<FlowyDoc>();
document.insert(0, "123").unwrap();
@ -49,8 +75,35 @@ async fn sync_open_empty_doc_and_sync_from_server_using_ws() {
.await;
}
#[rustfmt::skip]
// ┌─────────┐ ┌─────────┐
// │ Server │ │ Client │
// └─────────┘ └─────────┘
// │ │
// │ │
// ◀── http request ─┤ Open doc
// │ │
// │ │ ┌───────────────┐
// ├──http response──┼─▶│ops: [] rev: 0 │
// ┌───────────────────┐│ │ └───────────────┘
// │ops: ["123"] rev: 3││ │
// └───────────────────┘│ │ ┌────────────────────┐
// │ │ │ops: ["abc"] rev: 1 │
// │ │ └────────────────────┘
// │ │
// ◀─────────────────┤ start ws connection
// │ │
// ◀─────────────────┤ notify with rev: 1
// │ │
// ┌───────────────────┐ │ │ ┌──────────────────────────┐
// │ops: ["123"] rev: 3│ ├────Push Rev─────▶ │ops: ["abc", "123"] rev: 4│
// └───────────────────┘ │ │ └──────────────────────────┘
// ┌──────────────────────────┐ │ │ ┌────────────────────┐
// │ops: ["abc", "123"] rev: 4│ ◀────Push Rev─────┤ │ops: ["abc"] rev: 4 │
// └──────────────────────────┘ │ │ └────────────────────┘
// │ │
#[actix_rt::test]
async fn sync_open_non_empty_doc_and_sync_with_sever() {
async fn delta_sync_while_local_rev_less_than_server_rev() {
let test = DocumentTest::new().await;
let mut document = Document::new::<FlowyDoc>();
document.insert(0, "123").unwrap();
@ -66,3 +119,23 @@ async fn sync_open_non_empty_doc_and_sync_with_sever() {
])
.await;
}
#[actix_rt::test]
async fn delta_sync_while_local_rev_greater_than_server_rev() {
let test = DocumentTest::new().await;
let mut document = Document::new::<FlowyDoc>();
document.insert(0, "123").unwrap();
let json = document.to_json();
test.run_scripts(vec![
DocScript::SetServerDocument(json, 1),
DocScript::OpenDoc,
DocScript::AssertClient(r#"[{"insert":"123\n"}]"#),
DocScript::SendText(3, "abc"),
DocScript::SendText(6, "efg"),
DocScript::ConnectWs,
DocScript::AssertClient(r#"[{"insert":"123abcefg\n"}]"#),
DocScript::AssertServer(r#"[{"insert":"123abcefg\n"}]"#),
])
.await;
}

View File

@ -114,7 +114,7 @@ async fn run_scripts(context: Arc<RwLock<ScriptContext>>, scripts: Vec<DocScript
let doc_id = context.read().doc_id.clone();
match script {
DocScript::ConnectWs => {
// sleep(Duration::from_millis(300)).await;
sleep(Duration::from_millis(300)).await;
let user_session = context.read().user_session.clone();
let token = user_session.token().unwrap();
let _ = user_session.start_ws_connection(&token).await.unwrap();

View File

@ -128,7 +128,7 @@ impl DocumentEditActor {
async fn compose_delta(&self, delta: Delta) -> DocResult<()> {
let result = self.document.write().await.compose_delta(&delta);
log::debug!(
"Compose push delta: {}. result: {}",
"Client compose push delta: {}. result: {}",
delta.to_json(),
self.document.read().await.to_json()
);

View File

@ -47,8 +47,6 @@ impl ClientEditDoc {
) -> DocResult<Self> {
let rev_store = spawn_rev_store_actor(doc_id, pool.clone(), server.clone());
let DocRevision { rev_id, delta } = fetch_document(rev_store.clone()).await?;
log::info!("😁 Document delta: {:?}", delta);
let rev_manager = Arc::new(RevisionManager::new(doc_id, rev_id, rev_store));
let document = spawn_doc_edit_actor(doc_id, delta, pool.clone());
let doc_id = doc_id.to_string();
@ -262,7 +260,8 @@ impl ClientEditDoc {
},
WsDataType::PullRev => {
let range = RevisionRange::try_from(bytes)?;
let _ = self.rev_manager.send_revisions(range).await?;
let revision = self.rev_manager.send_revisions(range).await?;
self.ws.send(revision.into());
},
WsDataType::NewDocUser => {},
WsDataType::Acked => {
@ -290,7 +289,10 @@ impl WsDocumentHandler for EditDocWsHandler {
fn state_changed(&self, state: &WsState) {
match state {
WsState::Init => {},
WsState::Connected(_) => self.0.notify_open_doc(),
WsState::Connected(_) => {
log::debug!("ws state changed: {}", state);
self.0.notify_open_doc()
},
WsState::Disconnected(_) => {},
}
}

View File

@ -4,8 +4,10 @@ use crate::{
services::{doc::revision::store_actor::RevisionCmd, util::RevIdCounter, ws::DocumentWebSocket},
};
use flowy_infra::future::ResultFuture;
use flowy_ot::core::Delta;
use flowy_ot::core::{Delta, OperationTransformable};
use crate::entities::doc::RevType;
use flowy_ot::errors::OTError;
use tokio::sync::{mpsc, oneshot};
pub struct DocRevision {
@ -63,14 +65,35 @@ impl RevisionManager {
pub fn update_rev_id(&self, rev_id: i64) { self.rev_id_counter.set(rev_id); }
pub async fn send_revisions(&self, range: RevisionRange) -> Result<(), DocError> {
pub async fn send_revisions(&self, range: RevisionRange) -> Result<Revision, DocError> {
debug_assert!(&range.doc_id == &self.doc_id);
let (ret, rx) = oneshot::channel();
let sender = self.rev_store.clone();
let _ = sender.send(RevisionCmd::SendRevisions { range, ret }).await;
let _revisions = rx.await.map_err(internal_error)??;
let cmd = RevisionCmd::GetRevisions {
range: range.clone(),
ret,
};
let _ = sender.send(cmd).await;
let revisions = rx.await.map_err(internal_error)??;
let mut new_delta = Delta::new();
for revision in revisions {
match Delta::from_bytes(revision.delta_data) {
Ok(delta) => {
new_delta = new_delta.compose(&delta)?;
},
Err(_) => {},
}
}
unimplemented!()
// Ok(())
let delta_data = new_delta.to_bytes();
let revision = Revision::new(
range.from_rev_id,
range.to_rev_id,
delta_data.to_vec(),
&self.doc_id,
RevType::Remote,
);
Ok(revision)
}
}

View File

@ -23,7 +23,7 @@ pub enum RevisionCmd {
AckRevision {
rev_id: RevId,
},
SendRevisions {
GetRevisions {
range: RevisionRange,
ret: oneshot::Sender<DocResult<Vec<Revision>>>,
},
@ -84,7 +84,7 @@ impl RevisionStoreActor {
RevisionCmd::AckRevision { rev_id } => {
self.handle_revision_acked(rev_id).await;
},
RevisionCmd::SendRevisions { range, ret } => {
RevisionCmd::GetRevisions { range, ret } => {
let result = revs_in_range(&self.doc_id, self.persistence.clone(), range).await;
let _ = ret.send(result);
},

View File

@ -81,8 +81,9 @@ impl RevTableSql {
) -> Result<Vec<Revision>, DocError> {
let rev_tables = dsl::rev_table
.filter(rev_id.ge(range.from_rev_id))
.filter(rev_id.lt(range.to_rev_id))
.filter(rev_id.le(range.to_rev_id))
.filter(doc_id.eq(doc_id_s))
.order(rev_id.asc())
.load::<RevTable>(conn)?;
let revisions = rev_tables