fix test bugs

This commit is contained in:
appflowy 2022-01-03 12:20:06 +08:00
parent 951584c2ab
commit 661940f728
22 changed files with 367 additions and 297 deletions

2
backend/Cargo.lock generated
View File

@ -2059,6 +2059,7 @@ dependencies = [
name = "lib-ot"
version = "0.1.0"
dependencies = [
"anyhow",
"bytecount",
"bytes",
"dashmap",
@ -2070,6 +2071,7 @@ dependencies = [
"serde_json",
"strum",
"strum_macros",
"thiserror",
"tokio",
"tracing",
]

View File

@ -9,13 +9,12 @@ use flowy_collaboration::protobuf::{
CreateDocParams,
DocumentId,
DocumentInfo,
RepeatedRevision,
RepeatedRevision as RepeatedRevisionPB,
ResetDocumentParams,
Revision,
Revision as RevisionPB,
};
use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta};
use protobuf::Message;
use std::convert::TryInto;
use flowy_collaboration::sync::ServerDocumentManager;
use std::sync::Arc;
@ -46,15 +45,13 @@ pub async fn reset_document(
document_manager: &Arc<ServerDocumentManager>,
mut params: ResetDocumentParams,
) -> Result<(), ServerError> {
let params: flowy_collaboration::entities::doc::ResetDocumentParams = (&mut params).try_into().unwrap();
let mut revisions = params.revisions.into_inner();
if revisions.is_empty() {
let repeated_revision = params.take_revisions();
if repeated_revision.get_items().is_empty() {
return Err(ServerError::payload_none().context("Revisions should not be empty when reset the document"));
}
let doc_id = params.doc_id.clone();
revisions.sort_by(|a, b| a.rev_id.cmp(&b.rev_id));
let _ = document_manager
.handle_document_reset(&doc_id, revisions)
.handle_document_reset(&doc_id, repeated_revision)
.await
.map_err(internal_error)?;
Ok(())
@ -83,14 +80,14 @@ impl std::ops::DerefMut for DocumentKVPersistence {
impl DocumentKVPersistence {
pub(crate) fn new(kv_store: Arc<KVStore>) -> Self { DocumentKVPersistence { inner: kv_store } }
pub(crate) async fn batch_set_revision(&self, revisions: Vec<Revision>) -> Result<(), ServerError> {
pub(crate) async fn batch_set_revision(&self, revisions: Vec<RevisionPB>) -> Result<(), ServerError> {
let items = revisions_to_key_value_items(revisions)?;
self.inner
.transaction(|mut t| Box::pin(async move { t.batch_set(items).await }))
.await
}
pub(crate) async fn get_doc_revisions(&self, doc_id: &str) -> Result<RepeatedRevision, ServerError> {
pub(crate) async fn get_doc_revisions(&self, doc_id: &str) -> Result<RepeatedRevisionPB, ServerError> {
let doc_id = doc_id.to_owned();
let items = self
.inner
@ -103,7 +100,7 @@ impl DocumentKVPersistence {
&self,
doc_id: &str,
rev_ids: T,
) -> Result<RepeatedRevision, ServerError> {
) -> Result<RepeatedRevisionPB, ServerError> {
let rev_ids = rev_ids.into();
let items = match rev_ids {
None => {
@ -154,13 +151,13 @@ impl DocumentKVPersistence {
}
#[inline]
pub fn revisions_to_key_value_items(revisions: Vec<Revision>) -> Result<Vec<KeyValue>, ServerError> {
pub fn revisions_to_key_value_items(revisions: Vec<RevisionPB>) -> Result<Vec<KeyValue>, ServerError> {
let mut items = vec![];
for revision in revisions {
let key = make_revision_key(&revision.doc_id, revision.rev_id);
if revision.delta_data.is_empty() {
return Err(ServerError::internal().context("The delta_data of Revision should not be empty"));
return Err(ServerError::internal().context("The delta_data of RevisionPB should not be empty"));
}
let value = Bytes::from(revision.write_to_bytes().unwrap());
@ -170,14 +167,14 @@ pub fn revisions_to_key_value_items(revisions: Vec<Revision>) -> Result<Vec<KeyV
}
#[inline]
fn key_value_items_to_revisions(items: Vec<KeyValue>) -> RepeatedRevision {
fn key_value_items_to_revisions(items: Vec<KeyValue>) -> RepeatedRevisionPB {
let mut revisions = items
.into_iter()
.filter_map(|kv| parse_from_bytes::<Revision>(&kv.value).ok())
.collect::<Vec<Revision>>();
.filter_map(|kv| parse_from_bytes::<RevisionPB>(&kv.value).ok())
.collect::<Vec<RevisionPB>>();
revisions.sort_by(|a, b| a.rev_id.cmp(&b.rev_id));
let mut repeated_revision = RepeatedRevision::new();
let mut repeated_revision = RepeatedRevisionPB::new();
repeated_revision.set_items(revisions.into());
repeated_revision
}
@ -186,7 +183,7 @@ fn key_value_items_to_revisions(items: Vec<KeyValue>) -> RepeatedRevision {
fn make_revision_key(doc_id: &str, rev_id: i64) -> String { format!("{}:{}", doc_id, rev_id) }
#[inline]
fn make_doc_from_revisions(doc_id: &str, mut revisions: RepeatedRevision) -> Result<DocumentInfo, ServerError> {
fn make_doc_from_revisions(doc_id: &str, mut revisions: RepeatedRevisionPB) -> Result<DocumentInfo, ServerError> {
let revisions = revisions.take_items();
if revisions.is_empty() {
return Err(ServerError::record_not_found().context(format!("{} not exist", doc_id)));

View File

@ -12,7 +12,7 @@ use flowy_collaboration::{
sync::{RevisionUser, ServerDocumentManager, SyncResponse},
};
use futures::stream::StreamExt;
use std::{convert::TryInto, sync::Arc};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
pub enum WSActorMessage {
@ -147,13 +147,10 @@ impl RevisionUser for ServerDocUser {
let msg: WebSocketMessage = data.into();
self.socket.try_send(msg).map_err(internal_error)
},
SyncResponse::NewRevision(revisions) => {
SyncResponse::NewRevision(mut repeated_revision) => {
let kv_store = self.persistence.kv_store();
tokio::task::spawn(async move {
let revisions = revisions
.into_iter()
.map(|revision| revision.try_into().unwrap())
.collect::<Vec<_>>();
let revisions = repeated_revision.take_items().into();
match kv_store.batch_set_revision(revisions).await {
Ok(_) => {},
Err(e) => log::error!("{}", e),

View File

@ -1,21 +1,25 @@
use crate::services::{
document::{
persistence::{create_document, read_document},
ws_actor::{DocumentWebSocketActor, WSActorMessage},
use crate::{
context::FlowyPersistence,
services::{
document::{
persistence::{create_document, read_document, revisions_to_key_value_items},
ws_actor::{DocumentWebSocketActor, WSActorMessage},
},
web_socket::{WSClientData, WebSocketReceiver},
},
web_socket::{WSClientData, WebSocketReceiver},
};
use crate::{context::FlowyPersistence, services::document::persistence::revisions_to_key_value_items};
use backend_service::errors::ServerError;
use flowy_collaboration::{
entities::{
doc::{CreateDocParams, DocumentInfo},
revision::{RepeatedRevision, Revision},
},
entities::doc::DocumentInfo,
errors::CollaborateError,
protobuf::DocumentId,
protobuf::{
CreateDocParams as CreateDocParamsPB,
DocumentId,
RepeatedRevision as RepeatedRevisionPB,
Revision as RevisionPB,
},
sync::{DocumentPersistence, ServerDocumentManager},
util::repeated_revision_from_repeated_revision_pb,
};
use lib_infra::future::BoxResultFuture;
use std::{
@ -95,49 +99,54 @@ impl DocumentPersistence for DocumentPersistenceImpl {
})
}
fn create_doc(&self, doc_id: &str, revisions: Vec<Revision>) -> BoxResultFuture<DocumentInfo, CollaborateError> {
fn create_doc(
&self,
doc_id: &str,
repeated_revision: RepeatedRevisionPB,
) -> BoxResultFuture<DocumentInfo, CollaborateError> {
let kv_store = self.0.kv_store();
let doc_id = doc_id.to_owned();
Box::pin(async move {
let doc = DocumentInfo::from_revisions(&doc_id, revisions.clone())?;
let revisions = repeated_revision_from_repeated_revision_pb(repeated_revision.clone())?.into_inner();
let doc = DocumentInfo::from_revisions(&doc_id, revisions)?;
let doc_id = doc_id.to_owned();
let revisions = RepeatedRevision::new(revisions);
let params = CreateDocParams { id: doc_id, revisions };
let pb_params: flowy_collaboration::protobuf::CreateDocParams = params.try_into().unwrap();
let _ = create_document(&kv_store, pb_params)
let mut params = CreateDocParamsPB::new();
params.set_id(doc_id);
params.set_revisions(repeated_revision);
let _ = create_document(&kv_store, params)
.await
.map_err(server_error_to_collaborate_error)?;
Ok(doc)
})
}
fn get_revisions(&self, doc_id: &str, rev_ids: Vec<i64>) -> BoxResultFuture<Vec<Revision>, CollaborateError> {
fn get_revisions(&self, doc_id: &str, rev_ids: Vec<i64>) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
let kv_store = self.0.kv_store();
let doc_id = doc_id.to_owned();
let f = || async move {
let mut pb = kv_store.batch_get_revisions(&doc_id, rev_ids).await?;
let repeated_revision: RepeatedRevision = (&mut pb).try_into()?;
let revisions = repeated_revision.into_inner();
Ok(revisions)
let mut repeated_revision = kv_store.batch_get_revisions(&doc_id, rev_ids).await?;
Ok(repeated_revision.take_items().into())
};
Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) })
}
fn get_doc_revisions(&self, doc_id: &str) -> BoxResultFuture<Vec<Revision>, CollaborateError> {
fn get_doc_revisions(&self, doc_id: &str) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
let kv_store = self.0.kv_store();
let doc_id = doc_id.to_owned();
let f = || async move {
let mut pb = kv_store.get_doc_revisions(&doc_id).await?;
let repeated_revision: RepeatedRevision = (&mut pb).try_into()?;
let revisions = repeated_revision.into_inner();
Ok(revisions)
let mut repeated_revision = kv_store.get_doc_revisions(&doc_id).await?;
Ok(repeated_revision.take_items().into())
};
Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) })
}
fn reset_document(&self, doc_id: &str, revisions: Vec<Revision>) -> BoxResultFuture<(), CollaborateError> {
fn reset_document(
&self,
doc_id: &str,
mut repeated_revision: RepeatedRevisionPB,
) -> BoxResultFuture<(), CollaborateError> {
let kv_store = self.0.kv_store();
let doc_id = doc_id.to_owned();
let f = || async move {
@ -145,8 +154,8 @@ impl DocumentPersistence for DocumentPersistenceImpl {
.transaction(|mut transaction| {
Box::pin(async move {
let _ = transaction.batch_delete_key_start_with(&doc_id).await?;
// let items = revisions_to_key_value_items(vec![])?;
let _ = transaction.batch_set(vec![]).await?;
let items = revisions_to_key_value_items(repeated_revision.take_items().into())?;
let _ = transaction.batch_set(items).await?;
Ok(())
})
})

View File

@ -9,7 +9,6 @@ use futures_util::{stream, stream::StreamExt};
use std::sync::Arc;
use bytes::Bytes;
use tokio::time::{sleep, Duration};
// use crate::helper::*;
use crate::util::helper::{spawn_server, TestServer};
use flowy_collaboration::{entities::doc::DocumentId, protobuf::ResetDocumentParams};
use lib_ot::rich_text::{RichTextAttribute, RichTextDelta};
@ -28,13 +27,12 @@ pub struct DocumentTest {
}
#[derive(Clone)]
pub enum DocScript {
ClientConnectWS,
ClientInsertText(usize, &'static str),
ClientFormatText(Interval, RichTextAttribute),
ClientOpenDoc,
AssertClient(&'static str),
AssertServer(&'static str, i64),
ServerSaveDocument(String, i64), // delta_json, rev_id
ServerResetDocument(String, i64), // delta_json, rev_id
}
impl DocumentTest {
@ -95,18 +93,10 @@ async fn run_scripts(context: Arc<RwLock<ScriptContext>>, scripts: Vec<DocScript
let fut = async move {
let doc_id = context.read().doc_id.clone();
match script {
DocScript::ClientConnectWS => {
// sleep(Duration::from_millis(300)).await;
let ws_manager = context.read().ws_conn.clone();
let user_session = context.read().client_user_session.clone();
let token = user_session.token().unwrap();
let _ = ws_manager.start(token).await.unwrap();
},
DocScript::ClientOpenDoc => {
context.write().open_doc().await;
},
DocScript::ClientInsertText(index, s) => {
sleep(Duration::from_millis(2000)).await;
context.read().client_editor().insert(index, s).await.unwrap();
},
DocScript::ClientFormatText(interval, attribute) => {
@ -133,7 +123,7 @@ async fn run_scripts(context: Arc<RwLock<ScriptContext>>, scripts: Vec<DocScript
assert_eq(s, &document_info.text);
assert_eq!(document_info.rev_id, rev_id);
},
DocScript::ServerSaveDocument(document_json, rev_id) => {
DocScript::ServerResetDocument(document_json, rev_id) => {
let delta_data = Bytes::from(document_json);
let user_id = context.read().client_user_session.user_id().unwrap();
let md5 = format!("{:x}", md5::compute(&delta_data));
@ -151,9 +141,6 @@ async fn run_scripts(context: Arc<RwLock<ScriptContext>>, scripts: Vec<DocScript
reset_doc(&doc_id, RepeatedRevision::new(vec![revision]), document_manager.get_ref()).await;
sleep(Duration::from_millis(2000)).await;
},
// DocScript::Sleep(sec) => {
// sleep(Duration::from_secs(sec)).await;
// },
}
};
fut_scripts.push(fut);

View File

@ -7,13 +7,13 @@ use lib_ot::{core::Interval, rich_text::RichTextAttribute};
// │ Server │ │ Client │
// └─────────┘ └─────────┘
// ┌────────────────┐ │ │ ┌────────────────┐
// │ops: [] rev: 0 │◀┼───── ws ────┼─┤ops: [] rev: 0 │
// │ops: [] rev: 0 │◀┼──── Ping ─────┼─┤ops: [] rev: 0 │
// └────────────────┘ │ │ └────────────────┘
// ┌────────────────────┐ │ │ ┌────────────────────┐
// │ops: ["abc"] rev: 1 │◀┼───── ws ────┼─│ops: ["abc"] rev: 1 │
// │ops: ["abc"] rev: 1 │◀┼───ClientPush ───┼─│ops: ["abc"] rev: 1 │
// └────────────────────┘ │ │ └────────────────────┘
// ┌──────────────────────────┐ │ │ ┌──────────────────────┐
// │ops: ["abc", "123"] rev: 2│◀┼───── ws ────┼─│ops: ["123"] rev: 2 │
// │ops: ["abc", "123"] rev: 2│◀┼── ClientPush ───┼─│ops: ["123"] rev: 2 │
// └──────────────────────────┘ │ │ └──────────────────────┘
// │ │
#[actix_rt::test]
@ -46,7 +46,6 @@ async fn delta_sync_multi_revs() {
async fn delta_sync_while_editing_with_attribute() {
let test = DocumentTest::new().await;
test.run_scripts(vec![
DocScript::ClientConnectWS,
DocScript::ClientOpenDoc,
DocScript::ClientInsertText(0, "abc"),
DocScript::ClientFormatText(Interval::new(0, 3), RichTextAttribute::Bold(true)),
@ -64,26 +63,25 @@ async fn delta_sync_while_editing_with_attribute() {
// ┌─────────┐ ┌─────────┐
// │ Server │ │ Client │
// └─────────┘ └─────────┘
// ┌──────────────────────────┐ │ │
// │ops: ["123", "456"] rev: 2│ │ │
// └──────────────────────────┘ │ │
// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │
// ops: ["123", "456"] rev: 3│ │ │
// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │
// │ │
// ◀── http request ─┤ Open doc
// ◀───── Ping ───┤ Open doc
// │ │
// │ │ ┌──────────────────────────┐
// ├──http response──┼─▶│ops: ["123", "456"] rev: 2
// │ │ └──────────────────────────┘
// │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
// ├───ServerPush────┼─▶ ops: ["123", "456"] rev: 3
// │ │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
#[actix_rt::test]
async fn delta_sync_with_http_request() {
async fn delta_sync_with_server_push() {
let test = DocumentTest::new().await;
let mut document = Document::new::<NewlineDoc>();
document.insert(0, "123").unwrap();
document.insert(3, "456").unwrap();
let json = document.to_json();
test.run_scripts(vec![
DocScript::ServerSaveDocument(json, 3),
DocScript::ServerResetDocument(json, 3),
DocScript::ClientOpenDoc,
DocScript::AssertClient(r#"[{"insert":"123456\n"}]"#),
DocScript::AssertServer(r#"[{"insert":"123456\n"}]"#, 3),
@ -91,8 +89,25 @@ async fn delta_sync_with_http_request() {
.await;
}
#[rustfmt::skip]
// ┌─────────┐ ┌─────────┐
// │ Server │ │ Client │
// └─────────┘ └─────────┘
// ┌ ─ ─ ─ ─ ┐ │ │
// ops: [] │ │
// └ ─ ─ ─ ─ ┘ │ │
// │ │
// ◀───── Ping ───┤ Open doc
// ◀───── Ping ───┤
// ◀───── Ping ───┤
// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │ │
// ops: ["123"], rev: 3 │ │
// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
// ├────ServerPush───▶ ops: ["123"] rev: 3
// │ │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
// │ │
#[actix_rt::test]
async fn delta_sync_with_server_push_delta() {
async fn delta_sync_with_server_push_after_reset_document() {
let test = DocumentTest::new().await;
let mut document = Document::new::<NewlineDoc>();
document.insert(0, "123").unwrap();
@ -100,9 +115,9 @@ async fn delta_sync_with_server_push_delta() {
test.run_scripts(vec![
DocScript::ClientOpenDoc,
DocScript::ServerSaveDocument(json, 3),
DocScript::AssertClient(r#"[{"insert":"123\n\n"}]"#),
DocScript::AssertServer(r#"[{"insert":"123\n\n"}]"#, 3),
DocScript::ServerResetDocument(json, 3),
DocScript::AssertClient(r#"[{"insert":"123\n"}]"#),
DocScript::AssertServer(r#"[{"insert":"123\n"}]"#, 3),
])
.await;
}
@ -113,31 +128,23 @@ async fn delta_sync_with_server_push_delta() {
// └─────────┘ └─────────┘
// │ │
// │ │
// ◀── http request ─┤ Open doc
// │ │
// │ │ ┌───────────────┐
// ├──http response──┼─▶│ops: [] rev: 0 │
// ┌───────────────────┐│ │ └───────────────┘
// │ops: ["123"] rev: 3││ │
// └───────────────────┘│ │ ┌────────────────────┐
// │ │ │ops: ["abc"] rev: 1 │
// │ │ └────────────────────┘
// │ │
// ◀─────────────────┤ start ws connection
// │ │
// ◀─────────────────┤ notify with rev: 1
// ┌───────────────────┐ │ │
// │ops: ["123"] rev: 3│ ├────Push Rev─────▶ transform
// └───────────────────┘ │ │ ┌──────────────────────────┐
// │ │ │ops: ["abc", "123"] rev: 4│
// │ │ └──────────────────────────┘
// ◀────── Ping ─────┤ Open doc
// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │ │
// ops: ["123"] rev: 3 │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │ │ ops: ["abc"] rev: 1 │
// │ │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
// │ │ ┌────────────────────┐
// ◀───ClientPush ───┤ │ops: ["abc"] rev: 1 │
// ┌───────────────────┐ │ │ └────────────────────┘
// │ops: ["123"] rev: 3│ ├────ServerPush───▶ transform
// └───────────────────┘ │ │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
// │ │ ops: ["abc", "123"] rev: 4│
// │ │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
// │ │ ┌────────────────────────────────┐
// compose ◀────Push Rev─────┤ │ops: ["abc", "retain 3"] rev: 4 │
// │ │ └────────────────────────────────┘
// ┌──────────────────────────┐ │
// │ops: ["abc", "123"] rev: 4│ │
// └──────────────────────────┘ │
// │ │
// ◀────ClientPush───┤ │ops: ["retain 3","abc"] rev: 4 │
// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ └────────────────────────────────┘
// ops: ["abc", "123"] rev: 4│ ├────ServerAck────▶
// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │
#[actix_rt::test]
async fn delta_sync_while_local_rev_less_than_server_rev() {
let test = DocumentTest::new().await;
@ -147,40 +154,39 @@ async fn delta_sync_while_local_rev_less_than_server_rev() {
test.run_scripts(vec![
DocScript::ClientOpenDoc,
DocScript::ServerSaveDocument(json, 3),
DocScript::ServerResetDocument(json, 3),
DocScript::ClientInsertText(0, "abc"),
// DocScript::ClientConnectWS,
DocScript::AssertClient(r#"[{"insert":"abc\n123\n"}]"#),
DocScript::AssertServer(r#"[{"insert":"abc\n123\n"}]"#, 4),
DocScript::AssertClient(r#"[{"insert":"abc123\n"}]"#),
DocScript::AssertServer(r#"[{"insert":"abc123\n"}]"#, 4),
])
.await;
}
#[rustfmt::skip]
// ┌─────────┐ ┌─────────┐
// │ Server │ │ Client │
// └─────────┘ └─────────┘
// ┌───────────────────┐ │ │
// │ops: ["123"] rev: 1│ │ │
// └───────────────────┘ │ │
// ◀── http request ─┤ Open doc
// │ │
// │ │ ┌───────────────┐
// ├──http response──┼──▶│ops: [123] rev:
// │ │ └───────────────┘
// │ │ ──────────────────────────────────┐
// │ │ ops: ["123","abc", "efg"] rev: 3 │
// │ │ ──────────────────────────────────┘
// ◀─────────────────┤ start ws connection
//
// ◀─────────────────┤ call notify_open_doc with rev: 3
// │ │
// ├────Pull Rev─────▶
// │ │ ┌──────────────────────────────────┐
// compose ◀────Push Rev─────┤ │ops: ["retain 3", "abcefg"] rev: 3
// ┌──────────────────────────────────┐│ │ └──────────────────────────────────┘
// │ops: ["123","abc", "efg"] rev: 3 ││ │
// └──────────────────────────────────┘│ │
// ┌─────────┐ ┌─────────┐
// │ Server │ │ Client │
// └─────────┘ └─────────┘
// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │ │
// ops: ["123"] rev: 1 │ │
// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │ │
// ◀──── Ping ────┤ Open doc
// │ │
// │ │ ┌──────────────────┐
// ├───ServerPush────▶ │ops: [123] rev: 1
// │ │ └──────────────────┘
// │ │
// │ │ ops: ["123","abc", "efg"] rev: 3 │
// │ │
// │ │ ┌──────────────────────────────┐
// ◀────ClientPush───┤ │ops: [retain 3, "abc"] rev: 2
// ┌──────────────────────────┐ │ │ └──────────────────────────────┘
// │ops: ["123","abc"] rev: 2 │ ├────ServerAck────▶
// └──────────────────────────┘ │ │
// │ │ ┌──────────────────────────────┐
// ◀────ClientPush───┤ │ops: [retain 6, "efg"] rev: 3
// ┌──────────────────────────────────┐ │ │ └──────────────────────────────┘
// │ops: ["123","abc", "efg"] rev: 3 │ ├────ServerAck────▶
// └──────────────────────────────────┘ │ │
#[actix_rt::test]
async fn delta_sync_while_local_rev_greater_than_server_rev() {
let test = DocumentTest::new().await;
@ -189,12 +195,11 @@ async fn delta_sync_while_local_rev_greater_than_server_rev() {
let json = document.to_json();
test.run_scripts(vec![
DocScript::ServerSaveDocument(json, 1),
DocScript::ServerResetDocument(json, 1),
DocScript::ClientOpenDoc,
DocScript::AssertClient(r#"[{"insert":"123\n"}]"#),
DocScript::ClientInsertText(3, "abc"),
DocScript::ClientInsertText(6, "efg"),
DocScript::ClientConnectWS,
DocScript::AssertClient(r#"[{"insert":"123abcefg\n"}]"#),
DocScript::AssertServer(r#"[{"insert":"123abcefg\n"}]"#, 3),
])

View File

@ -1,8 +1,7 @@
use bytes::Bytes;
use flowy_collaboration::entities::{
doc::{DocumentDelta, DocumentId},
prelude::Revision,
revision::RepeatedRevision,
revision::{RepeatedRevision, Revision},
};
use flowy_database::SqliteConnection;
use futures::{FutureExt, StreamExt};

View File

@ -16,6 +16,7 @@ use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, RwLock};
pub(crate) struct EditorCommandQueue {
#[allow(dead_code)]
doc_id: String,
document: Arc<RwLock<Document>>,
receiver: Option<mpsc::UnboundedReceiver<EditorCommand>>,

View File

@ -7,10 +7,9 @@ use dashmap::DashMap;
use flowy_collaboration::{
entities::{
doc::DocumentInfo,
prelude::pair_rev_id_from_revisions,
revision::{RepeatedRevision, Revision, RevisionRange, RevisionState},
},
util::{md5, RevIdCounter},
util::{md5, pair_rev_id_from_revisions, RevIdCounter},
};
use flowy_error::FlowyResult;
use futures_util::{future, stream, stream::StreamExt};

View File

@ -1,7 +1,15 @@
use bytes::Bytes;
use dashmap::DashMap;
use flowy_collaboration::{entities::prelude::*, errors::CollaborateError, sync::*};
// use flowy_net::services::ws::*;
use flowy_collaboration::{
entities::{
doc::DocumentInfo,
ws::{DocumentClientWSData, DocumentClientWSDataType},
},
errors::CollaborateError,
protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
sync::*,
util::repeated_revision_from_repeated_revision_pb,
};
use lib_infra::future::BoxResultFuture;
use lib_ws::{WSModule, WebSocketRawMessage};
use std::{
@ -84,18 +92,27 @@ impl DocumentPersistence for MockDocServerPersistence {
})
}
fn create_doc(&self, doc_id: &str, revisions: Vec<Revision>) -> BoxResultFuture<DocumentInfo, CollaborateError> {
fn create_doc(
&self,
doc_id: &str,
repeated_revision: RepeatedRevisionPB,
) -> BoxResultFuture<DocumentInfo, CollaborateError> {
let doc_id = doc_id.to_owned();
Box::pin(async move { DocumentInfo::from_revisions(&doc_id, revisions) })
Box::pin(async move {
let repeated_revision = repeated_revision_from_repeated_revision_pb(repeated_revision)?;
DocumentInfo::from_revisions(&doc_id, repeated_revision.into_inner())
})
}
fn get_revisions(&self, _doc_id: &str, _rev_ids: Vec<i64>) -> BoxResultFuture<Vec<Revision>, CollaborateError> {
fn get_revisions(&self, _doc_id: &str, _rev_ids: Vec<i64>) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
Box::pin(async move { Ok(vec![]) })
}
fn get_doc_revisions(&self, _doc_id: &str) -> BoxResultFuture<Vec<Revision>, CollaborateError> { unimplemented!() }
fn get_doc_revisions(&self, _doc_id: &str) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
unimplemented!()
}
fn reset_document(&self, _doc_id: &str, _revisions: Vec<Revision>) -> BoxResultFuture<(), CollaborateError> {
fn reset_document(&self, _doc_id: &str, _revisions: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> {
unimplemented!()
}
}

2
shared-lib/Cargo.lock generated
View File

@ -1086,6 +1086,7 @@ dependencies = [
name = "lib-ot"
version = "0.1.0"
dependencies = [
"anyhow",
"bytecount",
"bytes",
"dashmap",
@ -1097,6 +1098,7 @@ dependencies = [
"serde_json",
"strum",
"strum_macros",
"thiserror",
"tokio",
"tracing",
]

View File

@ -1,10 +1,9 @@
use crate::response::FlowyResponse;
use bytes::Bytes;
use serde::{Deserialize, Serialize, __private::Formatter};
use serde_repr::*;
use std::{fmt, fmt::Debug};
use crate::response::FlowyResponse;
pub type Result<T> = std::result::Result<T, ServerError>;
#[derive(thiserror::Error, Debug, Serialize, Deserialize, Clone)]

View File

@ -2,7 +2,3 @@ pub mod doc;
pub mod parser;
pub mod revision;
pub mod ws;
pub mod prelude {
pub use crate::entities::{doc::*, parser::*, revision::*, ws::*};
}

View File

@ -117,21 +117,6 @@ impl RepeatedRevision {
pub fn into_inner(self) -> Vec<Revision> { self.items }
}
pub fn pair_rev_id_from_revisions(revisions: &[Revision]) -> (i64, i64) {
let mut rev_id = 0;
revisions.iter().for_each(|revision| {
if rev_id < revision.rev_id {
rev_id = revision.rev_id;
}
});
if rev_id > 0 {
(rev_id - 1, rev_id)
} else {
(0, rev_id)
}
}
#[derive(Clone, Debug, ProtoBuf, Default)]
pub struct RevId {
#[pb(index = 1)]

View File

@ -93,8 +93,7 @@ pub struct DocumentServerWSData {
pub struct DocumentServerWSDataBuilder();
impl DocumentServerWSDataBuilder {
pub fn build_push_message(doc_id: &str, revisions: Vec<Revision>) -> DocumentServerWSData {
let repeated_revision = RepeatedRevision::new(revisions);
pub fn build_push_message(doc_id: &str, repeated_revision: RepeatedRevision) -> DocumentServerWSData {
let bytes: Bytes = repeated_revision.try_into().unwrap();
DocumentServerWSData {
doc_id: doc_id.to_string(),

View File

@ -1,12 +1,8 @@
use crate::{
document::Document,
entities::{
doc::DocumentInfo,
revision::{RepeatedRevision, Revision},
ws::DocumentServerWSDataBuilder,
},
entities::{doc::DocumentInfo, ws::DocumentServerWSDataBuilder},
errors::{internal_error, CollaborateError, CollaborateResult},
protobuf::DocumentClientWSData,
protobuf::{DocumentClientWSData, RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
sync::{RevisionSynchronizer, RevisionUser, SyncResponse},
};
use async_stream::stream;
@ -14,29 +10,41 @@ use dashmap::DashMap;
use futures::stream::StreamExt;
use lib_infra::future::BoxResultFuture;
use lib_ot::rich_text::RichTextDelta;
use std::{convert::TryFrom, fmt::Debug, sync::Arc};
use std::{collections::HashMap, fmt::Debug, sync::Arc};
use tokio::{
sync::{mpsc, oneshot},
sync::{mpsc, oneshot, RwLock},
task::spawn_blocking,
};
pub trait DocumentPersistence: Send + Sync + Debug {
fn read_doc(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError>;
fn create_doc(&self, doc_id: &str, revisions: Vec<Revision>) -> BoxResultFuture<DocumentInfo, CollaborateError>;
fn get_revisions(&self, doc_id: &str, rev_ids: Vec<i64>) -> BoxResultFuture<Vec<Revision>, CollaborateError>;
fn get_doc_revisions(&self, doc_id: &str) -> BoxResultFuture<Vec<Revision>, CollaborateError>;
fn reset_document(&self, doc_id: &str, revisions: Vec<Revision>) -> BoxResultFuture<(), CollaborateError>;
fn create_doc(
&self,
doc_id: &str,
repeated_revision: RepeatedRevisionPB,
) -> BoxResultFuture<DocumentInfo, CollaborateError>;
fn get_revisions(&self, doc_id: &str, rev_ids: Vec<i64>) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError>;
fn get_doc_revisions(&self, doc_id: &str) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError>;
fn reset_document(
&self,
doc_id: &str,
repeated_revision: RepeatedRevisionPB,
) -> BoxResultFuture<(), CollaborateError>;
}
pub struct ServerDocumentManager {
open_doc_map: DashMap<String, Arc<OpenDocHandle>>,
open_doc_map: Arc<RwLock<HashMap<String, Arc<OpenDocHandle>>>>,
persistence: Arc<dyn DocumentPersistence>,
}
impl ServerDocumentManager {
pub fn new(persistence: Arc<dyn DocumentPersistence>) -> Self {
Self {
open_doc_map: DashMap::new(),
open_doc_map: Arc::new(RwLock::new(HashMap::new())),
persistence,
}
}
@ -46,28 +54,20 @@ impl ServerDocumentManager {
user: Arc<dyn RevisionUser>,
mut client_data: DocumentClientWSData,
) -> Result<(), CollaborateError> {
let mut pb = client_data.take_revisions();
let repeated_revision = client_data.take_revisions();
let cloned_user = user.clone();
let ack_id = rev_id_from_str(&client_data.id)?;
let doc_id = client_data.doc_id;
let revisions = spawn_blocking(move || {
let repeated_revision = RepeatedRevision::try_from(&mut pb)?;
let revisions = repeated_revision.into_inner();
Ok::<Vec<Revision>, CollaborateError>(revisions)
})
.await
.map_err(internal_error)??;
let result = match self.get_document_handler(&doc_id).await {
None => {
let _ = self.create_document(&doc_id, revisions).await.map_err(|e| {
let _ = self.create_document(&doc_id, repeated_revision).await.map_err(|e| {
CollaborateError::internal().context(format!("Server crate document failed: {}", e))
})?;
Ok(())
},
Some(handler) => {
let _ = handler.apply_revisions(user, revisions).await?;
let _ = handler.apply_revisions(user, repeated_revision).await?;
Ok(())
},
};
@ -99,59 +99,64 @@ impl ServerDocumentManager {
}
}
pub async fn handle_document_reset(&self, doc_id: &str, revisions: Vec<Revision>) -> Result<(), CollaborateError> {
pub async fn handle_document_reset(
&self,
doc_id: &str,
mut repeated_revision: RepeatedRevisionPB,
) -> Result<(), CollaborateError> {
repeated_revision.mut_items().sort_by(|a, b| a.rev_id.cmp(&b.rev_id));
match self.get_document_handler(doc_id).await {
None => {
tracing::warn!("Document:{} doesn't exist, ignore document reset", doc_id);
Ok(())
},
Some(handler) => {
let _ = handler.apply_document_reset(revisions).await?;
let _ = handler.apply_document_reset(repeated_revision).await?;
Ok(())
},
}
}
async fn get_document_handler(&self, doc_id: &str) -> Option<Arc<OpenDocHandle>> {
match self.open_doc_map.get(doc_id).map(|ctx| ctx.clone()) {
Some(edit_doc) => Some(edit_doc),
None => {
let f = || async {
let doc = self.persistence.read_doc(doc_id).await?;
let handler = self.cache_document(doc).await.map_err(internal_error)?;
Ok::<Arc<OpenDocHandle>, CollaborateError>(handler)
};
match f().await {
Ok(handler) => Some(handler),
Err(e) => {
log::error!("{}", e);
None
},
}
},
if let Some(handler) = self.open_doc_map.read().await.get(doc_id).cloned() {
return Some(handler);
}
let mut write_guard = self.open_doc_map.write().await;
let doc = self.persistence.read_doc(doc_id).await.unwrap();
let handler = self.create_document_handler(doc).await.map_err(internal_error).unwrap();
write_guard.insert(doc_id.to_owned(), handler.clone());
drop(write_guard);
Some(handler)
}
#[tracing::instrument(level = "debug", skip(self, revisions), err)]
#[tracing::instrument(level = "debug", skip(self, repeated_revision), err)]
async fn create_document(
&self,
doc_id: &str,
revisions: Vec<Revision>,
repeated_revision: RepeatedRevisionPB,
) -> Result<Arc<OpenDocHandle>, CollaborateError> {
let doc = self.persistence.create_doc(doc_id, revisions).await?;
let handler = self.cache_document(doc).await?;
let doc = self.persistence.create_doc(doc_id, repeated_revision).await?;
let handler = self.create_document_handler(doc).await?;
self.open_doc_map
.write()
.await
.insert(doc_id.to_owned(), handler.clone());
Ok(handler)
}
async fn cache_document(&self, doc: DocumentInfo) -> Result<Arc<OpenDocHandle>, CollaborateError> {
let doc_id = doc.doc_id.clone();
async fn create_document_handler(&self, doc: DocumentInfo) -> Result<Arc<OpenDocHandle>, CollaborateError> {
let persistence = self.persistence.clone();
let handle = spawn_blocking(|| OpenDocHandle::new(doc, persistence))
.await
.map_err(|e| CollaborateError::internal().context(format!("Create open doc handler failed: {}", e)))?;
let handle = Arc::new(handle?);
self.open_doc_map.insert(doc_id, handle.clone());
Ok(handle)
Ok(Arc::new(handle?))
}
}
impl std::ops::Drop for ServerDocumentManager {
fn drop(&mut self) {
log::debug!("ServerDocumentManager was drop");
}
}
@ -177,24 +182,24 @@ impl OpenDocHandle {
})
}
#[tracing::instrument(level = "debug", skip(self, user, revisions), err)]
#[tracing::instrument(level = "debug", skip(self, user, repeated_revision), err)]
async fn apply_revisions(
&self,
user: Arc<dyn RevisionUser>,
revisions: Vec<Revision>,
repeated_revision: RepeatedRevisionPB,
) -> Result<(), CollaborateError> {
let (ret, rx) = oneshot::channel();
let persistence = self.persistence.clone();
self.users.insert(user.user_id(), user.clone());
let msg = DocumentCommand::ApplyRevisions {
user,
revisions,
repeated_revision,
persistence,
ret,
};
let _ = self.send(msg, rx).await?;
Ok(())
let result = self.send(msg, rx).await?;
result
}
#[tracing::instrument(level = "debug", skip(self, user), err)]
@ -208,21 +213,21 @@ impl OpenDocHandle {
rev_id,
ret,
};
let _ = self.send(msg, rx).await?;
Ok(())
let result = self.send(msg, rx).await?;
result
}
#[tracing::instrument(level = "debug", skip(self, revisions), err)]
async fn apply_document_reset(&self, revisions: Vec<Revision>) -> Result<(), CollaborateError> {
#[tracing::instrument(level = "debug", skip(self, repeated_revision), err)]
async fn apply_document_reset(&self, repeated_revision: RepeatedRevisionPB) -> Result<(), CollaborateError> {
let (ret, rx) = oneshot::channel();
let persistence = self.persistence.clone();
let msg = DocumentCommand::Reset {
persistence,
revisions,
repeated_revision,
ret,
};
let _ = self.send(msg, rx).await?;
Ok(())
let result = self.send(msg, rx).await?;
result
}
async fn send<T>(&self, msg: DocumentCommand, rx: oneshot::Receiver<T>) -> CollaborateResult<T> {
@ -237,6 +242,7 @@ impl OpenDocHandle {
impl std::ops::Drop for OpenDocHandle {
fn drop(&mut self) {
//
log::debug!("{} OpenDocHandle was drop", self.doc_id);
}
}
@ -245,7 +251,7 @@ impl std::ops::Drop for OpenDocHandle {
enum DocumentCommand {
ApplyRevisions {
user: Arc<dyn RevisionUser>,
revisions: Vec<Revision>,
repeated_revision: RepeatedRevisionPB,
persistence: Arc<dyn DocumentPersistence>,
ret: oneshot::Sender<CollaborateResult<()>>,
},
@ -257,7 +263,7 @@ enum DocumentCommand {
},
Reset {
persistence: Arc<dyn DocumentPersistence>,
revisions: Vec<Revision>,
repeated_revision: RepeatedRevisionPB,
ret: oneshot::Sender<CollaborateResult<()>>,
},
}
@ -305,13 +311,13 @@ impl DocumentCommandQueue {
match msg {
DocumentCommand::ApplyRevisions {
user,
revisions,
repeated_revision,
persistence,
ret,
} => {
let result = self
.synchronizer
.sync_revisions(user, revisions, persistence)
.sync_revisions(user, repeated_revision, persistence)
.await
.map_err(internal_error);
let _ = ret.send(result);
@ -331,12 +337,12 @@ impl DocumentCommandQueue {
},
DocumentCommand::Reset {
persistence,
revisions,
repeated_revision,
ret,
} => {
let result = self
.synchronizer
.reset(persistence, revisions)
.reset(persistence, repeated_revision)
.await
.map_err(internal_error);
let _ = ret.send(result);

View File

@ -1,14 +1,14 @@
use crate::{
document::Document,
entities::{
revision::{Revision, RevisionRange},
revision::RevisionRange,
ws::{DocumentServerWSData, DocumentServerWSDataBuilder},
},
errors::CollaborateError,
protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
sync::DocumentPersistence,
util::*,
};
use crate::util::make_delta_from_revisions;
use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta};
use parking_lot::RwLock;
use std::{
@ -30,7 +30,7 @@ pub enum SyncResponse {
Pull(DocumentServerWSData),
Push(DocumentServerWSData),
Ack(DocumentServerWSData),
NewRevision(Vec<Revision>),
NewRevision(RepeatedRevisionPB),
}
pub struct RevisionSynchronizer {
@ -49,24 +49,25 @@ impl RevisionSynchronizer {
}
}
#[tracing::instrument(level = "debug", skip(self, user, revisions, persistence), err)]
#[tracing::instrument(level = "debug", skip(self, user, repeated_revision, persistence), err)]
pub async fn sync_revisions(
&self,
user: Arc<dyn RevisionUser>,
revisions: Vec<Revision>,
repeated_revision: RepeatedRevisionPB,
persistence: Arc<dyn DocumentPersistence>,
) -> Result<(), CollaborateError> {
let doc_id = self.doc_id.clone();
if revisions.is_empty() {
if repeated_revision.get_items().is_empty() {
// Return all the revisions to client
let revisions = persistence.get_doc_revisions(&doc_id).await?;
let data = DocumentServerWSDataBuilder::build_push_message(&doc_id, revisions);
let repeated_revision = repeated_revision_from_revision_pbs(revisions)?;
let data = DocumentServerWSDataBuilder::build_push_message(&doc_id, repeated_revision);
user.receive(SyncResponse::Push(data));
return Ok(());
}
let server_base_rev_id = self.rev_id.load(SeqCst);
let first_revision = revisions.first().unwrap().clone();
let first_revision = repeated_revision.get_items().first().unwrap().clone();
if self.is_applied_before(&first_revision, &persistence).await {
// Server has received this revision before, so ignore the following revisions
return Ok(());
@ -77,10 +78,10 @@ impl RevisionSynchronizer {
let server_rev_id = next(server_base_rev_id);
if server_base_rev_id == first_revision.base_rev_id || server_rev_id == first_revision.rev_id {
// The rev is in the right order, just compose it.
for revision in &revisions {
for revision in repeated_revision.get_items() {
let _ = self.compose_revision(revision)?;
}
user.receive(SyncResponse::NewRevision(revisions));
user.receive(SyncResponse::NewRevision(repeated_revision));
} else {
// The server document is outdated, pull the missing revision from the client.
let range = RevisionRange {
@ -110,16 +111,17 @@ impl RevisionSynchronizer {
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, user, persistence), err)]
#[tracing::instrument(level = "debug", skip(self, user, persistence), fields(server_rev_id), err)]
pub async fn pong(
&self,
user: Arc<dyn RevisionUser>,
persistence: Arc<dyn DocumentPersistence>,
rev_id: i64,
client_rev_id: i64,
) -> Result<(), CollaborateError> {
let doc_id = self.doc_id.clone();
let server_base_rev_id = self.rev_id.load(SeqCst);
match server_base_rev_id.cmp(&rev_id) {
let server_rev_id = self.rev_id();
tracing::Span::current().record("server_rev_id", &server_rev_id);
match server_rev_id.cmp(&client_rev_id) {
Ordering::Less => tracing::error!(
"[Pong] Client should not send ping and the server should pull the revisions from the client"
),
@ -128,8 +130,8 @@ impl RevisionSynchronizer {
// The client document is outdated. Transform the client revision delta and then
// send the prime delta to the client. Client should compose the this prime
// delta.
let from_rev_id = rev_id;
let to_rev_id = server_base_rev_id;
let from_rev_id = client_rev_id;
let to_rev_id = server_rev_id;
tracing::trace!("[Pong]: Push revisions to user");
let _ = self
.push_revisions_to_user(user, persistence, from_rev_id, to_rev_id)
@ -139,24 +141,27 @@ impl RevisionSynchronizer {
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, revisions), err)]
#[tracing::instrument(level = "debug", skip(self, repeated_revision, persistence), fields(doc_id), err)]
pub async fn reset(
&self,
persistence: Arc<dyn DocumentPersistence>,
revisions: Vec<Revision>,
repeated_revision: RepeatedRevisionPB,
) -> Result<(), CollaborateError> {
let doc_id = self.doc_id.clone();
let _ = persistence.reset_document(&doc_id, revisions.clone()).await?;
let delta = make_delta_from_revisions(revisions)?;
let new_document = Document::from_delta(delta);
*self.document.write() = new_document;
tracing::Span::current().record("doc_id", &doc_id.as_str());
let revisions: Vec<RevisionPB> = repeated_revision.get_items().to_vec();
let (_, rev_id) = pair_rev_id_from_revision_pbs(&revisions);
let delta = make_delta_from_revision_pb(revisions)?;
let _ = persistence.reset_document(&doc_id, repeated_revision).await?;
*self.document.write() = Document::from_delta(delta);
let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id));
Ok(())
}
pub fn doc_json(&self) -> String { self.document.read().to_json() }
fn compose_revision(&self, revision: &Revision) -> Result<(), CollaborateError> {
fn compose_revision(&self, revision: &RevisionPB) -> Result<(), CollaborateError> {
let delta = RichTextDelta::from_bytes(&revision.delta_data)?;
let _ = self.compose_delta(delta)?;
let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id));
@ -164,7 +169,7 @@ impl RevisionSynchronizer {
}
#[tracing::instrument(level = "debug", skip(self, revision))]
fn transform_revision(&self, revision: &Revision) -> Result<(RichTextDelta, RichTextDelta), CollaborateError> {
fn transform_revision(&self, revision: &RevisionPB) -> Result<(RichTextDelta, RichTextDelta), CollaborateError> {
let cli_delta = RichTextDelta::from_bytes(&revision.delta_data)?;
let result = self.document.read().delta().transform(&cli_delta)?;
Ok(result)
@ -184,10 +189,9 @@ impl RevisionSynchronizer {
Ok(())
}
#[allow(dead_code)]
pub(crate) fn rev_id(&self) -> i64 { self.rev_id.load(SeqCst) }
async fn is_applied_before(&self, new_revision: &Revision, persistence: &Arc<dyn DocumentPersistence>) -> bool {
async fn is_applied_before(&self, new_revision: &RevisionPB, persistence: &Arc<dyn DocumentPersistence>) -> bool {
if let Ok(revisions) = persistence.get_revisions(&self.doc_id, vec![new_revision.rev_id]).await {
if let Some(revision) = revisions.first() {
if revision.md5 == new_revision.md5 {
@ -223,15 +227,15 @@ impl RevisionSynchronizer {
};
tracing::debug!("Push revision: {} -> {} to client", from, to);
let data = DocumentServerWSDataBuilder::build_push_message(&self.doc_id, revisions);
user.receive(SyncResponse::Push(data));
match repeated_revision_from_revision_pbs(revisions) {
Ok(repeated_revision) => {
let data = DocumentServerWSDataBuilder::build_push_message(&self.doc_id, repeated_revision);
user.receive(SyncResponse::Push(data));
},
Err(e) => tracing::error!("{}", e),
}
}
}
#[inline]
fn next(rev_id: i64) -> i64 { rev_id + 1 }
// #[inline]
// fn md5<T: AsRef<[u8]>>(data: T) -> String {
// let md5 = format!("{:x}", md5::compute(data));
// md5
// }

View File

@ -1,12 +1,16 @@
use crate::{
entities::revision::Revision,
entities::revision::{RepeatedRevision, Revision},
errors::{CollaborateError, CollaborateResult},
protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
};
use lib_ot::{
core::{OperationTransformable, NEW_LINE, WHITESPACE},
rich_text::RichTextDelta,
};
use std::sync::atomic::{AtomicI64, Ordering::SeqCst};
use std::{
convert::TryInto,
sync::atomic::{AtomicI64, Ordering::SeqCst},
};
#[inline]
pub fn find_newline(s: &str) -> Option<usize> { s.find(NEW_LINE) }
@ -51,3 +55,64 @@ pub fn make_delta_from_revisions(revisions: Vec<Revision>) -> CollaborateResult<
}
Ok(new_delta)
}
pub fn make_delta_from_revision_pb(revisions: Vec<RevisionPB>) -> CollaborateResult<RichTextDelta> {
let mut new_delta = RichTextDelta::new();
for revision in revisions {
let delta = RichTextDelta::from_bytes(revision.delta_data).map_err(|e| {
let err_msg = format!("Deserialize remote revision failed: {:?}", e);
CollaborateError::internal().context(err_msg)
})?;
new_delta = new_delta.compose(&delta)?;
}
Ok(new_delta)
}
pub fn repeated_revision_from_revision_pbs(revisions: Vec<RevisionPB>) -> CollaborateResult<RepeatedRevision> {
let repeated_revision_pb = repeated_revision_pb_from_revisions(revisions);
repeated_revision_from_repeated_revision_pb(repeated_revision_pb)
}
pub fn repeated_revision_pb_from_revisions(revisions: Vec<RevisionPB>) -> RepeatedRevisionPB {
let mut repeated_revision_pb = RepeatedRevisionPB::new();
repeated_revision_pb.set_items(revisions.into());
repeated_revision_pb
}
pub fn repeated_revision_from_repeated_revision_pb(
mut repeated_revision: RepeatedRevisionPB,
) -> CollaborateResult<RepeatedRevision> {
(&mut repeated_revision)
.try_into()
.map_err(|e| CollaborateError::internal().context(format!("Cast repeated revision failed: {:?}", e)))
}
pub fn pair_rev_id_from_revision_pbs(revisions: &[RevisionPB]) -> (i64, i64) {
let mut rev_id = 0;
revisions.iter().for_each(|revision| {
if rev_id < revision.rev_id {
rev_id = revision.rev_id;
}
});
if rev_id > 0 {
(rev_id - 1, rev_id)
} else {
(0, rev_id)
}
}
pub fn pair_rev_id_from_revisions(revisions: &[Revision]) -> (i64, i64) {
let mut rev_id = 0;
revisions.iter().for_each(|revision| {
if rev_id < revision.rev_id {
rev_id = revision.rev_id;
}
});
if rev_id > 0 {
(rev_id - 1, rev_id)
} else {
(0, rev_id)
}
}

View File

@ -68,7 +68,6 @@ impl UpdateUserRequest {
#[derive(ProtoBuf, Default, Clone, Debug)]
pub struct UpdateUserParams {
// TODO: remove user id
#[pb(index = 1)]
pub id: String,

View File

@ -13,6 +13,8 @@ serde = { version = "1.0", features = ["derive"] }
tokio = {version = "1", features = ["sync"]}
dashmap = "4.0"
md5 = "0.7.0"
anyhow = "1.0"
thiserror = "1.0"
serde_json = {version = "1.0"}
derive_more = {version = "0.99", features = ["display"]}

View File

@ -2,6 +2,7 @@ use crate::{
core::{operation::*, DeltaIter, FlowyStr, Interval, OperationTransformable, MAX_IV_LEN},
errors::{ErrorBuilder, OTError, OTErrorCode},
};
use bytes::Bytes;
use serde::de::DeserializeOwned;
use std::{
@ -12,7 +13,7 @@ use std::{
str::FromStr,
};
// Opti: optimize the memory usage with Arc_mut or Cow
// TODO: optimize the memory usage with Arc_mut or Cow
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Delta<T: Attributes> {
pub ops: Vec<Operation<T>>,
@ -298,7 +299,7 @@ where
next_op1 = ops1.next();
},
(_, Some(Operation::Insert(o_insert))) => {
let composed_attrs = transform_op_attribute(&next_op1, &next_op2);
let composed_attrs = transform_op_attribute(&next_op1, &next_op2)?;
a_prime.retain(o_insert.count_of_code_units(), composed_attrs.clone());
b_prime.insert(&o_insert.s, composed_attrs);
next_op2 = ops2.next();
@ -310,7 +311,7 @@ where
return Err(ErrorBuilder::new(OTErrorCode::IncompatibleLength).build());
},
(Some(Operation::Retain(retain)), Some(Operation::Retain(o_retain))) => {
let composed_attrs = transform_op_attribute(&next_op1, &next_op2);
let composed_attrs = transform_op_attribute(&next_op1, &next_op2)?;
match retain.cmp(&o_retain) {
Ordering::Less => {
a_prime.retain(retain.n, composed_attrs.clone());
@ -456,17 +457,20 @@ fn invert_from_other<T: Attributes>(
});
}
fn transform_op_attribute<T: Attributes>(left: &Option<Operation<T>>, right: &Option<Operation<T>>) -> T {
fn transform_op_attribute<T: Attributes>(
left: &Option<Operation<T>>,
right: &Option<Operation<T>>,
) -> Result<T, OTError> {
if left.is_none() {
if right.is_none() {
return T::default();
return Ok(T::default());
}
return right.as_ref().unwrap().get_attributes();
return Ok(right.as_ref().unwrap().get_attributes());
}
let left = left.as_ref().unwrap().get_attributes();
let right = right.as_ref().unwrap().get_attributes();
// TODO: It's ok to unwrap?
left.transform(&right).unwrap().0
// TODO: replace with anyhow and thiserror.
Ok(left.transform(&right)?.0)
}
impl<T> Delta<T>

View File

@ -1,6 +1,6 @@
use std::{error::Error, fmt, fmt::Debug, str::Utf8Error};
use std::{fmt, fmt::Debug, str::Utf8Error};
#[derive(Clone, Debug)]
#[derive(thiserror::Error, Clone, Debug)]
pub struct OTError {
pub code: OTErrorCode,
pub msg: String,
@ -44,10 +44,6 @@ impl fmt::Display for OTError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "incompatible lengths") }
}
impl Error for OTError {
fn source(&self) -> Option<&(dyn Error + 'static)> { None }
}
impl std::convert::From<serde_json::Error> for OTError {
fn from(error: serde_json::Error) -> Self { ErrorBuilder::new(OTErrorCode::SerdeError).error(error).build() }
}