diff --git a/frontend/rust-lib/flowy-net/src/local_server/persistence.rs b/frontend/rust-lib/flowy-net/src/local_server/persistence.rs index 4cd305f2a6..17008481b5 100644 --- a/frontend/rust-lib/flowy-net/src/local_server/persistence.rs +++ b/frontend/rust-lib/flowy-net/src/local_server/persistence.rs @@ -1,7 +1,7 @@ +use flowy_sync::entities::revision::{RepeatedRevision, Revision}; use flowy_sync::{ entities::{folder::FolderInfo, text_block::DocumentPB}, errors::CollaborateError, - protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, server_document::*, server_folder::FolderCloudPersistence, util::{make_document_from_revision_pbs, make_folder_from_revisions_pb}, @@ -15,17 +15,17 @@ use std::{ // For the moment, we use memory to cache the data, it will be implemented with // other storage. Like the Firestore,Dropbox.etc. pub trait RevisionCloudStorage: Send + Sync { - fn set_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>; + fn set_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError>; fn get_revisions( &self, object_id: &str, rev_ids: Option>, - ) -> BoxResultFuture; + ) -> BoxResultFuture; fn reset_object( &self, object_id: &str, - repeated_revision: RepeatedRevisionPB, + repeated_revision: RepeatedRevision, ) -> BoxResultFuture<(), CollaborateError>; } @@ -64,7 +64,7 @@ impl FolderCloudPersistence for LocalTextBlockCloudPersistence { &self, _user_id: &str, folder_id: &str, - repeated_revision: RepeatedRevisionPB, + repeated_revision: RepeatedRevision, ) -> BoxResultFuture, CollaborateError> { let folder_id = folder_id.to_owned(); let storage = self.storage.clone(); @@ -74,7 +74,7 @@ impl FolderCloudPersistence for LocalTextBlockCloudPersistence { }) } - fn save_folder_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> { + fn save_folder_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> { let storage = self.storage.clone(); Box::pin(async move { let _ = storage.set_revisions(repeated_revision).await?; @@ -86,20 +86,19 @@ impl FolderCloudPersistence for LocalTextBlockCloudPersistence { &self, folder_id: &str, rev_ids: Option>, - ) -> BoxResultFuture, CollaborateError> { + ) -> BoxResultFuture, CollaborateError> { let folder_id = folder_id.to_owned(); let storage = self.storage.clone(); Box::pin(async move { - let mut repeated_revision = storage.get_revisions(&folder_id, rev_ids).await?; - let revisions: Vec = repeated_revision.take_items().into(); - Ok(revisions) + let repeated_revision = storage.get_revisions(&folder_id, rev_ids).await?; + Ok(repeated_revision.into_inner()) }) } fn reset_folder( &self, folder_id: &str, - repeated_revision: RepeatedRevisionPB, + repeated_revision: RepeatedRevision, ) -> BoxResultFuture<(), CollaborateError> { let storage = self.storage.clone(); let folder_id = folder_id.to_owned(); @@ -126,7 +125,7 @@ impl TextBlockCloudPersistence for LocalTextBlockCloudPersistence { fn create_text_block( &self, doc_id: &str, - repeated_revision: RepeatedRevisionPB, + repeated_revision: RepeatedRevision, ) -> BoxResultFuture, CollaborateError> { let doc_id = doc_id.to_owned(); let storage = self.storage.clone(); @@ -140,20 +139,16 @@ impl TextBlockCloudPersistence for LocalTextBlockCloudPersistence { &self, doc_id: &str, rev_ids: Option>, - ) -> BoxResultFuture, CollaborateError> { + ) -> BoxResultFuture, CollaborateError> { let doc_id = doc_id.to_owned(); let storage = self.storage.clone(); Box::pin(async move { - let mut repeated_revision = storage.get_revisions(&doc_id, rev_ids).await?; - let revisions: Vec = repeated_revision.take_items().into(); - Ok(revisions) + let repeated_revision = storage.get_revisions(&doc_id, rev_ids).await?; + Ok(repeated_revision.into_inner()) }) } - fn save_text_block_revisions( - &self, - repeated_revision: RepeatedRevisionPB, - ) -> BoxResultFuture<(), CollaborateError> { + fn save_text_block_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> { let storage = self.storage.clone(); Box::pin(async move { let _ = storage.set_revisions(repeated_revision).await?; @@ -161,7 +156,7 @@ impl TextBlockCloudPersistence for LocalTextBlockCloudPersistence { }) } - fn reset_text_block(&self, doc_id: &str, revisions: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> { + fn reset_text_block(&self, doc_id: &str, revisions: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> { let storage = self.storage.clone(); let doc_id = doc_id.to_owned(); Box::pin(async move { @@ -174,7 +169,7 @@ impl TextBlockCloudPersistence for LocalTextBlockCloudPersistence { #[derive(Default)] struct MemoryDocumentCloudStorage {} impl RevisionCloudStorage for MemoryDocumentCloudStorage { - fn set_revisions(&self, _repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> { + fn set_revisions(&self, _repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> { Box::pin(async move { Ok(()) }) } @@ -182,9 +177,9 @@ impl RevisionCloudStorage for MemoryDocumentCloudStorage { &self, _doc_id: &str, _rev_ids: Option>, - ) -> BoxResultFuture { + ) -> BoxResultFuture { Box::pin(async move { - let repeated_revisions = RepeatedRevisionPB::new(); + let repeated_revisions = RepeatedRevision::default(); Ok(repeated_revisions) }) } @@ -192,7 +187,7 @@ impl RevisionCloudStorage for MemoryDocumentCloudStorage { fn reset_object( &self, _doc_id: &str, - _repeated_revision: RepeatedRevisionPB, + _repeated_revision: RepeatedRevision, ) -> BoxResultFuture<(), CollaborateError> { Box::pin(async move { Ok(()) }) } diff --git a/shared-lib/flowy-derive/src/proto_buf/enum_serde.rs b/shared-lib/flowy-derive/src/proto_buf/enum_serde.rs index 902af74ee8..fb0870fde8 100644 --- a/shared-lib/flowy-derive/src/proto_buf/enum_serde.rs +++ b/shared-lib/flowy-derive/src/proto_buf/enum_serde.rs @@ -28,9 +28,9 @@ pub fn make_enum_token_stream(_ctxt: &Ctxt, cont: &ASTContainer) -> Option for #enum_ident { - fn into(self) -> crate::protobuf::#pb_enum { - match self { + impl std::convert::From<#enum_ident> for crate::protobuf::#pb_enum{ + fn from(o: #enum_ident) -> crate::protobuf::#pb_enum { + match o { #(#build_to_pb_enum)* } } diff --git a/shared-lib/flowy-derive/src/proto_buf/serialize.rs b/shared-lib/flowy-derive/src/proto_buf/serialize.rs index 0bf1469a3c..a43f337f8f 100644 --- a/shared-lib/flowy-derive/src/proto_buf/serialize.rs +++ b/shared-lib/flowy-derive/src/proto_buf/serialize.rs @@ -25,8 +25,8 @@ pub fn make_se_token_stream(ctxt: &Ctxt, ast: &ASTContainer) -> Option for #struct_ident { - fn into(self) -> crate::protobuf::#pb_ty { + impl std::convert::From<#struct_ident> for crate::protobuf::#pb_ty { + fn from(mut o: #struct_ident) -> crate::protobuf::#pb_ty { let mut pb = crate::protobuf::#pb_ty::new(); #(#build_set_pb_fields)* pb @@ -40,7 +40,7 @@ pub fn make_se_token_stream(ctxt: &Ctxt, ast: &ASTContainer) -> Option Option { if let Some(func) = &field.attrs.serialize_with() { let member = &field.member; - Some(quote! { pb.#member=self.#func(); }) + Some(quote! { pb.#member=o.#func(); }) } else if field.attrs.is_one_of() { token_stream_for_one_of(ctxt, field) } else { @@ -65,19 +65,19 @@ fn token_stream_for_one_of(ctxt: &Ctxt, field: &ASTField) -> Option match ident_category(bracketed_ty_info.unwrap().ident) { TypeCategory::Protobuf => Some(quote! { - match self.#member { + match o.#member { Some(s) => { pb.#set_func(s.into()) } None => {} } }), TypeCategory::Enum => Some(quote! { - match self.#member { + match o.#member { Some(s) => { pb.#set_func(s.into()) } None => {} } }), _ => Some(quote! { - match self.#member { + match o.#member { Some(ref s) => { pb.#set_func(s.clone()) } None => {} } @@ -99,18 +99,16 @@ fn gen_token_stream(ctxt: &Ctxt, member: &syn::Member, ty: &syn::Type, is_option TypeCategory::Str => { if is_option { Some(quote! { - match self.#member { + match o.#member { Some(ref s) => { pb.#member = s.to_string().clone(); } None => { pb.#member = String::new(); } } }) } else { - Some(quote! { pb.#member = self.#member.clone(); }) + Some(quote! { pb.#member = o.#member.clone(); }) } } - TypeCategory::Protobuf => { - Some(quote! { pb.#member = ::protobuf::SingularPtrField::some(self.#member.into()); }) - } + TypeCategory::Protobuf => Some(quote! { pb.#member = ::protobuf::SingularPtrField::some(o.#member.into()); }), TypeCategory::Opt => gen_token_stream(ctxt, member, ty_info.bracket_ty_info.unwrap().ty, true), TypeCategory::Enum => { // let pb_enum_ident = format_ident!("{}", ty_info.ident.to_string()); @@ -118,10 +116,10 @@ fn gen_token_stream(ctxt: &Ctxt, member: &syn::Member, ty: &syn::Type, is_option // flowy_protobuf::#pb_enum_ident::from_i32(self.#member.value()).unwrap(); // }) Some(quote! { - pb.#member = self.#member.into(); + pb.#member = o.#member.into(); }) } - _ => Some(quote! { pb.#member = self.#member; }), + _ => Some(quote! { pb.#member = o.#member; }), } } @@ -138,15 +136,15 @@ fn token_stream_for_vec(ctxt: &Ctxt, member: &syn::Member, ty: &syn::Type) -> Op match ident_category(ty_info.ident) { TypeCategory::Protobuf => Some(quote! { pb.#member = ::protobuf::RepeatedField::from_vec( - self.#member + o.#member .into_iter() .map(|m| m.into()) .collect()); }), - TypeCategory::Bytes => Some(quote! { pb.#member = self.#member.clone(); }), + TypeCategory::Bytes => Some(quote! { pb.#member = o.#member.clone(); }), _ => Some(quote! { - pb.#member = ::protobuf::RepeatedField::from_vec(self.#member.clone()); + pb.#member = ::protobuf::RepeatedField::from_vec(o.#member.clone()); }), } } @@ -165,14 +163,14 @@ fn token_stream_for_map(ctxt: &Ctxt, member: &syn::Member, ty: &syn::Type) -> Op match ident_category(ty_info.ident) { TypeCategory::Protobuf => Some(quote! { let mut m: std::collections::HashMap = std::collections::HashMap::new(); - self.#member.into_iter().for_each(|(k,v)| { + o.#member.into_iter().for_each(|(k,v)| { m.insert(k.clone(), v.into()); }); pb.#member = m; }), _ => Some(quote! { let mut m: std::collections::HashMap = std::collections::HashMap::new(); - self.#member.iter().for_each(|(k,v)| { + o.#member.iter().for_each(|(k,v)| { m.insert(k.clone(), v.clone()); }); pb.#member = m; diff --git a/shared-lib/flowy-sync/src/server_document/document_manager.rs b/shared-lib/flowy-sync/src/server_document/document_manager.rs index 9a5499f862..59d3dd3c97 100644 --- a/shared-lib/flowy-sync/src/server_document/document_manager.rs +++ b/shared-lib/flowy-sync/src/server_document/document_manager.rs @@ -1,7 +1,8 @@ +use crate::entities::revision::{RepeatedRevision, Revision}; use crate::{ entities::{text_block::DocumentPB, ws_data::ServerRevisionWSDataBuilder}, errors::{internal_error, CollaborateError, CollaborateResult}, - protobuf::{ClientRevisionWSData, RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, + protobuf::ClientRevisionWSData, server_document::document_pad::ServerDocument, synchronizer::{RevisionSyncPersistence, RevisionSyncResponse, RevisionSynchronizer, RevisionUser}, util::rev_id_from_str, @@ -23,22 +24,21 @@ pub trait TextBlockCloudPersistence: Send + Sync + Debug { fn create_text_block( &self, doc_id: &str, - repeated_revision: RepeatedRevisionPB, + repeated_revision: RepeatedRevision, ) -> BoxResultFuture, CollaborateError>; fn read_text_block_revisions( &self, doc_id: &str, rev_ids: Option>, - ) -> BoxResultFuture, CollaborateError>; + ) -> BoxResultFuture, CollaborateError>; - fn save_text_block_revisions(&self, repeated_revision: RepeatedRevisionPB) - -> BoxResultFuture<(), CollaborateError>; + fn save_text_block_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError>; fn reset_text_block( &self, doc_id: &str, - repeated_revision: RepeatedRevisionPB, + repeated_revision: RepeatedRevision, ) -> BoxResultFuture<(), CollaborateError>; } @@ -47,18 +47,18 @@ impl RevisionSyncPersistence for Arc { &self, object_id: &str, rev_ids: Option>, - ) -> BoxResultFuture, CollaborateError> { + ) -> BoxResultFuture, CollaborateError> { (**self).read_text_block_revisions(object_id, rev_ids) } - fn save_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> { + fn save_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> { (**self).save_text_block_revisions(repeated_revision) } fn reset_object( &self, object_id: &str, - repeated_revision: RepeatedRevisionPB, + repeated_revision: RepeatedRevision, ) -> BoxResultFuture<(), CollaborateError> { (**self).reset_text_block(object_id, repeated_revision) } @@ -82,7 +82,7 @@ impl ServerDocumentManager { user: Arc, mut client_data: ClientRevisionWSData, ) -> Result<(), CollaborateError> { - let repeated_revision = client_data.take_revisions(); + let repeated_revision: RepeatedRevision = client_data.take_revisions().into(); let cloned_user = user.clone(); let ack_id = rev_id_from_str(&client_data.data_id)?; let object_id = client_data.object_id; @@ -131,9 +131,10 @@ impl ServerDocumentManager { pub async fn handle_document_reset( &self, doc_id: &str, - mut repeated_revision: RepeatedRevisionPB, + mut repeated_revision: RepeatedRevision, ) -> Result<(), CollaborateError> { - repeated_revision.mut_items().sort_by(|a, b| a.rev_id.cmp(&b.rev_id)); + repeated_revision.sort_by(|a, b| a.rev_id.cmp(&b.rev_id)); + match self.get_document_handler(doc_id).await { None => { tracing::warn!("Document:{} doesn't exist, ignore document reset", doc_id); @@ -166,7 +167,7 @@ impl ServerDocumentManager { async fn create_document( &self, doc_id: &str, - repeated_revision: RepeatedRevisionPB, + repeated_revision: RepeatedRevision, ) -> Result, CollaborateError> { match self.persistence.create_text_block(doc_id, repeated_revision).await? { None => Err(CollaborateError::internal().context("Create document info from revisions failed")), @@ -229,7 +230,7 @@ impl OpenDocumentHandler { async fn apply_revisions( &self, user: Arc, - repeated_revision: RepeatedRevisionPB, + repeated_revision: RepeatedRevision, ) -> Result<(), CollaborateError> { let (ret, rx) = oneshot::channel(); self.users.insert(user.user_id(), user.clone()); @@ -252,7 +253,7 @@ impl OpenDocumentHandler { } #[tracing::instrument(level = "debug", skip(self, repeated_revision), err)] - async fn apply_document_reset(&self, repeated_revision: RepeatedRevisionPB) -> Result<(), CollaborateError> { + async fn apply_document_reset(&self, repeated_revision: RepeatedRevision) -> Result<(), CollaborateError> { let (ret, rx) = oneshot::channel(); let msg = DocumentCommand::Reset { repeated_revision, ret }; let result = self.send(msg, rx).await?; @@ -279,7 +280,7 @@ impl std::ops::Drop for OpenDocumentHandler { enum DocumentCommand { ApplyRevisions { user: Arc, - repeated_revision: RepeatedRevisionPB, + repeated_revision: RepeatedRevision, ret: oneshot::Sender>, }, Ping { @@ -288,7 +289,7 @@ enum DocumentCommand { ret: oneshot::Sender>, }, Reset { - repeated_revision: RepeatedRevisionPB, + repeated_revision: RepeatedRevision, ret: oneshot::Sender>, }, } diff --git a/shared-lib/flowy-sync/src/server_folder/folder_manager.rs b/shared-lib/flowy-sync/src/server_folder/folder_manager.rs index 0a1a5e117a..16b753f50b 100644 --- a/shared-lib/flowy-sync/src/server_folder/folder_manager.rs +++ b/shared-lib/flowy-sync/src/server_folder/folder_manager.rs @@ -1,10 +1,11 @@ +use crate::entities::revision::{RepeatedRevision, Revision}; use crate::{ entities::{ folder::{FolderDelta, FolderInfo}, ws_data::ServerRevisionWSDataBuilder, }, errors::{internal_error, CollaborateError, CollaborateResult}, - protobuf::{ClientRevisionWSData, RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, + protobuf::ClientRevisionWSData, server_folder::folder_pad::ServerFolder, synchronizer::{RevisionSyncPersistence, RevisionSyncResponse, RevisionSynchronizer, RevisionUser}, util::rev_id_from_str, @@ -26,21 +27,21 @@ pub trait FolderCloudPersistence: Send + Sync + Debug { &self, user_id: &str, folder_id: &str, - repeated_revision: RepeatedRevisionPB, + repeated_revision: RepeatedRevision, ) -> BoxResultFuture, CollaborateError>; - fn save_folder_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>; + fn save_folder_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError>; fn read_folder_revisions( &self, folder_id: &str, rev_ids: Option>, - ) -> BoxResultFuture, CollaborateError>; + ) -> BoxResultFuture, CollaborateError>; fn reset_folder( &self, folder_id: &str, - repeated_revision: RepeatedRevisionPB, + repeated_revision: RepeatedRevision, ) -> BoxResultFuture<(), CollaborateError>; } @@ -49,18 +50,18 @@ impl RevisionSyncPersistence for Arc { &self, object_id: &str, rev_ids: Option>, - ) -> BoxResultFuture, CollaborateError> { + ) -> BoxResultFuture, CollaborateError> { (**self).read_folder_revisions(object_id, rev_ids) } - fn save_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> { + fn save_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> { (**self).save_folder_revisions(repeated_revision) } fn reset_object( &self, object_id: &str, - repeated_revision: RepeatedRevisionPB, + repeated_revision: RepeatedRevision, ) -> BoxResultFuture<(), CollaborateError> { (**self).reset_folder(object_id, repeated_revision) } @@ -84,7 +85,7 @@ impl ServerFolderManager { user: Arc, mut client_data: ClientRevisionWSData, ) -> Result<(), CollaborateError> { - let repeated_revision = client_data.take_revisions(); + let repeated_revision: RepeatedRevision = client_data.take_revisions().into(); let cloned_user = user.clone(); let ack_id = rev_id_from_str(&client_data.data_id)?; let folder_id = client_data.object_id; @@ -167,7 +168,7 @@ impl ServerFolderManager { &self, user_id: &str, folder_id: &str, - repeated_revision: RepeatedRevisionPB, + repeated_revision: RepeatedRevision, ) -> Result, CollaborateError> { match self .persistence @@ -221,7 +222,7 @@ impl OpenFolderHandler { async fn apply_revisions( &self, user: Arc, - repeated_revision: RepeatedRevisionPB, + repeated_revision: RepeatedRevision, ) -> CollaborateResult<()> { let (ret, rx) = oneshot::channel(); let msg = FolderCommand::ApplyRevisions { @@ -258,7 +259,7 @@ impl std::ops::Drop for OpenFolderHandler { enum FolderCommand { ApplyRevisions { user: Arc, - repeated_revision: RepeatedRevisionPB, + repeated_revision: RepeatedRevision, ret: oneshot::Sender>, }, Ping { diff --git a/shared-lib/flowy-sync/src/synchronizer.rs b/shared-lib/flowy-sync/src/synchronizer.rs index 1305c471b2..eca422296a 100644 --- a/shared-lib/flowy-sync/src/synchronizer.rs +++ b/shared-lib/flowy-sync/src/synchronizer.rs @@ -1,10 +1,11 @@ +use crate::entities::revision::{RepeatedRevision, Revision}; use crate::{ entities::{ revision::RevisionRange, ws_data::{ServerRevisionWSData, ServerRevisionWSDataBuilder}, }, errors::CollaborateError, - protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, + protobuf::Revision as RevisionPB, util::*, }; use lib_infra::future::BoxResultFuture; @@ -31,14 +32,14 @@ pub trait RevisionSyncPersistence: Send + Sync + 'static { &self, object_id: &str, rev_ids: Option>, - ) -> BoxResultFuture, CollaborateError>; + ) -> BoxResultFuture, CollaborateError>; - fn save_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>; + fn save_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError>; fn reset_object( &self, object_id: &str, - repeated_revision: RepeatedRevisionPB, + repeated_revision: RepeatedRevision, ) -> BoxResultFuture<(), CollaborateError>; } @@ -87,20 +88,20 @@ where pub async fn sync_revisions( &self, user: Arc, - repeated_revision: RepeatedRevisionPB, + repeated_revision: RepeatedRevision, ) -> Result<(), CollaborateError> { let object_id = self.object_id.clone(); - if repeated_revision.get_items().is_empty() { + if repeated_revision.is_empty() { // Return all the revisions to client let revisions = self.persistence.read_revisions(&object_id, None).await?; - let repeated_revision = repeated_revision_from_revision_pbs(revisions)?; + let repeated_revision = RepeatedRevision::from(revisions); let data = ServerRevisionWSDataBuilder::build_push_message(&object_id, repeated_revision); user.receive(RevisionSyncResponse::Push(data)); return Ok(()); } let server_base_rev_id = self.rev_id.load(SeqCst); - let first_revision = repeated_revision.get_items().first().unwrap().clone(); + let first_revision = repeated_revision.first().unwrap().clone(); if self.is_applied_before(&first_revision, &self.persistence).await { // Server has received this revision before, so ignore the following revisions return Ok(()); @@ -111,7 +112,7 @@ where let server_rev_id = next(server_base_rev_id); if server_base_rev_id == first_revision.base_rev_id || server_rev_id == first_revision.rev_id { // The rev is in the right order, just compose it. - for revision in repeated_revision.get_items() { + for revision in repeated_revision.iter() { let _ = self.compose_revision(revision)?; } let _ = self.persistence.save_revisions(repeated_revision).await?; @@ -165,10 +166,10 @@ where } #[tracing::instrument(level = "debug", skip(self, repeated_revision), fields(object_id), err)] - pub async fn reset(&self, repeated_revision: RepeatedRevisionPB) -> Result<(), CollaborateError> { + pub async fn reset(&self, repeated_revision: RepeatedRevision) -> Result<(), CollaborateError> { let object_id = self.object_id.clone(); tracing::Span::current().record("object_id", &object_id.as_str()); - let revisions: Vec = repeated_revision.get_items().to_vec(); + let revisions: Vec = repeated_revision.clone().into_inner(); let (_, rev_id) = pair_rev_id_from_revision_pbs(&revisions); let delta = make_delta_from_revision_pb(revisions)?; let _ = self.persistence.reset_object(&object_id, repeated_revision).await?; @@ -181,7 +182,7 @@ where self.object.read().to_json() } - fn compose_revision(&self, revision: &RevisionPB) -> Result<(), CollaborateError> { + fn compose_revision(&self, revision: &Revision) -> Result<(), CollaborateError> { let delta = Delta::::from_bytes(&revision.delta_data)?; let _ = self.compose_delta(delta)?; let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id)); @@ -213,11 +214,7 @@ where self.rev_id.load(SeqCst) } - async fn is_applied_before( - &self, - new_revision: &RevisionPB, - persistence: &Arc, - ) -> bool { + async fn is_applied_before(&self, new_revision: &Revision, persistence: &Arc) -> bool { let rev_ids = Some(vec![new_revision.rev_id]); if let Ok(revisions) = persistence.read_revisions(&self.object_id, rev_ids).await { if let Some(revision) = revisions.first() { @@ -243,13 +240,10 @@ where tracing::trace!("{}: can not read the revisions in range {:?}", self.object_id, rev_ids); // assert_eq!(revisions.is_empty(), rev_ids.is_empty(),); } - match repeated_revision_from_revision_pbs(revisions) { - Ok(repeated_revision) => { - let data = ServerRevisionWSDataBuilder::build_push_message(&self.object_id, repeated_revision); - user.receive(RevisionSyncResponse::Push(data)); - } - Err(e) => tracing::error!("{}", e), - } + + let repeated_revision = RepeatedRevision::from(revisions); + let data = ServerRevisionWSDataBuilder::build_push_message(&self.object_id, repeated_revision); + user.receive(RevisionSyncResponse::Push(data)); } Err(e) => { tracing::error!("{}", e); diff --git a/shared-lib/flowy-sync/src/util.rs b/shared-lib/flowy-sync/src/util.rs index e0c8e0a5fa..c4504a806a 100644 --- a/shared-lib/flowy-sync/src/util.rs +++ b/shared-lib/flowy-sync/src/util.rs @@ -5,7 +5,6 @@ use crate::{ text_block::DocumentPB, }, errors::{CollaborateError, CollaborateResult}, - protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, }; use dissimilar::Chunk; use lib_ot::core::{DeltaBuilder, FlowyStr}; @@ -14,10 +13,7 @@ use lib_ot::{ rich_text::RichTextDelta, }; use serde::de::DeserializeOwned; -use std::{ - convert::TryInto, - sync::atomic::{AtomicI64, Ordering::SeqCst}, -}; +use std::sync::atomic::{AtomicI64, Ordering::SeqCst}; #[inline] pub fn find_newline(s: &str) -> Option { @@ -85,7 +81,7 @@ where Ok(delta) } -pub fn make_delta_from_revision_pb(revisions: Vec) -> CollaborateResult> +pub fn make_delta_from_revision_pb(revisions: Vec) -> CollaborateResult> where T: Attributes + DeserializeOwned, { @@ -100,29 +96,7 @@ where Ok(new_delta) } -pub fn repeated_revision_from_revision_pbs(revisions: Vec) -> CollaborateResult { - let repeated_revision_pb = repeated_revision_pb_from_revisions(revisions); - - // let repeated_revision: RepeatedRevision = revisions.into_iter().map(Revision::); - - repeated_revision_from_repeated_revision_pb(repeated_revision_pb) -} - -pub fn repeated_revision_pb_from_revisions(revisions: Vec) -> RepeatedRevisionPB { - let mut repeated_revision_pb = RepeatedRevisionPB::new(); - repeated_revision_pb.set_items(revisions.into()); - repeated_revision_pb -} - -pub fn repeated_revision_from_repeated_revision_pb( - repeated_revision: RepeatedRevisionPB, -) -> CollaborateResult { - repeated_revision - .try_into() - .map_err(|e| CollaborateError::internal().context(format!("Cast repeated revision failed: {:?}", e))) -} - -pub fn pair_rev_id_from_revision_pbs(revisions: &[RevisionPB]) -> (i64, i64) { +pub fn pair_rev_id_from_revision_pbs(revisions: &[Revision]) -> (i64, i64) { let mut rev_id = 0; revisions.iter().for_each(|revision| { if rev_id < revision.rev_id { @@ -155,9 +129,9 @@ pub fn pair_rev_id_from_revisions(revisions: &[Revision]) -> (i64, i64) { #[inline] pub fn make_folder_from_revisions_pb( folder_id: &str, - mut revisions: RepeatedRevisionPB, + revisions: RepeatedRevision, ) -> Result, CollaborateError> { - let revisions = revisions.take_items(); + let revisions = revisions.into_inner(); if revisions.is_empty() { return Ok(None); } @@ -187,9 +161,9 @@ pub fn make_folder_from_revisions_pb( #[inline] pub fn make_document_from_revision_pbs( doc_id: &str, - mut revisions: RepeatedRevisionPB, + revisions: RepeatedRevision, ) -> Result, CollaborateError> { - let revisions = revisions.take_items(); + let revisions = revisions.into_inner(); if revisions.is_empty() { return Ok(None); }