Integrate appflowy editor (#1040)

This commit is contained in:
Lucas.Xu
2022-10-22 21:57:44 +08:00
committed by GitHub
parent 8dff9dc67c
commit ad9a4b7d71
177 changed files with 4183 additions and 1007 deletions

View File

@ -871,6 +871,7 @@ dependencies = [
"lib-ot",
"lib-ws",
"log",
"md5",
"protobuf",
"rand 0.8.5",
"serde",
@ -1773,6 +1774,7 @@ dependencies = [
"bytes",
"dashmap",
"derive_more",
"indexmap",
"indextree",
"lazy_static",
"log",

View File

@ -23,7 +23,7 @@ pub extern "C" fn init_sdk(path: *mut c_char) -> i64 {
let path: &str = c_str.to_str().unwrap();
let server_config = get_client_server_configuration().unwrap();
let config = FlowySDKConfig::new(path, "appflowy", server_config, false).log_filter("info");
let config = FlowySDKConfig::new(path, "appflowy", server_config).log_filter("info");
FLOWY_SDK.get_or_init(|| FlowySDK::new(config));
0

View File

@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
DROP TABLE grid_view_rev_table;

View File

@ -0,0 +1,9 @@
-- Your SQL goes here
CREATE TABLE document_rev_table (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
document_id TEXT NOT NULL DEFAULT '',
base_rev_id BIGINT NOT NULL DEFAULT 0,
rev_id BIGINT NOT NULL DEFAULT 0,
data BLOB NOT NULL DEFAULT (x''),
state INTEGER NOT NULL DEFAULT 0
);

View File

@ -13,6 +13,17 @@ table! {
}
}
table! {
document_rev_table (id) {
id -> Integer,
document_id -> Text,
base_rev_id -> BigInt,
rev_id -> BigInt,
data -> Binary,
state -> Integer,
}
}
table! {
grid_block_index_table (row_id) {
row_id -> Text,
@ -133,6 +144,7 @@ table! {
allow_tables_to_appear_in_same_query!(
app_table,
document_rev_table,
grid_block_index_table,
grid_meta_rev_table,
grid_rev_table,

View File

@ -28,6 +28,7 @@ tokio = {version = "1", features = ["sync"]}
tracing = { version = "0.1", features = ["log"] }
bytes = { version = "1.1" }
md5 = "0.7.0"
strum = "0.21"
strum_macros = "0.21"
dashmap = "5"

View File

@ -0,0 +1,419 @@
{
"document": {
"type": "editor",
"children": [
{
"type": "text",
"attributes": {
"subtype": "heading",
"heading": "h1"
},
"delta": [
{
"insert": "🌟 Welcome to AppFlowy!"
}
]
},
{
"type": "text",
"attributes": {
"subtype": "heading",
"heading": "h2"
},
"delta": [
{
"insert": "Here are the basics"
}
]
},
{
"type": "text",
"attributes": {
"subtype": "checkbox",
"checkbox": null
},
"delta": [
{
"insert": "Click anywhere and just start typing."
}
]
},
{
"type": "text",
"attributes": {
"subtype": "checkbox",
"checkbox": null
},
"delta": [
{
"insert": "Highlight",
"attributes": {
"backgroundColor": "0x6000BCF0"
}
},
{
"insert": " any text, and use the editing menu to "
},
{
"insert": "style",
"attributes": {
"italic": true
}
},
{
"insert": " "
},
{
"insert": "your",
"attributes": {
"bold": true
}
},
{
"insert": " "
},
{
"insert": "writing",
"attributes": {
"underline": true
}
},
{
"insert": " "
},
{
"insert": "however",
"attributes": {
"code": true
}
},
{
"insert": " you "
},
{
"insert": "like.",
"attributes": {
"strikethrough": true
}
}
]
},
{
"type": "text",
"attributes": {
"subtype": "checkbox",
"checkbox": null
},
"delta": [
{
"insert": "As soon as you type "
},
{
"insert": "/",
"attributes": {
"code": true
}
},
{
"insert": " a menu will pop up. Select different types of content blocks you can add."
}
]
},
{
"type": "text",
"attributes": {
"subtype": "checkbox",
"checkbox": null
},
"delta": [
{
"insert": "Type "
},
{
"insert": "/",
"attributes": {
"code": true
}
},
{
"insert": " followed by "
},
{
"insert": "/bullet",
"attributes": {
"code": true
}
},
{
"insert": " or "
},
{
"insert": "/c.",
"attributes": {
"code": true
}
}
]
},
{
"type": "text",
"attributes": {
"subtype": "checkbox",
"checkbox": true
},
"delta": [
{
"insert": "Click "
},
{
"insert": "+ New Page ",
"attributes": {
"code": true
}
},
{
"insert": "button at the bottom of your sidebar to add a new page."
}
]
},
{
"type": "text",
"attributes": {
"subtype": "checkbox",
"checkbox": null
},
"delta": [
{
"insert": "Click "
},
{
"insert": "+",
"attributes": {
"code": true
}
},
{
"insert": " next to any page title in the sidebar to quickly add a new subpage."
}
]
},
{
"type": "text",
"attributes": {
"checkbox": null
},
"delta": []
},
{
"type": "text",
"attributes": {
"subtype": "heading",
"checkbox": null,
"heading": "h2"
},
"delta": [
{
"insert": "Markdown"
}
]
},
{
"type": "text",
"attributes": {
"subtype": "number-list",
"number": 1,
"heading": null
},
"delta": [
{
"insert": "Heading "
}
]
},
{
"type": "text",
"attributes": {
"subtype": "number-list",
"number": 2
},
"delta": [
{
"insert": "bold text",
"attributes": {
"bold": true,
"defaultFormating": true
}
}
]
},
{
"type": "text",
"attributes": {
"subtype": "number-list",
"number": 3
},
"delta": [
{
"insert": "italicized text",
"attributes": {
"italic": true
}
}
]
},
{
"type": "text",
"attributes": {
"subtype": "number-list",
"number": 4,
"number-list": null
},
"delta": [
{
"insert": "Ordered List"
}
]
},
{
"type": "text",
"attributes": {
"number": 5,
"subtype": "number-list"
},
"delta": [
{
"insert": "code",
"attributes": {
"code": true
}
}
]
},
{
"type": "text",
"attributes": {
"number": 6,
"subtype": "number-list"
},
"delta": [
{
"insert": "Strikethrough",
"attributes": {
"strikethrough": true
}
},
{
"retain": 1,
"attributes": {
"strikethrough": true
}
}
]
},
{
"type": "text",
"attributes": {
"checkbox": null
},
"delta": []
},
{
"type": "text",
"attributes": {
"subtype": "heading",
"checkbox": null,
"heading": "h2"
},
"delta": [
{
"insert": "Have a question?"
}
]
},
{
"type": "text",
"attributes": {
"subtype": "quote"
},
"delta": [
{
"insert": "Click "
},
{
"insert": "?",
"attributes": {
"code": true
}
},
{
"insert": " at the bottom right for help and support."
}
]
},
{
"type": "text",
"delta": []
},
{
"type": "text",
"attributes": {
"subtype": "heading",
"heading": "h2"
},
"delta": [
{
"insert": "Like AppFlowy? Follow us:"
}
]
},
{
"type": "text",
"attributes": {
"subtype": "bulleted-list",
"quote": null
},
"delta": [
{
"insert": "GitHub",
"attributes": {
"href": "https://github.com/AppFlowy-IO/AppFlowy"
}
}
]
},
{
"type": "text",
"attributes": {
"subtype": "bulleted-list"
},
"delta": [
{
"insert": "Twitter: @appflowy"
}
]
},
{
"type": "text",
"attributes": {
"subtype": "bulleted-list"
},
"delta": [
{
"insert": "Newsletter",
"attributes": {
"href": "https://blog-appflowy.ghost.io/"
}
}
]
},
{
"type": "text",
"attributes": {
"subtype": null,
"heading": null
},
"delta": []
}
]
}
}

View File

@ -1,11 +1,11 @@
use bytes::Bytes;
use flowy_error::{FlowyError, FlowyResult};
use flowy_revision::{RevisionObjectDeserializer, RevisionObjectSerializer};
use flowy_revision::{RevisionCompress, RevisionObjectDeserializer, RevisionObjectSerializer};
use flowy_sync::entities::revision::Revision;
use lib_ot::core::{
Body, Extension, NodeDataBuilder, NodeOperation, NodeTree, NodeTreeContext, Selection, Transaction,
};
use lib_ot::text_delta::TextOperationBuilder;
use lib_ot::text_delta::DeltaTextOperationBuilder;
#[derive(Debug)]
pub struct Document {
@ -30,6 +30,11 @@ impl Document {
}
}
pub fn md5(&self) -> String {
// format!("{:x}", md5::compute(bytes))
"".to_owned()
}
pub fn get_tree(&self) -> &NodeTree {
&self.tree
}
@ -40,7 +45,7 @@ pub(crate) fn make_tree_context() -> NodeTreeContext {
}
pub fn initial_document_content() -> String {
let delta = TextOperationBuilder::new().insert("").build();
let delta = DeltaTextOperationBuilder::new().insert("").build();
let node_data = NodeDataBuilder::new("text").insert_body(Body::Delta(delta)).build();
let editor_node = NodeDataBuilder::new("editor").add_node_data(node_data).build();
let node_operation = NodeOperation::Insert {
@ -78,7 +83,7 @@ impl RevisionObjectDeserializer for DocumentRevisionSerde {
fn deserialize_revisions(_object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
let mut tree = NodeTree::new(make_tree_context());
let transaction = make_transaction_from_revisions(revisions)?;
let transaction = make_transaction_from_revisions(&revisions)?;
let _ = tree.apply_transaction(transaction)?;
let document = Document::new(tree);
Result::<Document, FlowyError>::Ok(document)
@ -87,12 +92,20 @@ impl RevisionObjectDeserializer for DocumentRevisionSerde {
impl RevisionObjectSerializer for DocumentRevisionSerde {
fn combine_revisions(revisions: Vec<Revision>) -> FlowyResult<Bytes> {
let transaction = make_transaction_from_revisions(revisions)?;
let transaction = make_transaction_from_revisions(&revisions)?;
Ok(Bytes::from(transaction.to_bytes()?))
}
}
fn make_transaction_from_revisions(revisions: Vec<Revision>) -> FlowyResult<Transaction> {
pub(crate) struct DocumentRevisionCompress();
impl RevisionCompress for DocumentRevisionCompress {
fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
DocumentRevisionSerde::combine_revisions(revisions)
}
}
#[tracing::instrument(level = "trace", skip_all, err)]
pub fn make_transaction_from_revisions(revisions: &[Revision]) -> FlowyResult<Transaction> {
let mut transaction = Transaction::new();
for revision in revisions {
let _ = transaction.compose(Transaction::from_bytes(&revision.bytes)?)?;

View File

@ -3,11 +3,12 @@ use crate::editor::document::Document;
use bytes::Bytes;
use flowy_error::FlowyResult;
use lib_ot::core::{
AttributeHashMap, Body, Changeset, Extension, NodeData, NodeId, NodeOperation, NodeTree, Path, Selection,
Transaction,
AttributeHashMap, Body, Changeset, Extension, NodeData, NodeId, NodeOperation, NodeTree, NodeTreeContext, Path,
Selection, Transaction,
};
use lib_ot::text_delta::TextOperations;
use serde::de::{self, MapAccess, Visitor};
use lib_ot::text_delta::DeltaTextOperations;
use serde::de::{self, MapAccess, Unexpected, Visitor};
use serde::ser::{SerializeMap, SerializeSeq};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt;
@ -44,14 +45,14 @@ impl<'de> Deserialize<'de> for Document {
where
M: MapAccess<'de>,
{
let mut node_tree = None;
let mut document_node = None;
while let Some(key) = map.next_key()? {
match key {
"document" => {
if node_tree.is_some() {
if document_node.is_some() {
return Err(de::Error::duplicate_field("document"));
}
node_tree = Some(map.next_value::<NodeTree>()?)
document_node = Some(map.next_value::<DocumentNode>()?)
}
s => {
return Err(de::Error::unknown_field(s, FIELDS));
@ -59,8 +60,13 @@ impl<'de> Deserialize<'de> for Document {
}
}
match node_tree {
Some(tree) => Ok(Document::new(tree)),
match document_node {
Some(document_node) => {
match NodeTree::from_node_data(document_node.into(), NodeTreeContext::default()) {
Ok(tree) => Ok(Document::new(tree)),
Err(err) => Err(de::Error::invalid_value(Unexpected::Other(&format!("{}", err)), &"")),
}
}
None => Err(de::Error::missing_field("document")),
}
}
@ -69,10 +75,20 @@ impl<'de> Deserialize<'de> for Document {
}
}
#[derive(Debug)]
struct DocumentContentSerializer<'a>(pub &'a Document);
pub fn make_transaction_from_document_content(content: &str) -> FlowyResult<Transaction> {
let document_node: DocumentNode = serde_json::from_str::<DocumentContentDeserializer>(content)?.document;
let document_operation = DocumentOperation::Insert {
path: 0_usize.into(),
nodes: vec![document_node],
};
let mut document_transaction = DocumentTransaction::default();
document_transaction.operations.push(document_operation);
Ok(document_transaction.into())
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DocumentContentSerde {}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct DocumentTransaction {
#[serde(default)]
operations: Vec<DocumentOperation>,
@ -161,8 +177,8 @@ pub enum DocumentOperation {
#[serde(rename = "update_text")]
UpdateText {
path: Path,
delta: TextOperations,
inverted: TextOperations,
delta: DeltaTextOperations,
inverted: DeltaTextOperations,
},
}
@ -230,20 +246,27 @@ pub struct DocumentNode {
#[serde(default)]
pub attributes: AttributeHashMap,
#[serde(skip_serializing_if = "TextOperations::is_empty")]
pub delta: TextOperations,
#[serde(skip_serializing_if = "DeltaTextOperations::is_empty")]
#[serde(default)]
pub delta: DeltaTextOperations,
#[serde(skip_serializing_if = "Vec::is_empty")]
#[serde(default)]
pub children: Vec<DocumentNode>,
}
impl DocumentNode {
pub fn new() -> Self {
Self::default()
}
}
impl std::convert::From<NodeData> for DocumentNode {
fn from(node_data: NodeData) -> Self {
let delta = if let Body::Delta(operations) = node_data.body {
operations
} else {
TextOperations::default()
DeltaTextOperations::default()
};
DocumentNode {
node_type: node_data.node_type,
@ -265,6 +288,14 @@ impl std::convert::From<DocumentNode> for NodeData {
}
}
#[derive(Debug, Deserialize)]
struct DocumentContentDeserializer {
document: DocumentNode,
}
#[derive(Debug)]
struct DocumentContentSerializer<'a>(pub &'a Document);
impl<'a> Serialize for DocumentContentSerializer<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
@ -299,6 +330,12 @@ impl<'a> Serialize for DocumentContentSerializer<'a> {
mod tests {
use crate::editor::document::Document;
use crate::editor::document_serde::DocumentTransaction;
use crate::editor::initial_read_me;
#[test]
fn load_read_me() {
let _ = initial_read_me();
}
#[test]
fn transaction_deserialize_update_text_operation_test() {

View File

@ -1,5 +1,6 @@
use crate::editor::document::{Document, DocumentRevisionSerde};
use crate::editor::document_serde::DocumentTransaction;
use crate::editor::make_transaction_from_revisions;
use crate::editor::queue::{Command, CommandSender, DocumentQueue};
use crate::{DocumentEditor, DocumentUser};
use bytes::Bytes;
@ -17,6 +18,7 @@ pub struct AppFlowyDocumentEditor {
#[allow(dead_code)]
doc_id: String,
command_sender: CommandSender,
rev_manager: Arc<RevisionManager>,
}
impl AppFlowyDocumentEditor {
@ -28,9 +30,13 @@ impl AppFlowyDocumentEditor {
) -> FlowyResult<Arc<Self>> {
let document = rev_manager.load::<DocumentRevisionSerde>(Some(cloud_service)).await?;
let rev_manager = Arc::new(rev_manager);
let command_sender = spawn_edit_queue(user, rev_manager, document);
let command_sender = spawn_edit_queue(user, rev_manager.clone(), document);
let doc_id = doc_id.to_string();
let editor = Arc::new(Self { doc_id, command_sender });
let editor = Arc::new(Self {
doc_id,
command_sender,
rev_manager,
});
Ok(editor)
}
@ -53,6 +59,13 @@ impl AppFlowyDocumentEditor {
let content = rx.await.map_err(internal_error)??;
Ok(content)
}
pub async fn duplicate_document(&self) -> FlowyResult<String> {
let revisions = self.rev_manager.load_revisions().await?;
let transaction = make_transaction_from_revisions(&revisions)?;
let json = transaction.to_json()?;
Ok(json)
}
}
fn spawn_edit_queue(
@ -67,11 +80,24 @@ fn spawn_edit_queue(
}
impl DocumentEditor for Arc<AppFlowyDocumentEditor> {
fn close(&self) {}
fn export(&self) -> FutureResult<String, FlowyError> {
let this = self.clone();
FutureResult::new(async move { this.get_content(false).await })
}
fn duplicate(&self) -> FutureResult<String, FlowyError> {
let this = self.clone();
FutureResult::new(async move { this.duplicate_document().await })
}
fn receive_ws_data(&self, _data: ServerRevisionWSData) -> FutureResult<(), FlowyError> {
FutureResult::new(async move { Ok(()) })
}
fn receive_ws_state(&self, _state: &WSConnectState) {}
fn compose_local_operations(&self, data: Bytes) -> FutureResult<(), FlowyError> {
let this = self.clone();
FutureResult::new(async move {
@ -81,14 +107,6 @@ impl DocumentEditor for Arc<AppFlowyDocumentEditor> {
})
}
fn close(&self) {}
fn receive_ws_data(&self, _data: ServerRevisionWSData) -> FutureResult<(), FlowyError> {
FutureResult::new(async move { Ok(()) })
}
fn receive_ws_state(&self, _state: &WSConnectState) {}
fn as_any(&self) -> &dyn Any {
self
}

View File

@ -0,0 +1,419 @@
use crate::editor::{DocumentNode, DocumentOperation};
use flowy_error::FlowyResult;
use lib_ot::core::{AttributeHashMap, DeltaOperation, Insert, Transaction};
use lib_ot::text_delta::{DeltaTextOperation, DeltaTextOperations};
pub struct DeltaRevisionMigration();
impl DeltaRevisionMigration {
pub fn run(delta: DeltaTextOperations) -> FlowyResult<Transaction> {
let migrate_background_attribute = |insert: &mut Insert<AttributeHashMap>| {
if let Some(Some(color)) = insert.attributes.get("background").map(|value| value.str_value()) {
insert.attributes.remove_key("background");
insert.attributes.insert("backgroundColor", color);
}
};
let migrate_strike_attribute = |insert: &mut Insert<AttributeHashMap>| {
if let Some(Some(_)) = insert.attributes.get("strike").map(|value| value.str_value()) {
insert.attributes.remove_key("strike");
insert.attributes.insert("strikethrough", true);
}
};
let migrate_link_attribute = |insert: &mut Insert<AttributeHashMap>| {
if let Some(Some(link)) = insert.attributes.get("link").map(|value| value.str_value()) {
insert.attributes.remove_key("link");
insert.attributes.insert("href", link);
}
};
let migrate_list_attribute =
|attribute_node: &mut DocumentNode, value: &str, number_list_number: &mut usize| {
if value == "unchecked" {
*number_list_number = 0;
attribute_node.attributes.insert("subtype", "checkbox");
attribute_node.attributes.insert("checkbox", false);
}
if value == "checked" {
*number_list_number = 0;
attribute_node.attributes.insert("subtype", "checkbox");
attribute_node.attributes.insert("checkbox", true);
}
if value == "bullet" {
*number_list_number = 0;
attribute_node.attributes.insert("subtype", "bulleted-list");
}
if value == "ordered" {
*number_list_number += 1;
attribute_node.attributes.insert("subtype", "number-list");
attribute_node.attributes.insert("number", *number_list_number);
}
};
let generate_new_op_with_double_new_lines = |insert: &mut Insert<AttributeHashMap>| {
let pattern = "\n\n";
let mut new_ops = vec![];
if insert.s.as_str().contains(pattern) {
let insert_str = insert.s.clone();
let insert_strings = insert_str.split(pattern).map(|s| s.to_owned());
for (index, new_s) in insert_strings.enumerate() {
if index == 0 {
insert.s = new_s.into();
} else {
new_ops.push(DeltaOperation::Insert(Insert {
s: new_s.into(),
attributes: AttributeHashMap::default(),
}));
}
}
}
new_ops
};
let create_text_node = |ops: Vec<DeltaTextOperation>| {
let mut document_node = DocumentNode::new();
document_node.node_type = "text".to_owned();
ops.into_iter().for_each(|op| document_node.delta.add(op));
document_node
};
let transform_op = |mut insert: Insert<AttributeHashMap>| {
// Rename the attribute name from background to backgroundColor
migrate_background_attribute(&mut insert);
migrate_strike_attribute(&mut insert);
migrate_link_attribute(&mut insert);
let new_ops = generate_new_op_with_double_new_lines(&mut insert);
(DeltaOperation::Insert(insert), new_ops)
};
let mut index: usize = 0;
let mut number_list_number = 0;
let mut editor_node = DocumentNode::new();
editor_node.node_type = "editor".to_owned();
let mut transaction = Transaction::new();
transaction.push_operation(DocumentOperation::Insert {
path: 0.into(),
nodes: vec![editor_node],
});
let mut iter = delta.ops.into_iter().enumerate();
while let Some((_, op)) = iter.next() {
let mut document_node = create_text_node(vec![]);
let mut split_document_nodes = vec![];
match op {
DeltaOperation::Delete(_) => tracing::warn!("Should not contain delete operation"),
DeltaOperation::Retain(_) => tracing::warn!("Should not contain retain operation"),
DeltaOperation::Insert(insert) => {
if insert.s.as_str() != "\n" {
let (op, new_ops) = transform_op(insert);
document_node.delta.add(op);
if !new_ops.is_empty() {
split_document_nodes.push(create_text_node(new_ops));
}
}
while let Some((_, DeltaOperation::Insert(insert))) = iter.next() {
if insert.s.as_str() != "\n" {
let (op, new_ops) = transform_op(insert);
document_node.delta.add(op);
if !new_ops.is_empty() {
split_document_nodes.push(create_text_node(new_ops));
}
} else {
let attribute_node = match split_document_nodes.last_mut() {
None => &mut document_node,
Some(split_document_node) => split_document_node,
};
if let Some(value) = insert.attributes.get("header") {
attribute_node.attributes.insert("subtype", "heading");
if let Some(v) = value.int_value() {
number_list_number = 0;
attribute_node.attributes.insert("heading", format!("h{}", v));
}
}
if insert.attributes.get("blockquote").is_some() {
attribute_node.attributes.insert("subtype", "quote");
}
if let Some(value) = insert.attributes.get("list") {
if let Some(s) = value.str_value() {
migrate_list_attribute(attribute_node, &s, &mut number_list_number);
}
}
break;
}
}
}
}
let mut operations = vec![document_node];
operations.extend(split_document_nodes);
operations.into_iter().for_each(|node| {
// println!("{}", serde_json::to_string(&node).unwrap());
let operation = DocumentOperation::Insert {
path: vec![0, index].into(),
nodes: vec![node],
};
transaction.push_operation(operation);
index += 1;
});
}
Ok(transaction)
}
}
#[cfg(test)]
mod tests {
use crate::editor::migration::delta_migration::DeltaRevisionMigration;
use crate::editor::Document;
use lib_ot::text_delta::DeltaTextOperations;
#[test]
fn transform_delta_to_transaction_test() {
let delta = DeltaTextOperations::from_json(DELTA_STR).unwrap();
let transaction = DeltaRevisionMigration::run(delta).unwrap();
let document = Document::from_transaction(transaction).unwrap();
let s = document.get_content(true).unwrap();
assert!(!s.is_empty());
}
const DELTA_STR: &str = r#"[
{
"insert": "\n👋 Welcome to AppFlowy!"
},
{
"insert": "\n",
"attributes": {
"header": 1
}
},
{
"insert": "\nHere are the basics"
},
{
"insert": "\n",
"attributes": {
"header": 2
}
},
{
"insert": "Click anywhere and just start typing"
},
{
"insert": "\n",
"attributes": {
"list": "unchecked"
}
},
{
"insert": "Highlight",
"attributes": {
"background": "$fff2cd"
}
},
{
"insert": " any text, and use the menu at the bottom to "
},
{
"insert": "style",
"attributes": {
"italic": true
}
},
{
"insert": " "
},
{
"insert": "your",
"attributes": {
"bold": true
}
},
{
"insert": " "
},
{
"insert": "writing",
"attributes": {
"underline": true
}
},
{
"insert": " "
},
{
"insert": "however",
"attributes": {
"code": true
}
},
{
"insert": " "
},
{
"insert": "you",
"attributes": {
"strike": true
}
},
{
"insert": " "
},
{
"insert": "like",
"attributes": {
"background": "$e8e0ff"
}
},
{
"insert": "\n",
"attributes": {
"list": "unchecked"
}
},
{
"insert": "Click "
},
{
"insert": "+ New Page",
"attributes": {
"background": "$defff1",
"bold": true
}
},
{
"insert": " button at the bottom of your sidebar to add a new page"
},
{
"insert": "\n",
"attributes": {
"list": "unchecked"
}
},
{
"insert": "Click the "
},
{
"insert": "'",
"attributes": {
"background": "$defff1"
}
},
{
"insert": "+",
"attributes": {
"background": "$defff1",
"bold": true
}
},
{
"insert": "'",
"attributes": {
"background": "$defff1"
}
},
{
"insert": " next to any page title in the sidebar to quickly add a new subpage"
},
{
"insert": "\n",
"attributes": {
"list": "unchecked"
}
},
{
"insert": "\nHave a question? "
},
{
"insert": "\n",
"attributes": {
"header": 2
}
},
{
"insert": "Click the "
},
{
"insert": "'?'",
"attributes": {
"background": "$defff1",
"bold": true
}
},
{
"insert": " at the bottom right for help and support.\n\nLike AppFlowy? Follow us:"
},
{
"insert": "\n",
"attributes": {
"header": 2
}
},
{
"insert": "GitHub: https://github.com/AppFlowy-IO/appflowy"
},
{
"insert": "\n",
"attributes": {
"blockquote": true
}
},
{
"insert": "Twitter: https://twitter.com/appflowy"
},
{
"insert": "\n",
"attributes": {
"blockquote": true
}
},
{
"insert": "Newsletter: https://www.appflowy.io/blog"
},
{
"insert": "\n",
"attributes": {
"blockquote": true
}
},
{
"insert": "item 1"
},
{
"insert": "\n",
"attributes": {
"list": "ordered"
}
},
{
"insert": "item 2"
},
{
"insert": "\n",
"attributes": {
"list": "ordered"
}
},
{
"insert": "item3"
},
{
"insert": "\n",
"attributes": {
"list": "ordered"
}
},
{
"insert": "appflowy",
"attributes": {
"link": "https://www.appflowy.io/"
}
}
]"#;
}

View File

@ -0,0 +1,3 @@
mod delta_migration;
pub use delta_migration::*;

View File

@ -2,7 +2,17 @@
mod document;
mod document_serde;
mod editor;
mod migration;
mod queue;
pub use document::*;
pub use document_serde::*;
pub use editor::*;
pub use migration::*;
#[inline]
pub fn initial_read_me() -> String {
let document_content = include_str!("READ_ME.json");
let transaction = make_transaction_from_document_content(document_content).unwrap();
transaction.to_json().unwrap()
}

View File

@ -1,13 +1,17 @@
use crate::editor::document::Document;
use crate::DocumentUser;
use async_stream::stream;
use bytes::Bytes;
use flowy_error::FlowyError;
use flowy_revision::RevisionManager;
use flowy_sync::entities::revision::{RevId, Revision};
use futures::stream::StreamExt;
use lib_ot::core::Transaction;
use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{oneshot, RwLock};
pub struct DocumentQueue {
#[allow(dead_code)]
user: Arc<dyn DocumentUser>,
@ -56,7 +60,10 @@ impl DocumentQueue {
async fn handle_command(&self, command: Command) -> Result<(), FlowyError> {
match command {
Command::ComposeTransaction { transaction, ret } => {
self.document.write().await.apply_transaction(transaction)?;
self.document.write().await.apply_transaction(transaction.clone())?;
let _ = self
.save_local_operations(transaction, self.document.read().await.md5())
.await?;
let _ = ret.send(Ok(()));
}
Command::GetDocumentContent { pretty, ret } => {
@ -66,6 +73,16 @@ impl DocumentQueue {
}
Ok(())
}
#[tracing::instrument(level = "trace", skip(self, transaction, md5), err)]
async fn save_local_operations(&self, transaction: Transaction, md5: String) -> Result<RevId, FlowyError> {
let bytes = Bytes::from(transaction.to_bytes()?);
let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair();
let user_id = self.user.user_id()?;
let revision = Revision::new(&self.rev_manager.object_id, base_rev_id, rev_id, bytes, &user_id, md5);
let _ = self.rev_manager.add_local_revision(&revision).await?;
Ok(rev_id.into())
}
}
pub(crate) type CommandSender = Sender<Command>;

View File

@ -74,12 +74,41 @@ pub struct ExportPayloadPB {
#[pb(index = 2)]
pub export_type: ExportType,
#[pb(index = 3)]
pub document_version: DocumentVersionPB,
}
#[derive(PartialEq, Debug, ProtoBuf_Enum, Clone)]
pub enum DocumentVersionPB {
/// this version's content of the document is build from `Delta`. It uses
/// `DeltaDocumentEditor`.
V0 = 0,
/// this version's content of the document is build from `NodeTree`. It uses
/// `AppFlowyDocumentEditor`
V1 = 1,
}
impl std::default::Default for DocumentVersionPB {
fn default() -> Self {
Self::V0
}
}
#[derive(Default, ProtoBuf)]
pub struct OpenDocumentContextPB {
#[pb(index = 1)]
pub document_id: String,
#[pb(index = 2)]
pub document_version: DocumentVersionPB,
}
#[derive(Default, Debug)]
pub struct ExportParams {
pub view_id: String,
pub export_type: ExportType,
pub document_version: DocumentVersionPB,
}
impl TryInto<ExportParams> for ExportPayloadPB {
@ -88,6 +117,7 @@ impl TryInto<ExportParams> for ExportPayloadPB {
Ok(ExportParams {
view_id: self.view_id,
export_type: self.export_type,
document_version: self.document_version,
})
}
}

View File

@ -1,21 +1,23 @@
use crate::entities::{DocumentSnapshotPB, EditParams, EditPayloadPB, ExportDataPB, ExportParams, ExportPayloadPB};
use crate::entities::{
DocumentSnapshotPB, EditParams, EditPayloadPB, ExportDataPB, ExportParams, ExportPayloadPB, OpenDocumentContextPB,
};
use crate::DocumentManager;
use flowy_error::FlowyError;
use flowy_sync::entities::document::DocumentIdPB;
use lib_dispatch::prelude::{data_result, AppData, Data, DataResult};
use std::convert::TryInto;
use std::sync::Arc;
pub(crate) async fn get_document_handler(
data: Data<DocumentIdPB>,
data: Data<OpenDocumentContextPB>,
manager: AppData<Arc<DocumentManager>>,
) -> DataResult<DocumentSnapshotPB, FlowyError> {
let document_id: DocumentIdPB = data.into_inner();
let editor = manager.open_document_editor(&document_id).await?;
let operations_str = editor.export().await?;
let context: OpenDocumentContextPB = data.into_inner();
let editor = manager.open_document_editor(&context.document_id).await?;
let document_data = editor.export().await?;
data_result(DocumentSnapshotPB {
doc_id: document_id.into(),
snapshot: operations_str,
doc_id: context.document_id,
snapshot: document_data,
})
}

View File

@ -19,7 +19,7 @@ pub fn create(document_manager: Arc<DocumentManager>) -> Module {
#[derive(Clone, Copy, PartialEq, Eq, Debug, Display, Hash, ProtoBuf_Enum, Flowy_Event)]
#[event_err = "FlowyError"]
pub enum DocumentEvent {
#[event(input = "DocumentIdPB", output = "DocumentSnapshotPB")]
#[event(input = "OpenDocumentContextPB", output = "DocumentSnapshotPB")]
GetDocument = 0,
#[event(input = "EditPayloadPB")]

View File

@ -1,4 +1,4 @@
mod entities;
pub mod entities;
mod event_handler;
pub mod event_map;
pub mod manager;
@ -6,6 +6,7 @@ pub mod manager;
pub mod editor;
pub mod old_editor;
pub mod protobuf;
mod services;
pub use manager::*;
pub mod errors {

View File

@ -1,23 +1,23 @@
use crate::editor::{initial_document_content, AppFlowyDocumentEditor};
use crate::entities::EditParams;
use crate::old_editor::editor::{DeltaDocumentEditor, DocumentRevisionCompress};
use crate::editor::{initial_document_content, AppFlowyDocumentEditor, DocumentRevisionCompress};
use crate::entities::{DocumentVersionPB, EditParams};
use crate::old_editor::editor::{DeltaDocumentEditor, DeltaDocumentRevisionCompress};
use crate::services::DocumentPersistence;
use crate::{errors::FlowyError, DocumentCloudService};
use bytes::Bytes;
use dashmap::DashMap;
use flowy_database::ConnectionPool;
use flowy_error::FlowyResult;
use flowy_revision::disk::SQLiteDocumentRevisionPersistence;
use flowy_revision::disk::{SQLiteDeltaDocumentRevisionPersistence, SQLiteDocumentRevisionPersistence};
use flowy_revision::{
RevisionCloudService, RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence,
};
use flowy_sync::client_document::initial_old_document_content;
use flowy_sync::client_document::initial_delta_document_content;
use flowy_sync::entities::{
document::{DocumentIdPB, DocumentOperationsPB},
document::DocumentIdPB,
revision::{md5, RepeatedRevision, Revision},
ws_data::ServerRevisionWSData,
};
use lib_infra::future::FutureResult;
use lib_ws::WSConnectState;
use std::any::Any;
use std::{convert::TryInto, sync::Arc};
@ -26,17 +26,31 @@ pub trait DocumentUser: Send + Sync {
fn user_dir(&self) -> Result<String, FlowyError>;
fn user_id(&self) -> Result<String, FlowyError>;
fn token(&self) -> Result<String, FlowyError>;
}
pub trait DocumentDatabase: Send + Sync {
fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError>;
}
pub trait DocumentEditor: Send + Sync {
fn export(&self) -> FutureResult<String, FlowyError>;
fn compose_local_operations(&self, data: Bytes) -> FutureResult<(), FlowyError>;
/// Called when the document get closed
fn close(&self);
/// Exports the document content. The content is encoded in the corresponding
/// editor data format.
fn export(&self) -> FutureResult<String, FlowyError>;
/// Duplicate the document inner data into String
fn duplicate(&self) -> FutureResult<String, FlowyError>;
fn receive_ws_data(&self, data: ServerRevisionWSData) -> FutureResult<(), FlowyError>;
fn receive_ws_state(&self, state: &WSConnectState);
/// Receives the local operations made by the user input. The operations are encoded
/// in binary format.
fn compose_local_operations(&self, data: Bytes) -> FutureResult<(), FlowyError>;
/// Returns the `Any` reference that can be used to downcast back to the original,
/// concrete type.
///
@ -50,7 +64,15 @@ pub trait DocumentEditor: Send + Sync {
#[derive(Clone, Debug)]
pub struct DocumentConfig {
pub use_new_editor: bool,
pub version: DocumentVersionPB,
}
impl std::default::Default for DocumentConfig {
fn default() -> Self {
Self {
version: DocumentVersionPB::V1,
}
}
}
pub struct DocumentManager {
@ -58,6 +80,8 @@ pub struct DocumentManager {
rev_web_socket: Arc<dyn RevisionWebSocket>,
editor_map: Arc<DocumentEditorMap>,
user: Arc<dyn DocumentUser>,
persistence: Arc<DocumentPersistence>,
#[allow(dead_code)]
config: DocumentConfig,
}
@ -65,6 +89,7 @@ impl DocumentManager {
pub fn new(
cloud_service: Arc<dyn DocumentCloudService>,
document_user: Arc<dyn DocumentUser>,
database: Arc<dyn DocumentDatabase>,
rev_web_socket: Arc<dyn RevisionWebSocket>,
config: DocumentConfig,
) -> Self {
@ -73,24 +98,31 @@ impl DocumentManager {
rev_web_socket,
editor_map: Arc::new(DocumentEditorMap::new()),
user: document_user,
persistence: Arc::new(DocumentPersistence::new(database)),
config,
}
}
pub fn init(&self) -> FlowyResult<()> {
/// Called immediately after the application launched with the user sign in/sign up.
#[tracing::instrument(level = "trace", skip_all, err)]
pub async fn initialize(&self, user_id: &str) -> FlowyResult<()> {
let _ = self.persistence.initialize(user_id)?;
listen_ws_state_changed(self.rev_web_socket.clone(), self.editor_map.clone());
Ok(())
}
#[tracing::instrument(level = "trace", skip(self, editor_id), fields(editor_id), err)]
pub async fn initialize_with_new_user(&self, _user_id: &str, _token: &str) -> FlowyResult<()> {
Ok(())
}
#[tracing::instrument(level = "trace", skip_all, fields(document_id), err)]
pub async fn open_document_editor<T: AsRef<str>>(
&self,
editor_id: T,
document_id: T,
) -> Result<Arc<dyn DocumentEditor>, FlowyError> {
let editor_id = editor_id.as_ref();
tracing::Span::current().record("editor_id", &editor_id);
self.init_document_editor(editor_id).await
let document_id = document_id.as_ref();
tracing::Span::current().record("document_id", &document_id);
self.init_document_editor(document_id).await
}
#[tracing::instrument(level = "trace", skip(self, editor_id), fields(editor_id), err)]
@ -101,22 +133,6 @@ impl DocumentManager {
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, payload), err)]
pub async fn receive_local_operations(
&self,
payload: DocumentOperationsPB,
) -> Result<DocumentOperationsPB, FlowyError> {
let editor = self.get_document_editor(&payload.doc_id).await?;
let _ = editor
.compose_local_operations(Bytes::from(payload.operations_str))
.await?;
let operations_str = editor.export().await?;
Ok(DocumentOperationsPB {
doc_id: payload.doc_id.clone(),
operations_str,
})
}
pub async fn apply_edit(&self, params: EditParams) -> FlowyResult<()> {
let editor = self.get_document_editor(&params.doc_id).await?;
let _ = editor.compose_local_operations(Bytes::from(params.operations)).await?;
@ -125,9 +141,9 @@ impl DocumentManager {
pub async fn create_document<T: AsRef<str>>(&self, doc_id: T, revisions: RepeatedRevision) -> FlowyResult<()> {
let doc_id = doc_id.as_ref().to_owned();
let db_pool = self.user.db_pool()?;
let db_pool = self.persistence.database.db_pool()?;
// Maybe we could save the document to disk without creating the RevisionManager
let rev_manager = self.make_document_rev_manager(&doc_id, db_pool)?;
let rev_manager = self.make_rev_manager(&doc_id, db_pool)?;
let _ = rev_manager.reset_object(revisions).await?;
Ok(())
}
@ -149,10 +165,9 @@ impl DocumentManager {
}
pub fn initial_document_content(&self) -> String {
if self.config.use_new_editor {
initial_document_content()
} else {
initial_old_document_content()
match self.config.version {
DocumentVersionPB::V0 => initial_delta_document_content(),
DocumentVersionPB::V1 => initial_document_content(),
}
}
}
@ -168,7 +183,11 @@ impl DocumentManager {
///
async fn get_document_editor(&self, doc_id: &str) -> FlowyResult<Arc<dyn DocumentEditor>> {
match self.editor_map.get(doc_id) {
None => self.init_document_editor(doc_id).await,
None => {
//
tracing::warn!("Should call init_document_editor first");
self.init_document_editor(doc_id).await
}
Some(editor) => Ok(editor),
}
}
@ -184,25 +203,39 @@ impl DocumentManager {
///
#[tracing::instrument(level = "trace", skip(self), err)]
pub async fn init_document_editor(&self, doc_id: &str) -> Result<Arc<dyn DocumentEditor>, FlowyError> {
let pool = self.user.db_pool()?;
let pool = self.persistence.database.db_pool()?;
let user = self.user.clone();
let token = self.user.token()?;
let rev_manager = self.make_document_rev_manager(doc_id, pool.clone())?;
let cloud_service = Arc::new(DocumentRevisionCloudService {
token,
server: self.cloud_service.clone(),
});
let editor: Arc<dyn DocumentEditor> = if self.config.use_new_editor {
let editor = AppFlowyDocumentEditor::new(doc_id, user, rev_manager, cloud_service).await?;
Arc::new(editor)
} else {
let editor =
DeltaDocumentEditor::new(doc_id, user, rev_manager, self.rev_web_socket.clone(), cloud_service).await?;
Arc::new(editor)
};
self.editor_map.insert(doc_id, editor.clone());
Ok(editor)
match self.config.version {
DocumentVersionPB::V0 => {
let rev_manager = self.make_delta_document_rev_manager(doc_id, pool.clone())?;
let editor: Arc<dyn DocumentEditor> = Arc::new(
DeltaDocumentEditor::new(doc_id, user, rev_manager, self.rev_web_socket.clone(), cloud_service)
.await?,
);
self.editor_map.insert(doc_id, editor.clone());
Ok(editor)
}
DocumentVersionPB::V1 => {
let rev_manager = self.make_document_rev_manager(doc_id, pool.clone())?;
let editor: Arc<dyn DocumentEditor> =
Arc::new(AppFlowyDocumentEditor::new(doc_id, user, rev_manager, cloud_service).await?);
self.editor_map.insert(doc_id, editor.clone());
Ok(editor)
}
}
}
fn make_rev_manager(&self, doc_id: &str, pool: Arc<ConnectionPool>) -> Result<RevisionManager, FlowyError> {
match self.config.version {
DocumentVersionPB::V0 => self.make_delta_document_rev_manager(doc_id, pool),
DocumentVersionPB::V1 => self.make_document_rev_manager(doc_id, pool),
}
}
fn make_document_rev_manager(
@ -215,13 +248,31 @@ impl DocumentManager {
let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache);
// let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone());
let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(doc_id, pool);
let rev_compactor = DocumentRevisionCompress();
Ok(RevisionManager::new(
&user_id,
doc_id,
rev_persistence,
rev_compactor,
DocumentRevisionCompress(),
// history_persistence,
snapshot_persistence,
))
}
fn make_delta_document_rev_manager(
&self,
doc_id: &str,
pool: Arc<ConnectionPool>,
) -> Result<RevisionManager, FlowyError> {
let user_id = self.user.user_id()?;
let disk_cache = SQLiteDeltaDocumentRevisionPersistence::new(&user_id, pool.clone());
let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache);
// let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone());
let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(doc_id, pool);
Ok(RevisionManager::new(
&user_id,
doc_id,
rev_persistence,
DeltaDocumentRevisionCompress(),
// history_persistence,
snapshot_persistence,
))

View File

@ -18,7 +18,7 @@ use lib_infra::future::FutureResult;
use lib_ot::core::{AttributeEntry, AttributeHashMap};
use lib_ot::{
core::{DeltaOperation, Interval},
text_delta::TextOperations,
text_delta::DeltaTextOperations,
};
use lib_ws::WSConnectState;
use std::any::Any;
@ -46,7 +46,7 @@ impl DeltaDocumentEditor {
let document = rev_manager
.load::<DeltaDocumentRevisionSerde>(Some(cloud_service))
.await?;
let operations = TextOperations::from_bytes(&document.content)?;
let operations = DeltaTextOperations::from_bytes(&document.content)?;
let rev_manager = Arc::new(rev_manager);
let doc_id = doc_id.to_string();
let user_id = user.user_id()?;
@ -147,6 +147,11 @@ impl DeltaDocumentEditor {
}
impl DocumentEditor for Arc<DeltaDocumentEditor> {
fn close(&self) {
#[cfg(feature = "sync")]
self.ws_manager.stop();
}
fn export(&self) -> FutureResult<String, FlowyError> {
let (ret, rx) = oneshot::channel::<CollaborateResult<String>>();
let msg = EditorCommand::GetOperationsString { ret };
@ -158,22 +163,8 @@ impl DocumentEditor for Arc<DeltaDocumentEditor> {
})
}
fn compose_local_operations(&self, data: Bytes) -> FutureResult<(), FlowyError> {
let edit_cmd_tx = self.edit_cmd_tx.clone();
FutureResult::new(async move {
let operations = TextOperations::from_bytes(&data)?;
let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
let msg = EditorCommand::ComposeLocalOperations { operations, ret };
let _ = edit_cmd_tx.send(msg).await;
let _ = rx.await.map_err(internal_error)??;
Ok(())
})
}
fn close(&self) {
#[cfg(feature = "sync")]
self.ws_manager.stop();
fn duplicate(&self) -> FutureResult<String, FlowyError> {
self.export()
}
#[allow(unused_variables)]
@ -193,6 +184,19 @@ impl DocumentEditor for Arc<DeltaDocumentEditor> {
self.ws_manager.connect_state_changed(state.clone());
}
fn compose_local_operations(&self, data: Bytes) -> FutureResult<(), FlowyError> {
let edit_cmd_tx = self.edit_cmd_tx.clone();
FutureResult::new(async move {
let operations = DeltaTextOperations::from_bytes(&data)?;
let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
let msg = EditorCommand::ComposeLocalOperations { operations, ret };
let _ = edit_cmd_tx.send(msg).await;
let _ = rx.await.map_err(internal_error)??;
Ok(())
})
}
fn as_any(&self) -> &dyn Any {
self
}
@ -207,7 +211,7 @@ impl std::ops::Drop for DeltaDocumentEditor {
fn spawn_edit_queue(
user: Arc<dyn DocumentUser>,
rev_manager: Arc<RevisionManager>,
delta: TextOperations,
delta: DeltaTextOperations,
) -> EditorCommandSender {
let (sender, receiver) = mpsc::channel(1000);
let edit_queue = EditDocumentQueue::new(user, rev_manager, delta, receiver);
@ -226,8 +230,8 @@ fn spawn_edit_queue(
#[cfg(feature = "flowy_unit_test")]
impl DeltaDocumentEditor {
pub async fn document_operations(&self) -> FlowyResult<TextOperations> {
let (ret, rx) = oneshot::channel::<CollaborateResult<TextOperations>>();
pub async fn document_operations(&self) -> FlowyResult<DeltaTextOperations> {
let (ret, rx) = oneshot::channel::<CollaborateResult<DeltaTextOperations>>();
let msg = EditorCommand::GetOperations { ret };
let _ = self.edit_cmd_tx.send(msg).await;
let delta = rx.await.map_err(internal_error)??;
@ -264,8 +268,8 @@ impl RevisionObjectSerializer for DeltaDocumentRevisionSerde {
}
}
pub(crate) struct DocumentRevisionCompress();
impl RevisionCompress for DocumentRevisionCompress {
pub(crate) struct DeltaDocumentRevisionCompress();
impl RevisionCompress for DeltaDocumentRevisionCompress {
fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
DeltaDocumentRevisionSerde::combine_revisions(revisions)
}
@ -273,7 +277,7 @@ impl RevisionCompress for DocumentRevisionCompress {
// quill-editor requires the delta should end with '\n' and only contains the
// insert operation. The function, correct_delta maybe be removed in the future.
fn correct_delta(delta: &mut TextOperations) {
fn correct_delta(delta: &mut DeltaTextOperations) {
if let Some(op) = delta.ops.last() {
let op_data = op.get_data();
if !op_data.ends_with('\n') {

View File

@ -1,4 +1,4 @@
use crate::old_editor::web_socket::DocumentResolveOperations;
use crate::old_editor::web_socket::DeltaDocumentResolveOperations;
use crate::DocumentUser;
use async_stream::stream;
use flowy_error::FlowyError;
@ -12,7 +12,7 @@ use futures::stream::StreamExt;
use lib_ot::core::AttributeEntry;
use lib_ot::{
core::{Interval, OperationTransform},
text_delta::TextOperations,
text_delta::DeltaTextOperations,
};
use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender};
@ -31,7 +31,7 @@ impl EditDocumentQueue {
pub(crate) fn new(
user: Arc<dyn DocumentUser>,
rev_manager: Arc<RevisionManager>,
operations: TextOperations,
operations: DeltaTextOperations,
receiver: EditorCommandReceiver,
) -> Self {
let document = Arc::new(RwLock::new(ClientDocument::from_operations(operations)));
@ -91,8 +91,8 @@ impl EditDocumentQueue {
EditorCommand::TransformOperations { operations, ret } => {
let f = || async {
let read_guard = self.document.read().await;
let mut server_operations: Option<DocumentResolveOperations> = None;
let client_operations: TextOperations;
let mut server_operations: Option<DeltaDocumentResolveOperations> = None;
let client_operations: DeltaTextOperations;
if read_guard.is_empty() {
// Do nothing
@ -100,11 +100,11 @@ impl EditDocumentQueue {
} else {
let (s_prime, c_prime) = read_guard.get_operations().transform(&operations)?;
client_operations = c_prime;
server_operations = Some(DocumentResolveOperations(s_prime));
server_operations = Some(DeltaDocumentResolveOperations(s_prime));
}
drop(read_guard);
Ok::<TextTransformOperations, CollaborateError>(TransformOperations {
client_operations: DocumentResolveOperations(client_operations),
client_operations: DeltaDocumentResolveOperations(client_operations),
server_operations,
})
};
@ -174,7 +174,7 @@ impl EditDocumentQueue {
Ok(())
}
async fn save_local_operations(&self, operations: TextOperations, md5: String) -> Result<RevId, FlowyError> {
async fn save_local_operations(&self, operations: DeltaTextOperations, md5: String) -> Result<RevId, FlowyError> {
let bytes = operations.json_bytes();
let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair();
let user_id = self.user.user_id()?;
@ -184,26 +184,26 @@ impl EditDocumentQueue {
}
}
pub type TextTransformOperations = TransformOperations<DocumentResolveOperations>;
pub type TextTransformOperations = TransformOperations<DeltaDocumentResolveOperations>;
pub(crate) type EditorCommandSender = Sender<EditorCommand>;
pub(crate) type EditorCommandReceiver = Receiver<EditorCommand>;
pub(crate) type Ret<T> = oneshot::Sender<Result<T, CollaborateError>>;
pub(crate) enum EditorCommand {
ComposeLocalOperations {
operations: TextOperations,
operations: DeltaTextOperations,
ret: Ret<()>,
},
ComposeRemoteOperation {
client_operations: TextOperations,
client_operations: DeltaTextOperations,
ret: Ret<OperationsMD5>,
},
ResetOperations {
operations: TextOperations,
operations: DeltaTextOperations,
ret: Ret<OperationsMD5>,
},
TransformOperations {
operations: TextOperations,
operations: DeltaTextOperations,
ret: Ret<TextTransformOperations>,
},
Insert {
@ -242,7 +242,7 @@ pub(crate) enum EditorCommand {
},
#[allow(dead_code)]
GetOperations {
ret: Ret<TextOperations>,
ret: Ret<DeltaTextOperations>,
},
}

View File

@ -13,33 +13,35 @@ use flowy_sync::{
errors::CollaborateResult,
};
use lib_infra::future::{BoxResultFuture, FutureResult};
use lib_ot::text_delta::TextOperations;
use lib_ot::text_delta::DeltaTextOperations;
use lib_ws::WSConnectState;
use std::{sync::Arc, time::Duration};
use tokio::sync::{broadcast, oneshot};
#[derive(Clone)]
pub struct DocumentResolveOperations(pub TextOperations);
pub struct DeltaDocumentResolveOperations(pub DeltaTextOperations);
impl OperationsDeserializer<DocumentResolveOperations> for DocumentResolveOperations {
fn deserialize_revisions(revisions: Vec<Revision>) -> FlowyResult<DocumentResolveOperations> {
Ok(DocumentResolveOperations(make_operations_from_revisions(revisions)?))
impl OperationsDeserializer<DeltaDocumentResolveOperations> for DeltaDocumentResolveOperations {
fn deserialize_revisions(revisions: Vec<Revision>) -> FlowyResult<DeltaDocumentResolveOperations> {
Ok(DeltaDocumentResolveOperations(make_operations_from_revisions(
revisions,
)?))
}
}
impl OperationsSerializer for DocumentResolveOperations {
impl OperationsSerializer for DeltaDocumentResolveOperations {
fn serialize_operations(&self) -> Bytes {
self.0.json_bytes()
}
}
impl DocumentResolveOperations {
pub fn into_inner(self) -> TextOperations {
impl DeltaDocumentResolveOperations {
pub fn into_inner(self) -> DeltaTextOperations {
self.0
}
}
pub type DocumentConflictController = ConflictController<DocumentResolveOperations>;
pub type DocumentConflictController = ConflictController<DeltaDocumentResolveOperations>;
#[allow(dead_code)]
pub(crate) async fn make_document_ws_manager(
@ -129,8 +131,11 @@ struct DocumentConflictResolver {
edit_cmd_tx: EditorCommandSender,
}
impl ConflictResolver<DocumentResolveOperations> for DocumentConflictResolver {
fn compose_operations(&self, operations: DocumentResolveOperations) -> BoxResultFuture<OperationsMD5, FlowyError> {
impl ConflictResolver<DeltaDocumentResolveOperations> for DocumentConflictResolver {
fn compose_operations(
&self,
operations: DeltaDocumentResolveOperations,
) -> BoxResultFuture<OperationsMD5, FlowyError> {
let tx = self.edit_cmd_tx.clone();
let operations = operations.into_inner();
Box::pin(async move {
@ -150,8 +155,8 @@ impl ConflictResolver<DocumentResolveOperations> for DocumentConflictResolver {
fn transform_operations(
&self,
operations: DocumentResolveOperations,
) -> BoxResultFuture<TransformOperations<DocumentResolveOperations>, FlowyError> {
operations: DeltaDocumentResolveOperations,
) -> BoxResultFuture<TransformOperations<DeltaDocumentResolveOperations>, FlowyError> {
let tx = self.edit_cmd_tx.clone();
let operations = operations.into_inner();
Box::pin(async move {
@ -166,7 +171,10 @@ impl ConflictResolver<DocumentResolveOperations> for DocumentConflictResolver {
})
}
fn reset_operations(&self, operations: DocumentResolveOperations) -> BoxResultFuture<OperationsMD5, FlowyError> {
fn reset_operations(
&self,
operations: DeltaDocumentResolveOperations,
) -> BoxResultFuture<OperationsMD5, FlowyError> {
let tx = self.edit_cmd_tx.clone();
let operations = operations.into_inner();
Box::pin(async move {

View File

@ -0,0 +1,75 @@
use crate::editor::DeltaRevisionMigration;
use crate::DocumentDatabase;
use bytes::Bytes;
use flowy_database::kv::KV;
use flowy_error::FlowyResult;
use flowy_revision::disk::{DeltaRevisionSql, RevisionDiskCache, RevisionRecord, SQLiteDocumentRevisionPersistence};
use flowy_sync::entities::revision::{md5, Revision};
use flowy_sync::util::make_operations_from_revisions;
use std::sync::Arc;
const V1_MIGRATION: &str = "DOCUMENT_V1_MIGRATION";
pub(crate) struct DocumentMigration {
user_id: String,
database: Arc<dyn DocumentDatabase>,
}
impl DocumentMigration {
pub fn new(user_id: &str, database: Arc<dyn DocumentDatabase>) -> Self {
let user_id = user_id.to_owned();
Self { user_id, database }
}
pub fn run_v1_migration(&self) -> FlowyResult<()> {
let key = migration_flag_key(&self.user_id, V1_MIGRATION);
if KV::get_bool(&key) {
return Ok(());
}
let pool = self.database.db_pool()?;
let conn = &*pool.get()?;
let disk_cache = SQLiteDocumentRevisionPersistence::new(&self.user_id, pool);
let documents = DeltaRevisionSql::read_all_documents(&self.user_id, conn)?;
tracing::info!("[Document Migration]: try migrate {} documents", documents.len());
for revisions in documents {
if revisions.is_empty() {
continue;
}
let document_id = revisions.first().unwrap().object_id.clone();
match make_operations_from_revisions(revisions) {
Ok(delta) => match DeltaRevisionMigration::run(delta) {
Ok(transaction) => {
let bytes = Bytes::from(transaction.to_bytes()?);
let md5 = format!("{:x}", md5::compute(&bytes));
let revision = Revision::new(&document_id, 0, 1, bytes, &self.user_id, md5);
let record = RevisionRecord::new(revision);
match disk_cache.create_revision_records(vec![record]) {
Ok(_) => {}
Err(err) => {
tracing::error!("[Document Migration]: Save revisions to disk failed {:?}", err);
}
}
}
Err(err) => {
tracing::error!(
"[Document Migration]: Migrate revisions to transaction failed {:?}",
err
);
}
},
Err(e) => {
tracing::error!("[Document migration]: Make delta from revisions failed: {:?}", e);
}
}
}
//
KV::set_bool(&key, true);
tracing::info!("Run document v1 migration");
Ok(())
}
}
fn migration_flag_key(user_id: &str, version: &str) -> String {
md5(format!("{}{}", user_id, version,))
}

View File

@ -0,0 +1,4 @@
mod migration;
mod persistence;
pub use persistence::*;

View File

@ -0,0 +1,23 @@
use crate::services::migration::DocumentMigration;
use crate::DocumentDatabase;
use flowy_error::FlowyResult;
use std::sync::Arc;
pub struct DocumentPersistence {
pub database: Arc<dyn DocumentDatabase>,
}
impl DocumentPersistence {
pub fn new(database: Arc<dyn DocumentDatabase>) -> Self {
Self { database }
}
#[tracing::instrument(level = "trace", skip_all, err)]
pub fn initialize(&self, user_id: &str) -> FlowyResult<()> {
let migration = DocumentMigration::new(user_id, self.database.clone());
if let Err(e) = migration.run_v1_migration() {
tracing::error!("[Document Migration]: run v1 migration failed: {:?}", e);
}
Ok(())
}
}

View File

@ -3,7 +3,7 @@ use crate::editor::{TestBuilder, TestOp::*};
use flowy_sync::client_document::{NewlineDocument, EmptyDocument};
use lib_ot::core::{Interval, OperationTransform, NEW_LINE, WHITESPACE, OTString};
use unicode_segmentation::UnicodeSegmentation;
use lib_ot::text_delta::TextOperations;
use lib_ot::text_delta::DeltaTextOperations;
#[test]
fn attributes_bold_added() {
@ -29,7 +29,7 @@ fn attributes_bold_added_and_invert_all() {
Bold(0, Interval::new(0, 3), true),
AssertDocJson(0, r#"[{"insert":"123","attributes":{"bold":true}}]"#),
Bold(0, Interval::new(0, 3), false),
AssertDocJson(0, r#"[{"insert":"123"}]"#),
AssertDocJson(0, r#"[{"insert":"123","attributes":{"bold":false}}]"#),
];
TestBuilder::new().run_scripts::<EmptyDocument>(ops);
}
@ -41,7 +41,7 @@ fn attributes_bold_added_and_invert_partial_suffix() {
Bold(0, Interval::new(0, 4), true),
AssertDocJson(0, r#"[{"insert":"1234","attributes":{"bold":true}}]"#),
Bold(0, Interval::new(2, 4), false),
AssertDocJson(0, r#"[{"insert":"12","attributes":{"bold":true}},{"insert":"34"}]"#),
AssertDocJson(0, r#"[{"insert":"12","attributes":{"bold":true}},{"insert":"34","attributes":{"bold":false}}]"#),
];
TestBuilder::new().run_scripts::<EmptyDocument>(ops);
}
@ -53,7 +53,7 @@ fn attributes_bold_added_and_invert_partial_suffix2() {
Bold(0, Interval::new(0, 4), true),
AssertDocJson(0, r#"[{"insert":"1234","attributes":{"bold":true}}]"#),
Bold(0, Interval::new(2, 4), false),
AssertDocJson(0, r#"[{"insert":"12","attributes":{"bold":true}},{"insert":"34"}]"#),
AssertDocJson(0, r#"[{"insert":"12","attributes":{"bold":true}},{"insert":"34","attributes":{"bold":false}}]"#),
Bold(0, Interval::new(2, 4), true),
AssertDocJson(0, r#"[{"insert":"1234","attributes":{"bold":true}}]"#),
];
@ -95,7 +95,7 @@ fn attributes_bold_added_and_invert_partial_prefix() {
Bold(0, Interval::new(0, 4), true),
AssertDocJson(0, r#"[{"insert":"1234","attributes":{"bold":true}}]"#),
Bold(0, Interval::new(0, 2), false),
AssertDocJson(0, r#"[{"insert":"12"},{"insert":"34","attributes":{"bold":true}}]"#),
AssertDocJson(0, r#"[{"insert":"12","attributes":{"bold":false}},{"insert":"34","attributes":{"bold":true}}]"#),
];
TestBuilder::new().run_scripts::<EmptyDocument>(ops);
}
@ -762,12 +762,12 @@ fn attributes_preserve_list_format_on_merge() {
#[test]
fn delta_compose() {
let mut delta = TextOperations::from_json(r#"[{"insert":"\n"}]"#).unwrap();
let mut delta = DeltaTextOperations::from_json(r#"[{"insert":"\n"}]"#).unwrap();
let deltas = vec![
TextOperations::from_json(r#"[{"retain":1,"attributes":{"list":"unchecked"}}]"#).unwrap(),
TextOperations::from_json(r#"[{"insert":"a"}]"#).unwrap(),
TextOperations::from_json(r#"[{"retain":1},{"insert":"\n","attributes":{"list":"unchecked"}}]"#).unwrap(),
TextOperations::from_json(r#"[{"retain":2},{"retain":1,"attributes":{"list":""}}]"#).unwrap(),
DeltaTextOperations::from_json(r#"[{"retain":1,"attributes":{"list":"unchecked"}}]"#).unwrap(),
DeltaTextOperations::from_json(r#"[{"insert":"a"}]"#).unwrap(),
DeltaTextOperations::from_json(r#"[{"retain":1},{"insert":"\n","attributes":{"list":"unchecked"}}]"#).unwrap(),
DeltaTextOperations::from_json(r#"[{"retain":2},{"retain":1,"attributes":{"list":""}}]"#).unwrap(),
];
for d in deltas {

View File

@ -8,7 +8,7 @@ use derive_more::Display;
use flowy_sync::client_document::{ClientDocument, InitialDocument};
use lib_ot::{
core::*,
text_delta::{BuildInTextAttribute, TextOperations},
text_delta::{BuildInTextAttribute, DeltaTextOperations},
};
use rand::{prelude::*, Rng as WrappedRng};
use std::{sync::Once, time::Duration};
@ -81,8 +81,8 @@ pub enum TestOp {
pub struct TestBuilder {
documents: Vec<ClientDocument>,
deltas: Vec<Option<TextOperations>>,
primes: Vec<Option<TextOperations>>,
deltas: Vec<Option<DeltaTextOperations>>,
primes: Vec<Option<DeltaTextOperations>>,
}
impl TestBuilder {
@ -226,20 +226,20 @@ impl TestBuilder {
TestOp::AssertDocJson(delta_i, expected) => {
let delta_json = self.documents[*delta_i].get_operations_json();
let expected_delta: TextOperations = serde_json::from_str(expected).unwrap();
let target_delta: TextOperations = serde_json::from_str(&delta_json).unwrap();
let expected_delta: DeltaTextOperations = serde_json::from_str(expected).unwrap();
let target_delta: DeltaTextOperations = serde_json::from_str(&delta_json).unwrap();
if expected_delta != target_delta {
log::error!("✅ expect: {}", expected,);
log::error!("❌ receive: {}", delta_json);
println!("✅ expect: {}", expected,);
println!("❌ receive: {}", delta_json);
}
assert_eq!(target_delta, expected_delta);
}
TestOp::AssertPrimeJson(doc_i, expected) => {
let prime_json = self.primes[*doc_i].as_ref().unwrap().json_str();
let expected_prime: TextOperations = serde_json::from_str(expected).unwrap();
let target_prime: TextOperations = serde_json::from_str(&prime_json).unwrap();
let expected_prime: DeltaTextOperations = serde_json::from_str(expected).unwrap();
let target_prime: DeltaTextOperations = serde_json::from_str(&prime_json).unwrap();
if expected_prime != target_prime {
log::error!("✅ expect prime: {}", expected,);
@ -297,8 +297,8 @@ impl Rng {
.collect()
}
pub fn gen_delta(&mut self, s: &str) -> TextOperations {
let mut delta = TextOperations::default();
pub fn gen_delta(&mut self, s: &str) -> DeltaTextOperations {
let mut delta = DeltaTextOperations::default();
let s = OTString::from(s);
loop {
let left = s.utf16_len() - delta.utf16_base_len;

View File

@ -1,8 +1,8 @@
#![allow(clippy::all)]
use crate::editor::{Rng, TestBuilder, TestOp::*};
use flowy_sync::client_document::{EmptyDocument, NewlineDocument};
use lib_ot::text_delta::TextOperationBuilder;
use lib_ot::{core::Interval, core::*, text_delta::TextOperations};
use lib_ot::text_delta::DeltaTextOperationBuilder;
use lib_ot::{core::Interval, core::*, text_delta::DeltaTextOperations};
#[test]
fn attributes_insert_text() {
@ -37,7 +37,7 @@ fn attributes_insert_text_at_middle() {
#[test]
fn delta_get_ops_in_interval_1() {
let operations = OperationsBuilder::new().insert("123").insert("4").build();
let delta = TextOperationBuilder::from_operations(operations);
let delta = DeltaTextOperationBuilder::from_operations(operations);
let mut iterator = OperationIterator::from_interval(&delta, Interval::new(0, 4));
assert_eq!(iterator.ops(), delta.ops);
@ -45,7 +45,7 @@ fn delta_get_ops_in_interval_1() {
#[test]
fn delta_get_ops_in_interval_2() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
let insert_a = DeltaOperation::insert("123");
let insert_b = DeltaOperation::insert("4");
let insert_c = DeltaOperation::insert("5");
@ -89,7 +89,7 @@ fn delta_get_ops_in_interval_2() {
#[test]
fn delta_get_ops_in_interval_3() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
let insert_a = DeltaOperation::insert("123456");
delta.add(insert_a.clone());
assert_eq!(
@ -100,7 +100,7 @@ fn delta_get_ops_in_interval_3() {
#[test]
fn delta_get_ops_in_interval_4() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
let insert_a = DeltaOperation::insert("12");
let insert_b = DeltaOperation::insert("34");
let insert_c = DeltaOperation::insert("56");
@ -130,7 +130,7 @@ fn delta_get_ops_in_interval_4() {
#[test]
fn delta_get_ops_in_interval_5() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
let insert_a = DeltaOperation::insert("123456");
let insert_b = DeltaOperation::insert("789");
delta.ops.push(insert_a.clone());
@ -148,7 +148,7 @@ fn delta_get_ops_in_interval_5() {
#[test]
fn delta_get_ops_in_interval_6() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
let insert_a = DeltaOperation::insert("12345678");
delta.add(insert_a.clone());
assert_eq!(
@ -159,7 +159,7 @@ fn delta_get_ops_in_interval_6() {
#[test]
fn delta_get_ops_in_interval_7() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
let insert_a = DeltaOperation::insert("12345");
let retain_a = DeltaOperation::retain(3);
@ -179,7 +179,7 @@ fn delta_get_ops_in_interval_7() {
#[test]
fn delta_op_seek() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
let insert_a = DeltaOperation::insert("12345");
let retain_a = DeltaOperation::retain(3);
delta.add(insert_a.clone());
@ -191,7 +191,7 @@ fn delta_op_seek() {
#[test]
fn delta_utf16_code_unit_seek() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
delta.add(DeltaOperation::insert("12345"));
let mut iter = OperationIterator::new(&delta);
@ -201,7 +201,7 @@ fn delta_utf16_code_unit_seek() {
#[test]
fn delta_utf16_code_unit_seek_with_attributes() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
let attributes = AttributeBuilder::new()
.insert("bold", true)
.insert("italic", true)
@ -221,7 +221,7 @@ fn delta_utf16_code_unit_seek_with_attributes() {
#[test]
fn delta_next_op_len() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
delta.add(DeltaOperation::insert("12345"));
let mut iter = OperationIterator::new(&delta);
assert_eq!(iter.next_op_with_len(2).unwrap(), DeltaOperation::insert("12"));
@ -232,7 +232,7 @@ fn delta_next_op_len() {
#[test]
fn delta_next_op_len_with_chinese() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
delta.add(DeltaOperation::insert("你好"));
let mut iter = OperationIterator::new(&delta);
@ -242,7 +242,7 @@ fn delta_next_op_len_with_chinese() {
#[test]
fn delta_next_op_len_with_english() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
delta.add(DeltaOperation::insert("ab"));
let mut iter = OperationIterator::new(&delta);
assert_eq!(iter.next_op_len().unwrap(), 2);
@ -251,7 +251,7 @@ fn delta_next_op_len_with_english() {
#[test]
fn delta_next_op_len_after_seek() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
delta.add(DeltaOperation::insert("12345"));
let mut iter = OperationIterator::new(&delta);
assert_eq!(iter.next_op_len().unwrap(), 5);
@ -264,7 +264,7 @@ fn delta_next_op_len_after_seek() {
#[test]
fn delta_next_op_len_none() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
delta.add(DeltaOperation::insert("12345"));
let mut iter = OperationIterator::new(&delta);
@ -275,7 +275,7 @@ fn delta_next_op_len_none() {
#[test]
fn delta_next_op_with_len_zero() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
delta.add(DeltaOperation::insert("12345"));
let mut iter = OperationIterator::new(&delta);
assert_eq!(iter.next_op_with_len(0), None,);
@ -284,7 +284,7 @@ fn delta_next_op_with_len_zero() {
#[test]
fn delta_next_op_with_len_cross_op_return_last() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
delta.add(DeltaOperation::insert("12345"));
delta.add(DeltaOperation::retain(1));
delta.add(DeltaOperation::insert("678"));
@ -297,7 +297,7 @@ fn delta_next_op_with_len_cross_op_return_last() {
#[test]
fn lengths() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
assert_eq!(delta.utf16_base_len, 0);
assert_eq!(delta.utf16_target_len, 0);
delta.retain(5, AttributeHashMap::default());
@ -315,7 +315,7 @@ fn lengths() {
}
#[test]
fn sequence() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
delta.retain(5, AttributeHashMap::default());
delta.retain(0, AttributeHashMap::default());
delta.insert("appflowy", AttributeHashMap::default());
@ -348,7 +348,7 @@ fn apply_test() {
#[test]
fn base_len_test() {
let mut delta_a = TextOperations::default();
let mut delta_a = DeltaTextOperations::default();
delta_a.insert("a", AttributeHashMap::default());
delta_a.insert("b", AttributeHashMap::default());
delta_a.insert("c", AttributeHashMap::default());
@ -387,7 +387,7 @@ fn invert_test() {
#[test]
fn empty_ops() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
delta.retain(0, AttributeHashMap::default());
delta.insert("", AttributeHashMap::default());
delta.delete(0);
@ -395,12 +395,12 @@ fn empty_ops() {
}
#[test]
fn eq() {
let mut delta_a = TextOperations::default();
let mut delta_a = DeltaTextOperations::default();
delta_a.delete(1);
delta_a.insert("lo", AttributeHashMap::default());
delta_a.retain(2, AttributeHashMap::default());
delta_a.retain(3, AttributeHashMap::default());
let mut delta_b = TextOperations::default();
let mut delta_b = DeltaTextOperations::default();
delta_b.delete(1);
delta_b.insert("l", AttributeHashMap::default());
delta_b.insert("o", AttributeHashMap::default());
@ -412,7 +412,7 @@ fn eq() {
}
#[test]
fn ops_merging() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
assert_eq!(delta.ops.len(), 0);
delta.retain(2, AttributeHashMap::default());
assert_eq!(delta.ops.len(), 1);
@ -436,7 +436,7 @@ fn ops_merging() {
#[test]
fn is_noop() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
assert!(delta.is_noop());
delta.retain(5, AttributeHashMap::default());
assert!(delta.is_noop());
@ -484,13 +484,13 @@ fn transform_random_delta() {
#[test]
fn transform_with_two_delta() {
let mut a = TextOperations::default();
let mut a = DeltaTextOperations::default();
let mut a_s = String::new();
a.insert("123", AttributeBuilder::new().insert("bold", true).build());
a_s = a.apply(&a_s).unwrap();
assert_eq!(&a_s, "123");
let mut b = TextOperations::default();
let mut b = DeltaTextOperations::default();
let mut b_s = String::new();
b.insert("456", AttributeHashMap::default());
b_s = b.apply(&b_s).unwrap();
@ -580,10 +580,10 @@ fn transform_two_conflict_non_seq_delta() {
#[test]
fn delta_invert_no_attribute_delta() {
let mut delta = TextOperations::default();
let mut delta = DeltaTextOperations::default();
delta.add(DeltaOperation::insert("123"));
let mut change = TextOperations::default();
let mut change = DeltaTextOperations::default();
change.add(DeltaOperation::retain(3));
change.add(DeltaOperation::insert("456"));
let undo = change.invert(&delta);

View File

@ -1,8 +1,8 @@
use flowy_sync::client_document::{ClientDocument, EmptyDocument};
use lib_ot::text_delta::TextOperation;
use lib_ot::text_delta::DeltaTextOperation;
use lib_ot::{
core::*,
text_delta::{BuildInTextAttribute, TextOperations},
text_delta::{BuildInTextAttribute, DeltaTextOperations},
};
#[test]
@ -15,7 +15,7 @@ fn operation_insert_serialize_test() {
let json = serde_json::to_string(&operation).unwrap();
eprintln!("{}", json);
let insert_op: TextOperation = serde_json::from_str(&json).unwrap();
let insert_op: DeltaTextOperation = serde_json::from_str(&json).unwrap();
assert_eq!(insert_op, operation);
}
@ -24,15 +24,15 @@ fn operation_retain_serialize_test() {
let operation = DeltaOperation::Retain(12.into());
let json = serde_json::to_string(&operation).unwrap();
eprintln!("{}", json);
let insert_op: TextOperation = serde_json::from_str(&json).unwrap();
let insert_op: DeltaTextOperation = serde_json::from_str(&json).unwrap();
assert_eq!(insert_op, operation);
}
#[test]
fn operation_delete_serialize_test() {
let operation = TextOperation::Delete(2);
let operation = DeltaTextOperation::Delete(2);
let json = serde_json::to_string(&operation).unwrap();
let insert_op: TextOperation = serde_json::from_str(&json).unwrap();
let insert_op: DeltaTextOperation = serde_json::from_str(&json).unwrap();
assert_eq!(insert_op, operation);
}
@ -77,7 +77,7 @@ fn delta_deserialize_test() {
{"retain":2,"attributes":{"italic":true,"bold":true}},
{"retain":2,"attributes":{"italic":true,"bold":true}}
]"#;
let delta = TextOperations::from_json(json).unwrap();
let delta = DeltaTextOperations::from_json(json).unwrap();
eprintln!("{}", delta);
}
@ -86,12 +86,12 @@ fn delta_deserialize_null_test() {
let json = r#"[
{"retain":7,"attributes":{"bold":null}}
]"#;
let delta1 = TextOperations::from_json(json).unwrap();
let delta1 = DeltaTextOperations::from_json(json).unwrap();
let mut attribute = BuildInTextAttribute::Bold(true);
attribute.remove_value();
let delta2 = OperationBuilder::new()
let delta2 = DeltaOperationBuilder::new()
.retain_with_attributes(7, attribute.into())
.build();

View File

@ -0,0 +1,24 @@
use crate::new_document::script::DocumentEditorTest;
use crate::new_document::script::EditScript::*;
#[tokio::test]
async fn document_insert_h1_style_test() {
let scripts = vec![
ComposeTransactionStr {
transaction: r#"{"operations":[{"op":"update_text","path":[0,0],"delta":[{"insert":"/"}],"inverted":[{"delete":1}]}],"after_selection":{"start":{"path":[0,0],"offset":1},"end":{"path":[0,0],"offset":1}},"before_selection":{"start":{"path":[0,0],"offset":0},"end":{"path":[0,0],"offset":0}}}"#,
},
AssertContent {
expected: r#"{"document":{"type":"editor","children":[{"type":"text","delta":[{"insert":"/"}]}]}}"#,
},
ComposeTransactionStr {
transaction: r#"{"operations":[{"op":"update_text","path":[0,0],"delta":[{"delete":1}],"inverted":[{"insert":"/"}]}],"after_selection":{"start":{"path":[0,0],"offset":0},"end":{"path":[0,0],"offset":0}},"before_selection":{"start":{"path":[0,0],"offset":1},"end":{"path":[0,0],"offset":1}}}"#,
},
ComposeTransactionStr {
transaction: r#"{"operations":[{"op":"update","path":[0,0],"attributes":{"subtype":"heading","heading":"h1"},"oldAttributes":{"subtype":null,"heading":null}}],"after_selection":{"start":{"path":[0,0],"offset":0},"end":{"path":[0,0],"offset":0}},"before_selection":{"start":{"path":[0,0],"offset":0},"end":{"path":[0,0],"offset":0}}}"#,
},
AssertContent {
expected: r#"{"document":{"type":"editor","children":[{"type":"text","attributes":{"subtype":"heading","heading":"h1"}}]}}"#,
},
];
DocumentEditorTest::new().await.run_scripts(scripts).await;
}

View File

@ -1,2 +1,3 @@
mod document_compose_test;
mod script;
mod test;

View File

@ -1,17 +1,37 @@
use flowy_document::editor::AppFlowyDocumentEditor;
use flowy_document::editor::{AppFlowyDocumentEditor, Document, DocumentTransaction};
use flowy_document::entities::DocumentVersionPB;
use flowy_test::helper::ViewTest;
use flowy_test::FlowySDKTest;
use lib_ot::core::{Body, Changeset, NodeDataBuilder, NodeOperation, Path, Transaction};
use lib_ot::text_delta::TextOperations;
use lib_ot::text_delta::DeltaTextOperations;
use std::sync::Arc;
pub enum EditScript {
InsertText { path: Path, delta: TextOperations },
UpdateText { path: Path, delta: TextOperations },
Delete { path: Path },
AssertContent { expected: &'static str },
AssertPrettyContent { expected: &'static str },
InsertText {
path: Path,
delta: DeltaTextOperations,
},
UpdateText {
path: Path,
delta: DeltaTextOperations,
},
#[allow(dead_code)]
ComposeTransaction {
transaction: Transaction,
},
ComposeTransactionStr {
transaction: &'static str,
},
Delete {
path: Path,
},
AssertContent {
expected: &'static str,
},
AssertPrettyContent {
expected: &'static str,
},
}
pub struct DocumentEditorTest {
@ -21,7 +41,8 @@ pub struct DocumentEditorTest {
impl DocumentEditorTest {
pub async fn new() -> Self {
let sdk = FlowySDKTest::new(true);
let version = DocumentVersionPB::V1;
let sdk = FlowySDKTest::new(version.clone());
let _ = sdk.init_user().await;
let test = ViewTest::new_document_view(&sdk).await;
@ -62,6 +83,14 @@ impl DocumentEditorTest {
.await
.unwrap();
}
EditScript::ComposeTransaction { transaction } => {
self.editor.apply_transaction(transaction).await.unwrap();
}
EditScript::ComposeTransactionStr { transaction } => {
let document_transaction = serde_json::from_str::<DocumentTransaction>(transaction).unwrap();
let transaction: Transaction = document_transaction.into();
self.editor.apply_transaction(transaction).await.unwrap();
}
EditScript::Delete { path } => {
let operation = NodeOperation::Delete { path, nodes: vec![] };
self.editor
@ -72,6 +101,9 @@ impl DocumentEditorTest {
EditScript::AssertContent { expected } => {
//
let content = self.editor.get_content(false).await.unwrap();
let expected_document: Document = serde_json::from_str(expected).unwrap();
let expected = serde_json::to_string(&expected_document).unwrap();
assert_eq!(content, expected);
}
EditScript::AssertPrettyContent { expected } => {

View File

@ -1,7 +1,7 @@
use crate::new_document::script::DocumentEditorTest;
use crate::new_document::script::EditScript::*;
use lib_ot::text_delta::TextOperationBuilder;
use lib_ot::text_delta::DeltaTextOperationBuilder;
#[tokio::test]
async fn document_initialize_test() {
@ -13,7 +13,7 @@ async fn document_initialize_test() {
#[tokio::test]
async fn document_insert_text_test() {
let delta = TextOperationBuilder::new().insert("Hello world").build();
let delta = DeltaTextOperationBuilder::new().insert("Hello world").build();
let expected = r#"{
"document": {
"type": "editor",
@ -49,7 +49,7 @@ async fn document_update_text_test() {
let scripts = vec![
UpdateText {
path: vec![0, 0].into(),
delta: TextOperationBuilder::new().insert(&hello_world).build(),
delta: DeltaTextOperationBuilder::new().insert(&hello_world).build(),
},
AssertPrettyContent {
expected: r#"{
@ -75,7 +75,7 @@ async fn document_update_text_test() {
let scripts = vec![
UpdateText {
path: vec![0, 0].into(),
delta: TextOperationBuilder::new()
delta: DeltaTextOperationBuilder::new()
.retain(hello_world.len())
.insert(", AppFlowy")
.build(),
@ -122,11 +122,11 @@ async fn document_delete_text_test() {
let scripts = vec![
UpdateText {
path: vec![0, 0].into(),
delta: TextOperationBuilder::new().insert(&hello_world).build(),
delta: DeltaTextOperationBuilder::new().insert(&hello_world).build(),
},
UpdateText {
path: vec![0, 0].into(),
delta: TextOperationBuilder::new().retain(5).delete(6).build(),
delta: DeltaTextOperationBuilder::new().retain(5).delete(6).build(),
},
AssertPrettyContent { expected },
];
@ -139,7 +139,7 @@ async fn document_delete_node_test() {
let scripts = vec![
UpdateText {
path: vec![0, 0].into(),
delta: TextOperationBuilder::new().insert("Hello world").build(),
delta: DeltaTextOperationBuilder::new().insert("Hello world").build(),
},
AssertContent {
expected: r#"{"document":{"type":"editor","children":[{"type":"text","delta":[{"insert":"Hello world"}]}]}}"#,

View File

@ -2,7 +2,7 @@ use flowy_document::old_editor::editor::DeltaDocumentEditor;
use flowy_document::TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS;
use flowy_revision::disk::RevisionState;
use flowy_test::{helper::ViewTest, FlowySDKTest};
use lib_ot::{core::Interval, text_delta::TextOperations};
use lib_ot::{core::Interval, text_delta::DeltaTextOperations};
use std::sync::Arc;
use tokio::time::{sleep, Duration};
@ -75,7 +75,7 @@ impl DeltaDocumentEditorTest {
assert_eq!(next_revision.rev_id, rev_id.unwrap());
}
EditorScript::AssertJson(expected) => {
let expected_delta: TextOperations = serde_json::from_str(expected).unwrap();
let expected_delta: DeltaTextOperations = serde_json::from_str(expected).unwrap();
let delta = self.editor.document_operations().await.unwrap();
if expected_delta != delta {
eprintln!("✅ expect: {}", expected,);

View File

@ -7,7 +7,7 @@ use crate::{
impl_def_and_def_mut,
};
use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
use flowy_folder_data_model::revision::{gen_view_id, ViewDataTypeRevision, ViewLayoutTypeRevision, ViewRevision};
use flowy_folder_data_model::revision::{gen_view_id, ViewDataFormatRevision, ViewLayoutTypeRevision, ViewRevision};
use std::convert::TryInto;
#[derive(Eq, PartialEq, ProtoBuf, Debug, Default, Clone)]
@ -22,7 +22,7 @@ pub struct ViewPB {
pub name: String,
#[pb(index = 4)]
pub data_type: ViewDataTypePB,
pub data_format: ViewDataFormatPB,
#[pb(index = 5)]
pub modified_time: i64,
@ -40,7 +40,7 @@ impl std::convert::From<ViewRevision> for ViewPB {
id: rev.id,
app_id: rev.app_id,
name: rev.name,
data_type: rev.data_type.into(),
data_format: rev.data_format.into(),
modified_time: rev.modified_time,
create_time: rev.create_time,
layout: rev.layout.into(),
@ -49,31 +49,34 @@ impl std::convert::From<ViewRevision> for ViewPB {
}
#[derive(Eq, PartialEq, Hash, Debug, ProtoBuf_Enum, Clone)]
pub enum ViewDataTypePB {
Text = 0,
Database = 1,
pub enum ViewDataFormatPB {
DeltaFormat = 0,
DatabaseFormat = 1,
TreeFormat = 2,
}
impl std::default::Default for ViewDataTypePB {
impl std::default::Default for ViewDataFormatPB {
fn default() -> Self {
ViewDataTypeRevision::default().into()
ViewDataFormatRevision::default().into()
}
}
impl std::convert::From<ViewDataTypeRevision> for ViewDataTypePB {
fn from(rev: ViewDataTypeRevision) -> Self {
impl std::convert::From<ViewDataFormatRevision> for ViewDataFormatPB {
fn from(rev: ViewDataFormatRevision) -> Self {
match rev {
ViewDataTypeRevision::Text => ViewDataTypePB::Text,
ViewDataTypeRevision::Database => ViewDataTypePB::Database,
ViewDataFormatRevision::DeltaFormat => ViewDataFormatPB::DeltaFormat,
ViewDataFormatRevision::DatabaseFormat => ViewDataFormatPB::DatabaseFormat,
ViewDataFormatRevision::TreeFormat => ViewDataFormatPB::TreeFormat,
}
}
}
impl std::convert::From<ViewDataTypePB> for ViewDataTypeRevision {
fn from(ty: ViewDataTypePB) -> Self {
impl std::convert::From<ViewDataFormatPB> for ViewDataFormatRevision {
fn from(ty: ViewDataFormatPB) -> Self {
match ty {
ViewDataTypePB::Text => ViewDataTypeRevision::Text,
ViewDataTypePB::Database => ViewDataTypeRevision::Database,
ViewDataFormatPB::DeltaFormat => ViewDataFormatRevision::DeltaFormat,
ViewDataFormatPB::DatabaseFormat => ViewDataFormatRevision::DatabaseFormat,
ViewDataFormatPB::TreeFormat => ViewDataFormatRevision::TreeFormat,
}
}
}
@ -146,7 +149,7 @@ pub struct CreateViewPayloadPB {
pub thumbnail: Option<String>,
#[pb(index = 5)]
pub data_type: ViewDataTypePB,
pub data_format: ViewDataFormatPB,
#[pb(index = 6)]
pub layout: ViewLayoutTypePB,
@ -161,7 +164,7 @@ pub struct CreateViewParams {
pub name: String,
pub desc: String,
pub thumbnail: String,
pub data_type: ViewDataTypePB,
pub data_format: ViewDataFormatPB,
pub layout: ViewLayoutTypePB,
pub view_id: String,
pub view_content_data: Vec<u8>,
@ -183,7 +186,7 @@ impl TryInto<CreateViewParams> for CreateViewPayloadPB {
belong_to_id,
name,
desc: self.desc,
data_type: self.data_type,
data_format: self.data_format,
layout: self.layout,
thumbnail,
view_id,

View File

@ -1,4 +1,4 @@
use crate::entities::{RepeatedViewPB, ViewDataTypePB};
use crate::entities::{RepeatedViewPB, ViewDataFormatPB};
use flowy_derive::ProtoBuf;
#[derive(Eq, PartialEq, ProtoBuf, Debug, Default, Clone)]
@ -16,7 +16,7 @@ pub struct ViewInfoPB {
pub desc: String,
#[pb(index = 5)]
pub data_type: ViewDataTypePB,
pub data_type: ViewDataFormatPB,
#[pb(index = 6)]
pub belongings: RepeatedViewPB,

View File

@ -126,7 +126,7 @@ pub enum FolderEvent {
#[event(input = "RepeatedViewIdPB")]
DeleteView = 204,
#[event(input = "ViewIdPB")]
#[event(input = "ViewPB")]
DuplicateView = 205,
#[event(input = "ViewIdPB")]

View File

@ -1,5 +1,5 @@
use crate::entities::view::ViewDataTypePB;
use crate::entities::ViewLayoutTypePB;
use crate::entities::view::ViewDataFormatPB;
use crate::entities::{ViewLayoutTypePB, ViewPB};
use crate::services::folder_editor::FolderRevisionCompress;
use crate::{
dart_notification::{send_dart_notification, FolderNotification},
@ -14,13 +14,14 @@ use crate::{
use bytes::Bytes;
use flowy_error::FlowyError;
use flowy_folder_data_model::user_default;
use flowy_revision::disk::SQLiteDocumentRevisionPersistence;
use flowy_revision::disk::SQLiteDeltaDocumentRevisionPersistence;
use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence};
use flowy_sync::client_document::default::initial_read_me;
use flowy_document::editor::initial_read_me;
use flowy_sync::{client_folder::FolderPad, entities::ws_data::ServerRevisionWSData};
use lazy_static::lazy_static;
use lib_infra::future::FutureResult;
use std::{collections::HashMap, convert::TryInto, fmt::Formatter, sync::Arc};
use tokio::sync::RwLock as TokioRwLock;
lazy_static! {
@ -64,7 +65,6 @@ pub struct FolderManager {
pub(crate) trash_controller: Arc<TrashController>,
web_socket: Arc<dyn RevisionWebSocket>,
folder_editor: Arc<TokioRwLock<Option<Arc<FolderEditor>>>>,
data_processors: ViewDataProcessorMap,
}
impl FolderManager {
@ -95,7 +95,7 @@ impl FolderManager {
persistence.clone(),
cloud_service.clone(),
trash_controller.clone(),
data_processors.clone(),
data_processors,
));
let app_controller = Arc::new(AppController::new(
@ -122,7 +122,6 @@ impl FolderManager {
trash_controller,
web_socket,
folder_editor,
data_processors,
}
}
@ -151,6 +150,7 @@ impl FolderManager {
}
}
/// Called immediately after the application launched with the user sign in/sign up.
#[tracing::instrument(level = "trace", skip(self), err)]
pub async fn initialize(&self, user_id: &str, token: &str) -> FlowyResult<()> {
let mut write_guard = INIT_FOLDER_FLAG.write().await;
@ -165,7 +165,7 @@ impl FolderManager {
let pool = self.persistence.db_pool()?;
let object_id = folder_id.as_ref();
let disk_cache = SQLiteDocumentRevisionPersistence::new(user_id, pool.clone());
let disk_cache = SQLiteDeltaDocumentRevisionPersistence::new(user_id, pool.clone());
let rev_persistence = RevisionPersistence::new(user_id, object_id, disk_cache);
let rev_compactor = FolderRevisionCompress();
// let history_persistence = SQLiteRevisionHistoryPersistence::new(object_id, pool.clone());
@ -184,17 +184,24 @@ impl FolderManager {
let _ = self.app_controller.initialize()?;
let _ = self.view_controller.initialize()?;
self.data_processors.iter().for_each(|(_, processor)| {
processor.initialize();
});
write_guard.insert(user_id.to_owned(), true);
Ok(())
}
pub async fn initialize_with_new_user(&self, user_id: &str, token: &str) -> FlowyResult<()> {
DefaultFolderBuilder::build(token, user_id, self.persistence.clone(), self.view_controller.clone()).await?;
pub async fn initialize_with_new_user(
&self,
user_id: &str,
token: &str,
view_data_format: ViewDataFormatPB,
) -> FlowyResult<()> {
DefaultFolderBuilder::build(
token,
user_id,
self.persistence.clone(),
self.view_controller.clone(),
|| (view_data_format.clone(), Bytes::from(initial_read_me())),
)
.await?;
self.initialize(user_id, token).await
}
@ -205,23 +212,24 @@ impl FolderManager {
struct DefaultFolderBuilder();
impl DefaultFolderBuilder {
async fn build(
async fn build<F: Fn() -> (ViewDataFormatPB, Bytes)>(
token: &str,
user_id: &str,
persistence: Arc<FolderPersistence>,
view_controller: Arc<ViewController>,
create_view_fn: F,
) -> FlowyResult<()> {
log::debug!("Create user default workspace");
let workspace_rev = user_default::create_default_workspace();
set_current_workspace(&workspace_rev.id);
for app in workspace_rev.apps.iter() {
for (index, view) in app.belongings.iter().enumerate() {
let (view_data_type, view_data) = create_view_fn();
if index == 0 {
let view_data = initial_read_me().json_str();
let _ = view_controller.set_latest_view(&view.id);
let layout_type = ViewLayoutTypePB::from(view.layout.clone());
let _ = view_controller
.create_view(&view.id, ViewDataTypePB::Text, layout_type, Bytes::from(view_data))
.create_view(&view.id, view_data_type, layout_type, view_data)
.await?;
}
}
@ -247,25 +255,24 @@ impl FolderManager {
}
pub trait ViewDataProcessor {
fn initialize(&self) -> FutureResult<(), FlowyError>;
fn create_container(
fn create_view(
&self,
user_id: &str,
view_id: &str,
layout: ViewLayoutTypePB,
delta_data: Bytes,
view_data: Bytes,
) -> FutureResult<(), FlowyError>;
fn close_container(&self, view_id: &str) -> FutureResult<(), FlowyError>;
fn close_view(&self, view_id: &str) -> FutureResult<(), FlowyError>;
fn get_view_data(&self, view_id: &str) -> FutureResult<Bytes, FlowyError>;
fn get_view_data(&self, view: &ViewPB) -> FutureResult<Bytes, FlowyError>;
fn create_default_view(
&self,
user_id: &str,
view_id: &str,
layout: ViewLayoutTypePB,
data_format: ViewDataFormatPB,
) -> FutureResult<Bytes, FlowyError>;
fn create_view_from_delta_data(
@ -276,7 +283,7 @@ pub trait ViewDataProcessor {
layout: ViewLayoutTypePB,
) -> FutureResult<Bytes, FlowyError>;
fn data_type(&self) -> ViewDataTypePB;
fn data_types(&self) -> Vec<ViewDataFormatPB>;
}
pub type ViewDataProcessorMap = Arc<HashMap<ViewDataTypePB, Arc<dyn ViewDataProcessor + Send + Sync>>>;
pub type ViewDataProcessorMap = Arc<HashMap<ViewDataFormatPB, Arc<dyn ViewDataProcessor + Send + Sync>>>;

View File

@ -7,12 +7,13 @@ use bytes::Bytes;
use flowy_database::kv::KV;
use flowy_error::{FlowyError, FlowyResult};
use flowy_folder_data_model::revision::{AppRevision, FolderRevision, ViewRevision, WorkspaceRevision};
use flowy_revision::disk::SQLiteDocumentRevisionPersistence;
use flowy_revision::disk::SQLiteDeltaDocumentRevisionPersistence;
use flowy_revision::reset::{RevisionResettable, RevisionStructReset};
use flowy_sync::client_folder::make_folder_rev_json_str;
use flowy_sync::entities::revision::Revision;
use flowy_sync::server_folder::FolderOperationsBuilder;
use flowy_sync::{client_folder::FolderPad, entities::revision::md5};
use lib_ot::core::DeltaBuilder;
use std::sync::Arc;
const V1_MIGRATION: &str = "FOLDER_V1_MIGRATION";
@ -112,7 +113,7 @@ impl FolderMigration {
};
let pool = self.database.db_pool()?;
let disk_cache = SQLiteDocumentRevisionPersistence::new(&self.user_id, pool);
let disk_cache = SQLiteDeltaDocumentRevisionPersistence::new(&self.user_id, pool);
let reset = RevisionStructReset::new(&self.user_id, object, Arc::new(disk_cache));
reset.run().await
}
@ -134,7 +135,7 @@ impl RevisionResettable for FolderRevisionResettable {
fn reset_data(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
let pad = FolderPad::from_revisions(revisions)?;
let json = pad.to_json()?;
let bytes = DeltaBuilder::new().insert(&json).build().json_bytes();
let bytes = FolderOperationsBuilder::new().insert(&json).build().json_bytes();
Ok(bytes)
}

View File

@ -13,7 +13,8 @@ use flowy_folder_data_model::revision::{AppRevision, TrashRevision, ViewRevision
use flowy_revision::disk::{RevisionRecord, RevisionState};
use flowy_revision::mk_text_block_revision_disk_cache;
use flowy_sync::{client_folder::FolderPad, entities::revision::Revision};
use lib_ot::core::DeltaBuilder;
use flowy_sync::server_folder::FolderOperationsBuilder;
use std::sync::Arc;
use tokio::sync::RwLock;
pub use version_1::{app_sql::*, trash_sql::*, v1_impl::V1Transaction, view_sql::*, workspace_sql::*};
@ -108,7 +109,7 @@ impl FolderPersistence {
pub async fn save_folder(&self, user_id: &str, folder_id: &FolderId, folder: FolderPad) -> FlowyResult<()> {
let pool = self.database.db_pool()?;
let json = folder.to_json()?;
let delta_data = DeltaBuilder::new().insert(&json).build().json_bytes();
let delta_data = FolderOperationsBuilder::new().insert(&json).build().json_bytes();
let revision = Revision::initial_revision(user_id, folder_id.as_ref(), delta_data);
let record = RevisionRecord {
revision,

View File

@ -13,7 +13,7 @@ use flowy_database::{
SqliteConnection,
};
use flowy_folder_data_model::revision::{ViewDataTypeRevision, ViewLayoutTypeRevision, ViewRevision};
use flowy_folder_data_model::revision::{ViewDataFormatRevision, ViewLayoutTypeRevision, ViewRevision};
use lib_infra::util::timestamp;
pub struct ViewTableSql();
@ -78,7 +78,7 @@ pub(crate) struct ViewTable {
pub modified_time: i64,
pub create_time: i64,
pub thumbnail: String,
pub view_type: SqlViewDataType,
pub view_type: SqlViewDataFormat,
pub version: i64,
pub is_trash: bool,
pub ext_data: String,
@ -86,9 +86,10 @@ pub(crate) struct ViewTable {
impl ViewTable {
pub fn new(view_rev: ViewRevision) -> Self {
let data_type = match view_rev.data_type {
ViewDataTypeRevision::Text => SqlViewDataType::Block,
ViewDataTypeRevision::Database => SqlViewDataType::Grid,
let data_type = match view_rev.data_format {
ViewDataFormatRevision::DeltaFormat => SqlViewDataFormat::Delta,
ViewDataFormatRevision::DatabaseFormat => SqlViewDataFormat::Database,
ViewDataFormatRevision::TreeFormat => SqlViewDataFormat::Tree,
};
ViewTable {
@ -110,8 +111,9 @@ impl ViewTable {
impl std::convert::From<ViewTable> for ViewRevision {
fn from(table: ViewTable) -> Self {
let data_type = match table.view_type {
SqlViewDataType::Block => ViewDataTypeRevision::Text,
SqlViewDataType::Grid => ViewDataTypeRevision::Database,
SqlViewDataFormat::Delta => ViewDataFormatRevision::DeltaFormat,
SqlViewDataFormat::Database => ViewDataFormatRevision::DatabaseFormat,
SqlViewDataFormat::Tree => ViewDataFormatRevision::TreeFormat,
};
ViewRevision {
@ -119,7 +121,7 @@ impl std::convert::From<ViewTable> for ViewRevision {
app_id: table.belong_to_id,
name: table.name,
desc: table.desc,
data_type,
data_format: data_type,
belongings: vec![],
modified_time: table.modified_time,
version: table.version,
@ -180,34 +182,36 @@ impl ViewChangeset {
#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)]
#[repr(i32)]
#[sql_type = "Integer"]
pub enum SqlViewDataType {
Block = 0,
Grid = 1,
pub enum SqlViewDataFormat {
Delta = 0,
Database = 1,
Tree = 2,
}
impl std::default::Default for SqlViewDataType {
impl std::default::Default for SqlViewDataFormat {
fn default() -> Self {
SqlViewDataType::Block
SqlViewDataFormat::Delta
}
}
impl std::convert::From<i32> for SqlViewDataType {
impl std::convert::From<i32> for SqlViewDataFormat {
fn from(value: i32) -> Self {
match value {
0 => SqlViewDataType::Block,
1 => SqlViewDataType::Grid,
0 => SqlViewDataFormat::Delta,
1 => SqlViewDataFormat::Database,
2 => SqlViewDataFormat::Tree,
o => {
log::error!("Unsupported view type {}, fallback to ViewType::Block", o);
SqlViewDataType::Block
SqlViewDataFormat::Delta
}
}
}
}
impl SqlViewDataType {
impl SqlViewDataFormat {
pub fn value(&self) -> i32 {
*self as i32
}
}
impl_sql_integer_expression!(SqlViewDataType);
impl_sql_integer_expression!(SqlViewDataFormat);

View File

@ -1,4 +1,4 @@
pub use crate::entities::view::ViewDataTypePB;
pub use crate::entities::view::ViewDataFormatPB;
use crate::entities::{DeletedViewPB, ViewInfoPB, ViewLayoutTypePB};
use crate::manager::{ViewDataProcessor, ViewDataProcessorMap};
use crate::{
@ -58,12 +58,17 @@ impl ViewController {
&self,
mut params: CreateViewParams,
) -> Result<ViewRevision, FlowyError> {
let processor = self.get_data_processor(params.data_type.clone())?;
let processor = self.get_data_processor(params.data_format.clone())?;
let user_id = self.user.user_id()?;
if params.view_content_data.is_empty() {
tracing::trace!("Create view with build-in data");
let view_data = processor
.create_default_view(&user_id, &params.view_id, params.layout.clone())
.create_default_view(
&user_id,
&params.view_id,
params.layout.clone(),
params.data_format.clone(),
)
.await?;
params.view_content_data = view_data.to_vec();
} else {
@ -79,7 +84,7 @@ impl ViewController {
let _ = self
.create_view(
&params.view_id,
params.data_type.clone(),
params.data_format.clone(),
params.layout.clone(),
delta_data,
)
@ -91,22 +96,20 @@ impl ViewController {
Ok(view_rev)
}
#[tracing::instrument(level = "debug", skip(self, view_id, delta_data), err)]
#[tracing::instrument(level = "debug", skip(self, view_id, view_data), err)]
pub(crate) async fn create_view(
&self,
view_id: &str,
data_type: ViewDataTypePB,
data_type: ViewDataFormatPB,
layout_type: ViewLayoutTypePB,
delta_data: Bytes,
view_data: Bytes,
) -> Result<(), FlowyError> {
if delta_data.is_empty() {
if view_data.is_empty() {
return Err(FlowyError::internal().context("The content of the view should not be empty"));
}
let user_id = self.user.user_id()?;
let processor = self.get_data_processor(data_type)?;
let _ = processor
.create_container(&user_id, view_id, layout_type, delta_data)
.await?;
let _ = processor.create_view(&user_id, view_id, layout_type, view_data).await?;
Ok(())
}
@ -156,7 +159,7 @@ impl ViewController {
belong_to_id: view_rev.app_id,
name: view_rev.name,
desc: view_rev.desc,
data_type: view_rev.data_type.into(),
data_type: view_rev.data_format.into(),
belongings: RepeatedViewPB { items },
ext_data: view_rev.ext_data,
};
@ -188,7 +191,7 @@ impl ViewController {
#[tracing::instrument(level = "debug", skip(self), err)]
pub(crate) async fn close_view(&self, view_id: &str) -> Result<(), FlowyError> {
let processor = self.get_data_processor_from_view_id(view_id).await?;
let _ = processor.close_container(view_id).await?;
let _ = processor.close_view(view_id).await?;
Ok(())
}
@ -223,7 +226,7 @@ impl ViewController {
.send();
let processor = self.get_data_processor_from_view_id(&view_id).await?;
let _ = processor.close_container(&view_id).await?;
let _ = processor.close_view(&view_id).await?;
Ok(())
}
@ -242,20 +245,20 @@ impl ViewController {
}
#[tracing::instrument(level = "debug", skip(self), err)]
pub(crate) async fn duplicate_view(&self, view_id: &str) -> Result<(), FlowyError> {
pub(crate) async fn duplicate_view(&self, view: ViewPB) -> Result<(), FlowyError> {
let view_rev = self
.persistence
.begin_transaction(|transaction| transaction.read_view(view_id))
.begin_transaction(|transaction| transaction.read_view(&view.id))
.await?;
let processor = self.get_data_processor(view_rev.data_type.clone())?;
let view_data = processor.get_view_data(view_id).await?;
let processor = self.get_data_processor(view_rev.data_format.clone())?;
let view_data = processor.get_view_data(&view).await?;
let duplicate_params = CreateViewParams {
belong_to_id: view_rev.app_id.clone(),
name: format!("{} (copy)", &view_rev.name),
desc: view_rev.desc,
thumbnail: view_rev.thumbnail,
data_type: view_rev.data_type.into(),
data_format: view_rev.data_format.into(),
layout: view_rev.layout.into(),
view_content_data: view_data.to_vec(),
view_id: gen_view_id(),
@ -399,11 +402,11 @@ impl ViewController {
.persistence
.begin_transaction(|transaction| transaction.read_view(view_id))
.await?;
self.get_data_processor(view.data_type)
self.get_data_processor(view.data_format)
}
#[inline]
fn get_data_processor<T: Into<ViewDataTypePB>>(
fn get_data_processor<T: Into<ViewDataFormatPB>>(
&self,
data_type: T,
) -> FlowyResult<Arc<dyn ViewDataProcessor + Send + Sync>> {
@ -472,10 +475,10 @@ async fn handle_trash_event(
.await?;
for view in views {
let data_type = view.data_type.clone().into();
let data_type = view.data_format.clone().into();
match get_data_processor(data_processors.clone(), &data_type) {
Ok(processor) => {
let _ = processor.close_container(&view.id).await?;
let _ = processor.close_view(&view.id).await?;
}
Err(e) => {
tracing::error!("{}", e)
@ -491,7 +494,7 @@ async fn handle_trash_event(
fn get_data_processor(
data_processors: ViewDataProcessorMap,
data_type: &ViewDataTypePB,
data_type: &ViewDataFormatPB,
) -> FlowyResult<Arc<dyn ViewDataProcessor + Send + Sync>> {
match data_processors.get(data_type) {
None => Err(FlowyError::internal().context(format!(

View File

@ -121,10 +121,10 @@ pub(crate) async fn move_item_handler(
#[tracing::instrument(level = "debug", skip(data, controller), err)]
pub(crate) async fn duplicate_view_handler(
data: Data<ViewIdPB>,
data: Data<ViewPB>,
controller: AppData<Arc<ViewController>>,
) -> Result<(), FlowyError> {
let view_id: ViewIdPB = data.into_inner();
let _ = controller.duplicate_view(&view_id.value).await?;
let view: ViewPB = data.into_inner();
let _ = controller.duplicate_view(view).await?;
Ok(())
}

View File

@ -1,5 +1,5 @@
use crate::script::{invalid_workspace_name_test_case, FolderScript::*, FolderTest};
use flowy_folder::entities::view::ViewDataTypePB;
use flowy_folder::entities::view::ViewDataFormatPB;
use flowy_folder::entities::workspace::CreateWorkspacePayloadPB;
use flowy_revision::disk::RevisionState;
use flowy_test::{event_builder::*, FlowySDKTest};
@ -133,12 +133,12 @@ async fn app_create_with_view() {
CreateView {
name: "View A".to_owned(),
desc: "View A description".to_owned(),
data_type: ViewDataTypePB::Text,
data_type: ViewDataFormatPB::DeltaFormat,
},
CreateView {
name: "Grid".to_owned(),
desc: "Grid description".to_owned(),
data_type: ViewDataTypePB::Database,
data_type: ViewDataFormatPB::DatabaseFormat,
},
ReadApp(app.id),
])
@ -197,12 +197,12 @@ async fn view_delete_all() {
CreateView {
name: "View A".to_owned(),
desc: "View A description".to_owned(),
data_type: ViewDataTypePB::Text,
data_type: ViewDataFormatPB::DeltaFormat,
},
CreateView {
name: "Grid".to_owned(),
desc: "Grid description".to_owned(),
data_type: ViewDataTypePB::Database,
data_type: ViewDataFormatPB::DatabaseFormat,
},
ReadApp(app.id.clone()),
])
@ -230,7 +230,7 @@ async fn view_delete_all_permanent() {
CreateView {
name: "View A".to_owned(),
desc: "View A description".to_owned(),
data_type: ViewDataTypePB::Text,
data_type: ViewDataFormatPB::DeltaFormat,
},
ReadApp(app.id.clone()),
])
@ -329,7 +329,7 @@ async fn folder_sync_revision_with_new_view() {
CreateView {
name: view_name.clone(),
desc: view_desc.clone(),
data_type: ViewDataTypePB::Text,
data_type: ViewDataFormatPB::DeltaFormat,
},
AssertCurrentRevId(3),
AssertNextSyncRevId(Some(3)),

View File

@ -10,7 +10,7 @@ use flowy_folder::entities::{
use flowy_folder::entities::{
app::{AppPB, RepeatedAppPB},
trash::TrashPB,
view::{RepeatedViewPB, ViewDataTypePB, ViewPB},
view::{RepeatedViewPB, ViewDataFormatPB, ViewPB},
workspace::WorkspacePB,
};
use flowy_folder::event_map::FolderEvent::*;
@ -52,7 +52,7 @@ pub enum FolderScript {
CreateView {
name: String,
desc: String,
data_type: ViewDataTypePB,
data_type: ViewDataFormatPB,
},
AssertView(ViewPB),
ReadView(String),
@ -99,7 +99,7 @@ impl FolderTest {
&app.id,
"Folder View",
"Folder test view",
ViewDataTypePB::Text,
ViewDataFormatPB::DeltaFormat,
ViewLayoutTypePB::Document,
)
.await;
@ -182,8 +182,9 @@ impl FolderTest {
FolderScript::CreateView { name, desc, data_type } => {
let layout = match data_type {
ViewDataTypePB::Text => ViewLayoutTypePB::Document,
ViewDataTypePB::Database => ViewLayoutTypePB::Grid,
ViewDataFormatPB::DeltaFormat => ViewLayoutTypePB::Document,
ViewDataFormatPB::TreeFormat => ViewLayoutTypePB::Document,
ViewDataFormatPB::DatabaseFormat => ViewLayoutTypePB::Grid,
};
let view = create_view(sdk, &self.app.id, &name, &desc, data_type, layout).await;
self.view = view;
@ -357,7 +358,7 @@ pub async fn create_view(
app_id: &str,
name: &str,
desc: &str,
data_type: ViewDataTypePB,
data_type: ViewDataFormatPB,
layout: ViewLayoutTypePB,
) -> ViewPB {
let request = CreateViewPayloadPB {
@ -365,7 +366,7 @@ pub async fn create_view(
name: name.to_string(),
desc: desc.to_string(),
thumbnail: None,
data_type,
data_format: data_type,
layout,
view_content_data: vec![],
};

View File

@ -589,14 +589,14 @@ pub fn make_grid_setting(view_pad: &GridViewRevisionPad, field_revs: &[Arc<Field
#[cfg(test)]
mod tests {
use lib_ot::core::Delta;
use flowy_sync::client_grid::GridOperations;
#[test]
fn test() {
let s1 = r#"[{"insert":"{\"view_id\":\"fTURELffPr\",\"grid_id\":\"fTURELffPr\",\"layout\":0,\"filters\":[],\"groups\":[]}"}]"#;
let _delta_1 = Delta::from_json(s1).unwrap();
let _delta_1 = GridOperations::from_json(s1).unwrap();
let s2 = r#"[{"retain":195},{"insert":"{\\\"group_id\\\":\\\"wD9i\\\",\\\"visible\\\":true},{\\\"group_id\\\":\\\"xZtv\\\",\\\"visible\\\":true},{\\\"group_id\\\":\\\"tFV2\\\",\\\"visible\\\":true}"},{"retain":10}]"#;
let _delta_2 = Delta::from_json(s2).unwrap();
let _delta_2 = GridOperations::from_json(s2).unwrap();
}
}

View File

@ -6,10 +6,10 @@ use flowy_error::FlowyResult;
use flowy_grid_data_model::revision::GridRevision;
use flowy_revision::disk::SQLiteGridRevisionPersistence;
use flowy_revision::reset::{RevisionResettable, RevisionStructReset};
use flowy_sync::client_grid::{make_grid_rev_json_str, GridRevisionPad};
use flowy_sync::client_grid::{make_grid_rev_json_str, GridOperationsBuilder, GridRevisionPad};
use flowy_sync::entities::revision::Revision;
use flowy_sync::util::md5;
use lib_ot::core::DeltaBuilder;
use std::sync::Arc;
const V1_MIGRATION: &str = "GRID_V1_MIGRATION";
@ -64,7 +64,7 @@ impl RevisionResettable for GridRevisionResettable {
fn reset_data(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
let pad = GridRevisionPad::from_revisions(revisions)?;
let json = pad.json_str()?;
let bytes = DeltaBuilder::new().insert(&json).build().json_bytes();
let bytes = GridOperationsBuilder::new().insert(&json).build().json_bytes();
Ok(bytes)
}

View File

@ -307,7 +307,7 @@ impl FolderCouldServiceV1 for LocalServer {
app_id: params.belong_to_id,
name: params.name,
desc: params.desc,
data_type: params.data_type.into(),
data_format: params.data_format.into(),
version: 0,
belongings: vec![],
modified_time: time,

View File

@ -0,0 +1,305 @@
use crate::cache::disk::RevisionDiskCache;
use crate::disk::{RevisionChangeset, RevisionRecord};
use bytes::Bytes;
use diesel::{sql_types::Integer, update, SqliteConnection};
use flowy_database::{
impl_sql_integer_expression, insert_or_ignore_into,
prelude::*,
schema::{rev_table, rev_table::dsl},
ConnectionPool,
};
use flowy_error::{internal_error, FlowyError, FlowyResult};
use flowy_sync::{
entities::revision::{RevType, Revision, RevisionRange},
util::md5,
};
use std::collections::HashMap;
use std::sync::Arc;
pub struct SQLiteDeltaDocumentRevisionPersistence {
user_id: String,
pub(crate) pool: Arc<ConnectionPool>,
}
impl RevisionDiskCache for SQLiteDeltaDocumentRevisionPersistence {
type Error = FlowyError;
fn create_revision_records(&self, revision_records: Vec<RevisionRecord>) -> Result<(), Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let _ = DeltaRevisionSql::create(revision_records, &*conn)?;
Ok(())
}
fn read_revision_records(
&self,
object_id: &str,
rev_ids: Option<Vec<i64>>,
) -> Result<Vec<RevisionRecord>, Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let records = DeltaRevisionSql::read(&self.user_id, object_id, rev_ids, &*conn)?;
Ok(records)
}
fn read_revision_records_with_range(
&self,
object_id: &str,
range: &RevisionRange,
) -> Result<Vec<RevisionRecord>, Self::Error> {
let conn = &*self.pool.get().map_err(internal_error)?;
let revisions = DeltaRevisionSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?;
Ok(revisions)
}
fn update_revision_record(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()> {
let conn = &*self.pool.get().map_err(internal_error)?;
let _ = conn.immediate_transaction::<_, FlowyError, _>(|| {
for changeset in changesets {
let _ = DeltaRevisionSql::update(changeset, conn)?;
}
Ok(())
})?;
Ok(())
}
fn delete_revision_records(&self, object_id: &str, rev_ids: Option<Vec<i64>>) -> Result<(), Self::Error> {
let conn = &*self.pool.get().map_err(internal_error)?;
let _ = DeltaRevisionSql::delete(object_id, rev_ids, conn)?;
Ok(())
}
fn delete_and_insert_records(
&self,
object_id: &str,
deleted_rev_ids: Option<Vec<i64>>,
inserted_records: Vec<RevisionRecord>,
) -> Result<(), Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
conn.immediate_transaction::<_, FlowyError, _>(|| {
let _ = DeltaRevisionSql::delete(object_id, deleted_rev_ids, &*conn)?;
let _ = DeltaRevisionSql::create(inserted_records, &*conn)?;
Ok(())
})
}
}
impl SQLiteDeltaDocumentRevisionPersistence {
pub fn new(user_id: &str, pool: Arc<ConnectionPool>) -> Self {
Self {
user_id: user_id.to_owned(),
pool,
}
}
}
pub struct DeltaRevisionSql {}
impl DeltaRevisionSql {
fn create(revision_records: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
// Batch insert: https://diesel.rs/guides/all-about-inserts.html
let records = revision_records
.into_iter()
.map(|record| {
tracing::trace!(
"[TextRevisionSql] create revision: {}:{:?}",
record.revision.object_id,
record.revision.rev_id
);
let rev_state: TextRevisionState = record.state.into();
(
dsl::doc_id.eq(record.revision.object_id),
dsl::base_rev_id.eq(record.revision.base_rev_id),
dsl::rev_id.eq(record.revision.rev_id),
dsl::data.eq(record.revision.bytes),
dsl::state.eq(rev_state),
dsl::ty.eq(RevTableType::Local),
)
})
.collect::<Vec<_>>();
let _ = insert_or_ignore_into(dsl::rev_table).values(&records).execute(conn)?;
Ok(())
}
fn update(changeset: RevisionChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> {
let state: TextRevisionState = changeset.state.clone().into();
let filter = dsl::rev_table
.filter(dsl::rev_id.eq(changeset.rev_id.as_ref()))
.filter(dsl::doc_id.eq(changeset.object_id));
let _ = update(filter).set(dsl::state.eq(state)).execute(conn)?;
tracing::debug!(
"[TextRevisionSql] update revision:{} state:to {:?}",
changeset.rev_id,
changeset.state
);
Ok(())
}
fn read(
user_id: &str,
object_id: &str,
rev_ids: Option<Vec<i64>>,
conn: &SqliteConnection,
) -> Result<Vec<RevisionRecord>, FlowyError> {
let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(object_id)).into_boxed();
if let Some(rev_ids) = rev_ids {
sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
}
let rows = sql.order(dsl::rev_id.asc()).load::<RevisionTable>(conn)?;
let records = rows
.into_iter()
.map(|row| mk_revision_record_from_table(user_id, row))
.collect::<Vec<_>>();
Ok(records)
}
fn read_with_range(
user_id: &str,
object_id: &str,
range: RevisionRange,
conn: &SqliteConnection,
) -> Result<Vec<RevisionRecord>, FlowyError> {
let rev_tables = dsl::rev_table
.filter(dsl::rev_id.ge(range.start))
.filter(dsl::rev_id.le(range.end))
.filter(dsl::doc_id.eq(object_id))
.order(dsl::rev_id.asc())
.load::<RevisionTable>(conn)?;
let revisions = rev_tables
.into_iter()
.map(|table| mk_revision_record_from_table(user_id, table))
.collect::<Vec<_>>();
Ok(revisions)
}
fn delete(object_id: &str, rev_ids: Option<Vec<i64>>, conn: &SqliteConnection) -> Result<(), FlowyError> {
let mut sql = diesel::delete(dsl::rev_table).into_boxed();
sql = sql.filter(dsl::doc_id.eq(object_id));
if let Some(rev_ids) = rev_ids {
tracing::trace!("[TextRevisionSql] Delete revision: {}:{:?}", object_id, rev_ids);
sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
}
let affected_row = sql.execute(conn)?;
tracing::trace!("[TextRevisionSql] Delete {} rows", affected_row);
Ok(())
}
pub fn read_all_documents(user_id: &str, conn: &SqliteConnection) -> Result<Vec<Vec<Revision>>, FlowyError> {
let rev_tables = dsl::rev_table.order(dsl::rev_id.asc()).load::<RevisionTable>(conn)?;
let mut document_map = HashMap::new();
for rev_table in rev_tables {
document_map
.entry(rev_table.doc_id.clone())
.or_insert_with(Vec::new)
.push(rev_table);
}
let mut documents = vec![];
for rev_tables in document_map.into_values() {
let revisions = rev_tables
.into_iter()
.map(|table| {
let record = mk_revision_record_from_table(user_id, table);
record.revision
})
.collect::<Vec<_>>();
documents.push(revisions);
}
Ok(documents)
}
}
#[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)]
#[table_name = "rev_table"]
struct RevisionTable {
id: i32,
doc_id: String,
base_rev_id: i64,
rev_id: i64,
data: Vec<u8>,
state: TextRevisionState,
ty: RevTableType, // Deprecated
}
#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)]
#[repr(i32)]
#[sql_type = "Integer"]
enum TextRevisionState {
Sync = 0,
Ack = 1,
}
impl_sql_integer_expression!(TextRevisionState);
impl_rev_state_map!(TextRevisionState);
impl std::default::Default for TextRevisionState {
fn default() -> Self {
TextRevisionState::Sync
}
}
fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> RevisionRecord {
let md5 = md5(&table.data);
let revision = Revision::new(
&table.doc_id,
table.base_rev_id,
table.rev_id,
Bytes::from(table.data),
user_id,
md5,
);
RevisionRecord {
revision,
state: table.state.into(),
write_to_disk: false,
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)]
#[repr(i32)]
#[sql_type = "Integer"]
pub enum RevTableType {
Local = 0,
Remote = 1,
}
impl_sql_integer_expression!(RevTableType);
impl std::default::Default for RevTableType {
fn default() -> Self {
RevTableType::Local
}
}
impl std::convert::From<i32> for RevTableType {
fn from(value: i32) -> Self {
match value {
0 => RevTableType::Local,
1 => RevTableType::Remote,
o => {
tracing::error!("Unsupported rev type {}, fallback to RevTableType::Local", o);
RevTableType::Local
}
}
}
}
impl std::convert::From<RevType> for RevTableType {
fn from(ty: RevType) -> Self {
match ty {
RevType::DeprecatedLocal => RevTableType::Local,
RevType::DeprecatedRemote => RevTableType::Remote,
}
}
}
impl std::convert::From<RevTableType> for RevType {
fn from(ty: RevTableType) -> Self {
match ty {
RevTableType::Local => RevType::DeprecatedLocal,
RevTableType::Remote => RevType::DeprecatedRemote,
}
}
}

View File

@ -5,12 +5,12 @@ use diesel::{sql_types::Integer, update, SqliteConnection};
use flowy_database::{
impl_sql_integer_expression, insert_or_ignore_into,
prelude::*,
schema::{rev_table, rev_table::dsl},
schema::{document_rev_table, document_rev_table::dsl},
ConnectionPool,
};
use flowy_error::{internal_error, FlowyError, FlowyResult};
use flowy_sync::{
entities::revision::{RevType, Revision, RevisionRange},
entities::revision::{Revision, RevisionRange},
util::md5,
};
use std::sync::Arc;
@ -25,7 +25,7 @@ impl RevisionDiskCache for SQLiteDocumentRevisionPersistence {
fn create_revision_records(&self, revision_records: Vec<RevisionRecord>) -> Result<(), Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let _ = TextRevisionSql::create(revision_records, &*conn)?;
let _ = DocumentRevisionSql::create(revision_records, &*conn)?;
Ok(())
}
@ -35,7 +35,7 @@ impl RevisionDiskCache for SQLiteDocumentRevisionPersistence {
rev_ids: Option<Vec<i64>>,
) -> Result<Vec<RevisionRecord>, Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let records = TextRevisionSql::read(&self.user_id, object_id, rev_ids, &*conn)?;
let records = DocumentRevisionSql::read(&self.user_id, object_id, rev_ids, &*conn)?;
Ok(records)
}
@ -45,7 +45,7 @@ impl RevisionDiskCache for SQLiteDocumentRevisionPersistence {
range: &RevisionRange,
) -> Result<Vec<RevisionRecord>, Self::Error> {
let conn = &*self.pool.get().map_err(internal_error)?;
let revisions = TextRevisionSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?;
let revisions = DocumentRevisionSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?;
Ok(revisions)
}
@ -53,7 +53,7 @@ impl RevisionDiskCache for SQLiteDocumentRevisionPersistence {
let conn = &*self.pool.get().map_err(internal_error)?;
let _ = conn.immediate_transaction::<_, FlowyError, _>(|| {
for changeset in changesets {
let _ = TextRevisionSql::update(changeset, conn)?;
let _ = DocumentRevisionSql::update(changeset, conn)?;
}
Ok(())
})?;
@ -62,7 +62,7 @@ impl RevisionDiskCache for SQLiteDocumentRevisionPersistence {
fn delete_revision_records(&self, object_id: &str, rev_ids: Option<Vec<i64>>) -> Result<(), Self::Error> {
let conn = &*self.pool.get().map_err(internal_error)?;
let _ = TextRevisionSql::delete(object_id, rev_ids, conn)?;
let _ = DocumentRevisionSql::delete(object_id, rev_ids, conn)?;
Ok(())
}
@ -74,8 +74,8 @@ impl RevisionDiskCache for SQLiteDocumentRevisionPersistence {
) -> Result<(), Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
conn.immediate_transaction::<_, FlowyError, _>(|| {
let _ = TextRevisionSql::delete(object_id, deleted_rev_ids, &*conn)?;
let _ = TextRevisionSql::create(inserted_records, &*conn)?;
let _ = DocumentRevisionSql::delete(object_id, deleted_rev_ids, &*conn)?;
let _ = DocumentRevisionSql::create(inserted_records, &*conn)?;
Ok(())
})
}
@ -90,44 +90,44 @@ impl SQLiteDocumentRevisionPersistence {
}
}
struct TextRevisionSql {}
struct DocumentRevisionSql {}
impl TextRevisionSql {
impl DocumentRevisionSql {
fn create(revision_records: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
// Batch insert: https://diesel.rs/guides/all-about-inserts.html
let records = revision_records
.into_iter()
.map(|record| {
tracing::trace!(
"[TextRevisionSql] create revision: {}:{:?}",
"[DocumentRevisionSql] create revision: {}:{:?}",
record.revision.object_id,
record.revision.rev_id
);
let rev_state: TextRevisionState = record.state.into();
let rev_state: RevisionState = record.state.into();
(
dsl::doc_id.eq(record.revision.object_id),
dsl::document_id.eq(record.revision.object_id),
dsl::base_rev_id.eq(record.revision.base_rev_id),
dsl::rev_id.eq(record.revision.rev_id),
dsl::data.eq(record.revision.bytes),
dsl::state.eq(rev_state),
dsl::ty.eq(RevTableType::Local),
)
})
.collect::<Vec<_>>();
let _ = insert_or_ignore_into(dsl::rev_table).values(&records).execute(conn)?;
let _ = insert_or_ignore_into(dsl::document_rev_table)
.values(&records)
.execute(conn)?;
Ok(())
}
fn update(changeset: RevisionChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> {
let state: TextRevisionState = changeset.state.clone().into();
let filter = dsl::rev_table
let state: RevisionState = changeset.state.clone().into();
let filter = dsl::document_rev_table
.filter(dsl::rev_id.eq(changeset.rev_id.as_ref()))
.filter(dsl::doc_id.eq(changeset.object_id));
.filter(dsl::document_id.eq(changeset.object_id));
let _ = update(filter).set(dsl::state.eq(state)).execute(conn)?;
tracing::debug!(
"[TextRevisionSql] update revision:{} state:to {:?}",
"[DocumentRevisionSql] update revision:{} state:to {:?}",
changeset.rev_id,
changeset.state
);
@ -140,11 +140,13 @@ impl TextRevisionSql {
rev_ids: Option<Vec<i64>>,
conn: &SqliteConnection,
) -> Result<Vec<RevisionRecord>, FlowyError> {
let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(object_id)).into_boxed();
let mut sql = dsl::document_rev_table
.filter(dsl::document_id.eq(object_id))
.into_boxed();
if let Some(rev_ids) = rev_ids {
sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
}
let rows = sql.order(dsl::rev_id.asc()).load::<RevisionTable>(conn)?;
let rows = sql.order(dsl::rev_id.asc()).load::<DocumentRevisionTable>(conn)?;
let records = rows
.into_iter()
.map(|row| mk_revision_record_from_table(user_id, row))
@ -159,12 +161,12 @@ impl TextRevisionSql {
range: RevisionRange,
conn: &SqliteConnection,
) -> Result<Vec<RevisionRecord>, FlowyError> {
let rev_tables = dsl::rev_table
let rev_tables = dsl::document_rev_table
.filter(dsl::rev_id.ge(range.start))
.filter(dsl::rev_id.le(range.end))
.filter(dsl::doc_id.eq(object_id))
.filter(dsl::document_id.eq(object_id))
.order(dsl::rev_id.asc())
.load::<RevisionTable>(conn)?;
.load::<DocumentRevisionTable>(conn)?;
let revisions = rev_tables
.into_iter()
@ -174,52 +176,51 @@ impl TextRevisionSql {
}
fn delete(object_id: &str, rev_ids: Option<Vec<i64>>, conn: &SqliteConnection) -> Result<(), FlowyError> {
let mut sql = diesel::delete(dsl::rev_table).into_boxed();
sql = sql.filter(dsl::doc_id.eq(object_id));
let mut sql = diesel::delete(dsl::document_rev_table).into_boxed();
sql = sql.filter(dsl::document_id.eq(object_id));
if let Some(rev_ids) = rev_ids {
tracing::trace!("[TextRevisionSql] Delete revision: {}:{:?}", object_id, rev_ids);
tracing::trace!("[DocumentRevisionSql] Delete revision: {}:{:?}", object_id, rev_ids);
sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
}
let affected_row = sql.execute(conn)?;
tracing::trace!("[TextRevisionSql] Delete {} rows", affected_row);
tracing::trace!("[DocumentRevisionSql] Delete {} rows", affected_row);
Ok(())
}
}
#[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)]
#[table_name = "rev_table"]
struct RevisionTable {
#[table_name = "document_rev_table"]
struct DocumentRevisionTable {
id: i32,
doc_id: String,
document_id: String,
base_rev_id: i64,
rev_id: i64,
data: Vec<u8>,
state: TextRevisionState,
ty: RevTableType, // Deprecated
state: RevisionState,
}
#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)]
#[repr(i32)]
#[sql_type = "Integer"]
enum TextRevisionState {
enum RevisionState {
Sync = 0,
Ack = 1,
}
impl_sql_integer_expression!(TextRevisionState);
impl_rev_state_map!(TextRevisionState);
impl_sql_integer_expression!(RevisionState);
impl_rev_state_map!(RevisionState);
impl std::default::Default for TextRevisionState {
impl std::default::Default for RevisionState {
fn default() -> Self {
TextRevisionState::Sync
RevisionState::Sync
}
}
fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> RevisionRecord {
fn mk_revision_record_from_table(user_id: &str, table: DocumentRevisionTable) -> RevisionRecord {
let md5 = md5(&table.data);
let revision = Revision::new(
&table.doc_id,
&table.document_id,
table.base_rev_id,
table.rev_id,
Bytes::from(table.data),
@ -232,49 +233,3 @@ fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> Revisio
write_to_disk: false,
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)]
#[repr(i32)]
#[sql_type = "Integer"]
pub enum RevTableType {
Local = 0,
Remote = 1,
}
impl_sql_integer_expression!(RevTableType);
impl std::default::Default for RevTableType {
fn default() -> Self {
RevTableType::Local
}
}
impl std::convert::From<i32> for RevTableType {
fn from(value: i32) -> Self {
match value {
0 => RevTableType::Local,
1 => RevTableType::Remote,
o => {
tracing::error!("Unsupported rev type {}, fallback to RevTableType::Local", o);
RevTableType::Local
}
}
}
}
impl std::convert::From<RevType> for RevTableType {
fn from(ty: RevType) -> Self {
match ty {
RevType::DeprecatedLocal => RevTableType::Local,
RevType::DeprecatedRemote => RevTableType::Remote,
}
}
}
impl std::convert::From<RevTableType> for RevType {
fn from(ty: RevTableType) -> Self {
match ty {
RevTableType::Local => RevType::DeprecatedLocal,
RevTableType::Remote => RevType::DeprecatedRemote,
}
}
}

View File

@ -1,8 +1,10 @@
mod delta_document_impl;
mod document_impl;
mod grid_block_impl;
mod grid_impl;
mod grid_view_impl;
pub use delta_document_impl::*;
pub use document_impl::*;
pub use grid_block_impl::*;
pub use grid_impl::*;

View File

@ -86,7 +86,7 @@ impl RevisionManager {
user_id: &str,
object_id: &str,
rev_persistence: RevisionPersistence,
rev_compactor: C,
rev_compress: C,
snapshot_persistence: SP,
) -> Self
where
@ -94,7 +94,7 @@ impl RevisionManager {
C: 'static + RevisionCompress,
{
let rev_id_counter = RevIdCounter::new(0);
let rev_compactor = Arc::new(rev_compactor);
let rev_compress = Arc::new(rev_compress);
let rev_persistence = Arc::new(rev_persistence);
let rev_snapshot = Arc::new(RevisionSnapshotManager::new(user_id, object_id, snapshot_persistence));
#[cfg(feature = "flowy_unit_test")]
@ -106,7 +106,7 @@ impl RevisionManager {
rev_id_counter,
rev_persistence,
rev_snapshot,
rev_compress: rev_compactor,
rev_compress,
#[cfg(feature = "flowy_unit_test")]
rev_ack_notifier: revision_ack_notifier,
}
@ -130,6 +130,18 @@ impl RevisionManager {
B::deserialize_revisions(&self.object_id, revisions)
}
pub async fn load_revisions(&self) -> FlowyResult<Vec<Revision>> {
let revisions = RevisionLoader {
object_id: self.object_id.clone(),
user_id: self.user_id.clone(),
cloud: None,
rev_persistence: self.rev_persistence.clone(),
}
.load_revisions()
.await?;
Ok(revisions)
}
#[tracing::instrument(level = "debug", skip(self, revisions), err)]
pub async fn reset_object(&self, revisions: RepeatedRevision) -> FlowyResult<()> {
let rev_id = pair_rev_id_from_revisions(&revisions).1;
@ -264,4 +276,10 @@ impl RevisionLoader {
Ok((revisions, rev_id))
}
pub async fn load_revisions(&self) -> Result<Vec<Revision>, FlowyError> {
let records = self.rev_persistence.batch_get(&self.object_id)?;
let revisions = records.into_iter().map(|record| record.revision).collect::<_>();
Ok(revisions)
}
}

View File

@ -1,5 +1,5 @@
use crate::cache::{
disk::{RevisionChangeset, RevisionDiskCache, SQLiteDocumentRevisionPersistence},
disk::{RevisionChangeset, RevisionDiskCache, SQLiteDeltaDocumentRevisionPersistence},
memory::RevisionMemoryCacheDelegate,
};
use crate::disk::{RevisionRecord, RevisionState, SQLiteGridBlockRevisionPersistence};
@ -228,7 +228,7 @@ pub fn mk_text_block_revision_disk_cache(
user_id: &str,
pool: Arc<ConnectionPool>,
) -> Arc<dyn RevisionDiskCache<Error = FlowyError>> {
Arc::new(SQLiteDocumentRevisionPersistence::new(user_id, pool))
Arc::new(SQLiteDeltaDocumentRevisionPersistence::new(user_id, pool))
}
pub fn mk_grid_block_revision_disk_cache(

View File

@ -2,7 +2,7 @@ use bytes::Bytes;
use flowy_database::ConnectionPool;
use flowy_document::{
errors::{internal_error, FlowyError},
DocumentCloudService, DocumentConfig, DocumentManager, DocumentUser,
DocumentCloudService, DocumentConfig, DocumentDatabase, DocumentManager, DocumentUser,
};
use flowy_net::ClientServerConfiguration;
use flowy_net::{
@ -25,16 +25,18 @@ impl DocumentDepsResolver {
server_config: &ClientServerConfiguration,
document_config: &DocumentConfig,
) -> Arc<DocumentManager> {
let user = Arc::new(BlockUserImpl(user_session));
let user = Arc::new(BlockUserImpl(user_session.clone()));
let rev_web_socket = Arc::new(DocumentRevisionWebSocket(ws_conn.clone()));
let cloud_service: Arc<dyn DocumentCloudService> = match local_server {
None => Arc::new(DocumentCloudServiceImpl::new(server_config.clone())),
Some(local_server) => local_server,
};
let database = Arc::new(DocumentDatabaseImpl(user_session));
let manager = Arc::new(DocumentManager::new(
cloud_service,
user,
database,
rev_web_socket,
document_config.clone(),
));
@ -64,7 +66,10 @@ impl DocumentUser for BlockUserImpl {
fn token(&self) -> Result<String, FlowyError> {
self.0.token()
}
}
struct DocumentDatabaseImpl(Arc<UserSession>);
impl DocumentDatabase for DocumentDatabaseImpl {
fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError> {
self.0.db_pool()
}

View File

@ -1,7 +1,8 @@
use bytes::Bytes;
use flowy_database::ConnectionPool;
use flowy_document::DocumentManager;
use flowy_folder::entities::{ViewDataTypePB, ViewLayoutTypePB};
use flowy_folder::entities::{ViewDataFormatPB, ViewLayoutTypePB, ViewPB};
use flowy_folder::manager::{ViewDataProcessor, ViewDataProcessorMap};
use flowy_folder::{
errors::{internal_error, FlowyError},
@ -63,16 +64,20 @@ impl FolderDepsResolver {
}
fn make_view_data_processor(
text_block_manager: Arc<DocumentManager>,
document_manager: Arc<DocumentManager>,
grid_manager: Arc<GridManager>,
) -> ViewDataProcessorMap {
let mut map: HashMap<ViewDataTypePB, Arc<dyn ViewDataProcessor + Send + Sync>> = HashMap::new();
let mut map: HashMap<ViewDataFormatPB, Arc<dyn ViewDataProcessor + Send + Sync>> = HashMap::new();
let block_data_impl = DocumentViewDataProcessor(text_block_manager);
map.insert(block_data_impl.data_type(), Arc::new(block_data_impl));
let document_processor = Arc::new(DocumentViewDataProcessor(document_manager));
document_processor.data_types().into_iter().for_each(|data_type| {
map.insert(data_type, document_processor.clone());
});
let grid_data_impl = GridViewDataProcessor(grid_manager);
map.insert(grid_data_impl.data_type(), Arc::new(grid_data_impl));
let grid_data_impl = Arc::new(GridViewDataProcessor(grid_manager));
grid_data_impl.data_types().into_iter().for_each(|data_type| {
map.insert(data_type, grid_data_impl.clone());
});
Arc::new(map)
}
@ -137,30 +142,26 @@ impl WSMessageReceiver for FolderWSMessageReceiverImpl {
struct DocumentViewDataProcessor(Arc<DocumentManager>);
impl ViewDataProcessor for DocumentViewDataProcessor {
fn initialize(&self) -> FutureResult<(), FlowyError> {
let manager = self.0.clone();
FutureResult::new(async move { manager.init() })
}
fn create_container(
fn create_view(
&self,
user_id: &str,
view_id: &str,
layout: ViewLayoutTypePB,
delta_data: Bytes,
view_data: Bytes,
) -> FutureResult<(), FlowyError> {
// Only accept Document type
debug_assert_eq!(layout, ViewLayoutTypePB::Document);
let repeated_revision: RepeatedRevision = Revision::initial_revision(user_id, view_id, delta_data).into();
let repeated_revision: RepeatedRevision = Revision::initial_revision(user_id, view_id, view_data).into();
let view_id = view_id.to_string();
let manager = self.0.clone();
FutureResult::new(async move {
let _ = manager.create_document(view_id, repeated_revision).await?;
Ok(())
})
}
fn close_container(&self, view_id: &str) -> FutureResult<(), FlowyError> {
fn close_view(&self, view_id: &str) -> FutureResult<(), FlowyError> {
let manager = self.0.clone();
let view_id = view_id.to_string();
FutureResult::new(async move {
@ -169,13 +170,13 @@ impl ViewDataProcessor for DocumentViewDataProcessor {
})
}
fn get_view_data(&self, view_id: &str) -> FutureResult<Bytes, FlowyError> {
let view_id = view_id.to_string();
fn get_view_data(&self, view: &ViewPB) -> FutureResult<Bytes, FlowyError> {
let view_id = view.id.clone();
let manager = self.0.clone();
FutureResult::new(async move {
let editor = manager.open_document_editor(view_id).await?;
let delta_bytes = Bytes::from(editor.export().await?);
Ok(delta_bytes)
let document_data = Bytes::from(editor.duplicate().await?);
Ok(document_data)
})
}
@ -184,14 +185,15 @@ impl ViewDataProcessor for DocumentViewDataProcessor {
user_id: &str,
view_id: &str,
layout: ViewLayoutTypePB,
_data_format: ViewDataFormatPB,
) -> FutureResult<Bytes, FlowyError> {
debug_assert_eq!(layout, ViewLayoutTypePB::Document);
let user_id = user_id.to_string();
let view_id = view_id.to_string();
let manager = self.0.clone();
let view_data = self.0.initial_document_content();
let document_content = self.0.initial_document_content();
FutureResult::new(async move {
let delta_data = Bytes::from(view_data);
let delta_data = Bytes::from(document_content);
let repeated_revision: RepeatedRevision =
Revision::initial_revision(&user_id, &view_id, delta_data.clone()).into();
let _ = manager.create_document(view_id, repeated_revision).await?;
@ -210,18 +212,14 @@ impl ViewDataProcessor for DocumentViewDataProcessor {
FutureResult::new(async move { Ok(Bytes::from(data)) })
}
fn data_type(&self) -> ViewDataTypePB {
ViewDataTypePB::Text
fn data_types(&self) -> Vec<ViewDataFormatPB> {
vec![ViewDataFormatPB::DeltaFormat, ViewDataFormatPB::TreeFormat]
}
}
struct GridViewDataProcessor(Arc<GridManager>);
impl ViewDataProcessor for GridViewDataProcessor {
fn initialize(&self) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn create_container(
fn create_view(
&self,
user_id: &str,
view_id: &str,
@ -237,7 +235,7 @@ impl ViewDataProcessor for GridViewDataProcessor {
})
}
fn close_container(&self, view_id: &str) -> FutureResult<(), FlowyError> {
fn close_view(&self, view_id: &str) -> FutureResult<(), FlowyError> {
let grid_manager = self.0.clone();
let view_id = view_id.to_string();
FutureResult::new(async move {
@ -246,9 +244,9 @@ impl ViewDataProcessor for GridViewDataProcessor {
})
}
fn get_view_data(&self, view_id: &str) -> FutureResult<Bytes, FlowyError> {
let view_id = view_id.to_string();
fn get_view_data(&self, view: &ViewPB) -> FutureResult<Bytes, FlowyError> {
let grid_manager = self.0.clone();
let view_id = view.id.clone();
FutureResult::new(async move {
let editor = grid_manager.open_grid(view_id).await?;
let delta_bytes = editor.duplicate_grid().await?;
@ -261,7 +259,9 @@ impl ViewDataProcessor for GridViewDataProcessor {
user_id: &str,
view_id: &str,
layout: ViewLayoutTypePB,
data_format: ViewDataFormatPB,
) -> FutureResult<Bytes, FlowyError> {
debug_assert_eq!(data_format, ViewDataFormatPB::DatabaseFormat);
let (build_context, layout) = match layout {
ViewLayoutTypePB::Grid => (make_default_grid(), GridLayout::Table),
ViewLayoutTypePB::Board => (make_default_board(), GridLayout::Board),
@ -308,7 +308,7 @@ impl ViewDataProcessor for GridViewDataProcessor {
})
}
fn data_type(&self) -> ViewDataTypePB {
ViewDataTypePB::Database
fn data_types(&self) -> Vec<ViewDataFormatPB> {
vec![ViewDataFormatPB::DatabaseFormat]
}
}

View File

@ -3,7 +3,10 @@ pub mod module;
pub use flowy_net::get_client_server_configuration;
use crate::deps_resolve::*;
use flowy_document::entities::DocumentVersionPB;
use flowy_document::{DocumentConfig, DocumentManager};
use flowy_folder::entities::ViewDataFormatPB;
use flowy_folder::{errors::FlowyError, manager::FolderManager};
use flowy_grid::manager::GridManager;
use flowy_net::ClientServerConfiguration;
@ -34,7 +37,7 @@ pub struct FlowySDKConfig {
root: String,
log_filter: String,
server_config: ClientServerConfiguration,
document_config: DocumentConfig,
pub document: DocumentConfig,
}
impl fmt::Debug for FlowySDKConfig {
@ -42,23 +45,27 @@ impl fmt::Debug for FlowySDKConfig {
f.debug_struct("FlowySDKConfig")
.field("root", &self.root)
.field("server-config", &self.server_config)
.field("document-config", &self.document_config)
.field("document-config", &self.document)
.finish()
}
}
impl FlowySDKConfig {
pub fn new(root: &str, name: &str, server_config: ClientServerConfiguration, use_new_editor: bool) -> Self {
let document_config = DocumentConfig { use_new_editor };
pub fn new(root: &str, name: &str, server_config: ClientServerConfiguration) -> Self {
FlowySDKConfig {
name: name.to_owned(),
root: root.to_owned(),
log_filter: crate_log_filter("info".to_owned()),
server_config,
document_config,
document: DocumentConfig::default(),
}
}
pub fn with_document_version(mut self, version: DocumentVersionPB) -> Self {
self.document.version = version;
self
}
pub fn log_filter(mut self, level: &str) -> Self {
self.log_filter = crate_log_filter(level.to_owned());
self
@ -91,7 +98,7 @@ fn crate_log_filter(level: String) -> String {
#[derive(Clone)]
pub struct FlowySDK {
#[allow(dead_code)]
config: FlowySDKConfig,
pub config: FlowySDKConfig,
pub user_session: Arc<UserSession>,
pub document_manager: Arc<DocumentManager>,
pub folder_manager: Arc<FolderManager>,
@ -108,14 +115,14 @@ impl FlowySDK {
tracing::debug!("🔥 {:?}", config);
let runtime = tokio_default_runtime().unwrap();
let (local_server, ws_conn) = mk_local_server(&config.server_config);
let (user_session, text_block_manager, folder_manager, local_server, grid_manager) = runtime.block_on(async {
let (user_session, document_manager, folder_manager, local_server, grid_manager) = runtime.block_on(async {
let user_session = mk_user_session(&config, &local_server, &config.server_config);
let document_manager = DocumentDepsResolver::resolve(
local_server.clone(),
ws_conn.clone(),
user_session.clone(),
&config.server_config,
&config.document_config,
&config.document,
);
let grid_manager = GridDepsResolver::resolve(ws_conn.clone(), user_session.clone()).await;
@ -149,16 +156,24 @@ impl FlowySDK {
&folder_manager,
&grid_manager,
&user_session,
&text_block_manager,
&document_manager,
)
}));
_start_listening(&dispatcher, &ws_conn, &user_session, &folder_manager, &grid_manager);
_start_listening(
&config,
&dispatcher,
&ws_conn,
&user_session,
&document_manager,
&folder_manager,
&grid_manager,
);
Self {
config,
user_session,
document_manager: text_block_manager,
document_manager,
folder_manager,
grid_manager,
dispatcher,
@ -173,9 +188,11 @@ impl FlowySDK {
}
fn _start_listening(
config: &FlowySDKConfig,
dispatch: &EventDispatcher,
ws_conn: &Arc<FlowyWebSocketConnect>,
user_session: &Arc<UserSession>,
document_manager: &Arc<DocumentManager>,
folder_manager: &Arc<FolderManager>,
grid_manager: &Arc<GridManager>,
) {
@ -186,15 +203,19 @@ fn _start_listening(
let cloned_folder_manager = folder_manager.clone();
let ws_conn = ws_conn.clone();
let user_session = user_session.clone();
let document_manager = document_manager.clone();
let config = config.clone();
dispatch.spawn(async move {
user_session.init();
listen_on_websocket(ws_conn.clone());
_listen_user_status(
config,
ws_conn.clone(),
subscribe_user_status,
folder_manager.clone(),
grid_manager.clone(),
document_manager,
folder_manager,
grid_manager,
)
.await;
});
@ -220,8 +241,10 @@ fn mk_local_server(
}
async fn _listen_user_status(
config: FlowySDKConfig,
ws_conn: Arc<FlowyWebSocketConnect>,
mut subscribe: broadcast::Receiver<UserStatus>,
document_manager: Arc<DocumentManager>,
folder_manager: Arc<FolderManager>,
grid_manager: Arc<GridManager>,
) {
@ -231,6 +254,7 @@ async fn _listen_user_status(
UserStatus::Login { token, user_id } => {
tracing::trace!("User did login");
let _ = folder_manager.initialize(&user_id, &token).await?;
let _ = document_manager.initialize(&user_id).await?;
let _ = grid_manager.initialize(&user_id, &token).await?;
let _ = ws_conn.start(token, user_id).await?;
}
@ -246,7 +270,15 @@ async fn _listen_user_status(
}
UserStatus::SignUp { profile, ret } => {
tracing::trace!("User did sign up");
let view_data_type = match config.document.version {
DocumentVersionPB::V0 => ViewDataFormatPB::DeltaFormat,
DocumentVersionPB::V1 => ViewDataFormatPB::TreeFormat,
};
let _ = folder_manager
.initialize_with_new_user(&profile.id, &profile.token, view_data_type)
.await?;
let _ = document_manager
.initialize_with_new_user(&profile.id, &profile.token)
.await?;

View File

@ -25,11 +25,16 @@ pub struct ViewTest {
impl ViewTest {
#[allow(dead_code)]
pub async fn new(sdk: &FlowySDKTest, data_type: ViewDataTypePB, layout: ViewLayoutTypePB, data: Vec<u8>) -> Self {
pub async fn new(
sdk: &FlowySDKTest,
data_format: ViewDataFormatPB,
layout: ViewLayoutTypePB,
data: Vec<u8>,
) -> Self {
let workspace = create_workspace(sdk, "Workspace", "").await;
open_workspace(sdk, &workspace.id).await;
let app = create_app(sdk, "App", "AppFlowy GitHub Project", &workspace.id).await;
let view = create_view(sdk, &app.id, data_type, layout, data).await;
let view = create_view(sdk, &app.id, data_format, layout, data).await;
Self {
sdk: sdk.clone(),
workspace,
@ -39,15 +44,19 @@ impl ViewTest {
}
pub async fn new_grid_view(sdk: &FlowySDKTest, data: Vec<u8>) -> Self {
Self::new(sdk, ViewDataTypePB::Database, ViewLayoutTypePB::Grid, data).await
Self::new(sdk, ViewDataFormatPB::DatabaseFormat, ViewLayoutTypePB::Grid, data).await
}
pub async fn new_board_view(sdk: &FlowySDKTest, data: Vec<u8>) -> Self {
Self::new(sdk, ViewDataTypePB::Database, ViewLayoutTypePB::Board, data).await
Self::new(sdk, ViewDataFormatPB::DatabaseFormat, ViewLayoutTypePB::Board, data).await
}
pub async fn new_document_view(sdk: &FlowySDKTest) -> Self {
Self::new(sdk, ViewDataTypePB::Text, ViewLayoutTypePB::Document, vec![]).await
let view_data_format = match sdk.document_version() {
DocumentVersionPB::V0 => ViewDataFormatPB::DeltaFormat,
DocumentVersionPB::V1 => ViewDataFormatPB::TreeFormat,
};
Self::new(sdk, view_data_format, ViewLayoutTypePB::Document, vec![]).await
}
}
@ -97,7 +106,7 @@ async fn create_app(sdk: &FlowySDKTest, name: &str, desc: &str, workspace_id: &s
async fn create_view(
sdk: &FlowySDKTest,
app_id: &str,
data_type: ViewDataTypePB,
data_format: ViewDataFormatPB,
layout: ViewLayoutTypePB,
data: Vec<u8>,
) -> ViewPB {
@ -106,7 +115,7 @@ async fn create_view(
name: "View A".to_string(),
desc: "".to_string(),
thumbnail: Some("http://1.png".to_string()),
data_type,
data_format,
layout,
view_content_data: data,
};

View File

@ -3,6 +3,7 @@ pub mod helper;
use crate::helper::*;
use flowy_document::entities::DocumentVersionPB;
use flowy_net::get_client_server_configuration;
use flowy_sdk::{FlowySDK, FlowySDKConfig};
use flowy_user::entities::UserProfilePB;
@ -28,14 +29,16 @@ impl std::ops::Deref for FlowySDKTest {
impl std::default::Default for FlowySDKTest {
fn default() -> Self {
Self::new(false)
Self::new(DocumentVersionPB::V0)
}
}
impl FlowySDKTest {
pub fn new(use_new_editor: bool) -> Self {
pub fn new(document_version: DocumentVersionPB) -> Self {
let server_config = get_client_server_configuration().unwrap();
let config = FlowySDKConfig::new(&root_dir(), &nanoid!(6), server_config, use_new_editor).log_filter("info");
let config = FlowySDKConfig::new(&root_dir(), &nanoid!(6), server_config)
.with_document_version(document_version)
.log_filter("info");
let sdk = std::thread::spawn(|| FlowySDK::new(config)).join().unwrap();
std::mem::forget(sdk.dispatcher());
Self { inner: sdk }
@ -51,4 +54,8 @@ impl FlowySDKTest {
init_user_setting(self.inner.dispatcher()).await;
context.user_profile
}
pub fn document_version(&self) -> DocumentVersionPB {
self.inner.config.document.version.clone()
}
}