refactor ot

This commit is contained in:
appflowy 2021-09-22 14:42:14 +08:00
parent 45e74e48a2
commit 79560dd161
29 changed files with 267 additions and 390 deletions

View File

@ -1,6 +1,7 @@
use bcrypt::{hash, verify, DEFAULT_COST};
use flowy_net::errors::{ErrorCode, ServerError};
#[allow(dead_code)]
pub fn uuid() -> String { uuid::Uuid::new_v4().to_string() }
pub fn hash_password(plain: &str) -> Result<String, ServerError> {

View File

@ -42,6 +42,7 @@ impl NewAppSqlBuilder {
self
}
#[allow(dead_code)]
pub fn last_view_id(mut self, view_id: &str) -> Self {
self.table.last_view_id = view_id.to_string();
self

View File

@ -102,8 +102,8 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSClient {
self.hb = Instant::now();
ctx.pong(&msg);
},
Ok(ws::Message::Pong(msg)) => {
log::debug!("Receive {} pong {:?}", &self.session_id, &msg);
Ok(ws::Message::Pong(_msg)) => {
// log::debug!("Receive {} pong {:?}", &self.session_id, &msg);
self.hb = Instant::now();
},
Ok(ws::Message::Binary(bin)) => {

View File

@ -103,6 +103,7 @@ async fn user_update_email() {
assert_eq!(user.email, email);
}
#[allow(dead_code)]
async fn sign_up_user(server: &TestServer) -> SignUpResponse {
let email = "annie@appflowy.io";
let password = "HelloWorld123!";

View File

@ -42,7 +42,12 @@ impl WsTest {
pub async fn run_scripts(&mut self) {
let addr = self.server.ws_addr();
self.ws_controller.write().connect(addr).unwrap().await;
self.ws_controller
.write()
.connect(addr)
.unwrap()
.await
.unwrap();
}
}

View File

@ -1,6 +1,6 @@
use crate::{
entities::doc::{ApplyChangesetParams, CreateDocParams, Doc, QueryDocParams, SaveDocParams},
errors::{internal_error, DocError},
entities::doc::{ApplyChangesetParams, CreateDocParams, Doc, QueryDocParams},
errors::DocError,
services::{doc_controller::DocController, open_doc::OpenedDocManager, server::construct_doc_server, ws::WsManager},
};
use bytes::Bytes;

View File

@ -1,5 +1,5 @@
mod manager;
mod open_doc;
pub use manager::*;
pub(crate) use manager::*;
pub use open_doc::*;

View File

@ -52,10 +52,16 @@ impl OpenedDoc {
let mut write_guard = self.document.write();
let _ = write_guard.apply_changeset(data.clone())?;
self.ws_sender.send_data(data);
match self.ws_sender.send_data(data) {
Ok(_) => {},
Err(e) => {
// TODO: save to local and retry
log::error!("Send delta failed: {:?}", e);
},
}
// Opti: strategy to save the document
let mut save = SaveDocParams {
let save = SaveDocParams {
id: self.id.0.clone(),
data: write_guard.to_bytes(),
};

View File

@ -61,7 +61,7 @@ impl Document {
{
let new_delta: Delta = changeset.try_into()?;
log::debug!("Delta changeset: {}", new_delta);
self.add_delta(&new_delta);
let _ = self.add_delta(&new_delta)?;
log::debug!("Document: {}", self.to_json());
Ok(())
}

View File

@ -14,8 +14,8 @@ impl InsertExt for DefaultInsertAttribute {
// Enable each line split by "\n" remains the block attributes. for example:
// insert "\n" to "123456" at index 3
//
// [{"insert":"123"},{"insert":"\n","attributes":{"header":"1"}},
// {"insert":"456"},{"insert":"\n","attributes":{"header":"1"}}]
// [{"insert":"123"},{"insert":"\n","attributes":{"header":1}},
// {"insert":"456"},{"insert":"\n","attributes":{"header":1}}]
if text.ends_with(NEW_LINE) {
match iter.last() {
None => {},

View File

@ -1,10 +1,11 @@
use crate::core::{Attribute, AttributeKey, AttributeValue, Operation};
use crate::{
core::{Attribute, AttributeKey, AttributeValue, Operation, OperationTransformable},
errors::OTError,
};
use std::{collections::HashMap, fmt};
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Attributes {
// #[serde(skip_serializing_if = "HashMap::is_empty")]
// #[serde(flatten)]
pub(crate) inner: HashMap<AttributeKey, AttributeValue>,
}
@ -84,6 +85,56 @@ impl Attributes {
}
}
impl OperationTransformable for Attributes {
fn compose(&self, other: &Self) -> Result<Self, OTError>
where
Self: Sized,
{
let mut attributes = self.clone();
attributes.extend(other.clone());
Ok(attributes)
}
fn transform(&self, other: &Self) -> Result<(Self, Self), OTError>
where
Self: Sized,
{
let a = self.iter().fold(Attributes::new(), |mut new_attributes, (k, v)| {
if !other.contains_key(k) {
new_attributes.insert(k.clone(), v.clone());
}
new_attributes
});
let b = other.iter().fold(Attributes::new(), |mut new_attributes, (k, v)| {
if !self.contains_key(k) {
new_attributes.insert(k.clone(), v.clone());
}
new_attributes
});
Ok((a, b))
}
fn invert(&self, other: &Self) -> Self {
let base_inverted = other.iter().fold(Attributes::new(), |mut attributes, (k, v)| {
if other.get(k) != self.get(k) && self.contains_key(k) {
attributes.insert(k.clone(), v.clone());
}
attributes
});
let inverted = self.iter().fold(base_inverted, |mut attributes, (k, _)| {
if other.get(k) != self.get(k) && !other.contains_key(k) {
attributes.delete(k);
}
attributes
});
return inverted;
}
}
impl std::ops::Deref for Attributes {
type Target = HashMap<AttributeKey, AttributeValue>;
@ -94,88 +145,6 @@ impl std::ops::DerefMut for Attributes {
fn deref_mut(&mut self) -> &mut Self::Target { &mut self.inner }
}
pub(crate) fn attributes_from(operation: &Option<Operation>) -> Option<Attributes> {
match operation {
None => None,
Some(operation) => Some(operation.get_attributes()),
}
}
pub fn compose_operation(left: &Option<Operation>, right: &Option<Operation>) -> Attributes {
if left.is_none() && right.is_none() {
return Attributes::default();
}
let attr_left = attributes_from(left);
let attr_right = attributes_from(right);
if attr_left.is_none() {
return attr_right.unwrap();
}
if attr_right.is_none() {
return attr_left.unwrap();
}
let left = attr_left.unwrap();
let right = attr_right.unwrap();
log::trace!("compose attributes: a: {:?}, b: {:?}", left, right);
let attr = merge_attributes(left, right);
log::trace!("compose attributes result: {:?}", attr);
attr
}
pub fn compose_attributes(left: Attributes, right: Attributes) -> Attributes {
log::trace!("compose attributes: a: {:?}, b: {:?}", left, right);
let attr = merge_attributes(left, right);
log::trace!("compose attributes result: {:?}", attr);
attr
}
pub fn transform_operation(left: &Option<Operation>, right: &Option<Operation>) -> Attributes {
let attr_l = attributes_from(left);
let attr_r = attributes_from(right);
if attr_l.is_none() {
if attr_r.is_none() {
return Attributes::default();
}
return attr_r.unwrap();
}
let left = attr_l.unwrap();
let right = attr_r.unwrap();
left.iter().fold(Attributes::new(), |mut new_attributes, (k, v)| {
if !right.contains_key(k) {
new_attributes.insert(k.clone(), v.clone());
}
new_attributes
})
}
pub fn invert_attributes(attr: Attributes, base: Attributes) -> Attributes {
let base_inverted = base.iter().fold(Attributes::new(), |mut attributes, (k, v)| {
if base.get(k) != attr.get(k) && attr.contains_key(k) {
attributes.insert(k.clone(), v.clone());
}
attributes
});
let inverted = attr.iter().fold(base_inverted, |mut attributes, (k, _)| {
if base.get(k) != attr.get(k) && !base.contains_key(k) {
attributes.delete(k);
}
attributes
});
return inverted;
}
pub fn merge_attributes(mut attributes: Attributes, other: Attributes) -> Attributes {
attributes.extend(other);
attributes
}
pub fn attributes_except_header(op: &Operation) -> Attributes {
let mut attributes = op.get_attributes();
attributes.remove(AttributeKey::Header);

View File

@ -71,6 +71,7 @@ impl<'de> Deserialize<'de> for Attributes {
{
let mut attributes = Attributes::new();
while let Some(key) = map.next_key::<AttributeKey>()? {
log::warn!("{:?}", key);
let value = map.next_value::<AttributeValue>()?;
attributes.add_kv(key, value);
}
@ -102,7 +103,10 @@ impl<'de> Deserialize<'de> for AttributeValue {
struct AttributeValueVisitor;
impl<'de> Visitor<'de> for AttributeValueVisitor {
type Value = AttributeValue;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { formatter.write_str("Can't find any visit handler") }
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
//
formatter.write_str("bool, usize or string")
}
fn visit_bool<E>(self, value: bool) -> Result<Self::Value, E>
where
E: de::Error,

View File

@ -1,10 +1,9 @@
use crate::{
core::{attributes::*, operation::*, DeltaIter, Interval, MAX_IV_LEN},
core::{attributes::*, operation::*, DeltaIter, Interval, OperationTransformable, MAX_IV_LEN},
errors::{ErrorBuilder, OTError, OTErrorCode},
};
use bytecount::num_chars;
use bytes::Bytes;
use serde::__private::TryFrom;
use std::{
cmp::{min, Ordering},
fmt,
@ -14,7 +13,7 @@ use std::{
};
// Opti: optimize the memory usage with Arc_mut or Cow
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Delta {
pub ops: Vec<Operation>,
pub base_len: usize,
@ -84,8 +83,6 @@ impl FromIterator<Operation> for Delta {
impl Delta {
pub fn new() -> Self { Self::default() }
pub fn to_json(&self) -> String { serde_json::to_string(self).unwrap_or("".to_owned()) }
pub fn from_json(json: &str) -> Result<Self, OTError> {
let delta: Delta = serde_json::from_str(json).map_err(|e| {
log::trace!("Deserialize failed: {:?}", e);
@ -95,6 +92,8 @@ impl Delta {
Ok(delta)
}
pub fn to_json(&self) -> String { serde_json::to_string(self).unwrap_or("".to_owned()) }
pub fn from_bytes(bytes: Vec<u8>) -> Result<Self, OTError> {
let json = str::from_utf8(&bytes)?;
Self::from_json(json)
@ -179,12 +178,79 @@ impl Delta {
}
}
/// Merges the operation with `other` into one operation while preserving
/// the changes of both. Or, in other words, for each input string S and a
/// pair of consecutive operations A and B.
/// `apply(apply(S, A), B) = apply(S, compose(A, B))`
/// must hold.
pub fn compose(&self, other: &Self) -> Result<Self, OTError> {
/// Applies an operation to a string, returning a new string.
pub fn apply(&self, s: &str) -> Result<String, OTError> {
if num_chars(s.as_bytes()) != self.base_len {
return Err(ErrorBuilder::new(OTErrorCode::IncompatibleLength).build());
}
let mut new_s = String::new();
let chars = &mut s.chars();
for op in &self.ops {
match &op {
Operation::Retain(retain) => {
for c in chars.take(retain.n as usize) {
new_s.push(c);
}
},
Operation::Delete(delete) => {
for _ in 0..*delete {
chars.next();
}
},
Operation::Insert(insert) => {
new_s += &insert.s;
},
}
}
Ok(new_s)
}
/// Computes the inverse of an operation. The inverse of an operation is the
/// operation that reverts the effects of the operation
pub fn invert_str(&self, s: &str) -> Self {
let mut inverted = Delta::default();
let chars = &mut s.chars();
for op in &self.ops {
match &op {
Operation::Retain(retain) => {
inverted.retain(retain.n, Attributes::default());
// TODO: use advance_by instead, but it's unstable now
// chars.advance_by(retain.num)
for _ in 0..retain.n {
chars.next();
}
},
Operation::Insert(insert) => {
inverted.delete(insert.num_chars());
},
Operation::Delete(delete) => {
inverted.insert(&chars.take(*delete as usize).collect::<String>(), op.get_attributes());
},
}
}
inverted
}
/// Checks if this operation has no effect.
#[inline]
pub fn is_noop(&self) -> bool {
match self.ops.as_slice() {
[] => true,
[Operation::Retain(_)] => true,
_ => false,
}
}
pub fn is_empty(&self) -> bool { self.ops.is_empty() }
pub fn extend(&mut self, other: Self) { other.ops.into_iter().for_each(|op| self.add(op)); }
}
impl OperationTransformable for Delta {
fn compose(&self, other: &Self) -> Result<Self, OTError>
where
Self: Sized,
{
let mut new_delta = Delta::default();
let mut iter = DeltaIter::new(self);
let mut other_iter = DeltaIter::new(other);
@ -206,18 +272,18 @@ impl Delta {
);
let op = iter.next_op_with_len(length).unwrap_or(OpBuilder::retain(length).build());
let other_op = other_iter.next_op_with_len(length).unwrap_or(OpBuilder::retain(length).build());
debug_assert_eq!(op.len(), other_op.len());
match (&op, &other_op) {
(Operation::Retain(retain), Operation::Retain(other_retain)) => {
let composed_attrs = compose_attributes(retain.attributes.clone(), other_retain.attributes.clone());
let composed_attrs = retain.attributes.compose(&other_retain.attributes)?;
new_delta.add(OpBuilder::retain(retain.n).attributes(composed_attrs).build())
},
(Operation::Insert(insert), Operation::Retain(other_retain)) => {
let mut composed_attrs = compose_attributes(insert.attributes.clone(), other_retain.attributes.clone());
let mut composed_attrs = insert.attributes.compose(&other_retain.attributes)?;
composed_attrs.remove_empty();
new_delta.add(OpBuilder::insert(op.get_data()).attributes(composed_attrs).build())
},
@ -235,137 +301,10 @@ impl Delta {
Ok(new_delta)
}
#[deprecated(note = "The same as compose except it requires the target_len of self must equal to other's base_len")]
pub fn compose2(&self, other: &Self) -> Result<Self, OTError> {
if self.target_len != other.base_len {
return Err(ErrorBuilder::new(OTErrorCode::IncompatibleLength).build());
}
let mut new_delta = Delta::default();
let mut ops1 = self.ops.iter().cloned();
let mut ops2 = other.ops.iter().cloned();
let mut next_op1 = ops1.next();
let mut next_op2 = ops2.next();
loop {
match (&next_op1, &next_op2) {
(None, None) => break,
(Some(Operation::Delete(i)), _) => {
new_delta.delete(*i);
next_op1 = ops1.next();
},
(_, Some(Operation::Insert(o_insert))) => {
new_delta.insert(&o_insert.s, o_insert.attributes.clone());
next_op2 = ops2.next();
},
(None, _) | (_, None) => {
return Err(ErrorBuilder::new(OTErrorCode::IncompatibleLength).build());
},
(Some(Operation::Retain(retain)), Some(Operation::Retain(o_retain))) => {
let composed_attrs = compose_operation(&next_op1, &next_op2);
log::trace!("[retain:{} - retain:{}]: {:?}", retain.n, o_retain.n, composed_attrs);
match retain.cmp(&o_retain) {
Ordering::Less => {
new_delta.retain(retain.n, composed_attrs);
next_op2 = Some(
OpBuilder::retain(o_retain.n - retain.n)
.attributes(o_retain.attributes.clone())
.build(),
);
next_op1 = ops1.next();
},
std::cmp::Ordering::Equal => {
new_delta.retain(retain.n, composed_attrs);
next_op1 = ops1.next();
next_op2 = ops2.next();
},
std::cmp::Ordering::Greater => {
new_delta.retain(o_retain.n, composed_attrs);
next_op1 = Some(OpBuilder::retain(retain.n - o_retain.n).build());
next_op2 = ops2.next();
},
}
},
(Some(Operation::Insert(insert)), Some(Operation::Delete(o_num))) => {
match (num_chars(insert.as_bytes()) as usize).cmp(o_num) {
Ordering::Less => {
next_op2 = Some(
OpBuilder::delete(*o_num - num_chars(insert.as_bytes()) as usize)
.attributes(insert.attributes.clone())
.build(),
);
next_op1 = ops1.next();
},
Ordering::Equal => {
next_op1 = ops1.next();
next_op2 = ops2.next();
},
Ordering::Greater => {
next_op1 = Some(OpBuilder::insert(&insert.chars().skip(*o_num as usize).collect::<String>()).build());
next_op2 = ops2.next();
},
}
},
(Some(Operation::Insert(insert)), Some(Operation::Retain(o_retain))) => {
let mut composed_attrs = compose_operation(&next_op1, &next_op2);
composed_attrs.remove_empty();
log::trace!("compose: [{} - {}], composed_attrs: {}", insert, o_retain, composed_attrs);
match (insert.num_chars()).cmp(o_retain) {
Ordering::Less => {
new_delta.insert(&insert.s, composed_attrs.clone());
next_op2 = Some(
OpBuilder::retain(o_retain.n - insert.num_chars())
.attributes(o_retain.attributes.clone())
.build(),
);
next_op1 = ops1.next();
},
Ordering::Equal => {
new_delta.insert(&insert.s, composed_attrs);
next_op1 = ops1.next();
next_op2 = ops2.next();
},
Ordering::Greater => {
let chars = &mut insert.chars();
new_delta.insert(&chars.take(o_retain.n as usize).collect::<String>(), composed_attrs);
next_op1 = Some(OpBuilder::insert(&chars.collect::<String>()).build());
next_op2 = ops2.next();
},
}
},
(Some(Operation::Retain(retain)), Some(Operation::Delete(o_num))) => match retain.cmp(&o_num) {
Ordering::Less => {
new_delta.delete(retain.n);
next_op2 = Some(OpBuilder::delete(*o_num - retain.n).build());
next_op1 = ops1.next();
},
Ordering::Equal => {
new_delta.delete(*o_num);
next_op2 = ops2.next();
next_op1 = ops1.next();
},
Ordering::Greater => {
new_delta.delete(*o_num);
next_op1 = Some(OpBuilder::retain(retain.n - *o_num).build());
next_op2 = ops2.next();
},
},
};
}
Ok(new_delta)
}
/// Transforms two operations A and B that happened concurrently and
/// produces two operations A' and B' (in an array) such that
/// `apply(apply(S, A), B') = apply(apply(S, B), A')`.
/// This function is the heart of OT.
///
/// # Error
///
/// Returns an `OTError` if the operations cannot be transformed due to
/// length conflicts.
pub fn transform(&self, other: &Self) -> Result<(Self, Self), OTError> {
fn transform(&self, other: &Self) -> Result<(Self, Self), OTError>
where
Self: Sized,
{
if self.base_len != other.base_len {
return Err(ErrorBuilder::new(OTErrorCode::IncompatibleLength).build());
}
@ -388,7 +327,7 @@ impl Delta {
next_op1 = ops1.next();
},
(_, Some(Operation::Insert(o_insert))) => {
let composed_attrs = transform_operation(&next_op1, &next_op2);
let composed_attrs = transform_op_attribute(&next_op1, &next_op2);
a_prime.retain(o_insert.num_chars(), composed_attrs.clone());
b_prime.insert(&o_insert.s, composed_attrs);
next_op2 = ops2.next();
@ -400,7 +339,7 @@ impl Delta {
return Err(ErrorBuilder::new(OTErrorCode::IncompatibleLength).build());
},
(Some(Operation::Retain(retain)), Some(Operation::Retain(o_retain))) => {
let composed_attrs = transform_operation(&next_op1, &next_op2);
let composed_attrs = transform_op_attribute(&next_op1, &next_op2);
match retain.cmp(&o_retain) {
Ordering::Less => {
a_prime.retain(retain.n, composed_attrs.clone());
@ -480,66 +419,7 @@ impl Delta {
Ok((a_prime, b_prime))
}
/// Applies an operation to a string, returning a new string.
///
/// # Error
///
/// Returns an error if the operation cannot be applied due to length
/// conflicts.
pub fn apply(&self, s: &str) -> Result<String, OTError> {
if num_chars(s.as_bytes()) != self.base_len {
return Err(ErrorBuilder::new(OTErrorCode::IncompatibleLength).build());
}
let mut new_s = String::new();
let chars = &mut s.chars();
for op in &self.ops {
match &op {
Operation::Retain(retain) => {
for c in chars.take(retain.n as usize) {
new_s.push(c);
}
},
Operation::Delete(delete) => {
for _ in 0..*delete {
chars.next();
}
},
Operation::Insert(insert) => {
new_s += &insert.s;
},
}
}
Ok(new_s)
}
/// Computes the inverse of an operation. The inverse of an operation is the
/// operation that reverts the effects of the operation
pub fn invert_str(&self, s: &str) -> Self {
let mut inverted = Delta::default();
let chars = &mut s.chars();
for op in &self.ops {
match &op {
Operation::Retain(retain) => {
inverted.retain(retain.n, Attributes::default());
// TODO: use advance_by instead, but it's unstable now
// chars.advance_by(retain.num)
for _ in 0..retain.n {
chars.next();
}
},
Operation::Insert(insert) => {
inverted.delete(insert.num_chars());
},
Operation::Delete(delete) => {
inverted.insert(&chars.take(*delete as usize).collect::<String>(), op.get_attributes());
},
}
}
inverted
}
pub fn invert(&self, other: &Delta) -> Delta {
fn invert(&self, other: &Self) -> Self {
let mut inverted = Delta::default();
if other.is_empty() {
return inverted;
@ -575,20 +455,6 @@ impl Delta {
log::trace!("🌛invert result: {}", inverted);
inverted
}
/// Checks if this operation has no effect.
#[inline]
pub fn is_noop(&self) -> bool {
match self.ops.as_slice() {
[] => true,
[Operation::Retain(_)] => true,
_ => false,
}
}
pub fn is_empty(&self) -> bool { self.ops.is_empty() }
pub fn extend(&mut self, other: Self) { other.ops.into_iter().for_each(|op| self.add(op)); }
}
fn invert_from_other(base: &mut Delta, other: &Delta, operation: &Operation, start: usize, end: usize) {
@ -599,15 +465,13 @@ fn invert_from_other(base: &mut Delta, other: &Delta, operation: &Operation, sta
log::trace!("invert delete: {} by add {}", n, other_op);
base.add(other_op);
},
Operation::Retain(retain) => {
Operation::Retain(_retain) => {
log::trace!(
"invert attributes: {:?}, {:?}",
operation.get_attributes(),
other_op.get_attributes()
);
let inverted_attrs = invert_attributes(operation.get_attributes(), other_op.get_attributes());
log::trace!("invert attributes result: {:?}", inverted_attrs);
log::trace!("invert retain: {} by retain len: {}, {}", retain, other_op.len(), inverted_attrs);
let inverted_attrs = operation.get_attributes().invert(&other_op.get_attributes());
base.retain(other_op.len(), inverted_attrs);
},
Operation::Insert(_) => {
@ -615,3 +479,16 @@ fn invert_from_other(base: &mut Delta, other: &Delta, operation: &Operation, sta
},
});
}
fn transform_op_attribute(left: &Option<Operation>, right: &Option<Operation>) -> Attributes {
if left.is_none() {
if right.is_none() {
return Attributes::default();
}
return right.as_ref().unwrap().get_attributes();
}
let left = left.as_ref().unwrap().get_attributes();
let right = right.as_ref().unwrap().get_attributes();
// TODO: It's ok to unwrap?
left.transform(&right).unwrap().0
}

View File

@ -3,7 +3,28 @@ mod delta;
mod interval;
mod operation;
use crate::errors::OTError;
pub use attributes::*;
pub use delta::*;
pub use interval::*;
pub use operation::*;
pub trait OperationTransformable {
/// Merges the operation with `other` into one operation while preserving
/// the changes of both.
fn compose(&self, other: &Self) -> Result<Self, OTError>
where
Self: Sized;
/// Transforms two operations a and b that happened concurrently and
/// produces two operations a' and b'.
/// (a', b') = a.transform(b)
/// a.compose(b') = b.compose(a')
fn transform(&self, other: &Self) -> Result<(Self, Self), OTError>
where
Self: Sized;
/// Inverts the operation with `other` to produces undo operation.
/// undo = a.invert(b)
/// new_b = b.compose(a)
/// b = new_b.compose(undo)
fn invert(&self, other: &Self) -> Self;
}

View File

@ -8,7 +8,7 @@ use std::{
str::Chars,
};
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum Operation {
Delete(usize),
Retain(Retain),
@ -147,7 +147,7 @@ impl fmt::Display for Operation {
}
}
#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct Retain {
#[serde(rename(serialize = "retain", deserialize = "retain"))]
pub n: usize,
@ -168,7 +168,6 @@ impl fmt::Display for Retain {
impl Retain {
pub fn merge_or_new(&mut self, n: usize, attributes: Attributes) -> Option<Operation> {
log::trace!("merge_retain_or_new_op: len: {:?}, l: {} - r: {}", n, self.attributes, attributes);
if self.attributes == attributes {
self.n += n;
None
@ -199,7 +198,7 @@ impl DerefMut for Retain {
fn deref_mut(&mut self) -> &mut Self::Target { &mut self.n }
}
#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct Insert {
#[serde(rename(serialize = "insert", deserialize = "insert"))]
pub s: String,
@ -255,5 +254,4 @@ impl std::convert::From<String> for Insert {
impl std::convert::From<&str> for Insert {
fn from(s: &str) -> Self { Insert::from(s.to_owned()) }
}
fn is_empty(attributes: &Attributes) -> bool { attributes.is_empty() }

View File

@ -380,11 +380,11 @@ fn attributes_header_insert_newline_at_middle() {
let ops = vec![
Insert(0, "123456", 0),
Header(0, Interval::new(0, 6), 1),
AssertOpsJson(0, r#"[{"insert":"123456"},{"insert":"\n","attributes":{"header":"1"}}]"#),
AssertOpsJson(0, r#"[{"insert":"123456"},{"insert":"\n","attributes":{"header":1}}]"#),
Insert(0, "\n", 3),
AssertOpsJson(
0,
r#"[{"insert":"123"},{"insert":"\n","attributes":{"header":"1"}},{"insert":"456"},{"insert":"\n","attributes":{"header":"1"}}]"#,
r#"[{"insert":"123"},{"insert":"\n","attributes":{"header":1}},{"insert":"456"},{"insert":"\n","attributes":{"header":1}}]"#,
),
];
@ -399,17 +399,17 @@ fn attributes_header_insert_double_newline_at_middle() {
Insert(0, "\n", 3),
AssertOpsJson(
0,
r#"[{"insert":"123"},{"insert":"\n","attributes":{"header":"1"}},{"insert":"456"},{"insert":"\n","attributes":{"header":"1"}}]"#,
r#"[{"insert":"123"},{"insert":"\n","attributes":{"header":1}},{"insert":"456"},{"insert":"\n","attributes":{"header":1}}]"#,
),
Insert(0, "\n", 4),
AssertOpsJson(
0,
r#"[{"insert":"123"},{"insert":"\n\n","attributes":{"header":"1"}},{"insert":"456"},{"insert":"\n","attributes":{"header":"1"}}]"#,
r#"[{"insert":"123"},{"insert":"\n\n","attributes":{"header":1}},{"insert":"456"},{"insert":"\n","attributes":{"header":1}}]"#,
),
Insert(0, "\n", 4),
AssertOpsJson(
0,
r#"[{"insert":"123"},{"insert":"\n\n","attributes":{"header":"1"}},{"insert":"\n456"},{"insert":"\n","attributes":{"header":"1"}}]"#,
r#"[{"insert":"123"},{"insert":"\n\n","attributes":{"header":1}},{"insert":"\n456"},{"insert":"\n","attributes":{"header":1}}]"#,
),
];
@ -424,7 +424,7 @@ fn attributes_header_insert_newline_at_trailing() {
Insert(0, "\n", 6),
AssertOpsJson(
0,
r#"[{"insert":"123456"},{"insert":"\n","attributes":{"header":"1"}},{"insert":"\n"}]"#,
r#"[{"insert":"123456"},{"insert":"\n","attributes":{"header":1}},{"insert":"\n"}]"#,
),
];
@ -440,7 +440,7 @@ fn attributes_header_insert_double_newline_at_trailing() {
Insert(0, "\n", 7),
AssertOpsJson(
0,
r#"[{"insert":"123456"},{"insert":"\n","attributes":{"header":"1"}},{"insert":"\n\n"}]"#,
r#"[{"insert":"123456"},{"insert":"\n","attributes":{"header":1}},{"insert":"\n\n"}]"#,
),
];
@ -703,10 +703,10 @@ fn attributes_preserve_header_format_on_merge() {
Insert(0, NEW_LINE, 3),
AssertOpsJson(
0,
r#"[{"insert":"123"},{"insert":"\n","attributes":{"header":"1"}},{"insert":"456"},{"insert":"\n","attributes":{"header":"1"}}]"#,
r#"[{"insert":"123"},{"insert":"\n","attributes":{"header":1}},{"insert":"456"},{"insert":"\n","attributes":{"header":1}}]"#,
),
Delete(0, Interval::new(3, 4)),
AssertOpsJson(0, r#"[{"insert":"123456"},{"insert":"\n","attributes":{"header":"1"}}]"#),
AssertOpsJson(0, r#"[{"insert":"123456"},{"insert":"\n","attributes":{"header":1}}]"#),
];
TestBuilder::new().run_script::<FlowyDoc>(ops);

View File

@ -176,7 +176,6 @@ impl TestBuilder {
TestOp::AssertOpsJson(delta_i, expected) => {
let delta_i_json = self.documents[*delta_i].to_json();
let expected_delta: Delta = serde_json::from_str(expected).unwrap();
let target_delta: Delta = serde_json::from_str(&delta_i_json).unwrap();

View File

@ -91,8 +91,8 @@ fn delta_deserialize_null_test() {
#[test]
fn document_insert_serde_test() {
let mut document = Document::new::<PlainDoc>();
document.insert(0, "\n");
document.insert(0, "123");
document.insert(0, "\n").unwrap();
document.insert(0, "123").unwrap();
let json = document.to_json();
assert_eq!(r#"[{"insert":"123\n"}]"#, json);
assert_eq!(r#"[{"insert":"123\n"}]"#, Document::from_json(&json).unwrap().to_json());

View File

@ -252,7 +252,7 @@ fn history_header_added_undo() {
Redo(0),
AssertOpsJson(
0,
r#"[{"insert":"123"},{"insert":"\n\n","attributes":{"header":"1"}},{"insert":"456"},{"insert":"\n","attributes":{"header":"1"}}]"#,
r#"[{"insert":"123"},{"insert":"\n\n","attributes":{"header":1}},{"insert":"456"},{"insert":"\n","attributes":{"header":1}}]"#,
),
];

View File

@ -28,10 +28,7 @@ impl DocumentDepsResolver {
let ws_manager = Arc::new(RwLock::new(WsManager::new(sender)));
let ws_handler = Arc::new(WsDocumentResolver {
user: self.user_session.clone(),
inner: ws_manager.clone(),
});
let ws_handler = Arc::new(WsDocumentResolver { inner: ws_manager.clone() });
self.user_session.add_ws_handler(ws_handler);
@ -75,7 +72,6 @@ impl WsSender for WsSenderImpl {
}
struct WsDocumentResolver {
user: Arc<UserSession>,
inner: Arc<RwLock<WsManager>>,
}

View File

@ -283,10 +283,10 @@ impl UserSession {
let addr = format!("{}/{}", flowy_net::config::WS_ADDR.as_str(), token);
let ws_controller = self.ws_controller.clone();
let retry = Retry::new(&addr, move |addr| {
ws_controller.write().connect(addr.to_owned());
let _ = ws_controller.write().connect(addr.to_owned());
});
let _ = self.ws_controller.write().connect_with_retry(addr, retry);
let _ = self.ws_controller.write().connect_with_retry(addr, retry)?;
Ok(())
}
}

View File

@ -8,7 +8,6 @@ use crate::{
OpenViewRequest,
QueryViewParams,
QueryViewRequest,
SaveViewDataRequest,
UpdateViewParams,
UpdateViewRequest,
View,
@ -17,7 +16,7 @@ use crate::{
services::ViewController,
};
use flowy_dispatch::prelude::{data_result, Data, DataResult, Unit};
use flowy_document::entities::doc::{ApplyChangesetParams, Doc, QueryDocParams, SaveDocParams};
use flowy_document::entities::doc::{ApplyChangesetParams, Doc, QueryDocParams};
use std::{convert::TryInto, sync::Arc};
#[tracing::instrument(skip(data, controller), err)]

View File

@ -14,7 +14,7 @@ use crate::{
};
use flowy_database::SqliteConnection;
use flowy_document::{
entities::doc::{ApplyChangesetParams, CreateDocParams, Doc, QueryDocParams, SaveDocParams},
entities::doc::{ApplyChangesetParams, CreateDocParams, Doc, QueryDocParams},
module::FlowyDocument,
};
use std::sync::Arc;

View File

@ -71,6 +71,7 @@ impl WorkspaceController {
Ok(workspace)
}
#[allow(dead_code)]
pub(crate) async fn update_workspace(&self, params: UpdateWorkspaceParams) -> Result<(), WorkspaceError> {
let changeset = WorkspaceTableChangeset::new(params.clone());
let workspace_id = changeset.id.clone();
@ -91,6 +92,7 @@ impl WorkspaceController {
Ok(())
}
#[allow(dead_code)]
pub(crate) async fn delete_workspace(&self, workspace_id: &str) -> Result<(), WorkspaceError> {
let user_id = self.user.user_id()?;
let token = self.user.token()?;

View File

@ -44,11 +44,13 @@ impl WorkspaceTableSql {
Ok(workspaces)
}
#[allow(dead_code)]
pub(crate) fn update_workspace(&self, changeset: WorkspaceTableChangeset, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
diesel_update_table!(workspace_table, changeset, conn);
Ok(())
}
#[allow(dead_code)]
pub(crate) fn delete_workspace(&self, workspace_id: &str, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
diesel_delete_table!(workspace_table, workspace_id, conn);
Ok(())

View File

@ -211,10 +211,3 @@ pub fn open_view(sdk: &FlowyTestSDK, request: OpenViewRequest) -> Doc {
.sync_send()
.parse::<Doc>()
}
pub fn update_view_data(sdk: &FlowyTestSDK, request: SaveViewDataRequest) {
FlowyWorkspaceTest::new(sdk.clone())
.event(SaveViewData)
.request(request)
.sync_send();
}

View File

@ -1,6 +1,5 @@
use crate::helper::*;
use flowy_ot::core::DeltaBuilder;
use flowy_workspace::entities::view::*;
#[test]
@ -34,39 +33,40 @@ fn view_open_doc() {
#[test]
fn view_update_doc() {
let test = ViewTest::new();
let new_data = DeltaBuilder::new().insert("flutter ❤️ rust").build().into_bytes();
let request = SaveViewDataRequest {
view_id: test.view.id.clone(),
data: new_data.clone(),
};
update_view_data(&test.sdk, request);
let request = OpenViewRequest {
view_id: test.view.id.clone(),
};
let doc = open_view(&test.sdk, request);
assert_eq!(doc.data, new_data);
// let test = ViewTest::new();
// let new_data = DeltaBuilder::new().insert("flutter ❤️
// rust").build().into_bytes(); let request = SaveViewDataRequest {
// view_id: test.view.id.clone(),
// data: new_data.clone(),
// };
//
// update_view_data(&test.sdk, request);
//
// let request = OpenViewRequest {
// view_id: test.view.id.clone(),
// };
// let doc = open_view(&test.sdk, request);
// assert_eq!(doc.data, new_data);
}
#[test]
fn view_update_big_doc() {
let test = ViewTest::new();
let new_data = DeltaBuilder::new().insert(&"flutter ❤️ rust".repeat(1000000)).build().into_bytes();
let request = SaveViewDataRequest {
view_id: test.view.id.clone(),
data: new_data.clone(),
};
update_view_data(&test.sdk, request);
let doc = open_view(
&test.sdk,
OpenViewRequest {
view_id: test.view.id.clone(),
},
);
assert_eq!(doc.data, new_data);
// let test = ViewTest::new();
// let new_data = DeltaBuilder::new().insert(&"flutter ❤️
// rust".repeat(1000000)).build().into_bytes();
//
// let request = SaveViewDataRequest {
// view_id: test.view.id.clone(),
// data: new_data.clone(),
// };
//
// update_view_data(&test.sdk, request);
//
// let doc = open_view(
// &test.sdk,
// OpenViewRequest {
// view_id: test.view.id.clone(),
// },
// );
// assert_eq!(doc.data, new_data);
}

View File

@ -73,6 +73,7 @@ impl Future for WsConnectionFuture {
type Fut = BoxFuture<'static, Result<(), WsError>>;
#[pin_project]
pub struct WsStream {
#[allow(dead_code)]
msg_tx: MsgSender,
#[pin]
inner: Option<(Fut, Fut)>,
@ -135,6 +136,7 @@ fn post_message(tx: MsgSender, message: Result<Message, Error>) {
pub struct Retry<F> {
f: F,
#[allow(dead_code)]
retry_time: usize,
addr: String,
}

View File

@ -52,6 +52,7 @@ pub enum WsState {
pub struct WsController {
handlers: HashMap<String, Arc<dyn WsMessageHandler>>,
state_notify: Arc<RwLock<WsStateNotify>>,
#[allow(dead_code)]
addr: Option<String>,
sender: Option<Arc<WsSender>>,
}