add server document

This commit is contained in:
appflowy 2022-01-12 17:08:50 +08:00
parent 3c819ead49
commit 0fba8d9195
56 changed files with 196 additions and 116 deletions

View File

@ -9,7 +9,7 @@ use crate::services::document::{
persistence::DocumentKVPersistence, persistence::DocumentKVPersistence,
ws_receiver::{make_document_ws_receiver, HttpDocumentCloudPersistence}, ws_receiver::{make_document_ws_receiver, HttpDocumentCloudPersistence},
}; };
use flowy_collaboration::sync::ServerDocumentManager; use flowy_collaboration::server_document::ServerDocumentManager;
use lib_ws::WSModule; use lib_ws::WSModule;
use sqlx::PgPool; use sqlx::PgPool;
use std::sync::Arc; use std::sync::Arc;

View File

@ -14,7 +14,7 @@ use flowy_collaboration::{
ResetDocumentParams, ResetDocumentParams,
Revision as RevisionPB, Revision as RevisionPB,
}, },
sync::ServerDocumentManager, server_document::ServerDocumentManager,
util::make_doc_from_revisions, util::make_doc_from_revisions,
}; };

View File

@ -14,7 +14,7 @@ use flowy_collaboration::{
DocumentId as DocumentIdPB, DocumentId as DocumentIdPB,
ResetDocumentParams as ResetDocumentParamsPB, ResetDocumentParams as ResetDocumentParamsPB,
}, },
sync::ServerDocumentManager, server_document::ServerDocumentManager,
}; };
use std::sync::Arc; use std::sync::Arc;

View File

@ -13,7 +13,7 @@ use flowy_collaboration::{
DocumentClientWSDataType as DocumentClientWSDataTypePB, DocumentClientWSDataType as DocumentClientWSDataTypePB,
Revision as RevisionPB, Revision as RevisionPB,
}, },
sync::{RevisionUser, ServerDocumentManager, SyncResponse}, server_document::{RevisionUser, ServerDocumentManager, SyncResponse},
}; };
use futures::stream::StreamExt; use futures::stream::StreamExt;
use std::sync::Arc; use std::sync::Arc;

View File

@ -18,7 +18,7 @@ use flowy_collaboration::{
RepeatedRevision as RepeatedRevisionPB, RepeatedRevision as RepeatedRevisionPB,
Revision as RevisionPB, Revision as RevisionPB,
}, },
sync::{DocumentCloudPersistence, ServerDocumentManager}, server_document::{DocumentCloudPersistence, ServerDocumentManager},
util::repeated_revision_from_repeated_revision_pb, util::repeated_revision_from_repeated_revision_pb,
}; };
use lib_infra::future::BoxResultFuture; use lib_infra::future::BoxResultFuture;

View File

@ -2,7 +2,7 @@
use crate::util::helper::{ViewTest, *}; use crate::util::helper::{ViewTest, *};
use flowy_collaboration::{ use flowy_collaboration::{
document::{Document, PlainDoc}, client_document::{ClientDocument, PlainDoc},
entities::{ entities::{
doc::{CreateDocParams, DocumentId}, doc::{CreateDocParams, DocumentId},
revision::{md5, RepeatedRevision, Revision}, revision::{md5, RepeatedRevision, Revision},
@ -240,7 +240,7 @@ async fn doc_create() {
let server = TestUserServer::new().await; let server = TestUserServer::new().await;
let doc_id = uuid::Uuid::new_v4().to_string(); let doc_id = uuid::Uuid::new_v4().to_string();
let user_id = "a".to_owned(); let user_id = "a".to_owned();
let mut document = Document::new::<PlainDoc>(); let mut document = ClientDocument::new::<PlainDoc>();
let mut offset = 0; let mut offset = 0;
for i in 0..1000 { for i in 0..1000 {
let content = i.to_string(); let content = i.to_string();

View File

@ -16,7 +16,7 @@ use parking_lot::RwLock;
use backend::services::document::persistence::{read_document, reset_document}; use backend::services::document::persistence::{read_document, reset_document};
use flowy_collaboration::entities::revision::{RepeatedRevision, Revision}; use flowy_collaboration::entities::revision::{RepeatedRevision, Revision};
use flowy_collaboration::protobuf::{RepeatedRevision as RepeatedRevisionPB, DocumentId as DocumentIdPB}; use flowy_collaboration::protobuf::{RepeatedRevision as RepeatedRevisionPB, DocumentId as DocumentIdPB};
use flowy_collaboration::sync::ServerDocumentManager; use flowy_collaboration::server_document::ServerDocumentManager;
use flowy_net::ws::connection::FlowyWebSocketConnect; use flowy_net::ws::connection::FlowyWebSocketConnect;
use lib_ot::core::Interval; use lib_ot::core::Interval;

View File

@ -1,5 +1,5 @@
use crate::document_test::edit_script::{DocScript, DocumentTest}; use crate::document_test::edit_script::{DocScript, DocumentTest};
use flowy_collaboration::document::{Document, NewlineDoc}; use flowy_collaboration::client_document::{ClientDocument, NewlineDoc};
use lib_ot::{core::Interval, rich_text::RichTextAttribute}; use lib_ot::{core::Interval, rich_text::RichTextAttribute};
#[rustfmt::skip] #[rustfmt::skip]
@ -75,7 +75,7 @@ async fn delta_sync_while_editing_with_attribute() {
#[actix_rt::test] #[actix_rt::test]
async fn delta_sync_with_server_push() { async fn delta_sync_with_server_push() {
let test = DocumentTest::new().await; let test = DocumentTest::new().await;
let mut document = Document::new::<NewlineDoc>(); let mut document = ClientDocument::new::<NewlineDoc>();
document.insert(0, "123").unwrap(); document.insert(0, "123").unwrap();
document.insert(3, "456").unwrap(); document.insert(3, "456").unwrap();
let json = document.to_json(); let json = document.to_json();
@ -109,7 +109,7 @@ async fn delta_sync_with_server_push() {
#[actix_rt::test] #[actix_rt::test]
async fn delta_sync_with_server_push_after_reset_document() { async fn delta_sync_with_server_push_after_reset_document() {
let test = DocumentTest::new().await; let test = DocumentTest::new().await;
let mut document = Document::new::<NewlineDoc>(); let mut document = ClientDocument::new::<NewlineDoc>();
document.insert(0, "123").unwrap(); document.insert(0, "123").unwrap();
let json = document.to_json(); let json = document.to_json();
@ -148,7 +148,7 @@ async fn delta_sync_with_server_push_after_reset_document() {
#[actix_rt::test] #[actix_rt::test]
async fn delta_sync_while_local_rev_less_than_server_rev() { async fn delta_sync_while_local_rev_less_than_server_rev() {
let test = DocumentTest::new().await; let test = DocumentTest::new().await;
let mut document = Document::new::<NewlineDoc>(); let mut document = ClientDocument::new::<NewlineDoc>();
document.insert(0, "123").unwrap(); document.insert(0, "123").unwrap();
let json = document.to_json(); let json = document.to_json();
@ -190,7 +190,7 @@ async fn delta_sync_while_local_rev_less_than_server_rev() {
#[actix_rt::test] #[actix_rt::test]
async fn delta_sync_while_local_rev_greater_than_server_rev() { async fn delta_sync_while_local_rev_greater_than_server_rev() {
let test = DocumentTest::new().await; let test = DocumentTest::new().await;
let mut document = Document::new::<NewlineDoc>(); let mut document = ClientDocument::new::<NewlineDoc>();
document.insert(0, "123").unwrap(); document.insert(0, "123").unwrap();
let json = document.to_json(); let json = document.to_json();

View File

@ -8,7 +8,7 @@ use backend_service::{
errors::ServerError, errors::ServerError,
}; };
use flowy_collaboration::{ use flowy_collaboration::{
document::default::initial_delta_string, client_document::default::initial_delta_string,
entities::doc::{CreateDocParams, DocumentId, DocumentInfo}, entities::doc::{CreateDocParams, DocumentId, DocumentInfo},
}; };
use flowy_core_data_model::entities::prelude::*; use flowy_core_data_model::entities::prelude::*;

View File

@ -1,5 +1,5 @@
use chrono::Utc; use chrono::Utc;
use flowy_collaboration::document::default::{initial_delta, initial_read_me}; use flowy_collaboration::client_document::default::{initial_delta, initial_read_me};
use flowy_core_data_model::{entities::view::CreateViewParams, user_default}; use flowy_core_data_model::{entities::view::CreateViewParams, user_default};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use parking_lot::RwLock; use parking_lot::RwLock;

View File

@ -138,7 +138,7 @@ impl ViewController {
}) })
} }
#[tracing::instrument(level = "debug", skip(self,params), fields(doc_id = %params.doc_id), err)] #[tracing::instrument(level = "debug", skip(self, params), err)]
pub(crate) async fn close_view(&self, params: DocumentId) -> Result<(), FlowyError> { pub(crate) async fn close_view(&self, params: DocumentId) -> Result<(), FlowyError> {
let _ = self.document_ctx.controller.close_document(&params.doc_id)?; let _ = self.document_ctx.controller.close_document(&params.doc_id)?;
Ok(()) Ok(())

View File

@ -155,12 +155,15 @@ impl ClientDocumentEditor {
Ok(()) Ok(())
} }
#[tracing::instrument(level = "debug", skip(self))]
pub fn stop(&self) { self.ws_manager.stop(); } pub fn stop(&self) { self.ws_manager.stop(); }
pub(crate) fn ws_handler(&self) -> Arc<dyn DocumentWSReceiver> { self.ws_manager.clone() } pub(crate) fn ws_handler(&self) -> Arc<dyn DocumentWSReceiver> { self.ws_manager.clone() }
} }
impl std::ops::Drop for ClientDocumentEditor {
fn drop(&mut self) { tracing::trace!("{} ClientDocumentEditor was dropped", self.doc_id) }
}
// The edit queue will exit after the EditorCommandSender was dropped. // The edit queue will exit after the EditorCommandSender was dropped.
fn spawn_edit_queue( fn spawn_edit_queue(
user: Arc<dyn DocumentUser>, user: Arc<dyn DocumentUser>,

View File

@ -4,7 +4,7 @@ use crate::{
}; };
use async_stream::stream; use async_stream::stream;
use flowy_collaboration::{ use flowy_collaboration::{
document::{history::UndoResult, Document, NewlineDoc}, client_document::{history::UndoResult, ClientDocument, NewlineDoc},
entities::revision::{RepeatedRevision, RevId, Revision}, entities::revision::{RepeatedRevision, RevId, Revision},
errors::CollaborateError, errors::CollaborateError,
util::make_delta_from_revisions, util::make_delta_from_revisions,
@ -21,7 +21,7 @@ use tokio::sync::{oneshot, RwLock};
// The EditorCommandQueue executes each command that will alter the document in // The EditorCommandQueue executes each command that will alter the document in
// serial. // serial.
pub(crate) struct EditorCommandQueue { pub(crate) struct EditorCommandQueue {
document: Arc<RwLock<Document>>, document: Arc<RwLock<ClientDocument>>,
user: Arc<dyn DocumentUser>, user: Arc<dyn DocumentUser>,
rev_manager: Arc<DocumentRevisionManager>, rev_manager: Arc<DocumentRevisionManager>,
receiver: Option<EditorCommandReceiver>, receiver: Option<EditorCommandReceiver>,
@ -34,7 +34,7 @@ impl EditorCommandQueue {
delta: RichTextDelta, delta: RichTextDelta,
receiver: EditorCommandReceiver, receiver: EditorCommandReceiver,
) -> Self { ) -> Self {
let document = Arc::new(RwLock::new(Document::from_delta(delta))); let document = Arc::new(RwLock::new(ClientDocument::from_delta(delta)));
Self { Self {
document, document,
user, user,

View File

@ -14,7 +14,7 @@ use flowy_collaboration::{
use flowy_error::FlowyResult; use flowy_error::FlowyResult;
use futures_util::{future, stream, stream::StreamExt}; use futures_util::{future, stream, stream::StreamExt};
use lib_infra::future::FutureResult; use lib_infra::future::FutureResult;
use lib_ot::{core::Operation, errors::OTError, rich_text::RichTextDelta}; use lib_ot::{core::Operation, rich_text::RichTextDelta};
use std::{collections::VecDeque, sync::Arc}; use std::{collections::VecDeque, sync::Arc};
use tokio::sync::RwLock; use tokio::sync::RwLock;
@ -26,20 +26,20 @@ pub struct DocumentRevisionManager {
pub(crate) doc_id: String, pub(crate) doc_id: String,
user_id: String, user_id: String,
rev_id_counter: RevIdCounter, rev_id_counter: RevIdCounter,
cache: Arc<DocumentRevisionCache>, revision_cache: Arc<DocumentRevisionCache>,
sync_seq: Arc<RevisionSyncSequence>, revision_sync_seq: Arc<RevisionSyncSequence>,
} }
impl DocumentRevisionManager { impl DocumentRevisionManager {
pub fn new(user_id: &str, doc_id: &str, cache: Arc<DocumentRevisionCache>) -> Self { pub fn new(user_id: &str, doc_id: &str, revision_cache: Arc<DocumentRevisionCache>) -> Self {
let rev_id_counter = RevIdCounter::new(0); let rev_id_counter = RevIdCounter::new(0);
let sync_seq = Arc::new(RevisionSyncSequence::new()); let revision_sync_seq = Arc::new(RevisionSyncSequence::new());
Self { Self {
doc_id: doc_id.to_string(), doc_id: doc_id.to_string(),
user_id: user_id.to_owned(), user_id: user_id.to_owned(),
rev_id_counter, rev_id_counter,
cache, revision_cache,
sync_seq, revision_sync_seq,
} }
} }
@ -48,7 +48,8 @@ impl DocumentRevisionManager {
doc_id: self.doc_id.clone(), doc_id: self.doc_id.clone(),
user_id: self.user_id.clone(), user_id: self.user_id.clone(),
server, server,
cache: self.cache.clone(), revision_cache: self.revision_cache.clone(),
revision_sync_seq: self.revision_sync_seq.clone(),
} }
.load() .load()
.await?; .await?;
@ -61,7 +62,7 @@ impl DocumentRevisionManager {
pub async fn reset_document(&self, revisions: RepeatedRevision) -> FlowyResult<()> { pub async fn reset_document(&self, revisions: RepeatedRevision) -> FlowyResult<()> {
let rev_id = pair_rev_id_from_revisions(&revisions).1; let rev_id = pair_rev_id_from_revisions(&revisions).1;
let _ = self let _ = self
.cache .revision_cache
.reset_with_revisions(&self.doc_id, revisions.into_inner()) .reset_with_revisions(&self.doc_id, revisions.into_inner())
.await?; .await?;
self.rev_id_counter.set(rev_id); self.rev_id_counter.set(rev_id);
@ -73,7 +74,10 @@ impl DocumentRevisionManager {
if revision.delta_data.is_empty() { if revision.delta_data.is_empty() {
return Err(FlowyError::internal().context("Delta data should be empty")); return Err(FlowyError::internal().context("Delta data should be empty"));
} }
let _ = self.cache.add(revision.clone(), RevisionState::Ack, true).await?; let _ = self
.revision_cache
.add(revision.clone(), RevisionState::Ack, true)
.await?;
self.rev_id_counter.set(revision.rev_id); self.rev_id_counter.set(revision.rev_id);
Ok(()) Ok(())
} }
@ -84,15 +88,18 @@ impl DocumentRevisionManager {
return Err(FlowyError::internal().context("Delta data should be empty")); return Err(FlowyError::internal().context("Delta data should be empty"));
} }
let record = self.cache.add(revision.clone(), RevisionState::Local, true).await?; let record = self
self.sync_seq.add_revision(record).await?; .revision_cache
.add(revision.clone(), RevisionState::Local, true)
.await?;
self.revision_sync_seq.add_revision_record(record).await?;
Ok(()) Ok(())
} }
#[tracing::instrument(level = "debug", skip(self), err)] #[tracing::instrument(level = "debug", skip(self), err)]
pub async fn ack_revision(&self, rev_id: i64) -> Result<(), FlowyError> { pub async fn ack_revision(&self, rev_id: i64) -> Result<(), FlowyError> {
if self.sync_seq.ack(&rev_id).await.is_ok() { if self.revision_sync_seq.ack(&rev_id).await.is_ok() {
self.cache.ack(rev_id).await; self.revision_cache.ack(rev_id).await;
} }
Ok(()) Ok(())
} }
@ -109,28 +116,28 @@ impl DocumentRevisionManager {
pub async fn get_revisions_in_range(&self, range: RevisionRange) -> Result<Vec<Revision>, FlowyError> { pub async fn get_revisions_in_range(&self, range: RevisionRange) -> Result<Vec<Revision>, FlowyError> {
debug_assert!(range.doc_id == self.doc_id); debug_assert!(range.doc_id == self.doc_id);
let revisions = self.cache.revisions_in_range(range.clone()).await?; let revisions = self.revision_cache.revisions_in_range(range.clone()).await?;
Ok(revisions) Ok(revisions)
} }
pub fn next_sync_revision(&self) -> FutureResult<Option<Revision>, FlowyError> { pub fn next_sync_revision(&self) -> FutureResult<Option<Revision>, FlowyError> {
let sync_seq = self.sync_seq.clone(); let revision_sync_seq = self.revision_sync_seq.clone();
let cache = self.cache.clone(); let revision_cache = self.revision_cache.clone();
FutureResult::new(async move { FutureResult::new(async move {
match sync_seq.next_sync_revision().await { match revision_sync_seq.next_sync_revision_record().await {
None => match sync_seq.next_sync_rev_id().await { None => match revision_sync_seq.next_sync_rev_id().await {
None => Ok(None), None => Ok(None),
Some(rev_id) => Ok(cache.get(rev_id).await.map(|record| record.revision)), Some(rev_id) => Ok(revision_cache.get(rev_id).await.map(|record| record.revision)),
}, },
Some((_, record)) => Ok(Some(record.revision)), Some((_, record)) => Ok(Some(record.revision)),
} }
}) })
} }
pub async fn latest_revision(&self) -> Revision { self.cache.latest_revision().await } pub async fn latest_revision(&self) -> Revision { self.revision_cache.latest_revision().await }
pub async fn get_revision(&self, rev_id: i64) -> Option<Revision> { pub async fn get_revision(&self, rev_id: i64) -> Option<Revision> {
self.cache.get(rev_id).await.map(|record| record.revision) self.revision_cache.get(rev_id).await.map(|record| record.revision)
} }
} }
@ -152,12 +159,17 @@ impl std::default::Default for RevisionSyncSequence {
impl RevisionSyncSequence { impl RevisionSyncSequence {
fn new() -> Self { RevisionSyncSequence::default() } fn new() -> Self { RevisionSyncSequence::default() }
async fn add_revision(&self, record: RevisionRecord) -> Result<(), OTError> { async fn add_revision_record(&self, record: RevisionRecord) -> FlowyResult<()> {
if !record.state.is_local() {
return Ok(());
}
// The last revision's rev_id must be greater than the new one. // The last revision's rev_id must be greater than the new one.
if let Some(rev_id) = self.local_revs.read().await.back() { if let Some(rev_id) = self.local_revs.read().await.back() {
if *rev_id >= record.revision.rev_id { if *rev_id >= record.revision.rev_id {
return Err(OTError::revision_id_conflict() return Err(
.context(format!("The new revision's id must be greater than {}", rev_id))); FlowyError::internal().context(format!("The new revision's id must be greater than {}", rev_id))
);
} }
} }
self.local_revs.write().await.push_back(record.revision.rev_id); self.local_revs.write().await.push_back(record.revision.rev_id);
@ -181,7 +193,7 @@ impl RevisionSyncSequence {
Ok(()) Ok(())
} }
async fn next_sync_revision(&self) -> Option<(i64, RevisionRecord)> { async fn next_sync_revision_record(&self) -> Option<(i64, RevisionRecord)> {
match self.local_revs.read().await.front() { match self.local_revs.read().await.front() {
None => None, None => None,
Some(rev_id) => self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())), Some(rev_id) => self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())),
@ -195,12 +207,13 @@ struct RevisionLoader {
doc_id: String, doc_id: String,
user_id: String, user_id: String,
server: Arc<dyn RevisionServer>, server: Arc<dyn RevisionServer>,
cache: Arc<DocumentRevisionCache>, revision_cache: Arc<DocumentRevisionCache>,
revision_sync_seq: Arc<RevisionSyncSequence>,
} }
impl RevisionLoader { impl RevisionLoader {
async fn load(&self) -> Result<Vec<Revision>, FlowyError> { async fn load(&self) -> Result<Vec<Revision>, FlowyError> {
let records = self.cache.batch_get(&self.doc_id)?; let records = self.revision_cache.batch_get(&self.doc_id)?;
let revisions: Vec<Revision>; let revisions: Vec<Revision>;
if records.is_empty() { if records.is_empty() {
let doc = self.server.fetch_document(&self.doc_id).await?; let doc = self.server.fetch_document(&self.doc_id).await?;
@ -214,16 +227,24 @@ impl RevisionLoader {
&self.user_id, &self.user_id,
doc_md5, doc_md5,
); );
let _ = self.cache.add(revision.clone(), RevisionState::Ack, true).await?; let _ = self
.revision_cache
.add(revision.clone(), RevisionState::Ack, true)
.await?;
revisions = vec![revision]; revisions = vec![revision];
} else { } else {
// Sync the records if their state is RevisionState::Local.
stream::iter(records.clone()) stream::iter(records.clone())
.filter(|record| future::ready(record.state == RevisionState::Local)) .filter(|record| future::ready(record.state == RevisionState::Local))
.for_each(|record| async move { .for_each(|record| async move {
match self.cache.add(record.revision, record.state, false).await { let f = || async {
// Sync the records if their state is RevisionState::Local.
let _ = self.revision_sync_seq.add_revision_record(record.clone()).await?;
let _ = self.revision_cache.add(record.revision, record.state, false).await?;
Ok::<(), FlowyError>(())
};
match f().await {
Ok(_) => {}, Ok(_) => {},
Err(e) => tracing::error!("{}", e), Err(e) => tracing::error!("[RevisionLoader]: {}", e),
} }
}) })
.await; .await;
@ -274,5 +295,5 @@ impl RevisionSyncSequence {
#[cfg(feature = "flowy_unit_test")] #[cfg(feature = "flowy_unit_test")]
impl DocumentRevisionManager { impl DocumentRevisionManager {
pub fn revision_cache(&self) -> Arc<DocumentRevisionCache> { self.cache.clone() } pub fn revision_cache(&self) -> Arc<DocumentRevisionCache> { self.revision_cache.clone() }
} }

View File

@ -96,7 +96,7 @@ impl DocumentWebSocketManager {
pub(crate) fn stop(&self) { pub(crate) fn stop(&self) {
if self.stop_sync_tx.send(()).is_ok() { if self.stop_sync_tx.send(()).is_ok() {
tracing::debug!("{} stop sync", self.doc_id) tracing::trace!("{} stop sync", self.doc_id)
} }
} }
} }
@ -134,6 +134,10 @@ pub struct DocumentWSStream {
stop_rx: Option<SinkStopRx>, stop_rx: Option<SinkStopRx>,
} }
impl std::ops::Drop for DocumentWSStream {
fn drop(&mut self) { tracing::trace!("{} DocumentWSStream was dropped", self.doc_id) }
}
impl DocumentWSStream { impl DocumentWSStream {
pub fn new( pub fn new(
doc_id: &str, doc_id: &str,
@ -282,6 +286,10 @@ impl DocumentWSSink {
} }
} }
impl std::ops::Drop for DocumentWSSink {
fn drop(&mut self) { tracing::trace!("{} DocumentWSSink was dropped", self.doc_id) }
}
async fn tick(sender: mpsc::Sender<()>) { async fn tick(sender: mpsc::Sender<()>) {
let mut interval = interval(Duration::from_millis(SYNC_INTERVAL_IN_MILLIS)); let mut interval = interval(Duration::from_millis(SYNC_INTERVAL_IN_MILLIS));
while sender.send(()).await.is_ok() { while sender.send(()).await.is_ok() {

View File

@ -1,6 +1,6 @@
#![cfg_attr(rustfmt, rustfmt::skip)] #![cfg_attr(rustfmt, rustfmt::skip)]
use crate::editor::{TestBuilder, TestOp::*}; use crate::editor::{TestBuilder, TestOp::*};
use flowy_collaboration::document::{NewlineDoc, PlainDoc}; use flowy_collaboration::client_document::{NewlineDoc, PlainDoc};
use lib_ot::core::{Interval, OperationTransformable, NEW_LINE, WHITESPACE, FlowyStr}; use lib_ot::core::{Interval, OperationTransformable, NEW_LINE, WHITESPACE, FlowyStr};
use unicode_segmentation::UnicodeSegmentation; use unicode_segmentation::UnicodeSegmentation;
use lib_ot::rich_text::RichTextDelta; use lib_ot::rich_text::RichTextDelta;

View File

@ -5,7 +5,7 @@ mod serde_test;
mod undo_redo_test; mod undo_redo_test;
use derive_more::Display; use derive_more::Display;
use flowy_collaboration::document::{Document, InitialDocumentText}; use flowy_collaboration::client_document::{ClientDocument, InitialDocumentText};
use lib_ot::{ use lib_ot::{
core::*, core::*,
rich_text::{RichTextAttribute, RichTextAttributes, RichTextDelta}, rich_text::{RichTextAttribute, RichTextAttributes, RichTextDelta},
@ -82,7 +82,7 @@ pub enum TestOp {
} }
pub struct TestBuilder { pub struct TestBuilder {
documents: Vec<Document>, documents: Vec<ClientDocument>,
deltas: Vec<Option<RichTextDelta>>, deltas: Vec<Option<RichTextDelta>>,
primes: Vec<Option<RichTextDelta>>, primes: Vec<Option<RichTextDelta>>,
} }
@ -266,7 +266,7 @@ impl TestBuilder {
} }
pub fn run_scripts<C: InitialDocumentText>(mut self, scripts: Vec<TestOp>) { pub fn run_scripts<C: InitialDocumentText>(mut self, scripts: Vec<TestOp>) {
self.documents = vec![Document::new::<C>(), Document::new::<C>()]; self.documents = vec![ClientDocument::new::<C>(), ClientDocument::new::<C>()];
self.primes = vec![None, None]; self.primes = vec![None, None];
self.deltas = vec![None, None]; self.deltas = vec![None, None];
for (_i, op) in scripts.iter().enumerate() { for (_i, op) in scripts.iter().enumerate() {

View File

@ -1,6 +1,6 @@
#![allow(clippy::all)] #![allow(clippy::all)]
use crate::editor::{Rng, TestBuilder, TestOp::*}; use crate::editor::{Rng, TestBuilder, TestOp::*};
use flowy_collaboration::document::{NewlineDoc, PlainDoc}; use flowy_collaboration::client_document::{NewlineDoc, PlainDoc};
use lib_ot::{ use lib_ot::{
core::*, core::*,
rich_text::{AttributeBuilder, RichTextAttribute, RichTextAttributes, RichTextDelta}, rich_text::{AttributeBuilder, RichTextAttribute, RichTextAttributes, RichTextDelta},

View File

@ -1,4 +1,4 @@
use flowy_collaboration::document::{Document, PlainDoc}; use flowy_collaboration::client_document::{ClientDocument, PlainDoc};
use lib_ot::{ use lib_ot::{
core::*, core::*,
rich_text::{AttributeBuilder, RichTextAttribute, RichTextAttributeValue, RichTextDelta}, rich_text::{AttributeBuilder, RichTextAttribute, RichTextAttributeValue, RichTextDelta},
@ -104,10 +104,13 @@ fn delta_serde_null_test() {
#[test] #[test]
fn document_insert_serde_test() { fn document_insert_serde_test() {
let mut document = Document::new::<PlainDoc>(); let mut document = ClientDocument::new::<PlainDoc>();
document.insert(0, "\n").unwrap(); document.insert(0, "\n").unwrap();
document.insert(0, "123").unwrap(); document.insert(0, "123").unwrap();
let json = document.to_json(); let json = document.to_json();
assert_eq!(r#"[{"insert":"123\n"}]"#, json); assert_eq!(r#"[{"insert":"123\n"}]"#, json);
assert_eq!(r#"[{"insert":"123\n"}]"#, Document::from_json(&json).unwrap().to_json()); assert_eq!(
r#"[{"insert":"123\n"}]"#,
ClientDocument::from_json(&json).unwrap().to_json()
);
} }

View File

@ -1,5 +1,5 @@
use crate::editor::{TestBuilder, TestOp::*}; use crate::editor::{TestBuilder, TestOp::*};
use flowy_collaboration::document::{NewlineDoc, PlainDoc, RECORD_THRESHOLD}; use flowy_collaboration::client_document::{NewlineDoc, PlainDoc, RECORD_THRESHOLD};
use lib_ot::core::{Interval, NEW_LINE, WHITESPACE}; use lib_ot::core::{Interval, NEW_LINE, WHITESPACE};
#[test] #[test]

View File

@ -4,7 +4,7 @@ use backend_service::{
response::FlowyResponse, response::FlowyResponse,
}; };
use flowy_collaboration::{ use flowy_collaboration::{
document::default::initial_delta_string, client_document::default::initial_delta_string,
entities::doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams}, entities::doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams},
}; };
use flowy_error::FlowyError; use flowy_error::FlowyError;

View File

@ -4,14 +4,14 @@ use flowy_collaboration::{
entities::ws::{DocumentClientWSData, DocumentClientWSDataType}, entities::ws::{DocumentClientWSData, DocumentClientWSDataType},
errors::CollaborateError, errors::CollaborateError,
protobuf::DocumentClientWSData as DocumentClientWSDataPB, protobuf::DocumentClientWSData as DocumentClientWSDataPB,
sync::*, server_document::*,
}; };
use lib_ws::{WSModule, WebSocketRawMessage}; use lib_ws::{WSModule, WebSocketRawMessage};
use std::{convert::TryInto, fmt::Debug, sync::Arc}; use std::{convert::TryInto, fmt::Debug, sync::Arc};
use tokio::sync::{mpsc, mpsc::UnboundedSender}; use tokio::sync::{mpsc, mpsc::UnboundedSender};
pub struct LocalDocumentServer { pub struct LocalDocumentServer {
pub doc_manager: Arc<ServerDocumentManager>, doc_manager: Arc<ServerDocumentManager>,
sender: mpsc::UnboundedSender<WebSocketRawMessage>, sender: mpsc::UnboundedSender<WebSocketRawMessage>,
} }

View File

@ -50,22 +50,21 @@ impl std::default::Default for LocalWebSocket {
} }
impl LocalWebSocket { impl LocalWebSocket {
fn restart_ws_receiver(&self) -> mpsc::Receiver<()> { fn cancel_pre_spawn_client(&self) {
if let Some(stop_tx) = self.local_server_stop_tx.read().clone() { if let Some(stop_tx) = self.local_server_stop_tx.read().clone() {
tokio::spawn(async move { tokio::spawn(async move {
let _ = stop_tx.send(()).await; let _ = stop_tx.send(()).await;
}); });
} }
let (stop_tx, stop_rx) = mpsc::channel::<()>(1);
*self.local_server_stop_tx.write() = Some(stop_tx);
stop_rx
} }
fn spawn_client_ws_receiver(&self, _addr: String) { fn spawn_client_ws_receiver(&self, _addr: String) {
let mut ws_receiver = self.ws_sender.subscribe(); let mut ws_receiver = self.ws_sender.subscribe();
let local_server = self.local_server.clone(); let local_server = self.local_server.clone();
let user_id = self.user_id.clone(); let user_id = self.user_id.clone();
let mut stop_rx = self.restart_ws_receiver(); let _ = self.cancel_pre_spawn_client();
let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
*self.local_server_stop_tx.write() = Some(stop_tx);
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
tokio::select! { tokio::select! {

View File

@ -4,7 +4,7 @@ use flowy_collaboration::{
entities::doc::DocumentInfo, entities::doc::DocumentInfo,
errors::CollaborateError, errors::CollaborateError,
protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
sync::*, server_document::*,
util::{make_doc_from_revisions, repeated_revision_from_repeated_revision_pb}, util::{make_doc_from_revisions, repeated_revision_from_repeated_revision_pb},
}; };
use lib_infra::future::BoxResultFuture; use lib_infra::future::BoxResultFuture;

View File

@ -14,7 +14,7 @@ pub fn initial_read_me() -> RichTextDelta {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::document::default::initial_read_me; use crate::client_document::default::initial_read_me;
#[test] #[test]
fn load_read_me() { fn load_read_me() {

View File

@ -1,5 +1,5 @@
use crate::{ use crate::{
document::{ client_document::{
default::initial_delta, default::initial_delta,
history::{History, UndoResult}, history::{History, UndoResult},
view::{View, RECORD_THRESHOLD}, view::{View, RECORD_THRESHOLD},
@ -26,7 +26,7 @@ impl InitialDocumentText for NewlineDoc {
fn initial_delta() -> RichTextDelta { initial_delta() } fn initial_delta() -> RichTextDelta { initial_delta() }
} }
pub struct Document { pub struct ClientDocument {
delta: RichTextDelta, delta: RichTextDelta,
history: History, history: History,
view: View, view: View,
@ -34,11 +34,11 @@ pub struct Document {
notify: Option<mpsc::UnboundedSender<()>>, notify: Option<mpsc::UnboundedSender<()>>,
} }
impl Document { impl ClientDocument {
pub fn new<C: InitialDocumentText>() -> Self { Self::from_delta(C::initial_delta()) } pub fn new<C: InitialDocumentText>() -> Self { Self::from_delta(C::initial_delta()) }
pub fn from_delta(delta: RichTextDelta) -> Self { pub fn from_delta(delta: RichTextDelta) -> Self {
Document { ClientDocument {
delta, delta,
history: History::new(), history: History::new(),
view: View::new(), view: View::new(),
@ -185,7 +185,7 @@ impl Document {
pub fn is_empty<C: InitialDocumentText>(&self) -> bool { self.delta == C::initial_delta() } pub fn is_empty<C: InitialDocumentText>(&self) -> bool { self.delta == C::initial_delta() }
} }
impl Document { impl ClientDocument {
fn invert(&self, delta: &RichTextDelta) -> Result<(RichTextDelta, RichTextDelta), CollaborateError> { fn invert(&self, delta: &RichTextDelta) -> Result<(RichTextDelta, RichTextDelta), CollaborateError> {
// c = a.compose(b) // c = a.compose(b)
// d = b.invert(a) // d = b.invert(a)

View File

@ -1,4 +1,4 @@
use crate::document::DeleteExt; use crate::client_document::DeleteExt;
use lib_ot::{ use lib_ot::{
core::{DeltaBuilder, Interval}, core::{DeltaBuilder, Interval},
rich_text::RichTextDelta, rich_text::RichTextDelta,

View File

@ -1,4 +1,4 @@
use crate::{document::DeleteExt, util::is_newline}; use crate::{client_document::DeleteExt, util::is_newline};
use lib_ot::{ use lib_ot::{
core::{Attributes, DeltaBuilder, DeltaIter, Interval, Utf16CodeUnitMetric, NEW_LINE}, core::{Attributes, DeltaBuilder, DeltaIter, Interval, Utf16CodeUnitMetric, NEW_LINE},
rich_text::{plain_attributes, RichTextDelta}, rich_text::{plain_attributes, RichTextDelta},

View File

@ -4,7 +4,7 @@ use lib_ot::{
}; };
use crate::{ use crate::{
document::{extensions::helper::line_break, FormatExt}, client_document::{extensions::helper::line_break, FormatExt},
util::find_newline, util::find_newline,
}; };

View File

@ -4,7 +4,7 @@ use lib_ot::{
}; };
use crate::{ use crate::{
document::{extensions::helper::line_break, FormatExt}, client_document::{extensions::helper::line_break, FormatExt},
util::find_newline, util::find_newline,
}; };

View File

@ -1,4 +1,4 @@
use crate::{document::InsertExt, util::is_newline}; use crate::{client_document::InsertExt, util::is_newline};
use lib_ot::{ use lib_ot::{
core::{is_empty_line_at_index, DeltaBuilder, DeltaIter}, core::{is_empty_line_at_index, DeltaBuilder, DeltaIter},
rich_text::{attributes_except_header, RichTextAttributeKey, RichTextDelta}, rich_text::{attributes_except_header, RichTextAttributeKey, RichTextDelta},

View File

@ -1,4 +1,4 @@
use crate::{document::InsertExt, util::is_whitespace}; use crate::{client_document::InsertExt, util::is_whitespace};
use lib_ot::{ use lib_ot::{
core::{count_utf16_code_units, DeltaBuilder, DeltaIter}, core::{count_utf16_code_units, DeltaBuilder, DeltaIter},
rich_text::{plain_attributes, RichTextAttribute, RichTextAttributes, RichTextDelta}, rich_text::{plain_attributes, RichTextAttribute, RichTextAttributes, RichTextDelta},

View File

@ -1,4 +1,4 @@
use crate::document::InsertExt; use crate::client_document::InsertExt;
use lib_ot::{ use lib_ot::{
core::{Attributes, DeltaBuilder, DeltaIter, NEW_LINE}, core::{Attributes, DeltaBuilder, DeltaIter, NEW_LINE},
rich_text::{RichTextAttributeKey, RichTextAttributes, RichTextDelta}, rich_text::{RichTextAttributeKey, RichTextAttributes, RichTextDelta},

View File

@ -1,4 +1,4 @@
use crate::document::InsertExt; use crate::client_document::InsertExt;
pub use auto_exit_block::*; pub use auto_exit_block::*;
pub use auto_format::*; pub use auto_format::*;
pub use default_insert::*; pub use default_insert::*;

View File

@ -1,4 +1,4 @@
use crate::{document::InsertExt, util::is_newline}; use crate::{client_document::InsertExt, util::is_newline};
use lib_ot::{ use lib_ot::{
core::{DeltaBuilder, DeltaIter, NEW_LINE}, core::{DeltaBuilder, DeltaIter, NEW_LINE},
rich_text::{ rich_text::{

View File

@ -1,5 +1,5 @@
use crate::{ use crate::{
document::InsertExt, client_document::InsertExt,
util::{contain_newline, is_newline}, util::{contain_newline, is_newline},
}; };
use lib_ot::{ use lib_ot::{

View File

@ -1,4 +1,4 @@
use crate::{document::InsertExt, util::is_newline}; use crate::{client_document::InsertExt, util::is_newline};
use lib_ot::{ use lib_ot::{
core::{DeltaBuilder, DeltaIter, Utf16CodeUnitMetric, NEW_LINE}, core::{DeltaBuilder, DeltaIter, Utf16CodeUnitMetric, NEW_LINE},
rich_text::{RichTextAttributeKey, RichTextAttributes, RichTextDelta}, rich_text::{RichTextAttributeKey, RichTextAttributes, RichTextDelta},

View File

@ -1,12 +1,12 @@
#![allow(clippy::module_inception)] #![allow(clippy::module_inception)]
pub use document::*; pub use document_pad::*;
pub(crate) use extensions::*; pub(crate) use extensions::*;
pub use view::*; pub use view::*;
mod data; mod data;
pub mod default; pub mod default;
mod document; mod document_pad;
mod extensions; mod extensions;
pub mod history; pub mod history;
mod view; mod view;

View File

@ -1,4 +1,4 @@
use crate::document::*; use crate::client_document::*;
use lib_ot::{ use lib_ot::{
core::{trim, Interval}, core::{trim, Interval},
errors::{ErrorBuilder, OTError, OTErrorCode}, errors::{ErrorBuilder, OTError, OTErrorCode},

View File

@ -181,6 +181,15 @@ pub enum RevisionState {
Ack = 1, Ack = 1,
} }
impl RevisionState {
pub fn is_local(&self) -> bool {
match self {
RevisionState::Local => true,
RevisionState::Ack => false,
}
}
}
impl AsRef<RevisionState> for RevisionState { impl AsRef<RevisionState> for RevisionState {
fn as_ref(&self) -> &RevisionState { &self } fn as_ref(&self) -> &RevisionState { &self }
} }

View File

@ -1,8 +1,7 @@
pub mod document; pub mod client_document;
pub mod entities; pub mod entities;
pub mod errors; pub mod errors;
pub mod protobuf; pub mod protobuf;
pub mod sync; pub mod server_document;
pub mod util; pub mod util;
pub use lib_ot::rich_text::RichTextDelta; pub use lib_ot::rich_text::RichTextDelta;

View File

@ -1,9 +1,8 @@
use crate::{ use crate::{
document::Document,
entities::{doc::DocumentInfo, ws::DocumentServerWSDataBuilder}, entities::{doc::DocumentInfo, ws::DocumentServerWSDataBuilder},
errors::{internal_error, CollaborateError, CollaborateResult}, errors::{internal_error, CollaborateError, CollaborateResult},
protobuf::{DocumentClientWSData, RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, protobuf::{DocumentClientWSData, RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
sync::{RevisionSynchronizer, RevisionUser, SyncResponse}, server_document::{document_pad::ServerDocument, RevisionSynchronizer, RevisionUser, SyncResponse},
}; };
use async_stream::stream; use async_stream::stream;
use dashmap::DashMap; use dashmap::DashMap;
@ -186,7 +185,7 @@ impl OpenDocHandle {
let synchronizer = Arc::new(RevisionSynchronizer::new( let synchronizer = Arc::new(RevisionSynchronizer::new(
&doc.doc_id, &doc.doc_id,
doc.rev_id, doc.rev_id,
Document::from_delta(delta), ServerDocument::from_delta(delta),
persistence, persistence,
)); ));

View File

@ -0,0 +1,39 @@
use crate::{client_document::InitialDocumentText, errors::CollaborateError};
use lib_ot::{core::*, rich_text::RichTextDelta};
pub struct ServerDocument {
delta: RichTextDelta,
}
impl ServerDocument {
pub fn new<C: InitialDocumentText>() -> Self { Self::from_delta(C::initial_delta()) }
pub fn from_delta(delta: RichTextDelta) -> Self { ServerDocument { delta } }
pub fn from_json(json: &str) -> Result<Self, CollaborateError> {
let delta = RichTextDelta::from_json(json)?;
Ok(Self::from_delta(delta))
}
pub fn to_json(&self) -> String { self.delta.to_json() }
pub fn to_bytes(&self) -> Vec<u8> { self.delta.clone().to_bytes().to_vec() }
pub fn to_plain_string(&self) -> String { self.delta.apply("").unwrap() }
pub fn delta(&self) -> &RichTextDelta { &self.delta }
pub fn md5(&self) -> String {
let bytes = self.to_bytes();
format!("{:x}", md5::compute(bytes))
}
pub fn compose_delta(&mut self, delta: RichTextDelta) -> Result<(), CollaborateError> {
// tracing::trace!("{} compose {}", &self.delta.to_json(), delta.to_json());
let composed_delta = self.delta.compose(&delta)?;
self.delta = composed_delta;
Ok(())
}
pub fn is_empty<C: InitialDocumentText>(&self) -> bool { self.delta == C::initial_delta() }
}

View File

@ -0,0 +1,6 @@
mod document_manager;
mod document_pad;
mod revision_sync;
pub use document_manager::*;
pub use revision_sync::*;

View File

@ -1,12 +1,11 @@
use crate::{ use crate::{
document::Document,
entities::{ entities::{
revision::RevisionRange, revision::RevisionRange,
ws::{DocumentServerWSData, DocumentServerWSDataBuilder}, ws::{DocumentServerWSData, DocumentServerWSDataBuilder},
}, },
errors::CollaborateError, errors::CollaborateError,
protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
sync::DocumentCloudPersistence, server_document::{document_pad::ServerDocument, DocumentCloudPersistence},
util::*, util::*,
}; };
use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta}; use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta};
@ -35,7 +34,7 @@ pub enum SyncResponse {
pub struct RevisionSynchronizer { pub struct RevisionSynchronizer {
pub doc_id: String, pub doc_id: String,
pub rev_id: AtomicI64, pub rev_id: AtomicI64,
document: Arc<RwLock<Document>>, document: Arc<RwLock<ServerDocument>>,
persistence: Arc<dyn DocumentCloudPersistence>, persistence: Arc<dyn DocumentCloudPersistence>,
} }
@ -43,7 +42,7 @@ impl RevisionSynchronizer {
pub fn new( pub fn new(
doc_id: &str, doc_id: &str,
rev_id: i64, rev_id: i64,
document: Document, document: ServerDocument,
persistence: Arc<dyn DocumentCloudPersistence>, persistence: Arc<dyn DocumentCloudPersistence>,
) -> RevisionSynchronizer { ) -> RevisionSynchronizer {
let document = Arc::new(RwLock::new(document)); let document = Arc::new(RwLock::new(document));
@ -100,7 +99,7 @@ impl RevisionSynchronizer {
}, },
Ordering::Equal => { Ordering::Equal => {
// Do nothing // Do nothing
log::warn!("Applied revision rev_id is the same as cur_rev_id"); tracing::warn!("Applied revision rev_id is the same as cur_rev_id");
}, },
Ordering::Greater => { Ordering::Greater => {
// The client document is outdated. Transform the client revision delta and then // The client document is outdated. Transform the client revision delta and then
@ -147,7 +146,7 @@ impl RevisionSynchronizer {
let delta = make_delta_from_revision_pb(revisions)?; let delta = make_delta_from_revision_pb(revisions)?;
let _ = self.persistence.reset_document(&doc_id, repeated_revision).await?; let _ = self.persistence.reset_document(&doc_id, repeated_revision).await?;
*self.document.write() = Document::from_delta(delta); *self.document.write() = ServerDocument::from_delta(delta);
let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id)); let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id));
Ok(()) Ok(())
} }

View File

@ -1,5 +0,0 @@
mod server;
mod synchronizer;
pub use server::*;
pub use synchronizer::*;

View File

@ -7,7 +7,7 @@ use crate::{
view::{ViewName, ViewThumbnail}, view::{ViewName, ViewThumbnail},
}, },
}; };
use flowy_collaboration::document::default::initial_delta_string; use flowy_collaboration::client_document::default::initial_delta_string;
use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
use std::convert::TryInto; use std::convert::TryInto;

View File

@ -205,12 +205,12 @@ where
T: Attributes, T: Attributes,
{ {
pub fn merge_or_new(&mut self, n: usize, attributes: T) -> Option<Operation<T>> { pub fn merge_or_new(&mut self, n: usize, attributes: T) -> Option<Operation<T>> {
tracing::trace!( // tracing::trace!(
"merge_retain_or_new_op: len: {:?}, l: {} - r: {}", // "merge_retain_or_new_op: len: {:?}, l: {} - r: {}",
n, // n,
self.attributes, // self.attributes,
attributes // attributes
); // );
if self.attributes == attributes { if self.attributes == attributes {
self.n += n; self.n += n;
None None