mirror of
https://github.com/AppFlowy-IO/AppFlowy.git
synced 2024-08-30 18:12:39 +00:00
correct the document delta by removing unnecessary op
This commit is contained in:
@ -114,6 +114,7 @@ impl EditorCommandQueue {
|
|||||||
let read_guard = self.document.read().await;
|
let read_guard = self.document.read().await;
|
||||||
let mut server_prime: Option<RichTextDelta> = None;
|
let mut server_prime: Option<RichTextDelta> = None;
|
||||||
let client_prime: RichTextDelta;
|
let client_prime: RichTextDelta;
|
||||||
|
// The document is empty if its text is equal to the initial text.
|
||||||
if read_guard.is_empty::<NewlineDoc>() {
|
if read_guard.is_empty::<NewlineDoc>() {
|
||||||
// Do nothing
|
// Do nothing
|
||||||
client_prime = new_delta;
|
client_prime = new_delta;
|
||||||
@ -122,7 +123,6 @@ impl EditorCommandQueue {
|
|||||||
client_prime = c_prime;
|
client_prime = c_prime;
|
||||||
server_prime = Some(s_prime);
|
server_prime = Some(s_prime);
|
||||||
}
|
}
|
||||||
|
|
||||||
drop(read_guard);
|
drop(read_guard);
|
||||||
Ok::<TransformDeltas, CollaborateError>(TransformDeltas {
|
Ok::<TransformDeltas, CollaborateError>(TransformDeltas {
|
||||||
client_prime,
|
client_prime,
|
||||||
|
@ -9,16 +9,12 @@ use flowy_collaboration::{
|
|||||||
doc::DocumentInfo,
|
doc::DocumentInfo,
|
||||||
revision::{RepeatedRevision, Revision, RevisionRange, RevisionState},
|
revision::{RepeatedRevision, Revision, RevisionRange, RevisionState},
|
||||||
},
|
},
|
||||||
util::{md5, pair_rev_id_from_revisions, RevIdCounter},
|
util::{make_delta_from_revisions, md5, pair_rev_id_from_revisions, RevIdCounter},
|
||||||
};
|
};
|
||||||
use flowy_error::FlowyResult;
|
use flowy_error::FlowyResult;
|
||||||
use futures_util::{future, stream, stream::StreamExt};
|
use futures_util::{future, stream, stream::StreamExt};
|
||||||
use lib_infra::future::FutureResult;
|
use lib_infra::future::FutureResult;
|
||||||
use lib_ot::{
|
use lib_ot::{core::Operation, errors::OTError, rich_text::RichTextDelta};
|
||||||
core::{trim, Operation, OperationTransformable},
|
|
||||||
errors::OTError,
|
|
||||||
rich_text::RichTextDelta,
|
|
||||||
};
|
|
||||||
use std::{collections::VecDeque, sync::Arc};
|
use std::{collections::VecDeque, sync::Arc};
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
@ -239,18 +235,8 @@ impl RevisionLoader {
|
|||||||
|
|
||||||
fn mk_doc_from_revisions(doc_id: &str, revisions: Vec<Revision>) -> FlowyResult<DocumentInfo> {
|
fn mk_doc_from_revisions(doc_id: &str, revisions: Vec<Revision>) -> FlowyResult<DocumentInfo> {
|
||||||
let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
|
let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
|
||||||
let mut delta = RichTextDelta::new();
|
let mut delta = make_delta_from_revisions(revisions)?;
|
||||||
for (_, revision) in revisions.into_iter().enumerate() {
|
correct_delta(&mut delta);
|
||||||
match RichTextDelta::from_bytes(revision.delta_data) {
|
|
||||||
Ok(local_delta) => {
|
|
||||||
delta = delta.compose(&local_delta)?;
|
|
||||||
},
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!("Deserialize delta from revision failed: {}", e);
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
correct_delta_if_need(&mut delta);
|
|
||||||
|
|
||||||
Result::<DocumentInfo, FlowyError>::Ok(DocumentInfo {
|
Result::<DocumentInfo, FlowyError>::Ok(DocumentInfo {
|
||||||
doc_id: doc_id.to_owned(),
|
doc_id: doc_id.to_owned(),
|
||||||
@ -260,19 +246,23 @@ fn mk_doc_from_revisions(doc_id: &str, revisions: Vec<Revision>) -> FlowyResult<
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn correct_delta_if_need(delta: &mut RichTextDelta) {
|
// quill-editor requires the delta should end with '\n' and only contains the
|
||||||
trim(delta);
|
// insert operation. The function, correct_delta maybe be removed in the future.
|
||||||
|
fn correct_delta(delta: &mut RichTextDelta) {
|
||||||
if delta.ops.last().is_none() {
|
if let Some(op) = delta.ops.last() {
|
||||||
return;
|
let op_data = op.get_data();
|
||||||
}
|
if !op_data.ends_with('\n') {
|
||||||
let data = delta.ops.last().as_ref().unwrap().get_data();
|
log::warn!("The document must end with newline. Correcting it by inserting newline op");
|
||||||
if !data.ends_with('\n') {
|
|
||||||
log::error!("❌The op must end with newline. Correcting it by inserting newline op");
|
|
||||||
delta.ops.push(Operation::Insert("\n".into()));
|
delta.ops.push(Operation::Insert("\n".into()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(op) = delta.ops.iter().find(|op| !op.is_insert()) {
|
||||||
|
log::warn!("The document can only contains insert operations, but found {:?}", op);
|
||||||
|
delta.ops.retain(|op| op.is_insert());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(feature = "flowy_unit_test")]
|
#[cfg(feature = "flowy_unit_test")]
|
||||||
impl RevisionSyncSequence {
|
impl RevisionSyncSequence {
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
|
@ -130,17 +130,12 @@ impl DocumentWSSinkDataProvider for DocumentWSSinkDataProviderAdapter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn transform_pushed_revisions(
|
async fn transform_pushed_revisions(
|
||||||
revisions: &[Revision],
|
revisions: Vec<Revision>,
|
||||||
edit_cmd: &UnboundedSender<EditorCommand>,
|
edit_cmd: &UnboundedSender<EditorCommand>,
|
||||||
) -> FlowyResult<TransformDeltas> {
|
) -> FlowyResult<TransformDeltas> {
|
||||||
let (ret, rx) = oneshot::channel::<CollaborateResult<TransformDeltas>>();
|
let (ret, rx) = oneshot::channel::<CollaborateResult<TransformDeltas>>();
|
||||||
// Transform the revision
|
let _ = edit_cmd.send(EditorCommand::TransformRevision { revisions, ret });
|
||||||
let _ = edit_cmd.send(EditorCommand::TransformRevision {
|
Ok(rx.await.map_err(internal_error)??)
|
||||||
revisions: revisions.to_vec(),
|
|
||||||
ret,
|
|
||||||
});
|
|
||||||
let transformed_delta = rx.await.map_err(internal_error)??;
|
|
||||||
Ok(transformed_delta)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))]
|
#[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))]
|
||||||
@ -170,7 +165,8 @@ pub(crate) async fn handle_remote_revision(
|
|||||||
let TransformDeltas {
|
let TransformDeltas {
|
||||||
client_prime,
|
client_prime,
|
||||||
server_prime,
|
server_prime,
|
||||||
} = transform_pushed_revisions(&revisions, &edit_cmd_tx).await?;
|
} = transform_pushed_revisions(revisions.clone(), &edit_cmd_tx).await?;
|
||||||
|
|
||||||
match server_prime {
|
match server_prime {
|
||||||
None => {
|
None => {
|
||||||
// The server_prime is None means the client local revisions conflict with the
|
// The server_prime is None means the client local revisions conflict with the
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::sync::Arc;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use backend_service::configuration::ClientServerConfiguration;
|
use backend_service::configuration::ClientServerConfiguration;
|
||||||
@ -293,6 +292,7 @@ struct Session {
|
|||||||
user_id: String,
|
user_id: String,
|
||||||
token: String,
|
token: String,
|
||||||
email: String,
|
email: String,
|
||||||
|
#[serde(default)]
|
||||||
name: String,
|
name: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,15 +45,15 @@ impl RevIdCounter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn make_delta_from_revisions(revisions: Vec<Revision>) -> CollaborateResult<RichTextDelta> {
|
pub fn make_delta_from_revisions(revisions: Vec<Revision>) -> CollaborateResult<RichTextDelta> {
|
||||||
let mut new_delta = RichTextDelta::new();
|
let mut delta = RichTextDelta::new();
|
||||||
for revision in revisions {
|
for revision in revisions {
|
||||||
let delta = RichTextDelta::from_bytes(revision.delta_data).map_err(|e| {
|
let revision_delta = RichTextDelta::from_bytes(revision.delta_data).map_err(|e| {
|
||||||
let err_msg = format!("Deserialize remote revision failed: {:?}", e);
|
let err_msg = format!("Deserialize remote revision failed: {:?}", e);
|
||||||
CollaborateError::internal().context(err_msg)
|
CollaborateError::internal().context(err_msg)
|
||||||
})?;
|
})?;
|
||||||
new_delta = new_delta.compose(&delta)?;
|
delta = delta.compose(&revision_delta)?;
|
||||||
}
|
}
|
||||||
Ok(new_delta)
|
Ok(delta)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn make_delta_from_revision_pb(revisions: Vec<RevisionPB>) -> CollaborateResult<RichTextDelta> {
|
pub fn make_delta_from_revision_pb(revisions: Vec<RevisionPB>) -> CollaborateResult<RichTextDelta> {
|
||||||
|
@ -241,7 +241,7 @@ where
|
|||||||
.next_op_with_len(length)
|
.next_op_with_len(length)
|
||||||
.unwrap_or_else(|| OpBuilder::retain(length).build());
|
.unwrap_or_else(|| OpBuilder::retain(length).build());
|
||||||
|
|
||||||
debug_assert_eq!(op.len(), other_op.len());
|
// debug_assert_eq!(op.len(), other_op.len(), "Composing delta failed,");
|
||||||
|
|
||||||
match (&op, &other_op) {
|
match (&op, &other_op) {
|
||||||
(Operation::Retain(retain), Operation::Retain(other_retain)) => {
|
(Operation::Retain(retain), Operation::Retain(other_retain)) => {
|
||||||
|
Reference in New Issue
Block a user