chore: remove pb reference in util.rs

This commit is contained in:
appflowy
2022-07-19 13:07:30 +08:00
parent e7c672cb7e
commit 0ecff26636
7 changed files with 95 additions and 132 deletions

View File

@ -1,7 +1,7 @@
use flowy_sync::entities::revision::{RepeatedRevision, Revision};
use flowy_sync::{ use flowy_sync::{
entities::{folder::FolderInfo, text_block::DocumentPB}, entities::{folder::FolderInfo, text_block::DocumentPB},
errors::CollaborateError, errors::CollaborateError,
protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
server_document::*, server_document::*,
server_folder::FolderCloudPersistence, server_folder::FolderCloudPersistence,
util::{make_document_from_revision_pbs, make_folder_from_revisions_pb}, util::{make_document_from_revision_pbs, make_folder_from_revisions_pb},
@ -15,17 +15,17 @@ use std::{
// For the moment, we use memory to cache the data, it will be implemented with // For the moment, we use memory to cache the data, it will be implemented with
// other storage. Like the Firestore,Dropbox.etc. // other storage. Like the Firestore,Dropbox.etc.
pub trait RevisionCloudStorage: Send + Sync { pub trait RevisionCloudStorage: Send + Sync {
fn set_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>; fn set_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError>;
fn get_revisions( fn get_revisions(
&self, &self,
object_id: &str, object_id: &str,
rev_ids: Option<Vec<i64>>, rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<RepeatedRevisionPB, CollaborateError>; ) -> BoxResultFuture<RepeatedRevision, CollaborateError>;
fn reset_object( fn reset_object(
&self, &self,
object_id: &str, object_id: &str,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevision,
) -> BoxResultFuture<(), CollaborateError>; ) -> BoxResultFuture<(), CollaborateError>;
} }
@ -64,7 +64,7 @@ impl FolderCloudPersistence for LocalTextBlockCloudPersistence {
&self, &self,
_user_id: &str, _user_id: &str,
folder_id: &str, folder_id: &str,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevision,
) -> BoxResultFuture<Option<FolderInfo>, CollaborateError> { ) -> BoxResultFuture<Option<FolderInfo>, CollaborateError> {
let folder_id = folder_id.to_owned(); let folder_id = folder_id.to_owned();
let storage = self.storage.clone(); let storage = self.storage.clone();
@ -74,7 +74,7 @@ impl FolderCloudPersistence for LocalTextBlockCloudPersistence {
}) })
} }
fn save_folder_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> { fn save_folder_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> {
let storage = self.storage.clone(); let storage = self.storage.clone();
Box::pin(async move { Box::pin(async move {
let _ = storage.set_revisions(repeated_revision).await?; let _ = storage.set_revisions(repeated_revision).await?;
@ -86,20 +86,19 @@ impl FolderCloudPersistence for LocalTextBlockCloudPersistence {
&self, &self,
folder_id: &str, folder_id: &str,
rev_ids: Option<Vec<i64>>, rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> { ) -> BoxResultFuture<Vec<Revision>, CollaborateError> {
let folder_id = folder_id.to_owned(); let folder_id = folder_id.to_owned();
let storage = self.storage.clone(); let storage = self.storage.clone();
Box::pin(async move { Box::pin(async move {
let mut repeated_revision = storage.get_revisions(&folder_id, rev_ids).await?; let repeated_revision = storage.get_revisions(&folder_id, rev_ids).await?;
let revisions: Vec<RevisionPB> = repeated_revision.take_items().into(); Ok(repeated_revision.into_inner())
Ok(revisions)
}) })
} }
fn reset_folder( fn reset_folder(
&self, &self,
folder_id: &str, folder_id: &str,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevision,
) -> BoxResultFuture<(), CollaborateError> { ) -> BoxResultFuture<(), CollaborateError> {
let storage = self.storage.clone(); let storage = self.storage.clone();
let folder_id = folder_id.to_owned(); let folder_id = folder_id.to_owned();
@ -126,7 +125,7 @@ impl TextBlockCloudPersistence for LocalTextBlockCloudPersistence {
fn create_text_block( fn create_text_block(
&self, &self,
doc_id: &str, doc_id: &str,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevision,
) -> BoxResultFuture<Option<DocumentPB>, CollaborateError> { ) -> BoxResultFuture<Option<DocumentPB>, CollaborateError> {
let doc_id = doc_id.to_owned(); let doc_id = doc_id.to_owned();
let storage = self.storage.clone(); let storage = self.storage.clone();
@ -140,20 +139,16 @@ impl TextBlockCloudPersistence for LocalTextBlockCloudPersistence {
&self, &self,
doc_id: &str, doc_id: &str,
rev_ids: Option<Vec<i64>>, rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> { ) -> BoxResultFuture<Vec<Revision>, CollaborateError> {
let doc_id = doc_id.to_owned(); let doc_id = doc_id.to_owned();
let storage = self.storage.clone(); let storage = self.storage.clone();
Box::pin(async move { Box::pin(async move {
let mut repeated_revision = storage.get_revisions(&doc_id, rev_ids).await?; let repeated_revision = storage.get_revisions(&doc_id, rev_ids).await?;
let revisions: Vec<RevisionPB> = repeated_revision.take_items().into(); Ok(repeated_revision.into_inner())
Ok(revisions)
}) })
} }
fn save_text_block_revisions( fn save_text_block_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> {
&self,
repeated_revision: RepeatedRevisionPB,
) -> BoxResultFuture<(), CollaborateError> {
let storage = self.storage.clone(); let storage = self.storage.clone();
Box::pin(async move { Box::pin(async move {
let _ = storage.set_revisions(repeated_revision).await?; let _ = storage.set_revisions(repeated_revision).await?;
@ -161,7 +156,7 @@ impl TextBlockCloudPersistence for LocalTextBlockCloudPersistence {
}) })
} }
fn reset_text_block(&self, doc_id: &str, revisions: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> { fn reset_text_block(&self, doc_id: &str, revisions: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> {
let storage = self.storage.clone(); let storage = self.storage.clone();
let doc_id = doc_id.to_owned(); let doc_id = doc_id.to_owned();
Box::pin(async move { Box::pin(async move {
@ -174,7 +169,7 @@ impl TextBlockCloudPersistence for LocalTextBlockCloudPersistence {
#[derive(Default)] #[derive(Default)]
struct MemoryDocumentCloudStorage {} struct MemoryDocumentCloudStorage {}
impl RevisionCloudStorage for MemoryDocumentCloudStorage { impl RevisionCloudStorage for MemoryDocumentCloudStorage {
fn set_revisions(&self, _repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> { fn set_revisions(&self, _repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> {
Box::pin(async move { Ok(()) }) Box::pin(async move { Ok(()) })
} }
@ -182,9 +177,9 @@ impl RevisionCloudStorage for MemoryDocumentCloudStorage {
&self, &self,
_doc_id: &str, _doc_id: &str,
_rev_ids: Option<Vec<i64>>, _rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<RepeatedRevisionPB, CollaborateError> { ) -> BoxResultFuture<RepeatedRevision, CollaborateError> {
Box::pin(async move { Box::pin(async move {
let repeated_revisions = RepeatedRevisionPB::new(); let repeated_revisions = RepeatedRevision::default();
Ok(repeated_revisions) Ok(repeated_revisions)
}) })
} }
@ -192,7 +187,7 @@ impl RevisionCloudStorage for MemoryDocumentCloudStorage {
fn reset_object( fn reset_object(
&self, &self,
_doc_id: &str, _doc_id: &str,
_repeated_revision: RepeatedRevisionPB, _repeated_revision: RepeatedRevision,
) -> BoxResultFuture<(), CollaborateError> { ) -> BoxResultFuture<(), CollaborateError> {
Box::pin(async move { Ok(()) }) Box::pin(async move { Ok(()) })
} }

View File

@ -28,9 +28,9 @@ pub fn make_enum_token_stream(_ctxt: &Ctxt, cont: &ASTContainer) -> Option<Token
} }
} }
impl std::convert::Into<crate::protobuf::#pb_enum> for #enum_ident { impl std::convert::From<#enum_ident> for crate::protobuf::#pb_enum{
fn into(self) -> crate::protobuf::#pb_enum { fn from(o: #enum_ident) -> crate::protobuf::#pb_enum {
match self { match o {
#(#build_to_pb_enum)* #(#build_to_pb_enum)*
} }
} }

View File

@ -25,8 +25,8 @@ pub fn make_se_token_stream(ctxt: &Ctxt, ast: &ASTContainer) -> Option<TokenStre
} }
} }
impl std::convert::Into<crate::protobuf::#pb_ty> for #struct_ident { impl std::convert::From<#struct_ident> for crate::protobuf::#pb_ty {
fn into(self) -> crate::protobuf::#pb_ty { fn from(mut o: #struct_ident) -> crate::protobuf::#pb_ty {
let mut pb = crate::protobuf::#pb_ty::new(); let mut pb = crate::protobuf::#pb_ty::new();
#(#build_set_pb_fields)* #(#build_set_pb_fields)*
pb pb
@ -40,7 +40,7 @@ pub fn make_se_token_stream(ctxt: &Ctxt, ast: &ASTContainer) -> Option<TokenStre
fn se_token_stream_for_field(ctxt: &Ctxt, field: &ASTField, _take: bool) -> Option<TokenStream> { fn se_token_stream_for_field(ctxt: &Ctxt, field: &ASTField, _take: bool) -> Option<TokenStream> {
if let Some(func) = &field.attrs.serialize_with() { if let Some(func) = &field.attrs.serialize_with() {
let member = &field.member; let member = &field.member;
Some(quote! { pb.#member=self.#func(); }) Some(quote! { pb.#member=o.#func(); })
} else if field.attrs.is_one_of() { } else if field.attrs.is_one_of() {
token_stream_for_one_of(ctxt, field) token_stream_for_one_of(ctxt, field)
} else { } else {
@ -65,19 +65,19 @@ fn token_stream_for_one_of(ctxt: &Ctxt, field: &ASTField) -> Option<TokenStream>
match ident_category(bracketed_ty_info.unwrap().ident) { match ident_category(bracketed_ty_info.unwrap().ident) {
TypeCategory::Protobuf => Some(quote! { TypeCategory::Protobuf => Some(quote! {
match self.#member { match o.#member {
Some(s) => { pb.#set_func(s.into()) } Some(s) => { pb.#set_func(s.into()) }
None => {} None => {}
} }
}), }),
TypeCategory::Enum => Some(quote! { TypeCategory::Enum => Some(quote! {
match self.#member { match o.#member {
Some(s) => { pb.#set_func(s.into()) } Some(s) => { pb.#set_func(s.into()) }
None => {} None => {}
} }
}), }),
_ => Some(quote! { _ => Some(quote! {
match self.#member { match o.#member {
Some(ref s) => { pb.#set_func(s.clone()) } Some(ref s) => { pb.#set_func(s.clone()) }
None => {} None => {}
} }
@ -99,18 +99,16 @@ fn gen_token_stream(ctxt: &Ctxt, member: &syn::Member, ty: &syn::Type, is_option
TypeCategory::Str => { TypeCategory::Str => {
if is_option { if is_option {
Some(quote! { Some(quote! {
match self.#member { match o.#member {
Some(ref s) => { pb.#member = s.to_string().clone(); } Some(ref s) => { pb.#member = s.to_string().clone(); }
None => { pb.#member = String::new(); } None => { pb.#member = String::new(); }
} }
}) })
} else { } else {
Some(quote! { pb.#member = self.#member.clone(); }) Some(quote! { pb.#member = o.#member.clone(); })
} }
} }
TypeCategory::Protobuf => { TypeCategory::Protobuf => Some(quote! { pb.#member = ::protobuf::SingularPtrField::some(o.#member.into()); }),
Some(quote! { pb.#member = ::protobuf::SingularPtrField::some(self.#member.into()); })
}
TypeCategory::Opt => gen_token_stream(ctxt, member, ty_info.bracket_ty_info.unwrap().ty, true), TypeCategory::Opt => gen_token_stream(ctxt, member, ty_info.bracket_ty_info.unwrap().ty, true),
TypeCategory::Enum => { TypeCategory::Enum => {
// let pb_enum_ident = format_ident!("{}", ty_info.ident.to_string()); // let pb_enum_ident = format_ident!("{}", ty_info.ident.to_string());
@ -118,10 +116,10 @@ fn gen_token_stream(ctxt: &Ctxt, member: &syn::Member, ty: &syn::Type, is_option
// flowy_protobuf::#pb_enum_ident::from_i32(self.#member.value()).unwrap(); // flowy_protobuf::#pb_enum_ident::from_i32(self.#member.value()).unwrap();
// }) // })
Some(quote! { Some(quote! {
pb.#member = self.#member.into(); pb.#member = o.#member.into();
}) })
} }
_ => Some(quote! { pb.#member = self.#member; }), _ => Some(quote! { pb.#member = o.#member; }),
} }
} }
@ -138,15 +136,15 @@ fn token_stream_for_vec(ctxt: &Ctxt, member: &syn::Member, ty: &syn::Type) -> Op
match ident_category(ty_info.ident) { match ident_category(ty_info.ident) {
TypeCategory::Protobuf => Some(quote! { TypeCategory::Protobuf => Some(quote! {
pb.#member = ::protobuf::RepeatedField::from_vec( pb.#member = ::protobuf::RepeatedField::from_vec(
self.#member o.#member
.into_iter() .into_iter()
.map(|m| m.into()) .map(|m| m.into())
.collect()); .collect());
}), }),
TypeCategory::Bytes => Some(quote! { pb.#member = self.#member.clone(); }), TypeCategory::Bytes => Some(quote! { pb.#member = o.#member.clone(); }),
_ => Some(quote! { _ => Some(quote! {
pb.#member = ::protobuf::RepeatedField::from_vec(self.#member.clone()); pb.#member = ::protobuf::RepeatedField::from_vec(o.#member.clone());
}), }),
} }
} }
@ -165,14 +163,14 @@ fn token_stream_for_map(ctxt: &Ctxt, member: &syn::Member, ty: &syn::Type) -> Op
match ident_category(ty_info.ident) { match ident_category(ty_info.ident) {
TypeCategory::Protobuf => Some(quote! { TypeCategory::Protobuf => Some(quote! {
let mut m: std::collections::HashMap<String, crate::protobuf::#value_ty> = std::collections::HashMap::new(); let mut m: std::collections::HashMap<String, crate::protobuf::#value_ty> = std::collections::HashMap::new();
self.#member.into_iter().for_each(|(k,v)| { o.#member.into_iter().for_each(|(k,v)| {
m.insert(k.clone(), v.into()); m.insert(k.clone(), v.into());
}); });
pb.#member = m; pb.#member = m;
}), }),
_ => Some(quote! { _ => Some(quote! {
let mut m: std::collections::HashMap<String, #value_ty> = std::collections::HashMap::new(); let mut m: std::collections::HashMap<String, #value_ty> = std::collections::HashMap::new();
self.#member.iter().for_each(|(k,v)| { o.#member.iter().for_each(|(k,v)| {
m.insert(k.clone(), v.clone()); m.insert(k.clone(), v.clone());
}); });
pb.#member = m; pb.#member = m;

View File

@ -1,7 +1,8 @@
use crate::entities::revision::{RepeatedRevision, Revision};
use crate::{ use crate::{
entities::{text_block::DocumentPB, ws_data::ServerRevisionWSDataBuilder}, entities::{text_block::DocumentPB, ws_data::ServerRevisionWSDataBuilder},
errors::{internal_error, CollaborateError, CollaborateResult}, errors::{internal_error, CollaborateError, CollaborateResult},
protobuf::{ClientRevisionWSData, RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, protobuf::ClientRevisionWSData,
server_document::document_pad::ServerDocument, server_document::document_pad::ServerDocument,
synchronizer::{RevisionSyncPersistence, RevisionSyncResponse, RevisionSynchronizer, RevisionUser}, synchronizer::{RevisionSyncPersistence, RevisionSyncResponse, RevisionSynchronizer, RevisionUser},
util::rev_id_from_str, util::rev_id_from_str,
@ -23,22 +24,21 @@ pub trait TextBlockCloudPersistence: Send + Sync + Debug {
fn create_text_block( fn create_text_block(
&self, &self,
doc_id: &str, doc_id: &str,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevision,
) -> BoxResultFuture<Option<DocumentPB>, CollaborateError>; ) -> BoxResultFuture<Option<DocumentPB>, CollaborateError>;
fn read_text_block_revisions( fn read_text_block_revisions(
&self, &self,
doc_id: &str, doc_id: &str,
rev_ids: Option<Vec<i64>>, rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError>; ) -> BoxResultFuture<Vec<Revision>, CollaborateError>;
fn save_text_block_revisions(&self, repeated_revision: RepeatedRevisionPB) fn save_text_block_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError>;
-> BoxResultFuture<(), CollaborateError>;
fn reset_text_block( fn reset_text_block(
&self, &self,
doc_id: &str, doc_id: &str,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevision,
) -> BoxResultFuture<(), CollaborateError>; ) -> BoxResultFuture<(), CollaborateError>;
} }
@ -47,18 +47,18 @@ impl RevisionSyncPersistence for Arc<dyn TextBlockCloudPersistence> {
&self, &self,
object_id: &str, object_id: &str,
rev_ids: Option<Vec<i64>>, rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> { ) -> BoxResultFuture<Vec<Revision>, CollaborateError> {
(**self).read_text_block_revisions(object_id, rev_ids) (**self).read_text_block_revisions(object_id, rev_ids)
} }
fn save_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> { fn save_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> {
(**self).save_text_block_revisions(repeated_revision) (**self).save_text_block_revisions(repeated_revision)
} }
fn reset_object( fn reset_object(
&self, &self,
object_id: &str, object_id: &str,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevision,
) -> BoxResultFuture<(), CollaborateError> { ) -> BoxResultFuture<(), CollaborateError> {
(**self).reset_text_block(object_id, repeated_revision) (**self).reset_text_block(object_id, repeated_revision)
} }
@ -82,7 +82,7 @@ impl ServerDocumentManager {
user: Arc<dyn RevisionUser>, user: Arc<dyn RevisionUser>,
mut client_data: ClientRevisionWSData, mut client_data: ClientRevisionWSData,
) -> Result<(), CollaborateError> { ) -> Result<(), CollaborateError> {
let repeated_revision = client_data.take_revisions(); let repeated_revision: RepeatedRevision = client_data.take_revisions().into();
let cloned_user = user.clone(); let cloned_user = user.clone();
let ack_id = rev_id_from_str(&client_data.data_id)?; let ack_id = rev_id_from_str(&client_data.data_id)?;
let object_id = client_data.object_id; let object_id = client_data.object_id;
@ -131,9 +131,10 @@ impl ServerDocumentManager {
pub async fn handle_document_reset( pub async fn handle_document_reset(
&self, &self,
doc_id: &str, doc_id: &str,
mut repeated_revision: RepeatedRevisionPB, mut repeated_revision: RepeatedRevision,
) -> Result<(), CollaborateError> { ) -> Result<(), CollaborateError> {
repeated_revision.mut_items().sort_by(|a, b| a.rev_id.cmp(&b.rev_id)); repeated_revision.sort_by(|a, b| a.rev_id.cmp(&b.rev_id));
match self.get_document_handler(doc_id).await { match self.get_document_handler(doc_id).await {
None => { None => {
tracing::warn!("Document:{} doesn't exist, ignore document reset", doc_id); tracing::warn!("Document:{} doesn't exist, ignore document reset", doc_id);
@ -166,7 +167,7 @@ impl ServerDocumentManager {
async fn create_document( async fn create_document(
&self, &self,
doc_id: &str, doc_id: &str,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevision,
) -> Result<Arc<OpenDocumentHandler>, CollaborateError> { ) -> Result<Arc<OpenDocumentHandler>, CollaborateError> {
match self.persistence.create_text_block(doc_id, repeated_revision).await? { match self.persistence.create_text_block(doc_id, repeated_revision).await? {
None => Err(CollaborateError::internal().context("Create document info from revisions failed")), None => Err(CollaborateError::internal().context("Create document info from revisions failed")),
@ -229,7 +230,7 @@ impl OpenDocumentHandler {
async fn apply_revisions( async fn apply_revisions(
&self, &self,
user: Arc<dyn RevisionUser>, user: Arc<dyn RevisionUser>,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevision,
) -> Result<(), CollaborateError> { ) -> Result<(), CollaborateError> {
let (ret, rx) = oneshot::channel(); let (ret, rx) = oneshot::channel();
self.users.insert(user.user_id(), user.clone()); self.users.insert(user.user_id(), user.clone());
@ -252,7 +253,7 @@ impl OpenDocumentHandler {
} }
#[tracing::instrument(level = "debug", skip(self, repeated_revision), err)] #[tracing::instrument(level = "debug", skip(self, repeated_revision), err)]
async fn apply_document_reset(&self, repeated_revision: RepeatedRevisionPB) -> Result<(), CollaborateError> { async fn apply_document_reset(&self, repeated_revision: RepeatedRevision) -> Result<(), CollaborateError> {
let (ret, rx) = oneshot::channel(); let (ret, rx) = oneshot::channel();
let msg = DocumentCommand::Reset { repeated_revision, ret }; let msg = DocumentCommand::Reset { repeated_revision, ret };
let result = self.send(msg, rx).await?; let result = self.send(msg, rx).await?;
@ -279,7 +280,7 @@ impl std::ops::Drop for OpenDocumentHandler {
enum DocumentCommand { enum DocumentCommand {
ApplyRevisions { ApplyRevisions {
user: Arc<dyn RevisionUser>, user: Arc<dyn RevisionUser>,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevision,
ret: oneshot::Sender<CollaborateResult<()>>, ret: oneshot::Sender<CollaborateResult<()>>,
}, },
Ping { Ping {
@ -288,7 +289,7 @@ enum DocumentCommand {
ret: oneshot::Sender<CollaborateResult<()>>, ret: oneshot::Sender<CollaborateResult<()>>,
}, },
Reset { Reset {
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevision,
ret: oneshot::Sender<CollaborateResult<()>>, ret: oneshot::Sender<CollaborateResult<()>>,
}, },
} }

View File

@ -1,10 +1,11 @@
use crate::entities::revision::{RepeatedRevision, Revision};
use crate::{ use crate::{
entities::{ entities::{
folder::{FolderDelta, FolderInfo}, folder::{FolderDelta, FolderInfo},
ws_data::ServerRevisionWSDataBuilder, ws_data::ServerRevisionWSDataBuilder,
}, },
errors::{internal_error, CollaborateError, CollaborateResult}, errors::{internal_error, CollaborateError, CollaborateResult},
protobuf::{ClientRevisionWSData, RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, protobuf::ClientRevisionWSData,
server_folder::folder_pad::ServerFolder, server_folder::folder_pad::ServerFolder,
synchronizer::{RevisionSyncPersistence, RevisionSyncResponse, RevisionSynchronizer, RevisionUser}, synchronizer::{RevisionSyncPersistence, RevisionSyncResponse, RevisionSynchronizer, RevisionUser},
util::rev_id_from_str, util::rev_id_from_str,
@ -26,21 +27,21 @@ pub trait FolderCloudPersistence: Send + Sync + Debug {
&self, &self,
user_id: &str, user_id: &str,
folder_id: &str, folder_id: &str,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevision,
) -> BoxResultFuture<Option<FolderInfo>, CollaborateError>; ) -> BoxResultFuture<Option<FolderInfo>, CollaborateError>;
fn save_folder_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>; fn save_folder_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError>;
fn read_folder_revisions( fn read_folder_revisions(
&self, &self,
folder_id: &str, folder_id: &str,
rev_ids: Option<Vec<i64>>, rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError>; ) -> BoxResultFuture<Vec<Revision>, CollaborateError>;
fn reset_folder( fn reset_folder(
&self, &self,
folder_id: &str, folder_id: &str,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevision,
) -> BoxResultFuture<(), CollaborateError>; ) -> BoxResultFuture<(), CollaborateError>;
} }
@ -49,18 +50,18 @@ impl RevisionSyncPersistence for Arc<dyn FolderCloudPersistence> {
&self, &self,
object_id: &str, object_id: &str,
rev_ids: Option<Vec<i64>>, rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> { ) -> BoxResultFuture<Vec<Revision>, CollaborateError> {
(**self).read_folder_revisions(object_id, rev_ids) (**self).read_folder_revisions(object_id, rev_ids)
} }
fn save_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> { fn save_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> {
(**self).save_folder_revisions(repeated_revision) (**self).save_folder_revisions(repeated_revision)
} }
fn reset_object( fn reset_object(
&self, &self,
object_id: &str, object_id: &str,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevision,
) -> BoxResultFuture<(), CollaborateError> { ) -> BoxResultFuture<(), CollaborateError> {
(**self).reset_folder(object_id, repeated_revision) (**self).reset_folder(object_id, repeated_revision)
} }
@ -84,7 +85,7 @@ impl ServerFolderManager {
user: Arc<dyn RevisionUser>, user: Arc<dyn RevisionUser>,
mut client_data: ClientRevisionWSData, mut client_data: ClientRevisionWSData,
) -> Result<(), CollaborateError> { ) -> Result<(), CollaborateError> {
let repeated_revision = client_data.take_revisions(); let repeated_revision: RepeatedRevision = client_data.take_revisions().into();
let cloned_user = user.clone(); let cloned_user = user.clone();
let ack_id = rev_id_from_str(&client_data.data_id)?; let ack_id = rev_id_from_str(&client_data.data_id)?;
let folder_id = client_data.object_id; let folder_id = client_data.object_id;
@ -167,7 +168,7 @@ impl ServerFolderManager {
&self, &self,
user_id: &str, user_id: &str,
folder_id: &str, folder_id: &str,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevision,
) -> Result<Arc<OpenFolderHandler>, CollaborateError> { ) -> Result<Arc<OpenFolderHandler>, CollaborateError> {
match self match self
.persistence .persistence
@ -221,7 +222,7 @@ impl OpenFolderHandler {
async fn apply_revisions( async fn apply_revisions(
&self, &self,
user: Arc<dyn RevisionUser>, user: Arc<dyn RevisionUser>,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevision,
) -> CollaborateResult<()> { ) -> CollaborateResult<()> {
let (ret, rx) = oneshot::channel(); let (ret, rx) = oneshot::channel();
let msg = FolderCommand::ApplyRevisions { let msg = FolderCommand::ApplyRevisions {
@ -258,7 +259,7 @@ impl std::ops::Drop for OpenFolderHandler {
enum FolderCommand { enum FolderCommand {
ApplyRevisions { ApplyRevisions {
user: Arc<dyn RevisionUser>, user: Arc<dyn RevisionUser>,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevision,
ret: oneshot::Sender<CollaborateResult<()>>, ret: oneshot::Sender<CollaborateResult<()>>,
}, },
Ping { Ping {

View File

@ -1,10 +1,11 @@
use crate::entities::revision::{RepeatedRevision, Revision};
use crate::{ use crate::{
entities::{ entities::{
revision::RevisionRange, revision::RevisionRange,
ws_data::{ServerRevisionWSData, ServerRevisionWSDataBuilder}, ws_data::{ServerRevisionWSData, ServerRevisionWSDataBuilder},
}, },
errors::CollaborateError, errors::CollaborateError,
protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, protobuf::Revision as RevisionPB,
util::*, util::*,
}; };
use lib_infra::future::BoxResultFuture; use lib_infra::future::BoxResultFuture;
@ -31,14 +32,14 @@ pub trait RevisionSyncPersistence: Send + Sync + 'static {
&self, &self,
object_id: &str, object_id: &str,
rev_ids: Option<Vec<i64>>, rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError>; ) -> BoxResultFuture<Vec<Revision>, CollaborateError>;
fn save_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>; fn save_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError>;
fn reset_object( fn reset_object(
&self, &self,
object_id: &str, object_id: &str,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevision,
) -> BoxResultFuture<(), CollaborateError>; ) -> BoxResultFuture<(), CollaborateError>;
} }
@ -87,20 +88,20 @@ where
pub async fn sync_revisions( pub async fn sync_revisions(
&self, &self,
user: Arc<dyn RevisionUser>, user: Arc<dyn RevisionUser>,
repeated_revision: RepeatedRevisionPB, repeated_revision: RepeatedRevision,
) -> Result<(), CollaborateError> { ) -> Result<(), CollaborateError> {
let object_id = self.object_id.clone(); let object_id = self.object_id.clone();
if repeated_revision.get_items().is_empty() { if repeated_revision.is_empty() {
// Return all the revisions to client // Return all the revisions to client
let revisions = self.persistence.read_revisions(&object_id, None).await?; let revisions = self.persistence.read_revisions(&object_id, None).await?;
let repeated_revision = repeated_revision_from_revision_pbs(revisions)?; let repeated_revision = RepeatedRevision::from(revisions);
let data = ServerRevisionWSDataBuilder::build_push_message(&object_id, repeated_revision); let data = ServerRevisionWSDataBuilder::build_push_message(&object_id, repeated_revision);
user.receive(RevisionSyncResponse::Push(data)); user.receive(RevisionSyncResponse::Push(data));
return Ok(()); return Ok(());
} }
let server_base_rev_id = self.rev_id.load(SeqCst); let server_base_rev_id = self.rev_id.load(SeqCst);
let first_revision = repeated_revision.get_items().first().unwrap().clone(); let first_revision = repeated_revision.first().unwrap().clone();
if self.is_applied_before(&first_revision, &self.persistence).await { if self.is_applied_before(&first_revision, &self.persistence).await {
// Server has received this revision before, so ignore the following revisions // Server has received this revision before, so ignore the following revisions
return Ok(()); return Ok(());
@ -111,7 +112,7 @@ where
let server_rev_id = next(server_base_rev_id); let server_rev_id = next(server_base_rev_id);
if server_base_rev_id == first_revision.base_rev_id || server_rev_id == first_revision.rev_id { if server_base_rev_id == first_revision.base_rev_id || server_rev_id == first_revision.rev_id {
// The rev is in the right order, just compose it. // The rev is in the right order, just compose it.
for revision in repeated_revision.get_items() { for revision in repeated_revision.iter() {
let _ = self.compose_revision(revision)?; let _ = self.compose_revision(revision)?;
} }
let _ = self.persistence.save_revisions(repeated_revision).await?; let _ = self.persistence.save_revisions(repeated_revision).await?;
@ -165,10 +166,10 @@ where
} }
#[tracing::instrument(level = "debug", skip(self, repeated_revision), fields(object_id), err)] #[tracing::instrument(level = "debug", skip(self, repeated_revision), fields(object_id), err)]
pub async fn reset(&self, repeated_revision: RepeatedRevisionPB) -> Result<(), CollaborateError> { pub async fn reset(&self, repeated_revision: RepeatedRevision) -> Result<(), CollaborateError> {
let object_id = self.object_id.clone(); let object_id = self.object_id.clone();
tracing::Span::current().record("object_id", &object_id.as_str()); tracing::Span::current().record("object_id", &object_id.as_str());
let revisions: Vec<RevisionPB> = repeated_revision.get_items().to_vec(); let revisions: Vec<Revision> = repeated_revision.clone().into_inner();
let (_, rev_id) = pair_rev_id_from_revision_pbs(&revisions); let (_, rev_id) = pair_rev_id_from_revision_pbs(&revisions);
let delta = make_delta_from_revision_pb(revisions)?; let delta = make_delta_from_revision_pb(revisions)?;
let _ = self.persistence.reset_object(&object_id, repeated_revision).await?; let _ = self.persistence.reset_object(&object_id, repeated_revision).await?;
@ -181,7 +182,7 @@ where
self.object.read().to_json() self.object.read().to_json()
} }
fn compose_revision(&self, revision: &RevisionPB) -> Result<(), CollaborateError> { fn compose_revision(&self, revision: &Revision) -> Result<(), CollaborateError> {
let delta = Delta::<T>::from_bytes(&revision.delta_data)?; let delta = Delta::<T>::from_bytes(&revision.delta_data)?;
let _ = self.compose_delta(delta)?; let _ = self.compose_delta(delta)?;
let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id)); let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id));
@ -213,11 +214,7 @@ where
self.rev_id.load(SeqCst) self.rev_id.load(SeqCst)
} }
async fn is_applied_before( async fn is_applied_before(&self, new_revision: &Revision, persistence: &Arc<dyn RevisionSyncPersistence>) -> bool {
&self,
new_revision: &RevisionPB,
persistence: &Arc<dyn RevisionSyncPersistence>,
) -> bool {
let rev_ids = Some(vec![new_revision.rev_id]); let rev_ids = Some(vec![new_revision.rev_id]);
if let Ok(revisions) = persistence.read_revisions(&self.object_id, rev_ids).await { if let Ok(revisions) = persistence.read_revisions(&self.object_id, rev_ids).await {
if let Some(revision) = revisions.first() { if let Some(revision) = revisions.first() {
@ -243,14 +240,11 @@ where
tracing::trace!("{}: can not read the revisions in range {:?}", self.object_id, rev_ids); tracing::trace!("{}: can not read the revisions in range {:?}", self.object_id, rev_ids);
// assert_eq!(revisions.is_empty(), rev_ids.is_empty(),); // assert_eq!(revisions.is_empty(), rev_ids.is_empty(),);
} }
match repeated_revision_from_revision_pbs(revisions) {
Ok(repeated_revision) => { let repeated_revision = RepeatedRevision::from(revisions);
let data = ServerRevisionWSDataBuilder::build_push_message(&self.object_id, repeated_revision); let data = ServerRevisionWSDataBuilder::build_push_message(&self.object_id, repeated_revision);
user.receive(RevisionSyncResponse::Push(data)); user.receive(RevisionSyncResponse::Push(data));
} }
Err(e) => tracing::error!("{}", e),
}
}
Err(e) => { Err(e) => {
tracing::error!("{}", e); tracing::error!("{}", e);
} }

View File

@ -5,7 +5,6 @@ use crate::{
text_block::DocumentPB, text_block::DocumentPB,
}, },
errors::{CollaborateError, CollaborateResult}, errors::{CollaborateError, CollaborateResult},
protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
}; };
use dissimilar::Chunk; use dissimilar::Chunk;
use lib_ot::core::{DeltaBuilder, FlowyStr}; use lib_ot::core::{DeltaBuilder, FlowyStr};
@ -14,10 +13,7 @@ use lib_ot::{
rich_text::RichTextDelta, rich_text::RichTextDelta,
}; };
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use std::{ use std::sync::atomic::{AtomicI64, Ordering::SeqCst};
convert::TryInto,
sync::atomic::{AtomicI64, Ordering::SeqCst},
};
#[inline] #[inline]
pub fn find_newline(s: &str) -> Option<usize> { pub fn find_newline(s: &str) -> Option<usize> {
@ -85,7 +81,7 @@ where
Ok(delta) Ok(delta)
} }
pub fn make_delta_from_revision_pb<T>(revisions: Vec<RevisionPB>) -> CollaborateResult<Delta<T>> pub fn make_delta_from_revision_pb<T>(revisions: Vec<Revision>) -> CollaborateResult<Delta<T>>
where where
T: Attributes + DeserializeOwned, T: Attributes + DeserializeOwned,
{ {
@ -100,29 +96,7 @@ where
Ok(new_delta) Ok(new_delta)
} }
pub fn repeated_revision_from_revision_pbs(revisions: Vec<RevisionPB>) -> CollaborateResult<RepeatedRevision> { pub fn pair_rev_id_from_revision_pbs(revisions: &[Revision]) -> (i64, i64) {
let repeated_revision_pb = repeated_revision_pb_from_revisions(revisions);
// let repeated_revision: RepeatedRevision = revisions.into_iter().map(Revision::);
repeated_revision_from_repeated_revision_pb(repeated_revision_pb)
}
pub fn repeated_revision_pb_from_revisions(revisions: Vec<RevisionPB>) -> RepeatedRevisionPB {
let mut repeated_revision_pb = RepeatedRevisionPB::new();
repeated_revision_pb.set_items(revisions.into());
repeated_revision_pb
}
pub fn repeated_revision_from_repeated_revision_pb(
repeated_revision: RepeatedRevisionPB,
) -> CollaborateResult<RepeatedRevision> {
repeated_revision
.try_into()
.map_err(|e| CollaborateError::internal().context(format!("Cast repeated revision failed: {:?}", e)))
}
pub fn pair_rev_id_from_revision_pbs(revisions: &[RevisionPB]) -> (i64, i64) {
let mut rev_id = 0; let mut rev_id = 0;
revisions.iter().for_each(|revision| { revisions.iter().for_each(|revision| {
if rev_id < revision.rev_id { if rev_id < revision.rev_id {
@ -155,9 +129,9 @@ pub fn pair_rev_id_from_revisions(revisions: &[Revision]) -> (i64, i64) {
#[inline] #[inline]
pub fn make_folder_from_revisions_pb( pub fn make_folder_from_revisions_pb(
folder_id: &str, folder_id: &str,
mut revisions: RepeatedRevisionPB, revisions: RepeatedRevision,
) -> Result<Option<FolderInfo>, CollaborateError> { ) -> Result<Option<FolderInfo>, CollaborateError> {
let revisions = revisions.take_items(); let revisions = revisions.into_inner();
if revisions.is_empty() { if revisions.is_empty() {
return Ok(None); return Ok(None);
} }
@ -187,9 +161,9 @@ pub fn make_folder_from_revisions_pb(
#[inline] #[inline]
pub fn make_document_from_revision_pbs( pub fn make_document_from_revision_pbs(
doc_id: &str, doc_id: &str,
mut revisions: RepeatedRevisionPB, revisions: RepeatedRevision,
) -> Result<Option<DocumentPB>, CollaborateError> { ) -> Result<Option<DocumentPB>, CollaborateError> {
let revisions = revisions.take_items(); let revisions = revisions.into_inner();
if revisions.is_empty() { if revisions.is_empty() {
return Ok(None); return Ok(None);
} }