config folder collaboration

This commit is contained in:
appflowy 2022-01-15 23:58:36 +08:00
parent 02201c238c
commit 6bca483c28
31 changed files with 346 additions and 385 deletions

12
backend/Cargo.lock generated
View File

@ -1111,6 +1111,12 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0"
[[package]]
name = "dissimilar"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31ad93652f40969dead8d4bf897a41e9462095152eb21c56e5830537e41179dd"
[[package]]
name = "dotenv"
version = "0.15.0"
@ -1227,6 +1233,8 @@ dependencies = [
"bytes",
"chrono",
"dashmap",
"dissimilar",
"flowy-core-data-model",
"flowy-derive",
"futures",
"lib-infra",
@ -1236,6 +1244,7 @@ dependencies = [
"parking_lot",
"protobuf",
"serde",
"serde_json",
"strum",
"strum_macros",
"tokio",
@ -1289,10 +1298,11 @@ dependencies = [
"chrono",
"derive_more",
"error-code",
"flowy-collaboration",
"flowy-derive",
"log",
"protobuf",
"serde",
"serde_json",
"strum",
"strum_macros",
"unicode-segmentation",

View File

@ -11,7 +11,6 @@ use flowy_collaboration::{
ClientRevisionWSData as ClientRevisionWSDataPB,
ClientRevisionWSDataType as ClientRevisionWSDataTypePB,
},
server_document::ServerDocumentManager,
synchronizer::{RevisionSyncResponse, RevisionUser},
};
use futures::stream::StreamExt;

View File

@ -1,4 +1,4 @@
mod version_1;
pub mod version_1;
mod version_2;
use std::sync::Arc;

View File

@ -4,6 +4,7 @@ use flowy_collaboration::entities::{
revision::{RepeatedRevision, Revision},
};
use flowy_collaboration::client_document::default::initial_delta_string;
use futures::{FutureExt, StreamExt};
use std::{collections::HashSet, sync::Arc};
@ -61,7 +62,13 @@ impl ViewController {
#[tracing::instrument(level = "debug", skip(self, params), fields(name = %params.name), err)]
pub(crate) async fn create_view_from_params(&self, params: CreateViewParams) -> Result<View, FlowyError> {
let delta_data = Bytes::from(params.view_data.clone());
let view_data = if params.view_data.is_empty() {
initial_delta_string()
} else {
params.view_data.clone()
};
let delta_data = Bytes::from(view_data);
let user_id = self.user.user_id()?;
let repeated_revision: RepeatedRevision =
Revision::initial_revision(&user_id, &params.view_id, delta_data).into();
@ -110,22 +117,20 @@ impl ViewController {
})
}
#[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
pub(crate) async fn open_view(&self, params: DocumentId) -> Result<DocumentDelta, FlowyError> {
let doc_id = params.doc_id.clone();
let editor = self.document_ctx.controller.open_document(&params.doc_id).await?;
KV::set_str(LATEST_VIEW_ID, doc_id.clone());
#[tracing::instrument(level = "debug", skip(self), err)]
pub(crate) async fn open_view(&self, doc_id: &str) -> Result<DocumentDelta, FlowyError> {
let editor = self.document_ctx.controller.open_document(doc_id).await?;
KV::set_str(LATEST_VIEW_ID, doc_id.to_owned());
let document_json = editor.document_json().await?;
Ok(DocumentDelta {
doc_id,
doc_id: doc_id.to_string(),
delta_json: document_json,
})
}
#[tracing::instrument(level = "debug", skip(self, params), err)]
pub(crate) async fn close_view(&self, params: DocumentId) -> Result<(), FlowyError> {
let _ = self.document_ctx.controller.close_document(&params.doc_id)?;
#[tracing::instrument(level = "debug", skip(self), err)]
pub(crate) async fn close_view(&self, doc_id: &str) -> Result<(), FlowyError> {
let _ = self.document_ctx.controller.close_document(doc_id)?;
Ok(())
}
@ -140,13 +145,13 @@ impl ViewController {
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
pub(crate) async fn duplicate_view(&self, params: DocumentId) -> Result<(), FlowyError> {
#[tracing::instrument(level = "debug", skip(self), err)]
pub(crate) async fn duplicate_view(&self, doc_id: &str) -> Result<(), FlowyError> {
let view = self
.persistence
.begin_transaction(|transaction| transaction.read_view(&params.doc_id))?;
.begin_transaction(|transaction| transaction.read_view(doc_id))?;
let editor = self.document_ctx.controller.open_document(&params.doc_id).await?;
let editor = self.document_ctx.controller.open_document(doc_id).await?;
let document_json = editor.document_json().await?;
let duplicate_params = CreateViewParams {
belong_to_id: view.belong_to_id.clone(),

View File

@ -84,7 +84,7 @@ pub(crate) async fn open_view_handler(
controller: Unit<Arc<ViewController>>,
) -> DataResult<DocumentDelta, FlowyError> {
let params: ViewId = data.into_inner().try_into()?;
let doc = controller.open_view(params.into()).await?;
let doc = controller.open_view(&params.view_id).await?;
data_result(doc)
}
@ -93,7 +93,7 @@ pub(crate) async fn close_view_handler(
controller: Unit<Arc<ViewController>>,
) -> Result<(), FlowyError> {
let params: ViewId = data.into_inner().try_into()?;
let _ = controller.close_view(params.into()).await?;
let _ = controller.close_view(&params.view_id).await?;
Ok(())
}
@ -103,7 +103,7 @@ pub(crate) async fn duplicate_view_handler(
controller: Unit<Arc<ViewController>>,
) -> Result<(), FlowyError> {
let params: ViewId = data.into_inner().try_into()?;
let _ = controller.duplicate_view(params.into()).await?;
let _ = controller.duplicate_view(&params.view_id).await?;
Ok(())
}

View File

@ -132,14 +132,20 @@ impl RevisionWSSinkDataProvider for DocumentWSSinkDataProviderAdapter {
async fn transform_pushed_revisions(
revisions: Vec<Revision>,
edit_cmd: &EditorCommandSender,
edit_cmd_tx: &EditorCommandSender,
) -> FlowyResult<TransformDeltas> {
let (ret, rx) = oneshot::channel::<CollaborateResult<TransformDeltas>>();
let _ = edit_cmd.send(EditorCommand::TransformRevision { revisions, ret });
Ok(rx.await.map_err(internal_error)??)
edit_cmd_tx
.send(EditorCommand::TransformRevision { revisions, ret })
.await
.map_err(internal_error)?;
let transform_delta = rx
.await
.map_err(|e| FlowyError::internal().context(format!("transform_pushed_revisions failed: {}", e)))??;
Ok(transform_delta)
}
#[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))]
#[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes), err)]
pub(crate) async fn handle_remote_revision(
edit_cmd_tx: EditorCommandSender,
rev_manager: Arc<RevisionManager>,
@ -173,23 +179,33 @@ pub(crate) async fn handle_remote_revision(
// The server_prime is None means the client local revisions conflict with the
// server, and it needs to override the client delta.
let (ret, rx) = oneshot::channel();
let _ = edit_cmd_tx.send(EditorCommand::OverrideDelta {
revisions,
delta: client_prime,
ret,
});
let _ = rx.await.map_err(internal_error)??;
let _ = edit_cmd_tx
.send(EditorCommand::OverrideDelta {
revisions,
delta: client_prime,
ret,
})
.await;
let _ = rx.await.map_err(|e| {
FlowyError::internal().context(format!("handle EditorCommand::OverrideDelta failed: {}", e))
})??;
Ok(None)
},
Some(server_prime) => {
let (ret, rx) = oneshot::channel();
let _ = edit_cmd_tx.send(EditorCommand::ComposeRemoteDelta {
revisions,
client_delta: client_prime,
server_delta: server_prime,
ret,
});
Ok(rx.await.map_err(internal_error)??)
edit_cmd_tx
.send(EditorCommand::ComposeRemoteDelta {
revisions,
client_delta: client_prime,
server_delta: server_prime,
ret,
})
.await
.map_err(internal_error)?;
let result = rx.await.map_err(|e| {
FlowyError::internal().context(format!("handle EditorCommand::ComposeRemoteDelta failed: {}", e))
})??;
Ok(result)
},
}
}

View File

@ -3,6 +3,7 @@ use lib_ot::{
core::*,
rich_text::{AttributeBuilder, RichTextAttribute, RichTextAttributeValue, RichTextDelta},
};
use lib_ot::rich_text::RichTextOperation;
#[test]
fn operation_insert_serialize_test() {

View File

@ -12,6 +12,8 @@ use std::{
sync::Arc,
};
// For the moment, we use memory to cache the data, it will be implemented with
// other storage. Like the Firestore,Dropbox.etc.
pub trait RevisionCloudStorage: Send + Sync {
fn set_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>;
fn get_revisions(
@ -28,8 +30,6 @@ pub trait RevisionCloudStorage: Send + Sync {
}
pub(crate) struct LocalDocumentCloudPersistence {
// For the moment, we use memory to cache the data, it will be implemented with other storage.
// Like the Firestore,Dropbox.etc.
storage: Arc<dyn RevisionCloudStorage>,
}

View File

@ -74,6 +74,7 @@ fn crate_log_filter(level: String) -> String {
filters.push(format!("lib_ot={}", level));
filters.push(format!("lib_ws={}", level));
filters.push(format!("lib_infra={}", level));
filters.push(format!("flowy_sync={}", level));
filters.join(",")
}

View File

@ -147,13 +147,13 @@ impl RevisionWSStream {
yield msg
},
None => {
tracing::debug!("[RevisionWSStream:{}] loop exit", object_id);
tracing::debug!("[RevisionWSStream]:{} loop exit", object_id);
break;
},
}
},
_ = stop_rx.recv() => {
tracing::debug!("[RevisionWSStream:{}] loop exit", object_id);
tracing::debug!("[RevisionWSStream]:{} loop exit", object_id);
break
},
};
@ -164,7 +164,7 @@ impl RevisionWSStream {
.for_each(|msg| async {
match self.handle_message(msg).await {
Ok(_) => {},
Err(e) => tracing::error!("[RevisionWSStream:{}] error: {}", self.object_id, e),
Err(e) => tracing::error!("[RevisionWSStream]:{} error: {}", self.object_id, e),
}
})
.await;

12
shared-lib/Cargo.lock generated
View File

@ -530,6 +530,12 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0"
[[package]]
name = "dissimilar"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31ad93652f40969dead8d4bf897a41e9462095152eb21c56e5830537e41179dd"
[[package]]
name = "either"
version = "1.6.1"
@ -631,6 +637,8 @@ dependencies = [
"bytes",
"chrono",
"dashmap",
"dissimilar",
"flowy-core-data-model",
"flowy-derive",
"futures",
"lib-infra",
@ -640,6 +648,7 @@ dependencies = [
"parking_lot",
"protobuf",
"serde",
"serde_json",
"strum",
"strum_macros",
"tokio",
@ -655,10 +664,11 @@ dependencies = [
"chrono",
"derive_more",
"error-code",
"flowy-collaboration",
"flowy-derive",
"log",
"protobuf",
"serde",
"serde_json",
"strum",
"strum_macros",
"unicode-segmentation",

View File

@ -9,12 +9,15 @@ edition = "2018"
lib-ot = { path = "../lib-ot" }
lib-infra = { path = "../lib-infra" }
flowy-derive = { path = "../flowy-derive" }
flowy-core-data-model = { path = "../flowy-core-data-model" }
protobuf = {version = "2.18.0"}
bytes = "1.0"
log = "0.4.14"
md5 = "0.7.0"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde = { version = "1.0", features = ["derive", "rc"] }
serde_json = {version = "1.0"}
dissimilar = "1.0"
tracing = { version = "0.1", features = ["log"] }
url = "2.2"
strum = "0.21"

View File

@ -1,8 +1,5 @@
use crate::util::find_newline;
use lib_ot::{
core::RichTextOperation,
rich_text::{plain_attributes, AttributeScope, RichTextAttribute, RichTextDelta},
};
use lib_ot::rich_text::{plain_attributes, AttributeScope, RichTextAttribute, RichTextDelta, RichTextOperation};
pub(crate) fn line_break(
op: &RichTextOperation,

View File

@ -0,0 +1,132 @@
use dissimilar::*;
use flowy_core_data_model::entities::{
app::{App, RepeatedApp},
trash::{RepeatedTrash, Trash},
view::{RepeatedView, View},
workspace::{RepeatedWorkspace, Workspace},
};
use lib_ot::core::{
Delta,
FlowyStr,
Operation,
Operation::Retain,
PlainDeltaBuilder,
PlainTextAttributes,
PlainTextOpBuilder,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct RootFolder {
workspaces: Vec<Arc<Workspace>>,
trash: Vec<Trash>,
}
impl RootFolder {
pub fn add_workspace(&mut self, workspace: Workspace) -> Option<Delta<PlainTextAttributes>> {
let workspace = Arc::new(workspace);
if self.workspaces.contains(&workspace) {
tracing::warn!("Duplicate workspace");
return None;
}
let old = WorkspacesJson::new(self.workspaces.clone()).to_json().unwrap();
self.workspaces.push(workspace);
let new = WorkspacesJson::new(self.workspaces.clone()).to_json().unwrap();
Some(cal_diff(old, new))
}
pub fn update_workspace(&mut self, workspace_id: &str, name: Option<String>, desc: Option<String>) {
if let Some(mut workspace) = self
.workspaces
.iter_mut()
.find(|workspace| workspace.id == workspace_id)
{
let m_workspace = Arc::make_mut(&mut workspace);
if let Some(name) = name {
m_workspace.name = name;
}
if let Some(desc) = desc {
m_workspace.desc = desc;
}
}
}
pub fn delete_workspace(&mut self, workspace_id: &str) { self.workspaces.retain(|w| w.id != workspace_id) }
}
fn cal_diff(old: String, new: String) -> Delta<PlainTextAttributes> {
let mut chunks = dissimilar::diff(&old, &new);
let mut delta_builder = PlainDeltaBuilder::new();
for chunk in &chunks {
match chunk {
Chunk::Equal(s) => {
delta_builder = delta_builder.retain(FlowyStr::from(*s).utf16_size());
},
Chunk::Delete(s) => {
delta_builder = delta_builder.delete(FlowyStr::from(*s).utf16_size());
},
Chunk::Insert(s) => {
delta_builder = delta_builder.insert(*s);
},
}
}
delta_builder.build()
}
#[derive(Serialize, Deserialize)]
struct WorkspacesJson {
workspaces: Vec<Arc<Workspace>>,
}
impl WorkspacesJson {
fn new(workspaces: Vec<Arc<Workspace>>) -> Self { Self { workspaces } }
fn to_json(self) -> Result<String, String> {
serde_json::to_string(&self).map_err(|e| format!("format workspaces failed: {}", e))
}
}
#[cfg(test)]
mod tests {
use crate::folder::folder_data::RootFolder;
use chrono::Utc;
use flowy_core_data_model::{entities::prelude::Workspace, user_default};
use std::{borrow::Cow, sync::Arc};
#[test]
fn folder_add_workspace_serde_test() {
let mut folder = RootFolder {
workspaces: vec![],
trash: vec![],
};
let time = Utc::now();
let workspace_1 = user_default::create_default_workspace(time);
let delta_1 = folder.add_workspace(workspace_1).unwrap();
println!("{}", delta_1);
let workspace_2 = user_default::create_default_workspace(time);
let delta_2 = folder.add_workspace(workspace_2).unwrap();
println!("{}", delta_2);
}
#[test]
fn serial_folder_test() {
let time = Utc::now();
let workspace = user_default::create_default_workspace(time);
let id = workspace.id.clone();
let mut folder = RootFolder {
workspaces: vec![Arc::new(workspace)],
trash: vec![],
};
let mut cloned = folder.clone();
cloned.update_workspace(&id, Some("123".to_owned()), None);
println!("{}", serde_json::to_string(&folder).unwrap());
println!("{}", serde_json::to_string(&cloned).unwrap());
}
}

View File

@ -0,0 +1,5 @@
use lib_infra::future::BoxResultFuture;
pub trait FolderCloudPersistence: Send + Sync {
// fn read_folder(&self) -> BoxResultFuture<>
}

View File

@ -0,0 +1,2 @@
mod folder_data;
mod folder_manager;

View File

@ -1,6 +1,7 @@
pub mod client_document;
pub mod entities;
pub mod errors;
pub mod folder;
pub mod protobuf;
pub mod server_document;
pub mod synchronizer;

View File

@ -176,28 +176,6 @@ struct OpenDocHandle {
users: DashMap<String, Arc<dyn RevisionUser>>,
}
impl RevisionSyncPersistence for Arc<dyn DocumentCloudPersistence> {
fn read_revisions(
&self,
object_id: &str,
rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
(**self).read_revisions(object_id, rev_ids)
}
fn save_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> {
(**self).save_revisions(repeated_revision)
}
fn reset_object(
&self,
object_id: &str,
repeated_revision: RepeatedRevisionPB,
) -> BoxResultFuture<(), CollaborateError> {
(**self).reset_document(object_id, repeated_revision)
}
}
impl OpenDocHandle {
fn new(doc: DocumentInfo, persistence: Arc<dyn DocumentCloudPersistence>) -> Result<Self, CollaborateError> {
let doc_id = doc.doc_id.clone();
@ -263,6 +241,28 @@ impl std::ops::Drop for OpenDocHandle {
}
}
impl RevisionSyncPersistence for Arc<dyn DocumentCloudPersistence> {
fn read_revisions(
&self,
object_id: &str,
rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
(**self).read_revisions(object_id, rev_ids)
}
fn save_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> {
(**self).save_revisions(repeated_revision)
}
fn reset_object(
&self,
object_id: &str,
repeated_revision: RepeatedRevisionPB,
) -> BoxResultFuture<(), CollaborateError> {
(**self).reset_document(object_id, repeated_revision)
}
}
// #[derive(Debug)]
enum DocumentCommand {
ApplyRevisions {

View File

@ -17,8 +17,6 @@ impl ServerDocument {
let doc_id = doc_id.to_owned();
ServerDocument { doc_id, delta }
}
pub fn is_empty<C: InitialDocumentText>(&self) -> bool { self.delta == C::initial_delta() }
}
impl RevisionSyncObject<RichTextAttributes> for ServerDocument {

View File

@ -1,243 +0,0 @@
use crate::{
entities::{
revision::RevisionRange,
ws_data::{ServerRevisionWSData, ServerRevisionWSDataBuilder},
},
errors::CollaborateError,
protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
server_document::{document_pad::ServerDocument, DocumentCloudPersistence},
util::*,
};
use lib_infra::future::BoxResultFuture;
use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta};
use parking_lot::RwLock;
use std::{
cmp::Ordering,
fmt::Debug,
sync::{
atomic::{AtomicI64, Ordering::SeqCst},
Arc,
},
time::Duration,
};
pub trait RevisionUser: Send + Sync + Debug {
fn user_id(&self) -> String;
fn receive(&self, resp: RevisionSyncResponse);
}
pub trait RevisionSyncObject {
type SyncObject;
fn read_revisions(&self, rev_ids: Option<Vec<i64>>) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError>;
fn save_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>;
fn reset_object(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>;
}
pub enum RevisionSyncResponse {
Pull(ServerRevisionWSData),
Push(ServerRevisionWSData),
Ack(ServerRevisionWSData),
}
pub struct RevisionSynchronizer {
pub doc_id: String,
pub rev_id: AtomicI64,
document: Arc<RwLock<ServerDocument>>,
persistence: Arc<dyn DocumentCloudPersistence>,
}
impl RevisionSynchronizer {
pub fn new(
doc_id: &str,
rev_id: i64,
document: ServerDocument,
persistence: Arc<dyn DocumentCloudPersistence>,
) -> RevisionSynchronizer {
let document = Arc::new(RwLock::new(document));
RevisionSynchronizer {
doc_id: doc_id.to_string(),
rev_id: AtomicI64::new(rev_id),
document,
persistence,
}
}
#[tracing::instrument(level = "debug", skip(self, user, repeated_revision), err)]
pub async fn sync_revisions(
&self,
user: Arc<dyn RevisionUser>,
repeated_revision: RepeatedRevisionPB,
) -> Result<(), CollaborateError> {
let doc_id = self.doc_id.clone();
if repeated_revision.get_items().is_empty() {
// Return all the revisions to client
let revisions = self.persistence.read_revisions(&doc_id, None).await?;
let repeated_revision = repeated_revision_from_revision_pbs(revisions)?;
let data = ServerRevisionWSDataBuilder::build_push_message(&doc_id, repeated_revision);
user.receive(RevisionSyncResponse::Push(data));
return Ok(());
}
let server_base_rev_id = self.rev_id.load(SeqCst);
let first_revision = repeated_revision.get_items().first().unwrap().clone();
if self.is_applied_before(&first_revision, &self.persistence).await {
// Server has received this revision before, so ignore the following revisions
return Ok(());
}
match server_base_rev_id.cmp(&first_revision.rev_id) {
Ordering::Less => {
let server_rev_id = next(server_base_rev_id);
if server_base_rev_id == first_revision.base_rev_id || server_rev_id == first_revision.rev_id {
// The rev is in the right order, just compose it.
for revision in repeated_revision.get_items() {
let _ = self.compose_revision(revision)?;
}
let _ = self.persistence.save_revisions(repeated_revision).await?;
} else {
// The server document is outdated, pull the missing revision from the client.
let range = RevisionRange {
object_id: self.doc_id.clone(),
start: server_rev_id,
end: first_revision.rev_id,
};
let msg = ServerRevisionWSDataBuilder::build_pull_message(&self.doc_id, range);
user.receive(RevisionSyncResponse::Pull(msg));
}
},
Ordering::Equal => {
// Do nothing
tracing::warn!("Applied revision rev_id is the same as cur_rev_id");
},
Ordering::Greater => {
// The client document is outdated. Transform the client revision delta and then
// send the prime delta to the client. Client should compose the this prime
// delta.
let from_rev_id = first_revision.rev_id;
let to_rev_id = server_base_rev_id;
let _ = self.push_revisions_to_user(user, from_rev_id, to_rev_id).await;
},
}
Ok(())
}
#[tracing::instrument(level = "trace", skip(self, user), fields(server_rev_id), err)]
pub async fn pong(&self, user: Arc<dyn RevisionUser>, client_rev_id: i64) -> Result<(), CollaborateError> {
let doc_id = self.doc_id.clone();
let server_rev_id = self.rev_id();
tracing::Span::current().record("server_rev_id", &server_rev_id);
match server_rev_id.cmp(&client_rev_id) {
Ordering::Less => {
tracing::error!("Client should not send ping and the server should pull the revisions from the client")
},
Ordering::Equal => tracing::trace!("{} is up to date.", doc_id),
Ordering::Greater => {
// The client document is outdated. Transform the client revision delta and then
// send the prime delta to the client. Client should compose the this prime
// delta.
let from_rev_id = client_rev_id;
let to_rev_id = server_rev_id;
tracing::trace!("Push revisions to user");
let _ = self.push_revisions_to_user(user, from_rev_id, to_rev_id).await;
},
}
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, repeated_revision), fields(doc_id), err)]
pub async fn reset(&self, repeated_revision: RepeatedRevisionPB) -> Result<(), CollaborateError> {
let doc_id = self.doc_id.clone();
tracing::Span::current().record("doc_id", &doc_id.as_str());
let revisions: Vec<RevisionPB> = repeated_revision.get_items().to_vec();
let (_, rev_id) = pair_rev_id_from_revision_pbs(&revisions);
let delta = make_delta_from_revision_pb(revisions)?;
let _ = self.persistence.reset_document(&doc_id, repeated_revision).await?;
*self.document.write() = ServerDocument::from_delta(delta);
let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id));
Ok(())
}
pub fn doc_json(&self) -> String { self.document.read().to_json() }
fn compose_revision(&self, revision: &RevisionPB) -> Result<(), CollaborateError> {
let delta = RichTextDelta::from_bytes(&revision.delta_data)?;
let _ = self.compose_delta(delta)?;
let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id));
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, revision))]
fn transform_revision(&self, revision: &RevisionPB) -> Result<(RichTextDelta, RichTextDelta), CollaborateError> {
let cli_delta = RichTextDelta::from_bytes(&revision.delta_data)?;
let result = self.document.read().delta().transform(&cli_delta)?;
Ok(result)
}
fn compose_delta(&self, delta: RichTextDelta) -> Result<(), CollaborateError> {
if delta.is_empty() {
log::warn!("Composed delta is empty");
}
match self.document.try_write_for(Duration::from_millis(300)) {
None => log::error!("Failed to acquire write lock of document"),
Some(mut write_guard) => {
let _ = write_guard.compose_delta(delta);
},
}
Ok(())
}
pub(crate) fn rev_id(&self) -> i64 { self.rev_id.load(SeqCst) }
async fn is_applied_before(
&self,
new_revision: &RevisionPB,
persistence: &Arc<dyn DocumentCloudPersistence>,
) -> bool {
let rev_ids = Some(vec![new_revision.rev_id]);
if let Ok(revisions) = persistence.read_revisions(&self.doc_id, rev_ids).await {
if let Some(revision) = revisions.first() {
if revision.md5 == new_revision.md5 {
return true;
}
}
};
false
}
async fn push_revisions_to_user(&self, user: Arc<dyn RevisionUser>, from: i64, to: i64) {
let rev_ids: Vec<i64> = (from..=to).collect();
let revisions = match self.persistence.read_revisions(&self.doc_id, Some(rev_ids)).await {
Ok(revisions) => {
assert_eq!(
revisions.is_empty(),
false,
"revisions should not be empty if the doc exists"
);
revisions
},
Err(e) => {
tracing::error!("{}", e);
vec![]
},
};
tracing::debug!("Push revision: {} -> {} to client", from, to);
match repeated_revision_from_revision_pbs(revisions) {
Ok(repeated_revision) => {
let data = ServerRevisionWSDataBuilder::build_push_message(&self.doc_id, repeated_revision);
user.receive(RevisionSyncResponse::Push(data));
},
Err(e) => tracing::error!("{}", e),
}
}
}
#[inline]
fn next(rev_id: i64) -> i64 { rev_id + 1 }

View File

@ -8,7 +8,7 @@ use crate::{
util::*,
};
use lib_infra::future::BoxResultFuture;
use lib_ot::core::{Attributes, Delta, OperationTransformable};
use lib_ot::core::{Attributes, Delta};
use parking_lot::RwLock;
use serde::de::DeserializeOwned;
use std::{

View File

@ -14,10 +14,11 @@ strum = "0.21"
strum_macros = "0.21"
derive_more = {version = "0.99", features = ["display"]}
log = "0.4.14"
flowy-collaboration = { path = "../flowy-collaboration" }
uuid = { version = "0.8", features = ["serde", "v4"] }
chrono = { version = "0.4" }
error-code = { path = "../error-code" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
[features]
default = []

View File

@ -8,9 +8,10 @@ use crate::{
},
};
use flowy_derive::ProtoBuf;
use serde::{Deserialize, Serialize};
use std::convert::TryInto;
#[derive(PartialEq, ProtoBuf, Default, Debug, Clone)]
#[derive(PartialEq, ProtoBuf, Default, Debug, Clone, Serialize, Deserialize)]
pub struct App {
#[pb(index = 1)]
pub id: String,
@ -41,7 +42,8 @@ impl App {
pub fn take_belongings(&mut self) -> RepeatedView { std::mem::take(&mut self.belongings) }
}
#[derive(PartialEq, Debug, Default, ProtoBuf, Clone)]
#[derive(PartialEq, Debug, Default, ProtoBuf, Clone, Serialize, Deserialize)]
#[serde(transparent)]
pub struct RepeatedApp {
#[pb(index = 1)]
pub items: Vec<App>,

View File

@ -1,8 +1,47 @@
use crate::{entities::app::App, impl_def_and_def_mut};
use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
use serde::{Deserialize, Serialize};
use std::fmt::Formatter;
#[derive(PartialEq, Debug, ProtoBuf_Enum, Clone)]
#[derive(PartialEq, ProtoBuf, Default, Debug, Clone, Serialize, Deserialize)]
pub struct Trash {
#[pb(index = 1)]
pub id: String,
#[pb(index = 2)]
pub name: String,
#[pb(index = 3)]
pub modified_time: i64,
#[pb(index = 4)]
pub create_time: i64,
#[pb(index = 5)]
pub ty: TrashType,
}
#[derive(PartialEq, Debug, Default, ProtoBuf, Clone)]
pub struct RepeatedTrash {
#[pb(index = 1)]
pub items: Vec<Trash>,
}
impl_def_and_def_mut!(RepeatedTrash, Trash);
impl std::convert::From<App> for Trash {
fn from(app: App) -> Self {
Trash {
id: app.id,
name: app.name,
modified_time: app.modified_time,
create_time: app.create_time,
ty: TrashType::App,
}
}
}
#[derive(PartialEq, Debug, ProtoBuf_Enum, Clone, Serialize, Deserialize)]
pub enum TrashType {
Unknown = 0,
View = 1,
@ -97,41 +136,3 @@ impl std::convert::From<&Trash> for TrashId {
impl std::fmt::Display for TrashId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str(&format!("{:?}:{}", self.ty, self.id)) }
}
#[derive(PartialEq, ProtoBuf, Default, Debug, Clone)]
pub struct Trash {
#[pb(index = 1)]
pub id: String,
#[pb(index = 2)]
pub name: String,
#[pb(index = 3)]
pub modified_time: i64,
#[pb(index = 4)]
pub create_time: i64,
#[pb(index = 5)]
pub ty: TrashType,
}
#[derive(PartialEq, Debug, Default, ProtoBuf, Clone)]
pub struct RepeatedTrash {
#[pb(index = 1)]
pub items: Vec<Trash>,
}
impl_def_and_def_mut!(RepeatedTrash, Trash);
impl std::convert::From<App> for Trash {
fn from(app: App) -> Self {
Trash {
id: app.id,
name: app.name,
modified_time: app.modified_time,
create_time: app.create_time,
ty: TrashType::App,
}
}
}

View File

@ -7,11 +7,11 @@ use crate::{
view::{ViewDesc, ViewIdentify, ViewName, ViewThumbnail},
},
};
use flowy_collaboration::{client_document::default::initial_delta_string, entities::doc::DocumentId};
use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
use serde::{Deserialize, Serialize};
use std::convert::TryInto;
#[derive(PartialEq, ProtoBuf, Default, Debug, Clone)]
#[derive(PartialEq, ProtoBuf, Default, Debug, Clone, Serialize, Deserialize)]
pub struct View {
#[pb(index = 1)]
pub id: String,
@ -41,7 +41,8 @@ pub struct View {
pub create_time: i64,
}
#[derive(PartialEq, Debug, Default, ProtoBuf, Clone)]
#[derive(PartialEq, Debug, Default, ProtoBuf, Clone, Serialize, Deserialize)]
#[serde(transparent)]
pub struct RepeatedView {
#[pb(index = 1)]
pub items: Vec<View>,
@ -61,7 +62,7 @@ impl std::convert::From<View> for Trash {
}
}
#[derive(PartialEq, Debug, ProtoBuf_Enum, Clone)]
#[derive(PartialEq, Debug, ProtoBuf_Enum, Clone, Serialize, Deserialize)]
pub enum ViewType {
Blank = 0,
Doc = 1,
@ -155,7 +156,7 @@ impl TryInto<CreateViewParams> for CreateViewRequest {
fn try_into(self) -> Result<CreateViewParams, Self::Error> {
let name = ViewName::parse(self.name)?.0;
let belong_to_id = AppIdentify::parse(self.belong_to_id)?.0;
let view_data = initial_delta_string();
let view_data = "".to_string();
let view_id = uuid::Uuid::new_v4().to_string();
let thumbnail = match self.thumbnail {
None => "".to_string(),
@ -190,14 +191,6 @@ impl std::convert::From<String> for ViewId {
fn from(view_id: String) -> Self { ViewId { view_id } }
}
impl std::convert::From<ViewId> for DocumentId {
fn from(identifier: ViewId) -> Self {
DocumentId {
doc_id: identifier.view_id,
}
}
}
impl TryInto<ViewId> for QueryViewRequest {
type Error = ErrorCode;
fn try_into(self) -> Result<ViewId, Self::Error> {

View File

@ -5,9 +5,10 @@ use crate::{
parser::workspace::{WorkspaceDesc, WorkspaceIdentify, WorkspaceName},
};
use flowy_derive::ProtoBuf;
use serde::{Deserialize, Serialize};
use std::convert::TryInto;
#[derive(PartialEq, ProtoBuf, Default, Debug, Clone)]
#[derive(PartialEq, ProtoBuf, Default, Debug, Clone, Serialize, Deserialize)]
pub struct Workspace {
#[pb(index = 1)]
pub id: String,

View File

@ -1,4 +1,6 @@
use crate::core::{trim, Attributes, Delta};
use crate::core::{trim, Attributes, Delta, PlainTextAttributes};
pub type PlainDeltaBuilder = DeltaBuilder<PlainTextAttributes>;
pub struct DeltaBuilder<T: Attributes> {
delta: Delta<T>,

View File

@ -13,7 +13,7 @@ use std::{
str::FromStr,
};
// TODO: optimize the memory usage with Arc_mut or Cow
// TODO: optimize the memory usage with Arc::make_mut or Cow
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Delta<T: Attributes> {
pub ops: Vec<Operation<T>>,

View File

@ -1,9 +1,10 @@
use crate::{
core::{Attributes, Operation},
core::{Attributes, Operation, PlainTextAttributes},
rich_text::RichTextAttributes,
};
pub type RichTextOpBuilder = OpBuilder<RichTextAttributes>;
pub type PlainTextOpBuilder = OpBuilder<PlainTextAttributes>;
pub struct OpBuilder<T: Attributes> {
ty: Operation<T>,

View File

@ -1,12 +1,13 @@
use crate::{
core::{FlowyStr, Interval, OpBuilder, OperationTransformable},
errors::OTError,
rich_text::{RichTextAttribute, RichTextAttributes},
};
use serde::__private::Formatter;
use std::{
cmp::min,
fmt,
fmt::Debug,
fmt::{Debug, Display},
ops::{Deref, DerefMut},
};
@ -19,13 +20,6 @@ pub trait Attributes: fmt::Display + Eq + PartialEq + Default + Clone + Debug +
fn extend_other(&mut self, other: Self);
}
pub type RichTextOperation = Operation<RichTextAttributes>;
impl RichTextOperation {
pub fn contain_attribute(&self, attribute: &RichTextAttribute) -> bool {
self.get_attributes().contains_key(&attribute.key)
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum Operation<T: Attributes> {
Delete(usize),
@ -328,3 +322,25 @@ where
}
}
}
#[derive(Debug, Clone, Eq, PartialEq, Default)]
pub struct PlainTextAttributes();
impl fmt::Display for PlainTextAttributes {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("PlainTextAttributes") }
}
impl Attributes for PlainTextAttributes {
fn is_empty(&self) -> bool { true }
fn remove_empty(&mut self) {}
fn extend_other(&mut self, _other: Self) {}
}
impl OperationTransformable for PlainTextAttributes {
fn compose(&self, _other: &Self) -> Result<Self, OTError> { Ok(self.clone()) }
fn transform(&self, other: &Self) -> Result<(Self, Self), OTError> { Ok((self.clone(), other.clone())) }
fn invert(&self, _other: &Self) -> Self { self.clone() }
}

View File

@ -1,7 +1,7 @@
#![allow(non_snake_case)]
use crate::{
block_attribute,
core::{Attributes, OperationTransformable, RichTextOperation},
core::{Attributes, Operation, OperationTransformable},
errors::OTError,
ignore_attribute,
inline_attribute,
@ -16,6 +16,13 @@ use std::{
};
use strum_macros::Display;
pub type RichTextOperation = Operation<RichTextAttributes>;
impl RichTextOperation {
pub fn contain_attribute(&self, attribute: &RichTextAttribute) -> bool {
self.get_attributes().contains_key(&attribute.key)
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct RichTextAttributes {
pub(crate) inner: HashMap<RichTextAttributeKey, RichTextAttributeValue>,