config virtual net

This commit is contained in:
appflowy
2021-12-26 23:59:45 +08:00
parent bbc9190bc0
commit a0e6c61f50
28 changed files with 463 additions and 447 deletions

View File

@ -3,7 +3,7 @@ use crate::services::{
persistence::{create_doc, read_doc}, persistence::{create_doc, read_doc},
ws_actor::{DocumentWebSocketActor, WSActorMessage}, ws_actor::{DocumentWebSocketActor, WSActorMessage},
}, },
web_socket::{WebSocketReceiver, WSClientData}, web_socket::{WSClientData, WebSocketReceiver},
}; };
use crate::context::FlowyPersistence; use crate::context::FlowyPersistence;
@ -18,13 +18,13 @@ use flowy_collaboration::{
}; };
use lib_infra::future::FutureResultSend; use lib_infra::future::FutureResultSend;
use flowy_collaboration::sync::{DocumentPersistence, ServerDocumentManager};
use std::{ use std::{
convert::TryInto, convert::TryInto,
fmt::{Debug, Formatter}, fmt::{Debug, Formatter},
sync::Arc, sync::Arc,
}; };
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use flowy_collaboration::sync::{DocumentPersistence, ServerDocumentManager};
pub fn make_document_ws_receiver(persistence: Arc<FlowyPersistence>) -> Arc<DocumentWebSocketReceiver> { pub fn make_document_ws_receiver(persistence: Arc<FlowyPersistence>) -> Arc<DocumentWebSocketReceiver> {
let document_persistence = Arc::new(DocumentPersistenceImpl(persistence.clone())); let document_persistence = Arc::new(DocumentPersistenceImpl(persistence.clone()));
@ -95,13 +95,13 @@ impl DocumentPersistence for DocumentPersistenceImpl {
}) })
} }
fn create_doc(&self, revision: Revision) -> FutureResultSend<DocumentInfo, CollaborateError> { fn create_doc(&self, doc_id: &str, revisions: Vec<Revision>) -> FutureResultSend<DocumentInfo, CollaborateError> {
let kv_store = self.0.kv_store(); let kv_store = self.0.kv_store();
let doc_id = doc_id.to_owned();
FutureResultSend::new(async move { FutureResultSend::new(async move {
let doc: DocumentInfo = revision.clone().try_into()?; let doc = DocumentInfo::from_revisions(&doc_id, revisions.clone())?;
let doc_id = revision.doc_id.clone(); let doc_id = doc_id.to_owned();
let revisions = RepeatedRevision { items: vec![revision] }; let revisions = RepeatedRevision::new(revisions);
let params = CreateDocParams { id: doc_id, revisions }; let params = CreateDocParams { id: doc_id, revisions };
let pb_params: flowy_collaboration::protobuf::CreateDocParams = params.try_into().unwrap(); let pb_params: flowy_collaboration::protobuf::CreateDocParams = params.try_into().unwrap();
let _ = create_doc(&kv_store, pb_params) let _ = create_doc(&kv_store, pb_params)
@ -125,6 +125,19 @@ impl DocumentPersistence for DocumentPersistenceImpl {
FutureResultSend::new(async move { f().await.map_err(server_error_to_collaborate_error) }) FutureResultSend::new(async move { f().await.map_err(server_error_to_collaborate_error) })
} }
fn get_doc_revisions(&self, doc_id: &str) -> FutureResultSend<Vec<Revision>, CollaborateError> {
let kv_store = self.0.kv_store();
let doc_id = doc_id.to_owned();
let f = || async move {
let mut pb = kv_store.get_doc_revisions(&doc_id).await?;
let repeated_revision: RepeatedRevision = (&mut pb).try_into()?;
let revisions = repeated_revision.into_inner();
Ok(revisions)
};
FutureResultSend::new(async move { f().await.map_err(server_error_to_collaborate_error) })
}
} }
fn server_error_to_collaborate_error(error: ServerError) -> CollaborateError { fn server_error_to_collaborate_error(error: ServerError) -> CollaborateError {

View File

@ -49,6 +49,11 @@ impl DocumentKVPersistence {
Ok(()) Ok(())
} }
pub(crate) async fn get_doc_revisions(&self, doc_id: &str) -> Result<RepeatedRevision, ServerError> {
let items = self.inner.batch_get_start_with(doc_id).await?;
Ok(key_value_items_to_revisions(items))
}
pub(crate) async fn batch_get_revisions<T: Into<Option<Vec<i64>>>>( pub(crate) async fn batch_get_revisions<T: Into<Option<Vec<i64>>>>(
&self, &self,
doc_id: &str, doc_id: &str,
@ -56,7 +61,7 @@ impl DocumentKVPersistence {
) -> Result<RepeatedRevision, ServerError> { ) -> Result<RepeatedRevision, ServerError> {
let rev_ids = rev_ids.into(); let rev_ids = rev_ids.into();
let items = match rev_ids { let items = match rev_ids {
None => self.inner.batch_get_key_start_with(doc_id).await?, None => self.inner.batch_get_start_with(doc_id).await?,
Some(rev_ids) => { Some(rev_ids) => {
let keys = rev_ids let keys = rev_ids
.into_iter() .into_iter()
@ -66,17 +71,7 @@ impl DocumentKVPersistence {
}, },
}; };
let mut revisions = items Ok(key_value_items_to_revisions(items))
.into_iter()
.filter_map(|kv| parse_from_bytes::<Revision>(&kv.value).ok())
.collect::<Vec<Revision>>();
// TODO: optimize sort
revisions.sort_by(|a, b| a.rev_id.cmp(&b.rev_id));
let mut repeated_revision = RepeatedRevision::new();
repeated_revision.set_items(revisions.into());
Ok(repeated_revision)
} }
pub(crate) async fn batch_delete_revisions<T: Into<Option<Vec<i64>>>>( pub(crate) async fn batch_delete_revisions<T: Into<Option<Vec<i64>>>>(
@ -101,5 +96,18 @@ impl DocumentKVPersistence {
} }
} }
#[inline]
fn key_value_items_to_revisions(items: Vec<KeyValue>) -> RepeatedRevision {
let mut revisions = items
.into_iter()
.filter_map(|kv| parse_from_bytes::<Revision>(&kv.value).ok())
.collect::<Vec<Revision>>();
revisions.sort_by(|a, b| a.rev_id.cmp(&b.rev_id));
let mut repeated_revision = RepeatedRevision::new();
repeated_revision.set_items(revisions.into());
repeated_revision
}
#[inline] #[inline]
fn make_revision_key(doc_id: &str, rev_id: i64) -> String { format!("{}:{}", doc_id, rev_id) } fn make_revision_key(doc_id: &str, rev_id: i64) -> String { format!("{}:{}", doc_id, rev_id) }

View File

@ -57,7 +57,7 @@ fn make_doc_from_revisions(doc_id: &str, mut revisions: RepeatedRevision) -> Res
if revisions.is_empty() { if revisions.is_empty() {
return Err(ServerError::record_not_found().context(format!("{} not exist", doc_id))); return Err(ServerError::record_not_found().context(format!("{} not exist", doc_id)));
} }
let mut document_delta = RichTextDelta::new(); let mut document_delta = RichTextDelta::new();
let mut base_rev_id = 0; let mut base_rev_id = 0;
let mut rev_id = 0; let mut rev_id = 0;
@ -70,7 +70,7 @@ fn make_doc_from_revisions(doc_id: &str, mut revisions: RepeatedRevision) -> Res
} }
let text = document_delta.to_json(); let text = document_delta.to_json();
let mut document_info = DocumentInfo::new(); let mut document_info = DocumentInfo::new();
document_info.set_id(doc_id.to_owned()); document_info.set_doc_id(doc_id.to_owned());
document_info.set_text(text); document_info.set_text(text);
document_info.set_base_rev_id(base_rev_id); document_info.set_base_rev_id(base_rev_id);
document_info.set_rev_id(rev_id); document_info.set_rev_id(rev_id);

View File

@ -6,8 +6,9 @@ use crate::{
use actix_rt::task::spawn_blocking; use actix_rt::task::spawn_blocking;
use async_stream::stream; use async_stream::stream;
use backend_service::errors::{internal_error, Result, ServerError}; use backend_service::errors::{internal_error, Result, ServerError};
use flowy_collaboration::{ use flowy_collaboration::{
protobuf::{DocumentClientWSData, DocumentClientWSDataType, RepeatedRevision, Revision}, protobuf::{DocumentClientWSData, DocumentClientWSDataType, Revision},
sync::{RevisionUser, ServerDocumentManager, SyncResponse}, sync::{RevisionUser, ServerDocumentManager, SyncResponse},
}; };
use futures::stream::StreamExt; use futures::stream::StreamExt;
@ -67,18 +68,15 @@ impl DocumentWebSocketActor {
async fn handle_client_data(&self, client_data: WSClientData, persistence: Arc<FlowyPersistence>) -> Result<()> { async fn handle_client_data(&self, client_data: WSClientData, persistence: Arc<FlowyPersistence>) -> Result<()> {
let WSClientData { user, socket, data } = client_data; let WSClientData { user, socket, data } = client_data;
let document_data = spawn_blocking(move || { let document_client_data = spawn_blocking(move || parse_from_bytes::<DocumentClientWSData>(&data))
let document_data: DocumentClientWSData = parse_from_bytes(&data)?; .await
Result::Ok(document_data) .map_err(internal_error)??;
})
.await
.map_err(internal_error)??;
tracing::debug!( tracing::debug!(
"[HTTP_SERVER_WS]: receive client data: {}:{}, {:?}", "[DocumentWebSocketActor]: receive client data: {}:{}, {:?}",
document_data.doc_id, document_client_data.doc_id,
document_data.id, document_client_data.id,
document_data.ty document_client_data.ty
); );
let user = Arc::new(ServerDocUser { let user = Arc::new(ServerDocUser {
@ -87,33 +85,26 @@ impl DocumentWebSocketActor {
persistence, persistence,
}); });
match match &document_data.ty { match self.handle_revision(user, document_client_data).await {
DocumentClientWSDataType::ClientPushRev => self.handle_pushed_rev(user, document_data.data).await,
} {
Ok(_) => {}, Ok(_) => {},
Err(e) => { Err(e) => {
tracing::error!("[HTTP_SERVER_WS]: process client data error {:?}", e); tracing::error!("[DocumentWebSocketActor]: process client data error {:?}", e);
}, },
} }
Ok(()) Ok(())
} }
async fn handle_pushed_rev(&self, user: Arc<ServerDocUser>, data: Vec<u8>) -> Result<()> { async fn handle_revision(&self, user: Arc<ServerDocUser>, client_data: DocumentClientWSData) -> Result<()> {
let repeated_revision = spawn_blocking(move || parse_from_bytes::<RepeatedRevision>(&data)) match &client_data.ty {
.await DocumentClientWSDataType::ClientPushRev => {
.map_err(internal_error)??; let _ = self
self.handle_revision(user, repeated_revision).await .doc_manager
} .apply_revisions(user, client_data)
.await
.map_err(internal_error)?;
},
}
async fn handle_revision(&self, user: Arc<ServerDocUser>, mut revisions: RepeatedRevision) -> Result<()> {
let repeated_revision: flowy_collaboration::entities::revision::RepeatedRevision =
(&mut revisions).try_into().map_err(internal_error)?;
let revisions = repeated_revision.into_inner();
let _ = self
.doc_manager
.apply_revisions(user, revisions)
.await
.map_err(internal_error)?;
Ok(()) Ok(())
} }
} }

View File

@ -158,9 +158,9 @@ impl KVStore for PostgresKV {
}) })
} }
fn batch_get_key_start_with(&self, prefix: &str) -> FutureResultSend<Vec<KeyValue>, ServerError> { fn batch_get_start_with(&self, key: &str) -> FutureResultSend<Vec<KeyValue>, ServerError> {
let pg_pool = self.pg_pool.clone(); let pg_pool = self.pg_pool.clone();
let prefix = prefix.to_owned(); let prefix = key.to_owned();
FutureResultSend::new(async move { FutureResultSend::new(async move {
let mut transaction = pg_pool let mut transaction = pg_pool
.begin() .begin()

View File

@ -19,7 +19,7 @@ pub trait KVStore: Send + Sync {
fn delete(&self, key: &str) -> FutureResultSend<(), ServerError>; fn delete(&self, key: &str) -> FutureResultSend<(), ServerError>;
fn batch_set(&self, kvs: Vec<KeyValue>) -> FutureResultSend<(), ServerError>; fn batch_set(&self, kvs: Vec<KeyValue>) -> FutureResultSend<(), ServerError>;
fn batch_get(&self, keys: Vec<String>) -> FutureResultSend<Vec<KeyValue>, ServerError>; fn batch_get(&self, keys: Vec<String>) -> FutureResultSend<Vec<KeyValue>, ServerError>;
fn batch_get_key_start_with(&self, keyword: &str) -> FutureResultSend<Vec<KeyValue>, ServerError>; fn batch_get_start_with(&self, key: &str) -> FutureResultSend<Vec<KeyValue>, ServerError>;
fn batch_delete(&self, keys: Vec<String>) -> FutureResultSend<(), ServerError>; fn batch_delete(&self, keys: Vec<String>) -> FutureResultSend<(), ServerError>;
fn batch_delete_key_start_with(&self, keyword: &str) -> FutureResultSend<(), ServerError>; fn batch_delete_key_start_with(&self, keyword: &str) -> FutureResultSend<(), ServerError>;

View File

@ -258,7 +258,7 @@ async fn doc_create() {
let params = CreateDocParams { let params = CreateDocParams {
id: doc_id.clone(), id: doc_id.clone(),
revisions: RepeatedRevision { items: revisions }, revisions: RepeatedRevision::new(revisions),
}; };
server.create_doc(params).await; server.create_doc(params).await;

View File

@ -211,7 +211,7 @@ packages:
name: vector_math name: vector_math
url: "https://pub.dartlang.org" url: "https://pub.dartlang.org"
source: hosted source: hosted
version: "2.1.0" version: "2.1.1"
xml: xml:
dependency: transitive dependency: transitive
description: description:
@ -220,5 +220,5 @@ packages:
source: hosted source: hosted
version: "5.2.0" version: "5.2.0"
sdks: sdks:
dart: ">=2.13.0 <3.0.0" dart: ">=2.14.0 <3.0.0"
flutter: ">=1.24.0-7.0" flutter: ">=1.24.0-7.0"

View File

@ -77,7 +77,7 @@ class CreateDocParams extends $pb.GeneratedMessage {
class DocumentInfo extends $pb.GeneratedMessage { class DocumentInfo extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'DocumentInfo', createEmptyInstance: create) static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'DocumentInfo', createEmptyInstance: create)
..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id') ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId')
..aOS(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'text') ..aOS(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'text')
..aInt64(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'revId') ..aInt64(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'revId')
..aInt64(4, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'baseRevId') ..aInt64(4, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'baseRevId')
@ -86,14 +86,14 @@ class DocumentInfo extends $pb.GeneratedMessage {
DocumentInfo._() : super(); DocumentInfo._() : super();
factory DocumentInfo({ factory DocumentInfo({
$core.String? id, $core.String? docId,
$core.String? text, $core.String? text,
$fixnum.Int64? revId, $fixnum.Int64? revId,
$fixnum.Int64? baseRevId, $fixnum.Int64? baseRevId,
}) { }) {
final _result = create(); final _result = create();
if (id != null) { if (docId != null) {
_result.id = id; _result.docId = docId;
} }
if (text != null) { if (text != null) {
_result.text = text; _result.text = text;
@ -128,13 +128,13 @@ class DocumentInfo extends $pb.GeneratedMessage {
static DocumentInfo? _defaultInstance; static DocumentInfo? _defaultInstance;
@$pb.TagNumber(1) @$pb.TagNumber(1)
$core.String get id => $_getSZ(0); $core.String get docId => $_getSZ(0);
@$pb.TagNumber(1) @$pb.TagNumber(1)
set id($core.String v) { $_setString(0, v); } set docId($core.String v) { $_setString(0, v); }
@$pb.TagNumber(1) @$pb.TagNumber(1)
$core.bool hasId() => $_has(0); $core.bool hasDocId() => $_has(0);
@$pb.TagNumber(1) @$pb.TagNumber(1)
void clearId() => clearField(1); void clearDocId() => clearField(1);
@$pb.TagNumber(2) @$pb.TagNumber(2)
$core.String get text => $_getSZ(1); $core.String get text => $_getSZ(1);

View File

@ -23,7 +23,7 @@ final $typed_data.Uint8List createDocParamsDescriptor = $convert.base64Decode('C
const DocumentInfo$json = const { const DocumentInfo$json = const {
'1': 'DocumentInfo', '1': 'DocumentInfo',
'2': const [ '2': const [
const {'1': 'id', '3': 1, '4': 1, '5': 9, '10': 'id'}, const {'1': 'doc_id', '3': 1, '4': 1, '5': 9, '10': 'docId'},
const {'1': 'text', '3': 2, '4': 1, '5': 9, '10': 'text'}, const {'1': 'text', '3': 2, '4': 1, '5': 9, '10': 'text'},
const {'1': 'rev_id', '3': 3, '4': 1, '5': 3, '10': 'revId'}, const {'1': 'rev_id', '3': 3, '4': 1, '5': 3, '10': 'revId'},
const {'1': 'base_rev_id', '3': 4, '4': 1, '5': 3, '10': 'baseRevId'}, const {'1': 'base_rev_id', '3': 4, '4': 1, '5': 3, '10': 'baseRevId'},
@ -31,7 +31,7 @@ const DocumentInfo$json = const {
}; };
/// Descriptor for `DocumentInfo`. Decode as a `google.protobuf.DescriptorProto`. /// Descriptor for `DocumentInfo`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List documentInfoDescriptor = $convert.base64Decode('CgxEb2N1bWVudEluZm8SDgoCaWQYASABKAlSAmlkEhIKBHRleHQYAiABKAlSBHRleHQSFQoGcmV2X2lkGAMgASgDUgVyZXZJZBIeCgtiYXNlX3Jldl9pZBgEIAEoA1IJYmFzZVJldklk'); final $typed_data.Uint8List documentInfoDescriptor = $convert.base64Decode('CgxEb2N1bWVudEluZm8SFQoGZG9jX2lkGAEgASgJUgVkb2NJZBISCgR0ZXh0GAIgASgJUgR0ZXh0EhUKBnJldl9pZBgDIAEoA1IFcmV2SWQSHgoLYmFzZV9yZXZfaWQYBCABKANSCWJhc2VSZXZJZA==');
@$core.Deprecated('Use resetDocumentParamsDescriptor instead') @$core.Deprecated('Use resetDocumentParamsDescriptor instead')
const ResetDocumentParams$json = const { const ResetDocumentParams$json = const {
'1': 'ResetDocumentParams', '1': 'ResetDocumentParams',

View File

@ -9,6 +9,8 @@ import 'dart:core' as $core;
import 'package:protobuf/protobuf.dart' as $pb; import 'package:protobuf/protobuf.dart' as $pb;
import 'revision.pb.dart' as $0;
import 'ws.pbenum.dart'; import 'ws.pbenum.dart';
export 'ws.pbenum.dart'; export 'ws.pbenum.dart';
@ -17,7 +19,7 @@ class DocumentClientWSData extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'DocumentClientWSData', createEmptyInstance: create) static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'DocumentClientWSData', createEmptyInstance: create)
..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId') ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId')
..e<DocumentClientWSDataType>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: DocumentClientWSDataType.ClientPushRev, valueOf: DocumentClientWSDataType.valueOf, enumValues: DocumentClientWSDataType.values) ..e<DocumentClientWSDataType>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: DocumentClientWSDataType.ClientPushRev, valueOf: DocumentClientWSDataType.valueOf, enumValues: DocumentClientWSDataType.values)
..a<$core.List<$core.int>>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY) ..aOM<$0.RepeatedRevision>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'revisions', subBuilder: $0.RepeatedRevision.create)
..aOS(4, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id') ..aOS(4, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id')
..hasRequiredFields = false ..hasRequiredFields = false
; ;
@ -26,7 +28,7 @@ class DocumentClientWSData extends $pb.GeneratedMessage {
factory DocumentClientWSData({ factory DocumentClientWSData({
$core.String? docId, $core.String? docId,
DocumentClientWSDataType? ty, DocumentClientWSDataType? ty,
$core.List<$core.int>? data, $0.RepeatedRevision? revisions,
$core.String? id, $core.String? id,
}) { }) {
final _result = create(); final _result = create();
@ -36,8 +38,8 @@ class DocumentClientWSData extends $pb.GeneratedMessage {
if (ty != null) { if (ty != null) {
_result.ty = ty; _result.ty = ty;
} }
if (data != null) { if (revisions != null) {
_result.data = data; _result.revisions = revisions;
} }
if (id != null) { if (id != null) {
_result.id = id; _result.id = id;
@ -84,13 +86,15 @@ class DocumentClientWSData extends $pb.GeneratedMessage {
void clearTy() => clearField(2); void clearTy() => clearField(2);
@$pb.TagNumber(3) @$pb.TagNumber(3)
$core.List<$core.int> get data => $_getN(2); $0.RepeatedRevision get revisions => $_getN(2);
@$pb.TagNumber(3) @$pb.TagNumber(3)
set data($core.List<$core.int> v) { $_setBytes(2, v); } set revisions($0.RepeatedRevision v) { setField(3, v); }
@$pb.TagNumber(3) @$pb.TagNumber(3)
$core.bool hasData() => $_has(2); $core.bool hasRevisions() => $_has(2);
@$pb.TagNumber(3) @$pb.TagNumber(3)
void clearData() => clearField(3); void clearRevisions() => clearField(3);
@$pb.TagNumber(3)
$0.RepeatedRevision ensureRevisions() => $_ensure(2);
@$pb.TagNumber(4) @$pb.TagNumber(4)
$core.String get id => $_getSZ(3); $core.String get id => $_getSZ(3);
@ -107,7 +111,6 @@ class DocumentServerWSData extends $pb.GeneratedMessage {
..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId') ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId')
..e<DocumentServerWSDataType>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: DocumentServerWSDataType.ServerAck, valueOf: DocumentServerWSDataType.valueOf, enumValues: DocumentServerWSDataType.values) ..e<DocumentServerWSDataType>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: DocumentServerWSDataType.ServerAck, valueOf: DocumentServerWSDataType.valueOf, enumValues: DocumentServerWSDataType.values)
..a<$core.List<$core.int>>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY) ..a<$core.List<$core.int>>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY)
..aOS(4, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id')
..hasRequiredFields = false ..hasRequiredFields = false
; ;
@ -116,7 +119,6 @@ class DocumentServerWSData extends $pb.GeneratedMessage {
$core.String? docId, $core.String? docId,
DocumentServerWSDataType? ty, DocumentServerWSDataType? ty,
$core.List<$core.int>? data, $core.List<$core.int>? data,
$core.String? id,
}) { }) {
final _result = create(); final _result = create();
if (docId != null) { if (docId != null) {
@ -128,9 +130,6 @@ class DocumentServerWSData extends $pb.GeneratedMessage {
if (data != null) { if (data != null) {
_result.data = data; _result.data = data;
} }
if (id != null) {
_result.id = id;
}
return _result; return _result;
} }
factory DocumentServerWSData.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); factory DocumentServerWSData.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
@ -180,15 +179,6 @@ class DocumentServerWSData extends $pb.GeneratedMessage {
$core.bool hasData() => $_has(2); $core.bool hasData() => $_has(2);
@$pb.TagNumber(3) @$pb.TagNumber(3)
void clearData() => clearField(3); void clearData() => clearField(3);
@$pb.TagNumber(4)
$core.String get id => $_getSZ(3);
@$pb.TagNumber(4)
set id($core.String v) { $_setString(3, v); }
@$pb.TagNumber(4)
$core.bool hasId() => $_has(3);
@$pb.TagNumber(4)
void clearId() => clearField(4);
} }
class NewDocumentUser extends $pb.GeneratedMessage { class NewDocumentUser extends $pb.GeneratedMessage {

View File

@ -37,13 +37,13 @@ const DocumentClientWSData$json = const {
'2': const [ '2': const [
const {'1': 'doc_id', '3': 1, '4': 1, '5': 9, '10': 'docId'}, const {'1': 'doc_id', '3': 1, '4': 1, '5': 9, '10': 'docId'},
const {'1': 'ty', '3': 2, '4': 1, '5': 14, '6': '.DocumentClientWSDataType', '10': 'ty'}, const {'1': 'ty', '3': 2, '4': 1, '5': 14, '6': '.DocumentClientWSDataType', '10': 'ty'},
const {'1': 'data', '3': 3, '4': 1, '5': 12, '10': 'data'}, const {'1': 'revisions', '3': 3, '4': 1, '5': 11, '6': '.RepeatedRevision', '10': 'revisions'},
const {'1': 'id', '3': 4, '4': 1, '5': 9, '10': 'id'}, const {'1': 'id', '3': 4, '4': 1, '5': 9, '10': 'id'},
], ],
}; };
/// Descriptor for `DocumentClientWSData`. Decode as a `google.protobuf.DescriptorProto`. /// Descriptor for `DocumentClientWSData`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List documentClientWSDataDescriptor = $convert.base64Decode('ChREb2N1bWVudENsaWVudFdTRGF0YRIVCgZkb2NfaWQYASABKAlSBWRvY0lkEikKAnR5GAIgASgOMhkuRG9jdW1lbnRDbGllbnRXU0RhdGFUeXBlUgJ0eRISCgRkYXRhGAMgASgMUgRkYXRhEg4KAmlkGAQgASgJUgJpZA=='); final $typed_data.Uint8List documentClientWSDataDescriptor = $convert.base64Decode('ChREb2N1bWVudENsaWVudFdTRGF0YRIVCgZkb2NfaWQYASABKAlSBWRvY0lkEikKAnR5GAIgASgOMhkuRG9jdW1lbnRDbGllbnRXU0RhdGFUeXBlUgJ0eRIvCglyZXZpc2lvbnMYAyABKAsyES5SZXBlYXRlZFJldmlzaW9uUglyZXZpc2lvbnMSDgoCaWQYBCABKAlSAmlk');
@$core.Deprecated('Use documentServerWSDataDescriptor instead') @$core.Deprecated('Use documentServerWSDataDescriptor instead')
const DocumentServerWSData$json = const { const DocumentServerWSData$json = const {
'1': 'DocumentServerWSData', '1': 'DocumentServerWSData',
@ -51,12 +51,11 @@ const DocumentServerWSData$json = const {
const {'1': 'doc_id', '3': 1, '4': 1, '5': 9, '10': 'docId'}, const {'1': 'doc_id', '3': 1, '4': 1, '5': 9, '10': 'docId'},
const {'1': 'ty', '3': 2, '4': 1, '5': 14, '6': '.DocumentServerWSDataType', '10': 'ty'}, const {'1': 'ty', '3': 2, '4': 1, '5': 14, '6': '.DocumentServerWSDataType', '10': 'ty'},
const {'1': 'data', '3': 3, '4': 1, '5': 12, '10': 'data'}, const {'1': 'data', '3': 3, '4': 1, '5': 12, '10': 'data'},
const {'1': 'id', '3': 4, '4': 1, '5': 9, '10': 'id'},
], ],
}; };
/// Descriptor for `DocumentServerWSData`. Decode as a `google.protobuf.DescriptorProto`. /// Descriptor for `DocumentServerWSData`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List documentServerWSDataDescriptor = $convert.base64Decode('ChREb2N1bWVudFNlcnZlcldTRGF0YRIVCgZkb2NfaWQYASABKAlSBWRvY0lkEikKAnR5GAIgASgOMhkuRG9jdW1lbnRTZXJ2ZXJXU0RhdGFUeXBlUgJ0eRISCgRkYXRhGAMgASgMUgRkYXRhEg4KAmlkGAQgASgJUgJpZA=='); final $typed_data.Uint8List documentServerWSDataDescriptor = $convert.base64Decode('ChREb2N1bWVudFNlcnZlcldTRGF0YRIVCgZkb2NfaWQYASABKAlSBWRvY0lkEikKAnR5GAIgASgOMhkuRG9jdW1lbnRTZXJ2ZXJXU0RhdGFUeXBlUgJ0eRISCgRkYXRhGAMgASgMUgRkYXRh');
@$core.Deprecated('Use newDocumentUserDescriptor instead') @$core.Deprecated('Use newDocumentUserDescriptor instead')
const NewDocumentUser$json = const { const NewDocumentUser$json = const {
'1': 'NewDocumentUser', '1': 'NewDocumentUser',

View File

@ -115,7 +115,7 @@ impl RevisionLoader {
let delta_data = Bytes::from(doc.text.clone()); let delta_data = Bytes::from(doc.text.clone());
let doc_md5 = md5(&delta_data); let doc_md5 = md5(&delta_data);
let revision = Revision::new( let revision = Revision::new(
&doc.id, &doc.doc_id,
doc.base_rev_id, doc.base_rev_id,
doc.rev_id, doc.rev_id,
delta_data, delta_data,
@ -158,7 +158,7 @@ fn mk_doc_from_revisions(doc_id: &str, revisions: Vec<Revision>) -> FlowyResult<
correct_delta_if_need(&mut delta); correct_delta_if_need(&mut delta);
Result::<DocumentInfo, FlowyError>::Ok(DocumentInfo { Result::<DocumentInfo, FlowyError>::Ok(DocumentInfo {
id: doc_id.to_owned(), doc_id: doc_id.to_owned(),
text: delta.to_json(), text: delta.to_json(),
rev_id, rev_id,
base_rev_id, base_rev_id,

View File

@ -5,7 +5,7 @@ use crate::services::{
use async_stream::stream; use async_stream::stream;
use bytes::Bytes; use bytes::Bytes;
use flowy_collaboration::entities::{ use flowy_collaboration::entities::{
revision::RevisionRange, revision::{RevId, RevisionRange},
ws::{DocumentClientWSData, DocumentServerWSData, DocumentServerWSDataType, NewDocumentUser}, ws::{DocumentClientWSData, DocumentServerWSData, DocumentServerWSDataType, NewDocumentUser},
}; };
use flowy_error::{internal_error, FlowyError, FlowyResult}; use flowy_error::{internal_error, FlowyError, FlowyResult};
@ -172,12 +172,7 @@ impl DocumentWSStream {
} }
async fn handle_message(&self, msg: DocumentServerWSData) -> FlowyResult<()> { async fn handle_message(&self, msg: DocumentServerWSData) -> FlowyResult<()> {
let DocumentServerWSData { let DocumentServerWSData { doc_id: _, ty, data } = msg;
doc_id: _,
ty,
data,
id,
} = msg;
let bytes = spawn_blocking(move || Bytes::from(data)) let bytes = spawn_blocking(move || Bytes::from(data))
.await .await
.map_err(internal_error)?; .map_err(internal_error)?;
@ -186,14 +181,14 @@ impl DocumentWSStream {
match ty { match ty {
DocumentServerWSDataType::ServerPushRev => { DocumentServerWSDataType::ServerPushRev => {
let _ = self.consumer.receive_push_revision(bytes).await?; let _ = self.consumer.receive_push_revision(bytes).await?;
let _ = self.consumer.receive_ack(id, ty).await;
}, },
DocumentServerWSDataType::ServerPullRev => { DocumentServerWSDataType::ServerPullRev => {
let range = RevisionRange::try_from(bytes)?; let range = RevisionRange::try_from(bytes)?;
let _ = self.consumer.pull_revisions_in_range(range).await?; let _ = self.consumer.pull_revisions_in_range(range).await?;
}, },
DocumentServerWSDataType::ServerAck => { DocumentServerWSDataType::ServerAck => {
let _ = self.consumer.receive_ack(id, ty).await; let rev_id = RevId::try_from(bytes).unwrap().value;
let _ = self.consumer.receive_ack(rev_id.to_string(), ty).await;
}, },
DocumentServerWSDataType::UserConnect => { DocumentServerWSDataType::UserConnect => {
let new_user = NewDocumentUser::try_from(bytes)?; let new_user = NewDocumentUser::try_from(bytes)?;

View File

@ -1,10 +1,5 @@
use crate::services::doc::{ use crate::services::doc::{
web_socket::{ web_socket::{DocumentWSSinkDataProvider, DocumentWSSteamConsumer, HttpWebSocketManager},
local_ws_impl::LocalWebSocketManager,
DocumentWSSinkDataProvider,
DocumentWSSteamConsumer,
HttpWebSocketManager,
},
DocumentMD5, DocumentMD5,
DocumentWSReceiver, DocumentWSReceiver,
DocumentWebSocket, DocumentWebSocket,
@ -40,6 +35,33 @@ pub(crate) async fn make_document_ws_manager(
rev_manager: Arc<RevisionManager>, rev_manager: Arc<RevisionManager>,
ws: Arc<dyn DocumentWebSocket>, ws: Arc<dyn DocumentWebSocket>,
) -> Arc<dyn DocumentWebSocketManager> { ) -> Arc<dyn DocumentWebSocketManager> {
// if cfg!(feature = "http_server") {
// let shared_sink =
// Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone()));
// let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter {
// doc_id: doc_id.clone(),
// user_id: user_id.clone(),
// editor_edit_queue: editor_edit_queue.clone(),
// rev_manager: rev_manager.clone(),
// shared_sink: shared_sink.clone(),
// });
// let ws_stream_provider =
// DocumentWSSinkDataProviderAdapter(shared_sink.clone());
// let ws_manager = Arc::new(HttpWebSocketManager::new(
// &doc_id,
// ws.clone(),
// Arc::new(ws_stream_provider),
// ws_stream_consumer,
// ));
// notify_user_has_connected(&user_id, &doc_id, rev_manager.clone(),
// shared_sink).await; listen_document_ws_state(&user_id, &doc_id,
// ws_manager.scribe_state(), rev_manager.clone());
//
// Arc::new(ws_manager)
// } else {
// Arc::new(Arc::new(LocalWebSocketManager {}))
// }
let shared_sink = Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone())); let shared_sink = Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone()));
let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter {
doc_id: doc_id.clone(), doc_id: doc_id.clone(),
@ -124,7 +146,8 @@ impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter {
if let Some(server_composed_revision) = if let Some(server_composed_revision) =
handle_push_rev(&doc_id, &user_id, edit_cmd_tx, rev_manager, bytes).await? handle_push_rev(&doc_id, &user_id, edit_cmd_tx, rev_manager, bytes).await?
{ {
shared_sink.push_back(server_composed_revision.into()).await; let data = DocumentClientWSData::from_revisions(&doc_id, vec![server_composed_revision]);
shared_sink.push_back(data).await;
} }
Ok(()) Ok(())
}) })
@ -143,15 +166,11 @@ impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter {
fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> { fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> {
let rev_manager = self.rev_manager.clone(); let rev_manager = self.rev_manager.clone();
let shared_sink = self.shared_sink.clone(); let shared_sink = self.shared_sink.clone();
let doc_id = self.doc_id.clone();
FutureResult::new(async move { FutureResult::new(async move {
let data = rev_manager let revisions = rev_manager.get_revisions_in_range(range).await?;
.get_revisions_in_range(range) let data = DocumentClientWSData::from_revisions(&doc_id, revisions);
.await? shared_sink.push_back(data).await;
.into_iter()
.map(|revision| revision.into())
.collect::<Vec<DocumentClientWSData>>();
shared_sink.append(data).await;
Ok(()) Ok(())
}) })
} }
@ -260,18 +279,11 @@ impl SharedWSSinkDataProvider {
} }
} }
// TODO: return Option<&DocumentWSData> would be better #[allow(dead_code)]
pub(crate) async fn front(&self) -> Option<DocumentClientWSData> { self.shared.read().await.front().cloned() }
pub(crate) async fn push_front(&self, data: DocumentClientWSData) { self.shared.write().await.push_front(data); } pub(crate) async fn push_front(&self, data: DocumentClientWSData) { self.shared.write().await.push_front(data); }
async fn push_back(&self, data: DocumentClientWSData) { self.shared.write().await.push_back(data); } async fn push_back(&self, data: DocumentClientWSData) { self.shared.write().await.push_back(data); }
async fn append(&self, data: Vec<DocumentClientWSData>) {
let mut buf: VecDeque<_> = data.into_iter().collect();
self.shared.write().await.append(&mut buf);
}
async fn next(&self) -> FlowyResult<Option<DocumentClientWSData>> { async fn next(&self) -> FlowyResult<Option<DocumentClientWSData>> {
let source_ty = self.source_ty.read().await.clone(); let source_ty = self.source_ty.read().await.clone();
match source_ty { match source_ty {
@ -281,7 +293,7 @@ impl SharedWSSinkDataProvider {
Ok(None) Ok(None)
}, },
Some(data) => { Some(data) => {
tracing::debug!("[DocumentSinkDataProvider]: {}:{:?}", data.doc_id, data.ty); tracing::debug!("[SharedWSSinkDataProvider]: {}:{:?}", data.doc_id, data.ty);
Ok(Some(data.clone())) Ok(Some(data.clone()))
}, },
}, },
@ -293,8 +305,9 @@ impl SharedWSSinkDataProvider {
match self.rev_manager.next_sync_revision().await? { match self.rev_manager.next_sync_revision().await? {
Some(rev) => { Some(rev) => {
tracing::debug!("[DocumentSinkDataProvider]: {}:{:?}", rev.doc_id, rev.rev_id); tracing::debug!("[SharedWSSinkDataProvider]: {}:{:?}", rev.doc_id, rev.rev_id);
Ok(Some(rev.into())) let doc_id = rev.doc_id.clone();
Ok(Some(DocumentClientWSData::from_revisions(&doc_id, vec![rev])))
}, },
None => Ok(None), None => Ok(None),
} }
@ -310,10 +323,11 @@ impl SharedWSSinkDataProvider {
let should_pop = match self.shared.read().await.front() { let should_pop = match self.shared.read().await.front() {
None => false, None => false,
Some(val) => { Some(val) => {
if val.id == id { let expected_id = val.id();
if expected_id == id {
true true
} else { } else {
tracing::error!("The front element's {} is not equal to the {}", val.id, id); tracing::error!("The front element's {} is not equal to the {}", expected_id, id);
false false
} }
}, },

View File

@ -15,7 +15,7 @@ impl DocumentServerAPI for DocServerMock {
fn read_doc(&self, _token: &str, params: DocIdentifier) -> FutureResult<Option<DocumentInfo>, FlowyError> { fn read_doc(&self, _token: &str, params: DocIdentifier) -> FutureResult<Option<DocumentInfo>, FlowyError> {
let doc = DocumentInfo { let doc = DocumentInfo {
id: params.doc_id, doc_id: params.doc_id,
text: initial_delta_string(), text: initial_delta_string(),
rev_id: 0, rev_id: 0,
base_rev_id: 0, base_rev_id: 0,

View File

@ -1,11 +1,11 @@
use bytes::Bytes; use bytes::Bytes;
use dashmap::DashMap; use dashmap::DashMap;
use flowy_collaboration::{entities::prelude::*, errors::CollaborateError, sync::*}; use flowy_collaboration::{entities::prelude::*, errors::CollaborateError, sync::*};
use flowy_net::services::ws::*; // use flowy_net::services::ws::*;
use lib_infra::future::FutureResultSend; use lib_infra::future::FutureResultSend;
use lib_ws::{WSModule, WebSocketRawMessage}; use lib_ws::{WSModule, WebSocketRawMessage};
use std::{ use std::{
convert::{TryFrom, TryInto}, convert::TryInto,
fmt::{Debug, Formatter}, fmt::{Debug, Formatter},
sync::Arc, sync::Arc,
}; };
@ -24,21 +24,20 @@ impl std::default::Default for MockDocServer {
} }
impl MockDocServer { impl MockDocServer {
pub async fn handle_ws_data(&self, ws_data: DocumentClientWSData) -> Option<mpsc::Receiver<WebSocketRawMessage>> { pub async fn handle_client_data(
let bytes = Bytes::from(ws_data.data); &self,
match ws_data.ty { client_data: DocumentClientWSData,
) -> Option<mpsc::Receiver<WebSocketRawMessage>> {
match client_data.ty {
DocumentClientWSDataType::ClientPushRev => { DocumentClientWSDataType::ClientPushRev => {
let revisions = RepeatedRevision::try_from(bytes).unwrap().into_inner();
if revisions.is_empty() {
return None;
}
let first_revision = revisions.first().unwrap();
let (tx, rx) = mpsc::channel(1); let (tx, rx) = mpsc::channel(1);
let user = Arc::new(MockDocUser { let user = Arc::new(MockDocUser {
user_id: first_revision.user_id.clone(), user_id: "fake_user_id".to_owned(),
tx, tx,
}); });
self.manager.apply_revisions(user, revisions).await.unwrap(); let pb_client_data: flowy_collaboration::protobuf::DocumentClientWSData =
client_data.try_into().unwrap();
self.manager.apply_revisions(user, pb_client_data).await.unwrap();
Some(rx) Some(rx)
}, },
} }
@ -79,16 +78,16 @@ impl DocumentPersistence for MockDocServerPersistence {
}) })
} }
fn create_doc(&self, revision: Revision) -> FutureResultSend<DocumentInfo, CollaborateError> { fn create_doc(&self, doc_id: &str, revisions: Vec<Revision>) -> FutureResultSend<DocumentInfo, CollaborateError> {
FutureResultSend::new(async move { let doc_id = doc_id.to_owned();
let document_info: DocumentInfo = revision.try_into().unwrap(); FutureResultSend::new(async move { DocumentInfo::from_revisions(&doc_id, revisions) })
Ok(document_info)
})
} }
fn get_revisions(&self, _doc_id: &str, _rev_ids: Vec<i64>) -> FutureResultSend<Vec<Revision>, CollaborateError> { fn get_revisions(&self, _doc_id: &str, _rev_ids: Vec<i64>) -> FutureResultSend<Vec<Revision>, CollaborateError> {
unimplemented!() FutureResultSend::new(async move { Ok(vec![]) })
} }
fn get_doc_revisions(&self, _doc_id: &str) -> FutureResultSend<Vec<Revision>, CollaborateError> { unimplemented!() }
} }
#[derive(Debug)] #[derive(Debug)]

View File

@ -47,7 +47,7 @@ impl FlowyWebSocket for MockWebSocket {
} else { } else {
let ws_data = DocumentClientWSData::try_from(Bytes::from(message.data.clone())).unwrap(); let ws_data = DocumentClientWSData::try_from(Bytes::from(message.data.clone())).unwrap();
if let Some(mut rx) = server.handle_ws_data(ws_data).await { if let Some(mut rx) = server.handle_client_data(ws_data).await {
let new_ws_message = rx.recv().await.unwrap(); let new_ws_message = rx.recv().await.unwrap();
match receivers.get(&new_ws_message.module) { match receivers.get(&new_ws_message.module) {
None => tracing::error!("Can't find any handler for message: {:?}", new_ws_message), None => tracing::error!("Can't find any handler for message: {:?}", new_ws_message),

View File

@ -1,9 +1,9 @@
use crate::{ use crate::{
entities::revision::{RepeatedRevision, Revision}, entities::revision::{RepeatedRevision, Revision},
errors::CollaborateError, errors::{internal_error, CollaborateError},
}; };
use flowy_derive::ProtoBuf; use flowy_derive::ProtoBuf;
use lib_ot::{errors::OTError, rich_text::RichTextDelta}; use lib_ot::{core::OperationTransformable, errors::OTError, rich_text::RichTextDelta};
#[derive(ProtoBuf, Default, Debug, Clone)] #[derive(ProtoBuf, Default, Debug, Clone)]
pub struct CreateDocParams { pub struct CreateDocParams {
@ -17,7 +17,7 @@ pub struct CreateDocParams {
#[derive(ProtoBuf, Default, Debug, Clone, Eq, PartialEq)] #[derive(ProtoBuf, Default, Debug, Clone, Eq, PartialEq)]
pub struct DocumentInfo { pub struct DocumentInfo {
#[pb(index = 1)] #[pb(index = 1)]
pub id: String, pub doc_id: String,
#[pb(index = 2)] #[pb(index = 2)]
pub text: String, pub text: String,
@ -34,6 +34,27 @@ impl DocumentInfo {
let delta = RichTextDelta::from_bytes(&self.text)?; let delta = RichTextDelta::from_bytes(&self.text)?;
Ok(delta) Ok(delta)
} }
pub fn from_revisions(doc_id: &str, revisions: Vec<Revision>) -> Result<Self, CollaborateError> {
let mut document_delta = RichTextDelta::new();
let mut base_rev_id = 0;
let mut rev_id = 0;
for revision in revisions {
base_rev_id = revision.base_rev_id;
rev_id = revision.rev_id;
let delta = RichTextDelta::from_bytes(revision.delta_data).map_err(internal_error)?;
document_delta = document_delta.compose(&delta).map_err(internal_error)?;
}
let text = document_delta.to_json();
Ok(DocumentInfo {
doc_id: doc_id.to_string(),
text,
rev_id,
base_rev_id,
})
}
} }
impl std::convert::TryFrom<Revision> for DocumentInfo { impl std::convert::TryFrom<Revision> for DocumentInfo {
@ -49,7 +70,7 @@ impl std::convert::TryFrom<Revision> for DocumentInfo {
let doc_json = delta.to_json(); let doc_json = delta.to_json();
Ok(DocumentInfo { Ok(DocumentInfo {
id: revision.doc_id, doc_id: revision.doc_id,
text: doc_json, text: doc_json,
rev_id: revision.rev_id, rev_id: revision.rev_id,
base_rev_id: revision.base_rev_id, base_rev_id: revision.base_rev_id,

View File

@ -100,7 +100,7 @@ impl std::fmt::Debug for Revision {
#[derive(PartialEq, Debug, Default, ProtoBuf, Clone)] #[derive(PartialEq, Debug, Default, ProtoBuf, Clone)]
pub struct RepeatedRevision { pub struct RepeatedRevision {
#[pb(index = 1)] #[pb(index = 1)]
pub items: Vec<Revision>, items: Vec<Revision>,
} }
impl std::ops::Deref for RepeatedRevision { impl std::ops::Deref for RepeatedRevision {
@ -114,6 +114,16 @@ impl std::ops::DerefMut for RepeatedRevision {
} }
impl RepeatedRevision { impl RepeatedRevision {
pub fn new(items: Vec<Revision>) -> Self {
if cfg!(debug_assertions) {
let mut sorted_items = items.clone();
sorted_items.sort_by(|a, b| a.rev_id.cmp(&b.rev_id));
assert_eq!(sorted_items, items, "The items passed in should be sorted")
}
Self { items }
}
pub fn into_inner(self) -> Vec<Revision> { self.items } pub fn into_inner(self) -> Vec<Revision> { self.items }
} }

View File

@ -1,5 +1,5 @@
use crate::{ use crate::{
entities::revision::{RepeatedRevision, Revision, RevisionRange}, entities::revision::{RepeatedRevision, RevId, Revision, RevisionRange},
errors::CollaborateError, errors::CollaborateError,
}; };
use bytes::Bytes; use bytes::Bytes;
@ -33,24 +33,28 @@ pub struct DocumentClientWSData {
pub ty: DocumentClientWSDataType, pub ty: DocumentClientWSDataType,
#[pb(index = 3)] #[pb(index = 3)]
pub data: Vec<u8>, pub revisions: RepeatedRevision,
#[pb(index = 4)] #[pb(index = 4)]
pub id: String, id: String,
} }
impl std::convert::From<Revision> for DocumentClientWSData { impl DocumentClientWSData {
fn from(revision: Revision) -> Self { pub fn from_revisions(doc_id: &str, revisions: Vec<Revision>) -> Self {
let doc_id = revision.doc_id.clone(); let rev_id = match revisions.first() {
let rev_id = revision.rev_id; None => 0,
let bytes: Bytes = revision.try_into().unwrap(); Some(revision) => revision.rev_id,
};
Self { Self {
doc_id, doc_id: doc_id.to_owned(),
ty: DocumentClientWSDataType::ClientPushRev, ty: DocumentClientWSDataType::ClientPushRev,
data: bytes.to_vec(), revisions: RepeatedRevision::new(revisions),
id: rev_id.to_string(), id: rev_id.to_string(),
} }
} }
pub fn id(&self) -> String { self.id.clone() }
} }
#[derive(Debug, Clone, ProtoBuf_Enum, Eq, PartialEq, Hash)] #[derive(Debug, Clone, ProtoBuf_Enum, Eq, PartialEq, Hash)]
@ -75,57 +79,38 @@ pub struct DocumentServerWSData {
#[pb(index = 3)] #[pb(index = 3)]
pub data: Vec<u8>, pub data: Vec<u8>,
#[pb(index = 4)]
pub id: String,
} }
pub struct DocumentServerWSDataBuilder(); pub struct DocumentServerWSDataBuilder();
impl DocumentServerWSDataBuilder { impl DocumentServerWSDataBuilder {
// DocumentWSDataType::PushRev -> Revision pub fn build_push_message(doc_id: &str, revisions: Vec<Revision>) -> DocumentServerWSData {
pub fn build_push_message(doc_id: &str, revisions: Vec<Revision>, id: &str) -> DocumentServerWSData { let repeated_revision = RepeatedRevision::new(revisions);
let repeated_revision = RepeatedRevision { items: revisions };
let bytes: Bytes = repeated_revision.try_into().unwrap(); let bytes: Bytes = repeated_revision.try_into().unwrap();
DocumentServerWSData { DocumentServerWSData {
doc_id: doc_id.to_string(), doc_id: doc_id.to_string(),
ty: DocumentServerWSDataType::ServerPushRev, ty: DocumentServerWSDataType::ServerPushRev,
data: bytes.to_vec(), data: bytes.to_vec(),
id: id.to_string(),
} }
} }
// DocumentWSDataType::PullRev -> RevisionRange pub fn build_pull_message(doc_id: &str, range: RevisionRange) -> DocumentServerWSData {
pub fn build_pull_message(doc_id: &str, range: RevisionRange, rev_id: i64) -> DocumentServerWSData {
let bytes: Bytes = range.try_into().unwrap(); let bytes: Bytes = range.try_into().unwrap();
DocumentServerWSData { DocumentServerWSData {
doc_id: doc_id.to_string(), doc_id: doc_id.to_string(),
ty: DocumentServerWSDataType::ServerPullRev, ty: DocumentServerWSDataType::ServerPullRev,
data: bytes.to_vec(), data: bytes.to_vec(),
id: rev_id.to_string(),
} }
} }
// DocumentWSDataType::Ack -> RevId pub fn build_ack_message(doc_id: &str, rev_id: i64) -> DocumentServerWSData {
pub fn build_ack_message(doc_id: &str, id: &str) -> DocumentServerWSData { let rev_id: RevId = rev_id.into();
let bytes: Bytes = rev_id.try_into().unwrap();
DocumentServerWSData { DocumentServerWSData {
doc_id: doc_id.to_string(), doc_id: doc_id.to_string(),
ty: DocumentServerWSDataType::ServerAck, ty: DocumentServerWSDataType::ServerAck,
data: vec![], data: bytes.to_vec(),
id: id.to_string(),
} }
} }
// DocumentWSDataType::UserConnect -> DocumentConnected
// pub fn build_new_document_user_message(doc_id: &str, new_document_user:
// NewDocumentUser) -> DocumentServerWSData { let id =
// new_document_user.user_id.clone(); let bytes: Bytes =
// new_document_user.try_into().unwrap(); DocumentServerWSData {
// doc_id: doc_id.to_string(),
// ty: DocumentServerWSDataType::UserConnect,
// data: bytes.to_vec(),
// id,
// }
// }
} }
#[derive(ProtoBuf, Default, Debug, Clone)] #[derive(ProtoBuf, Default, Debug, Clone)]

View File

@ -242,7 +242,7 @@ impl ::protobuf::reflect::ProtobufValue for CreateDocParams {
#[derive(PartialEq,Clone,Default)] #[derive(PartialEq,Clone,Default)]
pub struct DocumentInfo { pub struct DocumentInfo {
// message fields // message fields
pub id: ::std::string::String, pub doc_id: ::std::string::String,
pub text: ::std::string::String, pub text: ::std::string::String,
pub rev_id: i64, pub rev_id: i64,
pub base_rev_id: i64, pub base_rev_id: i64,
@ -262,30 +262,30 @@ impl DocumentInfo {
::std::default::Default::default() ::std::default::Default::default()
} }
// string id = 1; // string doc_id = 1;
pub fn get_id(&self) -> &str { pub fn get_doc_id(&self) -> &str {
&self.id &self.doc_id
} }
pub fn clear_id(&mut self) { pub fn clear_doc_id(&mut self) {
self.id.clear(); self.doc_id.clear();
} }
// Param is passed by value, moved // Param is passed by value, moved
pub fn set_id(&mut self, v: ::std::string::String) { pub fn set_doc_id(&mut self, v: ::std::string::String) {
self.id = v; self.doc_id = v;
} }
// Mutable pointer to the field. // Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first. // If field is not initialized, it is initialized with default value first.
pub fn mut_id(&mut self) -> &mut ::std::string::String { pub fn mut_doc_id(&mut self) -> &mut ::std::string::String {
&mut self.id &mut self.doc_id
} }
// Take field // Take field
pub fn take_id(&mut self) -> ::std::string::String { pub fn take_doc_id(&mut self) -> ::std::string::String {
::std::mem::replace(&mut self.id, ::std::string::String::new()) ::std::mem::replace(&mut self.doc_id, ::std::string::String::new())
} }
// string text = 2; // string text = 2;
@ -355,7 +355,7 @@ impl ::protobuf::Message for DocumentInfo {
let (field_number, wire_type) = is.read_tag_unpack()?; let (field_number, wire_type) = is.read_tag_unpack()?;
match field_number { match field_number {
1 => { 1 => {
::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.id)?; ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.doc_id)?;
}, },
2 => { 2 => {
::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.text)?; ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.text)?;
@ -386,8 +386,8 @@ impl ::protobuf::Message for DocumentInfo {
#[allow(unused_variables)] #[allow(unused_variables)]
fn compute_size(&self) -> u32 { fn compute_size(&self) -> u32 {
let mut my_size = 0; let mut my_size = 0;
if !self.id.is_empty() { if !self.doc_id.is_empty() {
my_size += ::protobuf::rt::string_size(1, &self.id); my_size += ::protobuf::rt::string_size(1, &self.doc_id);
} }
if !self.text.is_empty() { if !self.text.is_empty() {
my_size += ::protobuf::rt::string_size(2, &self.text); my_size += ::protobuf::rt::string_size(2, &self.text);
@ -404,8 +404,8 @@ impl ::protobuf::Message for DocumentInfo {
} }
fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
if !self.id.is_empty() { if !self.doc_id.is_empty() {
os.write_string(1, &self.id)?; os.write_string(1, &self.doc_id)?;
} }
if !self.text.is_empty() { if !self.text.is_empty() {
os.write_string(2, &self.text)?; os.write_string(2, &self.text)?;
@ -455,9 +455,9 @@ impl ::protobuf::Message for DocumentInfo {
descriptor.get(|| { descriptor.get(|| {
let mut fields = ::std::vec::Vec::new(); let mut fields = ::std::vec::Vec::new();
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
"id", "doc_id",
|m: &DocumentInfo| { &m.id }, |m: &DocumentInfo| { &m.doc_id },
|m: &mut DocumentInfo| { &mut m.id }, |m: &mut DocumentInfo| { &mut m.doc_id },
)); ));
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
"text", "text",
@ -490,7 +490,7 @@ impl ::protobuf::Message for DocumentInfo {
impl ::protobuf::Clear for DocumentInfo { impl ::protobuf::Clear for DocumentInfo {
fn clear(&mut self) { fn clear(&mut self) {
self.id.clear(); self.doc_id.clear();
self.text.clear(); self.text.clear();
self.rev_id = 0; self.rev_id = 0;
self.base_rev_id = 0; self.base_rev_id = 0;
@ -1325,64 +1325,64 @@ impl ::protobuf::reflect::ProtobufValue for DocIdentifier {
static file_descriptor_proto_data: &'static [u8] = b"\ static file_descriptor_proto_data: &'static [u8] = b"\
\n\tdoc.proto\x1a\x0erevision.proto\"R\n\x0fCreateDocParams\x12\x0e\n\ \n\tdoc.proto\x1a\x0erevision.proto\"R\n\x0fCreateDocParams\x12\x0e\n\
\x02id\x18\x01\x20\x01(\tR\x02id\x12/\n\trevisions\x18\x02\x20\x01(\x0b2\ \x02id\x18\x01\x20\x01(\tR\x02id\x12/\n\trevisions\x18\x02\x20\x01(\x0b2\
\x11.RepeatedRevisionR\trevisions\"i\n\x0cDocumentInfo\x12\x0e\n\x02id\ \x11.RepeatedRevisionR\trevisions\"p\n\x0cDocumentInfo\x12\x15\n\x06doc_\
\x18\x01\x20\x01(\tR\x02id\x12\x12\n\x04text\x18\x02\x20\x01(\tR\x04text\ id\x18\x01\x20\x01(\tR\x05docId\x12\x12\n\x04text\x18\x02\x20\x01(\tR\
\x12\x15\n\x06rev_id\x18\x03\x20\x01(\x03R\x05revId\x12\x1e\n\x0bbase_re\ \x04text\x12\x15\n\x06rev_id\x18\x03\x20\x01(\x03R\x05revId\x12\x1e\n\
v_id\x18\x04\x20\x01(\x03R\tbaseRevId\"]\n\x13ResetDocumentParams\x12\ \x0bbase_rev_id\x18\x04\x20\x01(\x03R\tbaseRevId\"]\n\x13ResetDocumentPa\
\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12/\n\trevisions\x18\x02\ rams\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12/\n\trevisions\
\x20\x01(\x0b2\x11.RepeatedRevisionR\trevisions\":\n\rDocumentDelta\x12\ \x18\x02\x20\x01(\x0b2\x11.RepeatedRevisionR\trevisions\":\n\rDocumentDe\
\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\x12\n\x04text\x18\x02\ lta\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\x12\n\x04text\
\x20\x01(\tR\x04text\"S\n\nNewDocUser\x12\x17\n\x07user_id\x18\x01\x20\ \x18\x02\x20\x01(\tR\x04text\"S\n\nNewDocUser\x12\x17\n\x07user_id\x18\
\x01(\tR\x06userId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\x03R\x05revId\ \x01\x20\x01(\tR\x06userId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\x03R\x05\
\x12\x15\n\x06doc_id\x18\x03\x20\x01(\tR\x05docId\"&\n\rDocIdentifier\ revId\x12\x15\n\x06doc_id\x18\x03\x20\x01(\tR\x05docId\"&\n\rDocIdentifi\
\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docIdJ\xaf\x07\n\x06\x12\x04\ er\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docIdJ\xaf\x07\n\x06\x12\
\0\0\x1c\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\t\n\x02\x03\0\x12\x03\x01\ \x04\0\0\x1c\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\t\n\x02\x03\0\x12\x03\
\0\x18\n\n\n\x02\x04\0\x12\x04\x03\0\x06\x01\n\n\n\x03\x04\0\x01\x12\x03\ \x01\0\x18\n\n\n\x02\x04\0\x12\x04\x03\0\x06\x01\n\n\n\x03\x04\0\x01\x12\
\x03\x08\x17\n\x0b\n\x04\x04\0\x02\0\x12\x03\x04\x04\x12\n\x0c\n\x05\x04\ \x03\x03\x08\x17\n\x0b\n\x04\x04\0\x02\0\x12\x03\x04\x04\x12\n\x0c\n\x05\
\0\x02\0\x05\x12\x03\x04\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x04\ \x04\0\x02\0\x05\x12\x03\x04\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\
\x0b\r\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x04\x10\x11\n\x0b\n\x04\x04\0\ \x04\x0b\r\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x04\x10\x11\n\x0b\n\x04\
\x02\x01\x12\x03\x05\x04#\n\x0c\n\x05\x04\0\x02\x01\x06\x12\x03\x05\x04\ \x04\0\x02\x01\x12\x03\x05\x04#\n\x0c\n\x05\x04\0\x02\x01\x06\x12\x03\
\x14\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x05\x15\x1e\n\x0c\n\x05\x04\0\ \x05\x04\x14\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x05\x15\x1e\n\x0c\n\
\x02\x01\x03\x12\x03\x05!\"\n\n\n\x02\x04\x01\x12\x04\x07\0\x0c\x01\n\n\ \x05\x04\0\x02\x01\x03\x12\x03\x05!\"\n\n\n\x02\x04\x01\x12\x04\x07\0\
\n\x03\x04\x01\x01\x12\x03\x07\x08\x14\n\x0b\n\x04\x04\x01\x02\0\x12\x03\ \x0c\x01\n\n\n\x03\x04\x01\x01\x12\x03\x07\x08\x14\n\x0b\n\x04\x04\x01\
\x08\x04\x12\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\x08\x04\n\n\x0c\n\x05\ \x02\0\x12\x03\x08\x04\x16\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\x08\x04\
\x04\x01\x02\0\x01\x12\x03\x08\x0b\r\n\x0c\n\x05\x04\x01\x02\0\x03\x12\ \n\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x08\x0b\x11\n\x0c\n\x05\x04\x01\
\x03\x08\x10\x11\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\t\x04\x14\n\x0c\n\ \x02\0\x03\x12\x03\x08\x14\x15\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\t\x04\
\x05\x04\x01\x02\x01\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\x01\x02\x01\x01\ \x14\n\x0c\n\x05\x04\x01\x02\x01\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\x01\
\x12\x03\t\x0b\x0f\n\x0c\n\x05\x04\x01\x02\x01\x03\x12\x03\t\x12\x13\n\ \x02\x01\x01\x12\x03\t\x0b\x0f\n\x0c\n\x05\x04\x01\x02\x01\x03\x12\x03\t\
\x0b\n\x04\x04\x01\x02\x02\x12\x03\n\x04\x15\n\x0c\n\x05\x04\x01\x02\x02\ \x12\x13\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\n\x04\x15\n\x0c\n\x05\x04\
\x05\x12\x03\n\x04\t\n\x0c\n\x05\x04\x01\x02\x02\x01\x12\x03\n\n\x10\n\ \x01\x02\x02\x05\x12\x03\n\x04\t\n\x0c\n\x05\x04\x01\x02\x02\x01\x12\x03\
\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\n\x13\x14\n\x0b\n\x04\x04\x01\x02\ \n\n\x10\n\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\n\x13\x14\n\x0b\n\x04\
\x03\x12\x03\x0b\x04\x1a\n\x0c\n\x05\x04\x01\x02\x03\x05\x12\x03\x0b\x04\ \x04\x01\x02\x03\x12\x03\x0b\x04\x1a\n\x0c\n\x05\x04\x01\x02\x03\x05\x12\
\t\n\x0c\n\x05\x04\x01\x02\x03\x01\x12\x03\x0b\n\x15\n\x0c\n\x05\x04\x01\ \x03\x0b\x04\t\n\x0c\n\x05\x04\x01\x02\x03\x01\x12\x03\x0b\n\x15\n\x0c\n\
\x02\x03\x03\x12\x03\x0b\x18\x19\n\n\n\x02\x04\x02\x12\x04\r\0\x10\x01\n\ \x05\x04\x01\x02\x03\x03\x12\x03\x0b\x18\x19\n\n\n\x02\x04\x02\x12\x04\r\
\n\n\x03\x04\x02\x01\x12\x03\r\x08\x1b\n\x0b\n\x04\x04\x02\x02\0\x12\x03\ \0\x10\x01\n\n\n\x03\x04\x02\x01\x12\x03\r\x08\x1b\n\x0b\n\x04\x04\x02\
\x0e\x04\x16\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0e\x04\n\n\x0c\n\x05\ \x02\0\x12\x03\x0e\x04\x16\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0e\x04\
\x04\x02\x02\0\x01\x12\x03\x0e\x0b\x11\n\x0c\n\x05\x04\x02\x02\0\x03\x12\ \n\n\x0c\n\x05\x04\x02\x02\0\x01\x12\x03\x0e\x0b\x11\n\x0c\n\x05\x04\x02\
\x03\x0e\x14\x15\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\x0f\x04#\n\x0c\n\ \x02\0\x03\x12\x03\x0e\x14\x15\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\x0f\
\x05\x04\x02\x02\x01\x06\x12\x03\x0f\x04\x14\n\x0c\n\x05\x04\x02\x02\x01\ \x04#\n\x0c\n\x05\x04\x02\x02\x01\x06\x12\x03\x0f\x04\x14\n\x0c\n\x05\
\x01\x12\x03\x0f\x15\x1e\n\x0c\n\x05\x04\x02\x02\x01\x03\x12\x03\x0f!\"\ \x04\x02\x02\x01\x01\x12\x03\x0f\x15\x1e\n\x0c\n\x05\x04\x02\x02\x01\x03\
\n\n\n\x02\x04\x03\x12\x04\x11\0\x14\x01\n\n\n\x03\x04\x03\x01\x12\x03\ \x12\x03\x0f!\"\n\n\n\x02\x04\x03\x12\x04\x11\0\x14\x01\n\n\n\x03\x04\
\x11\x08\x15\n\x0b\n\x04\x04\x03\x02\0\x12\x03\x12\x04\x16\n\x0c\n\x05\ \x03\x01\x12\x03\x11\x08\x15\n\x0b\n\x04\x04\x03\x02\0\x12\x03\x12\x04\
\x04\x03\x02\0\x05\x12\x03\x12\x04\n\n\x0c\n\x05\x04\x03\x02\0\x01\x12\ \x16\n\x0c\n\x05\x04\x03\x02\0\x05\x12\x03\x12\x04\n\n\x0c\n\x05\x04\x03\
\x03\x12\x0b\x11\n\x0c\n\x05\x04\x03\x02\0\x03\x12\x03\x12\x14\x15\n\x0b\ \x02\0\x01\x12\x03\x12\x0b\x11\n\x0c\n\x05\x04\x03\x02\0\x03\x12\x03\x12\
\n\x04\x04\x03\x02\x01\x12\x03\x13\x04\x14\n\x0c\n\x05\x04\x03\x02\x01\ \x14\x15\n\x0b\n\x04\x04\x03\x02\x01\x12\x03\x13\x04\x14\n\x0c\n\x05\x04\
\x05\x12\x03\x13\x04\n\n\x0c\n\x05\x04\x03\x02\x01\x01\x12\x03\x13\x0b\ \x03\x02\x01\x05\x12\x03\x13\x04\n\n\x0c\n\x05\x04\x03\x02\x01\x01\x12\
\x0f\n\x0c\n\x05\x04\x03\x02\x01\x03\x12\x03\x13\x12\x13\n\n\n\x02\x04\ \x03\x13\x0b\x0f\n\x0c\n\x05\x04\x03\x02\x01\x03\x12\x03\x13\x12\x13\n\n\
\x04\x12\x04\x15\0\x19\x01\n\n\n\x03\x04\x04\x01\x12\x03\x15\x08\x12\n\ \n\x02\x04\x04\x12\x04\x15\0\x19\x01\n\n\n\x03\x04\x04\x01\x12\x03\x15\
\x0b\n\x04\x04\x04\x02\0\x12\x03\x16\x04\x17\n\x0c\n\x05\x04\x04\x02\0\ \x08\x12\n\x0b\n\x04\x04\x04\x02\0\x12\x03\x16\x04\x17\n\x0c\n\x05\x04\
\x05\x12\x03\x16\x04\n\n\x0c\n\x05\x04\x04\x02\0\x01\x12\x03\x16\x0b\x12\ \x04\x02\0\x05\x12\x03\x16\x04\n\n\x0c\n\x05\x04\x04\x02\0\x01\x12\x03\
\n\x0c\n\x05\x04\x04\x02\0\x03\x12\x03\x16\x15\x16\n\x0b\n\x04\x04\x04\ \x16\x0b\x12\n\x0c\n\x05\x04\x04\x02\0\x03\x12\x03\x16\x15\x16\n\x0b\n\
\x02\x01\x12\x03\x17\x04\x15\n\x0c\n\x05\x04\x04\x02\x01\x05\x12\x03\x17\ \x04\x04\x04\x02\x01\x12\x03\x17\x04\x15\n\x0c\n\x05\x04\x04\x02\x01\x05\
\x04\t\n\x0c\n\x05\x04\x04\x02\x01\x01\x12\x03\x17\n\x10\n\x0c\n\x05\x04\ \x12\x03\x17\x04\t\n\x0c\n\x05\x04\x04\x02\x01\x01\x12\x03\x17\n\x10\n\
\x04\x02\x01\x03\x12\x03\x17\x13\x14\n\x0b\n\x04\x04\x04\x02\x02\x12\x03\ \x0c\n\x05\x04\x04\x02\x01\x03\x12\x03\x17\x13\x14\n\x0b\n\x04\x04\x04\
\x18\x04\x16\n\x0c\n\x05\x04\x04\x02\x02\x05\x12\x03\x18\x04\n\n\x0c\n\ \x02\x02\x12\x03\x18\x04\x16\n\x0c\n\x05\x04\x04\x02\x02\x05\x12\x03\x18\
\x05\x04\x04\x02\x02\x01\x12\x03\x18\x0b\x11\n\x0c\n\x05\x04\x04\x02\x02\ \x04\n\n\x0c\n\x05\x04\x04\x02\x02\x01\x12\x03\x18\x0b\x11\n\x0c\n\x05\
\x03\x12\x03\x18\x14\x15\n\n\n\x02\x04\x05\x12\x04\x1a\0\x1c\x01\n\n\n\ \x04\x04\x02\x02\x03\x12\x03\x18\x14\x15\n\n\n\x02\x04\x05\x12\x04\x1a\0\
\x03\x04\x05\x01\x12\x03\x1a\x08\x15\n\x0b\n\x04\x04\x05\x02\0\x12\x03\ \x1c\x01\n\n\n\x03\x04\x05\x01\x12\x03\x1a\x08\x15\n\x0b\n\x04\x04\x05\
\x1b\x04\x16\n\x0c\n\x05\x04\x05\x02\0\x05\x12\x03\x1b\x04\n\n\x0c\n\x05\ \x02\0\x12\x03\x1b\x04\x16\n\x0c\n\x05\x04\x05\x02\0\x05\x12\x03\x1b\x04\
\x04\x05\x02\0\x01\x12\x03\x1b\x0b\x11\n\x0c\n\x05\x04\x05\x02\0\x03\x12\ \n\n\x0c\n\x05\x04\x05\x02\0\x01\x12\x03\x1b\x0b\x11\n\x0c\n\x05\x04\x05\
\x03\x1b\x14\x15b\x06proto3\ \x02\0\x03\x12\x03\x1b\x14\x15b\x06proto3\
"; ";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

View File

@ -28,7 +28,7 @@ pub struct DocumentClientWSData {
// message fields // message fields
pub doc_id: ::std::string::String, pub doc_id: ::std::string::String,
pub ty: DocumentClientWSDataType, pub ty: DocumentClientWSDataType,
pub data: ::std::vec::Vec<u8>, pub revisions: ::protobuf::SingularPtrField<super::revision::RepeatedRevision>,
pub id: ::std::string::String, pub id: ::std::string::String,
// special fields // special fields
pub unknown_fields: ::protobuf::UnknownFields, pub unknown_fields: ::protobuf::UnknownFields,
@ -87,30 +87,37 @@ impl DocumentClientWSData {
self.ty = v; self.ty = v;
} }
// bytes data = 3; // .RepeatedRevision revisions = 3;
pub fn get_data(&self) -> &[u8] { pub fn get_revisions(&self) -> &super::revision::RepeatedRevision {
&self.data self.revisions.as_ref().unwrap_or_else(|| <super::revision::RepeatedRevision as ::protobuf::Message>::default_instance())
} }
pub fn clear_data(&mut self) { pub fn clear_revisions(&mut self) {
self.data.clear(); self.revisions.clear();
}
pub fn has_revisions(&self) -> bool {
self.revisions.is_some()
} }
// Param is passed by value, moved // Param is passed by value, moved
pub fn set_data(&mut self, v: ::std::vec::Vec<u8>) { pub fn set_revisions(&mut self, v: super::revision::RepeatedRevision) {
self.data = v; self.revisions = ::protobuf::SingularPtrField::some(v);
} }
// Mutable pointer to the field. // Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first. // If field is not initialized, it is initialized with default value first.
pub fn mut_data(&mut self) -> &mut ::std::vec::Vec<u8> { pub fn mut_revisions(&mut self) -> &mut super::revision::RepeatedRevision {
&mut self.data if self.revisions.is_none() {
self.revisions.set_default();
}
self.revisions.as_mut().unwrap()
} }
// Take field // Take field
pub fn take_data(&mut self) -> ::std::vec::Vec<u8> { pub fn take_revisions(&mut self) -> super::revision::RepeatedRevision {
::std::mem::replace(&mut self.data, ::std::vec::Vec::new()) self.revisions.take().unwrap_or_else(|| super::revision::RepeatedRevision::new())
} }
// string id = 4; // string id = 4;
@ -142,6 +149,11 @@ impl DocumentClientWSData {
impl ::protobuf::Message for DocumentClientWSData { impl ::protobuf::Message for DocumentClientWSData {
fn is_initialized(&self) -> bool { fn is_initialized(&self) -> bool {
for v in &self.revisions {
if !v.is_initialized() {
return false;
}
};
true true
} }
@ -156,7 +168,7 @@ impl ::protobuf::Message for DocumentClientWSData {
::protobuf::rt::read_proto3_enum_with_unknown_fields_into(wire_type, is, &mut self.ty, 2, &mut self.unknown_fields)? ::protobuf::rt::read_proto3_enum_with_unknown_fields_into(wire_type, is, &mut self.ty, 2, &mut self.unknown_fields)?
}, },
3 => { 3 => {
::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.data)?; ::protobuf::rt::read_singular_message_into(wire_type, is, &mut self.revisions)?;
}, },
4 => { 4 => {
::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.id)?; ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.id)?;
@ -179,8 +191,9 @@ impl ::protobuf::Message for DocumentClientWSData {
if self.ty != DocumentClientWSDataType::ClientPushRev { if self.ty != DocumentClientWSDataType::ClientPushRev {
my_size += ::protobuf::rt::enum_size(2, self.ty); my_size += ::protobuf::rt::enum_size(2, self.ty);
} }
if !self.data.is_empty() { if let Some(ref v) = self.revisions.as_ref() {
my_size += ::protobuf::rt::bytes_size(3, &self.data); let len = v.compute_size();
my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len;
} }
if !self.id.is_empty() { if !self.id.is_empty() {
my_size += ::protobuf::rt::string_size(4, &self.id); my_size += ::protobuf::rt::string_size(4, &self.id);
@ -197,8 +210,10 @@ impl ::protobuf::Message for DocumentClientWSData {
if self.ty != DocumentClientWSDataType::ClientPushRev { if self.ty != DocumentClientWSDataType::ClientPushRev {
os.write_enum(2, ::protobuf::ProtobufEnum::value(&self.ty))?; os.write_enum(2, ::protobuf::ProtobufEnum::value(&self.ty))?;
} }
if !self.data.is_empty() { if let Some(ref v) = self.revisions.as_ref() {
os.write_bytes(3, &self.data)?; os.write_tag(3, ::protobuf::wire_format::WireTypeLengthDelimited)?;
os.write_raw_varint32(v.get_cached_size())?;
v.write_to_with_cached_sizes(os)?;
} }
if !self.id.is_empty() { if !self.id.is_empty() {
os.write_string(4, &self.id)?; os.write_string(4, &self.id)?;
@ -251,10 +266,10 @@ impl ::protobuf::Message for DocumentClientWSData {
|m: &DocumentClientWSData| { &m.ty }, |m: &DocumentClientWSData| { &m.ty },
|m: &mut DocumentClientWSData| { &mut m.ty }, |m: &mut DocumentClientWSData| { &mut m.ty },
)); ));
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( fields.push(::protobuf::reflect::accessor::make_singular_ptr_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage<super::revision::RepeatedRevision>>(
"data", "revisions",
|m: &DocumentClientWSData| { &m.data }, |m: &DocumentClientWSData| { &m.revisions },
|m: &mut DocumentClientWSData| { &mut m.data }, |m: &mut DocumentClientWSData| { &mut m.revisions },
)); ));
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
"id", "id",
@ -279,7 +294,7 @@ impl ::protobuf::Clear for DocumentClientWSData {
fn clear(&mut self) { fn clear(&mut self) {
self.doc_id.clear(); self.doc_id.clear();
self.ty = DocumentClientWSDataType::ClientPushRev; self.ty = DocumentClientWSDataType::ClientPushRev;
self.data.clear(); self.revisions.clear();
self.id.clear(); self.id.clear();
self.unknown_fields.clear(); self.unknown_fields.clear();
} }
@ -303,7 +318,6 @@ pub struct DocumentServerWSData {
pub doc_id: ::std::string::String, pub doc_id: ::std::string::String,
pub ty: DocumentServerWSDataType, pub ty: DocumentServerWSDataType,
pub data: ::std::vec::Vec<u8>, pub data: ::std::vec::Vec<u8>,
pub id: ::std::string::String,
// special fields // special fields
pub unknown_fields: ::protobuf::UnknownFields, pub unknown_fields: ::protobuf::UnknownFields,
pub cached_size: ::protobuf::CachedSize, pub cached_size: ::protobuf::CachedSize,
@ -386,32 +400,6 @@ impl DocumentServerWSData {
pub fn take_data(&mut self) -> ::std::vec::Vec<u8> { pub fn take_data(&mut self) -> ::std::vec::Vec<u8> {
::std::mem::replace(&mut self.data, ::std::vec::Vec::new()) ::std::mem::replace(&mut self.data, ::std::vec::Vec::new())
} }
// string id = 4;
pub fn get_id(&self) -> &str {
&self.id
}
pub fn clear_id(&mut self) {
self.id.clear();
}
// Param is passed by value, moved
pub fn set_id(&mut self, v: ::std::string::String) {
self.id = v;
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_id(&mut self) -> &mut ::std::string::String {
&mut self.id
}
// Take field
pub fn take_id(&mut self) -> ::std::string::String {
::std::mem::replace(&mut self.id, ::std::string::String::new())
}
} }
impl ::protobuf::Message for DocumentServerWSData { impl ::protobuf::Message for DocumentServerWSData {
@ -432,9 +420,6 @@ impl ::protobuf::Message for DocumentServerWSData {
3 => { 3 => {
::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.data)?; ::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.data)?;
}, },
4 => {
::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.id)?;
},
_ => { _ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
}, },
@ -456,9 +441,6 @@ impl ::protobuf::Message for DocumentServerWSData {
if !self.data.is_empty() { if !self.data.is_empty() {
my_size += ::protobuf::rt::bytes_size(3, &self.data); my_size += ::protobuf::rt::bytes_size(3, &self.data);
} }
if !self.id.is_empty() {
my_size += ::protobuf::rt::string_size(4, &self.id);
}
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size); self.cached_size.set(my_size);
my_size my_size
@ -474,9 +456,6 @@ impl ::protobuf::Message for DocumentServerWSData {
if !self.data.is_empty() { if !self.data.is_empty() {
os.write_bytes(3, &self.data)?; os.write_bytes(3, &self.data)?;
} }
if !self.id.is_empty() {
os.write_string(4, &self.id)?;
}
os.write_unknown_fields(self.get_unknown_fields())?; os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(()) ::std::result::Result::Ok(())
} }
@ -530,11 +509,6 @@ impl ::protobuf::Message for DocumentServerWSData {
|m: &DocumentServerWSData| { &m.data }, |m: &DocumentServerWSData| { &m.data },
|m: &mut DocumentServerWSData| { &mut m.data }, |m: &mut DocumentServerWSData| { &mut m.data },
)); ));
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
"id",
|m: &DocumentServerWSData| { &m.id },
|m: &mut DocumentServerWSData| { &mut m.id },
));
::protobuf::reflect::MessageDescriptor::new_pb_name::<DocumentServerWSData>( ::protobuf::reflect::MessageDescriptor::new_pb_name::<DocumentServerWSData>(
"DocumentServerWSData", "DocumentServerWSData",
fields, fields,
@ -554,7 +528,6 @@ impl ::protobuf::Clear for DocumentServerWSData {
self.doc_id.clear(); self.doc_id.clear();
self.ty = DocumentServerWSDataType::ServerAck; self.ty = DocumentServerWSDataType::ServerAck;
self.data.clear(); self.data.clear();
self.id.clear();
self.unknown_fields.clear(); self.unknown_fields.clear();
} }
} }
@ -918,66 +891,64 @@ impl ::protobuf::reflect::ProtobufValue for DocumentServerWSDataType {
} }
static file_descriptor_proto_data: &'static [u8] = b"\ static file_descriptor_proto_data: &'static [u8] = b"\
\n\x08ws.proto\"|\n\x14DocumentClientWSData\x12\x15\n\x06doc_id\x18\x01\ \n\x08ws.proto\x1a\x0erevision.proto\"\x99\x01\n\x14DocumentClientWSData\
\x20\x01(\tR\x05docId\x12)\n\x02ty\x18\x02\x20\x01(\x0e2\x19.DocumentCli\ \x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12)\n\x02ty\x18\x02\
entWSDataTypeR\x02ty\x12\x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data\x12\ \x20\x01(\x0e2\x19.DocumentClientWSDataTypeR\x02ty\x12/\n\trevisions\x18\
\x0e\n\x02id\x18\x04\x20\x01(\tR\x02id\"|\n\x14DocumentServerWSData\x12\ \x03\x20\x01(\x0b2\x11.RepeatedRevisionR\trevisions\x12\x0e\n\x02id\x18\
\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12)\n\x02ty\x18\x02\x20\ \x04\x20\x01(\tR\x02id\"l\n\x14DocumentServerWSData\x12\x15\n\x06doc_id\
\x01(\x0e2\x19.DocumentServerWSDataTypeR\x02ty\x12\x12\n\x04data\x18\x03\ \x18\x01\x20\x01(\tR\x05docId\x12)\n\x02ty\x18\x02\x20\x01(\x0e2\x19.Doc\
\x20\x01(\x0cR\x04data\x12\x0e\n\x02id\x18\x04\x20\x01(\tR\x02id\"f\n\ umentServerWSDataTypeR\x02ty\x12\x12\n\x04data\x18\x03\x20\x01(\x0cR\x04\
\x0fNewDocumentUser\x12\x17\n\x07user_id\x18\x01\x20\x01(\tR\x06userId\ data\"f\n\x0fNewDocumentUser\x12\x17\n\x07user_id\x18\x01\x20\x01(\tR\
\x12\x15\n\x06doc_id\x18\x02\x20\x01(\tR\x05docId\x12#\n\rrevision_data\ \x06userId\x12\x15\n\x06doc_id\x18\x02\x20\x01(\tR\x05docId\x12#\n\rrevi\
\x18\x03\x20\x01(\x0cR\x0crevisionData*-\n\x18DocumentClientWSDataType\ sion_data\x18\x03\x20\x01(\x0cR\x0crevisionData*-\n\x18DocumentClientWSD\
\x12\x11\n\rClientPushRev\x10\0*`\n\x18DocumentServerWSDataType\x12\r\n\ ataType\x12\x11\n\rClientPushRev\x10\0*`\n\x18DocumentServerWSDataType\
\tServerAck\x10\0\x12\x11\n\rServerPushRev\x10\x01\x12\x11\n\rServerPull\ \x12\r\n\tServerAck\x10\0\x12\x11\n\rServerPushRev\x10\x01\x12\x11\n\rSe\
Rev\x10\x02\x12\x0f\n\x0bUserConnect\x10\x03J\xb4\x07\n\x06\x12\x04\0\0\ rverPullRev\x10\x02\x12\x0f\n\x0bUserConnect\x10\x03J\x88\x07\n\x06\x12\
\x1b\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\ \x04\0\0\x1b\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\t\n\x02\x03\0\x12\x03\
\x07\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x1c\n\x0b\n\x04\x04\0\x02\0\ \x01\0\x18\n\n\n\x02\x04\0\x12\x04\x03\0\x08\x01\n\n\n\x03\x04\0\x01\x12\
\x12\x03\x03\x04\x16\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\ \x03\x03\x08\x1c\n\x0b\n\x04\x04\0\x02\0\x12\x03\x04\x04\x16\n\x0c\n\x05\
\n\x05\x04\0\x02\0\x01\x12\x03\x03\x0b\x11\n\x0c\n\x05\x04\0\x02\0\x03\ \x04\0\x02\0\x05\x12\x03\x04\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\
\x12\x03\x03\x14\x15\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04$\n\x0c\n\ \x04\x0b\x11\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x04\x14\x15\n\x0b\n\x04\
\x05\x04\0\x02\x01\x06\x12\x03\x04\x04\x1c\n\x0c\n\x05\x04\0\x02\x01\x01\ \x04\0\x02\x01\x12\x03\x05\x04$\n\x0c\n\x05\x04\0\x02\x01\x06\x12\x03\
\x12\x03\x04\x1d\x1f\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\"#\n\x0b\ \x05\x04\x1c\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x05\x1d\x1f\n\x0c\n\
\n\x04\x04\0\x02\x02\x12\x03\x05\x04\x13\n\x0c\n\x05\x04\0\x02\x02\x05\ \x05\x04\0\x02\x01\x03\x12\x03\x05\"#\n\x0b\n\x04\x04\0\x02\x02\x12\x03\
\x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x0e\n\x0c\ \x06\x04#\n\x0c\n\x05\x04\0\x02\x02\x06\x12\x03\x06\x04\x14\n\x0c\n\x05\
\n\x05\x04\0\x02\x02\x03\x12\x03\x05\x11\x12\n\x0b\n\x04\x04\0\x02\x03\ \x04\0\x02\x02\x01\x12\x03\x06\x15\x1e\n\x0c\n\x05\x04\0\x02\x02\x03\x12\
\x12\x03\x06\x04\x12\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x06\x04\n\n\ \x03\x06!\"\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x07\x04\x12\n\x0c\n\x05\
\x0c\n\x05\x04\0\x02\x03\x01\x12\x03\x06\x0b\r\n\x0c\n\x05\x04\0\x02\x03\ \x04\0\x02\x03\x05\x12\x03\x07\x04\n\n\x0c\n\x05\x04\0\x02\x03\x01\x12\
\x03\x12\x03\x06\x10\x11\n\n\n\x02\x04\x01\x12\x04\x08\0\r\x01\n\n\n\x03\ \x03\x07\x0b\r\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\x07\x10\x11\n\n\n\
\x04\x01\x01\x12\x03\x08\x08\x1c\n\x0b\n\x04\x04\x01\x02\0\x12\x03\t\x04\ \x02\x04\x01\x12\x04\t\0\r\x01\n\n\n\x03\x04\x01\x01\x12\x03\t\x08\x1c\n\
\x16\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\x01\ \x0b\n\x04\x04\x01\x02\0\x12\x03\n\x04\x16\n\x0c\n\x05\x04\x01\x02\0\x05\
\x02\0\x01\x12\x03\t\x0b\x11\n\x0c\n\x05\x04\x01\x02\0\x03\x12\x03\t\x14\ \x12\x03\n\x04\n\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\n\x0b\x11\n\x0c\n\
\x15\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\n\x04$\n\x0c\n\x05\x04\x01\x02\ \x05\x04\x01\x02\0\x03\x12\x03\n\x14\x15\n\x0b\n\x04\x04\x01\x02\x01\x12\
\x01\x06\x12\x03\n\x04\x1c\n\x0c\n\x05\x04\x01\x02\x01\x01\x12\x03\n\x1d\ \x03\x0b\x04$\n\x0c\n\x05\x04\x01\x02\x01\x06\x12\x03\x0b\x04\x1c\n\x0c\
\x1f\n\x0c\n\x05\x04\x01\x02\x01\x03\x12\x03\n\"#\n\x0b\n\x04\x04\x01\ \n\x05\x04\x01\x02\x01\x01\x12\x03\x0b\x1d\x1f\n\x0c\n\x05\x04\x01\x02\
\x02\x02\x12\x03\x0b\x04\x13\n\x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\x0b\ \x01\x03\x12\x03\x0b\"#\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\x0c\x04\x13\
\x04\t\n\x0c\n\x05\x04\x01\x02\x02\x01\x12\x03\x0b\n\x0e\n\x0c\n\x05\x04\ \n\x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\x0c\x04\t\n\x0c\n\x05\x04\x01\
\x01\x02\x02\x03\x12\x03\x0b\x11\x12\n\x0b\n\x04\x04\x01\x02\x03\x12\x03\ \x02\x02\x01\x12\x03\x0c\n\x0e\n\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\
\x0c\x04\x12\n\x0c\n\x05\x04\x01\x02\x03\x05\x12\x03\x0c\x04\n\n\x0c\n\ \x0c\x11\x12\n\n\n\x02\x04\x02\x12\x04\x0e\0\x12\x01\n\n\n\x03\x04\x02\
\x05\x04\x01\x02\x03\x01\x12\x03\x0c\x0b\r\n\x0c\n\x05\x04\x01\x02\x03\ \x01\x12\x03\x0e\x08\x17\n\x0b\n\x04\x04\x02\x02\0\x12\x03\x0f\x04\x17\n\
\x03\x12\x03\x0c\x10\x11\n\n\n\x02\x04\x02\x12\x04\x0e\0\x12\x01\n\n\n\ \x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0f\x04\n\n\x0c\n\x05\x04\x02\x02\0\
\x03\x04\x02\x01\x12\x03\x0e\x08\x17\n\x0b\n\x04\x04\x02\x02\0\x12\x03\ \x01\x12\x03\x0f\x0b\x12\n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03\x0f\x15\
\x0f\x04\x17\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0f\x04\n\n\x0c\n\x05\ \x16\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\x10\x04\x16\n\x0c\n\x05\x04\x02\
\x04\x02\x02\0\x01\x12\x03\x0f\x0b\x12\n\x0c\n\x05\x04\x02\x02\0\x03\x12\ \x02\x01\x05\x12\x03\x10\x04\n\n\x0c\n\x05\x04\x02\x02\x01\x01\x12\x03\
\x03\x0f\x15\x16\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\x10\x04\x16\n\x0c\n\ \x10\x0b\x11\n\x0c\n\x05\x04\x02\x02\x01\x03\x12\x03\x10\x14\x15\n\x0b\n\
\x05\x04\x02\x02\x01\x05\x12\x03\x10\x04\n\n\x0c\n\x05\x04\x02\x02\x01\ \x04\x04\x02\x02\x02\x12\x03\x11\x04\x1c\n\x0c\n\x05\x04\x02\x02\x02\x05\
\x01\x12\x03\x10\x0b\x11\n\x0c\n\x05\x04\x02\x02\x01\x03\x12\x03\x10\x14\ \x12\x03\x11\x04\t\n\x0c\n\x05\x04\x02\x02\x02\x01\x12\x03\x11\n\x17\n\
\x15\n\x0b\n\x04\x04\x02\x02\x02\x12\x03\x11\x04\x1c\n\x0c\n\x05\x04\x02\ \x0c\n\x05\x04\x02\x02\x02\x03\x12\x03\x11\x1a\x1b\n\n\n\x02\x05\0\x12\
\x02\x02\x05\x12\x03\x11\x04\t\n\x0c\n\x05\x04\x02\x02\x02\x01\x12\x03\ \x04\x13\0\x15\x01\n\n\n\x03\x05\0\x01\x12\x03\x13\x05\x1d\n\x0b\n\x04\
\x11\n\x17\n\x0c\n\x05\x04\x02\x02\x02\x03\x12\x03\x11\x1a\x1b\n\n\n\x02\ \x05\0\x02\0\x12\x03\x14\x04\x16\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x14\
\x05\0\x12\x04\x13\0\x15\x01\n\n\n\x03\x05\0\x01\x12\x03\x13\x05\x1d\n\ \x04\x11\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x14\x14\x15\n\n\n\x02\x05\
\x0b\n\x04\x05\0\x02\0\x12\x03\x14\x04\x16\n\x0c\n\x05\x05\0\x02\0\x01\ \x01\x12\x04\x16\0\x1b\x01\n\n\n\x03\x05\x01\x01\x12\x03\x16\x05\x1d\n\
\x12\x03\x14\x04\x11\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x14\x14\x15\n\n\ \x0b\n\x04\x05\x01\x02\0\x12\x03\x17\x04\x12\n\x0c\n\x05\x05\x01\x02\0\
\n\x02\x05\x01\x12\x04\x16\0\x1b\x01\n\n\n\x03\x05\x01\x01\x12\x03\x16\ \x01\x12\x03\x17\x04\r\n\x0c\n\x05\x05\x01\x02\0\x02\x12\x03\x17\x10\x11\
\x05\x1d\n\x0b\n\x04\x05\x01\x02\0\x12\x03\x17\x04\x12\n\x0c\n\x05\x05\ \n\x0b\n\x04\x05\x01\x02\x01\x12\x03\x18\x04\x16\n\x0c\n\x05\x05\x01\x02\
\x01\x02\0\x01\x12\x03\x17\x04\r\n\x0c\n\x05\x05\x01\x02\0\x02\x12\x03\ \x01\x01\x12\x03\x18\x04\x11\n\x0c\n\x05\x05\x01\x02\x01\x02\x12\x03\x18\
\x17\x10\x11\n\x0b\n\x04\x05\x01\x02\x01\x12\x03\x18\x04\x16\n\x0c\n\x05\ \x14\x15\n\x0b\n\x04\x05\x01\x02\x02\x12\x03\x19\x04\x16\n\x0c\n\x05\x05\
\x05\x01\x02\x01\x01\x12\x03\x18\x04\x11\n\x0c\n\x05\x05\x01\x02\x01\x02\ \x01\x02\x02\x01\x12\x03\x19\x04\x11\n\x0c\n\x05\x05\x01\x02\x02\x02\x12\
\x12\x03\x18\x14\x15\n\x0b\n\x04\x05\x01\x02\x02\x12\x03\x19\x04\x16\n\ \x03\x19\x14\x15\n\x0b\n\x04\x05\x01\x02\x03\x12\x03\x1a\x04\x14\n\x0c\n\
\x0c\n\x05\x05\x01\x02\x02\x01\x12\x03\x19\x04\x11\n\x0c\n\x05\x05\x01\ \x05\x05\x01\x02\x03\x01\x12\x03\x1a\x04\x0f\n\x0c\n\x05\x05\x01\x02\x03\
\x02\x02\x02\x12\x03\x19\x14\x15\n\x0b\n\x04\x05\x01\x02\x03\x12\x03\x1a\ \x02\x12\x03\x1a\x12\x13b\x06proto3\
\x04\x14\n\x0c\n\x05\x05\x01\x02\x03\x01\x12\x03\x1a\x04\x0f\n\x0c\n\x05\
\x05\x01\x02\x03\x02\x12\x03\x1a\x12\x13b\x06proto3\
"; ";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

View File

@ -6,7 +6,7 @@ message CreateDocParams {
RepeatedRevision revisions = 2; RepeatedRevision revisions = 2;
} }
message DocumentInfo { message DocumentInfo {
string id = 1; string doc_id = 1;
string text = 2; string text = 2;
int64 rev_id = 3; int64 rev_id = 3;
int64 base_rev_id = 4; int64 base_rev_id = 4;

View File

@ -1,16 +1,16 @@
syntax = "proto3"; syntax = "proto3";
import "revision.proto";
message DocumentClientWSData { message DocumentClientWSData {
string doc_id = 1; string doc_id = 1;
DocumentClientWSDataType ty = 2; DocumentClientWSDataType ty = 2;
bytes data = 3; RepeatedRevision revisions = 3;
string id = 4; string id = 4;
} }
message DocumentServerWSData { message DocumentServerWSData {
string doc_id = 1; string doc_id = 1;
DocumentServerWSDataType ty = 2; DocumentServerWSDataType ty = 2;
bytes data = 3; bytes data = 3;
string id = 4;
} }
message NewDocumentUser { message NewDocumentUser {
string user_id = 1; string user_id = 1;

View File

@ -1,15 +1,20 @@
use crate::{ use crate::{
document::Document, document::Document,
entities::{doc::DocumentInfo, revision::Revision}, entities::{
doc::DocumentInfo,
revision::{RepeatedRevision, Revision},
ws::DocumentServerWSDataBuilder,
},
errors::{internal_error, CollaborateError, CollaborateResult}, errors::{internal_error, CollaborateError, CollaborateResult},
sync::{RevisionSynchronizer, RevisionUser}, protobuf::DocumentClientWSData,
sync::{RevisionSynchronizer, RevisionUser, SyncResponse},
}; };
use async_stream::stream; use async_stream::stream;
use dashmap::DashMap; use dashmap::DashMap;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use lib_infra::future::FutureResultSend; use lib_infra::future::FutureResultSend;
use lib_ot::rich_text::RichTextDelta; use lib_ot::rich_text::RichTextDelta;
use std::{fmt::Debug, sync::Arc}; use std::{convert::TryFrom, fmt::Debug, sync::Arc};
use tokio::{ use tokio::{
sync::{mpsc, oneshot}, sync::{mpsc, oneshot},
task::spawn_blocking, task::spawn_blocking,
@ -17,8 +22,9 @@ use tokio::{
pub trait DocumentPersistence: Send + Sync + Debug { pub trait DocumentPersistence: Send + Sync + Debug {
fn read_doc(&self, doc_id: &str) -> FutureResultSend<DocumentInfo, CollaborateError>; fn read_doc(&self, doc_id: &str) -> FutureResultSend<DocumentInfo, CollaborateError>;
fn create_doc(&self, revision: Revision) -> FutureResultSend<DocumentInfo, CollaborateError>; fn create_doc(&self, doc_id: &str, revisions: Vec<Revision>) -> FutureResultSend<DocumentInfo, CollaborateError>;
fn get_revisions(&self, doc_id: &str, rev_ids: Vec<i64>) -> FutureResultSend<Vec<Revision>, CollaborateError>; fn get_revisions(&self, doc_id: &str, rev_ids: Vec<i64>) -> FutureResultSend<Vec<Revision>, CollaborateError>;
fn get_doc_revisions(&self, doc_id: &str) -> FutureResultSend<Vec<Revision>, CollaborateError>;
} }
pub struct ServerDocumentManager { pub struct ServerDocumentManager {
@ -37,22 +43,43 @@ impl ServerDocumentManager {
pub async fn apply_revisions( pub async fn apply_revisions(
&self, &self,
user: Arc<dyn RevisionUser>, user: Arc<dyn RevisionUser>,
revisions: Vec<Revision>, mut client_data: DocumentClientWSData,
) -> Result<(), CollaborateError> { ) -> Result<(), CollaborateError> {
if revisions.is_empty() { let mut pb = client_data.take_revisions();
return Ok(()); let cloned_user = user.clone();
} let ack_id = client_data.id.clone().parse::<i64>().map_err(|e| {
let revision = revisions.first().unwrap(); CollaborateError::internal().context(format!("Parse rev_id from {} failed. {}", &client_data.id, e))
let handler = match self.get_document_handler(&revision.doc_id).await { })?;
let doc_id = client_data.doc_id;
let revisions = spawn_blocking(move || {
let repeated_revision = RepeatedRevision::try_from(&mut pb)?;
let revisions = repeated_revision.into_inner();
Ok::<Vec<Revision>, CollaborateError>(revisions)
})
.await
.map_err(internal_error)??;
let result = match self.get_document_handler(&doc_id).await {
None => { None => {
// Create the document if it doesn't exist let _ = self.create_document(&doc_id, revisions).await.map_err(internal_error)?;
self.create_document(revision.clone()).await.map_err(internal_error)? Ok(())
},
Some(handler) => {
let _ = handler
.apply_revisions(doc_id.clone(), user, revisions)
.await
.map_err(internal_error)?;
Ok(())
}, },
Some(handler) => handler,
}; };
handler.apply_revisions(user, revisions).await.map_err(internal_error)?; if result.is_ok() {
Ok(()) cloned_user.receive(SyncResponse::Ack(DocumentServerWSDataBuilder::build_ack_message(
&doc_id, ack_id,
)));
}
result
} }
async fn get_document_handler(&self, doc_id: &str) -> Option<Arc<OpenDocHandle>> { async fn get_document_handler(&self, doc_id: &str) -> Option<Arc<OpenDocHandle>> {
@ -75,14 +102,18 @@ impl ServerDocumentManager {
} }
} }
async fn create_document(&self, revision: Revision) -> Result<Arc<OpenDocHandle>, CollaborateError> { async fn create_document(
let doc = self.persistence.create_doc(revision).await?; &self,
doc_id: &str,
revisions: Vec<Revision>,
) -> Result<Arc<OpenDocHandle>, CollaborateError> {
let doc = self.persistence.create_doc(doc_id, revisions).await?;
let handler = self.cache_document(doc).await?; let handler = self.cache_document(doc).await?;
Ok(handler) Ok(handler)
} }
async fn cache_document(&self, doc: DocumentInfo) -> Result<Arc<OpenDocHandle>, CollaborateError> { async fn cache_document(&self, doc: DocumentInfo) -> Result<Arc<OpenDocHandle>, CollaborateError> {
let doc_id = doc.id.clone(); let doc_id = doc.doc_id.clone();
let persistence = self.persistence.clone(); let persistence = self.persistence.clone();
let handle = spawn_blocking(|| OpenDocHandle::new(doc, persistence)) let handle = spawn_blocking(|| OpenDocHandle::new(doc, persistence))
.await .await
@ -114,6 +145,7 @@ impl OpenDocHandle {
async fn apply_revisions( async fn apply_revisions(
&self, &self,
doc_id: String,
user: Arc<dyn RevisionUser>, user: Arc<dyn RevisionUser>,
revisions: Vec<Revision>, revisions: Vec<Revision>,
) -> Result<(), CollaborateError> { ) -> Result<(), CollaborateError> {
@ -121,6 +153,7 @@ impl OpenDocHandle {
let persistence = self.persistence.clone(); let persistence = self.persistence.clone();
self.users.insert(user.user_id(), user.clone()); self.users.insert(user.user_id(), user.clone());
let msg = DocumentCommand::ApplyRevisions { let msg = DocumentCommand::ApplyRevisions {
doc_id,
user, user,
revisions, revisions,
persistence, persistence,
@ -130,12 +163,6 @@ impl OpenDocHandle {
Ok(()) Ok(())
} }
pub async fn document_json(&self) -> CollaborateResult<String> {
let (ret, rx) = oneshot::channel();
let msg = DocumentCommand::GetDocumentJson { ret };
self.send(msg, rx).await?
}
async fn send<T>(&self, msg: DocumentCommand, rx: oneshot::Receiver<T>) -> CollaborateResult<T> { async fn send<T>(&self, msg: DocumentCommand, rx: oneshot::Receiver<T>) -> CollaborateResult<T> {
let _ = self.sender.send(msg).await.map_err(internal_error)?; let _ = self.sender.send(msg).await.map_err(internal_error)?;
let result = rx.await.map_err(internal_error)?; let result = rx.await.map_err(internal_error)?;
@ -146,14 +173,12 @@ impl OpenDocHandle {
#[derive(Debug)] #[derive(Debug)]
enum DocumentCommand { enum DocumentCommand {
ApplyRevisions { ApplyRevisions {
doc_id: String,
user: Arc<dyn RevisionUser>, user: Arc<dyn RevisionUser>,
revisions: Vec<Revision>, revisions: Vec<Revision>,
persistence: Arc<dyn DocumentPersistence>, persistence: Arc<dyn DocumentPersistence>,
ret: oneshot::Sender<CollaborateResult<()>>, ret: oneshot::Sender<CollaborateResult<()>>,
}, },
GetDocumentJson {
ret: oneshot::Sender<CollaborateResult<String>>,
},
} }
struct DocumentCommandQueue { struct DocumentCommandQueue {
@ -166,13 +191,13 @@ impl DocumentCommandQueue {
fn new(receiver: mpsc::Receiver<DocumentCommand>, doc: DocumentInfo) -> Result<Self, CollaborateError> { fn new(receiver: mpsc::Receiver<DocumentCommand>, doc: DocumentInfo) -> Result<Self, CollaborateError> {
let delta = RichTextDelta::from_bytes(&doc.text)?; let delta = RichTextDelta::from_bytes(&doc.text)?;
let synchronizer = Arc::new(RevisionSynchronizer::new( let synchronizer = Arc::new(RevisionSynchronizer::new(
&doc.id, &doc.doc_id,
doc.rev_id, doc.rev_id,
Document::from_delta(delta), Document::from_delta(delta),
)); ));
Ok(Self { Ok(Self {
doc_id: doc.id, doc_id: doc.doc_id,
receiver: Some(receiver), receiver: Some(receiver),
synchronizer, synchronizer,
}) })
@ -198,24 +223,18 @@ impl DocumentCommandQueue {
async fn handle_message(&self, msg: DocumentCommand) { async fn handle_message(&self, msg: DocumentCommand) {
match msg { match msg {
DocumentCommand::ApplyRevisions { DocumentCommand::ApplyRevisions {
doc_id,
user, user,
revisions, revisions,
persistence, persistence,
ret, ret,
} => { } => {
self.synchronizer self.synchronizer
.apply_revisions(user, revisions, persistence) .apply_revisions(doc_id, user, revisions, persistence)
.await .await
.unwrap(); .unwrap();
let _ = ret.send(Ok(())); let _ = ret.send(Ok(()));
}, },
DocumentCommand::GetDocumentJson { ret } => {
let synchronizer = self.synchronizer.clone();
let json = spawn_blocking(move || synchronizer.doc_json())
.await
.map_err(internal_error);
let _ = ret.send(json);
},
} }
} }
} }

View File

@ -6,6 +6,7 @@ use crate::{
}, },
sync::DocumentPersistence, sync::DocumentPersistence,
}; };
use futures::TryFutureExt;
use lib_ot::{core::OperationTransformable, errors::OTError, rich_text::RichTextDelta}; use lib_ot::{core::OperationTransformable, errors::OTError, rich_text::RichTextDelta};
use parking_lot::RwLock; use parking_lot::RwLock;
use std::{ use std::{
@ -49,12 +50,19 @@ impl RevisionSynchronizer {
#[tracing::instrument(level = "debug", skip(self, user, revisions, persistence), err)] #[tracing::instrument(level = "debug", skip(self, user, revisions, persistence), err)]
pub async fn apply_revisions( pub async fn apply_revisions(
&self, &self,
doc_id: String,
user: Arc<dyn RevisionUser>, user: Arc<dyn RevisionUser>,
revisions: Vec<Revision>, revisions: Vec<Revision>,
persistence: Arc<dyn DocumentPersistence>, persistence: Arc<dyn DocumentPersistence>,
) -> Result<(), OTError> { ) -> Result<(), OTError> {
if revisions.is_empty() { if revisions.is_empty() {
tracing::warn!("Receive empty revisions"); // Return all the revisions to client
let revisions = persistence
.get_doc_revisions(&doc_id)
.map_err(|e| OTError::internal().context(e))
.await?;
let data = DocumentServerWSDataBuilder::build_push_message(&doc_id, revisions);
user.receive(SyncResponse::Push(data));
return Ok(()); return Ok(());
} }
@ -83,8 +91,7 @@ impl RevisionSynchronizer {
start: server_rev_id, start: server_rev_id,
end: first_revision.rev_id, end: first_revision.rev_id,
}; };
let msg = let msg = DocumentServerWSDataBuilder::build_pull_message(&self.doc_id, range);
DocumentServerWSDataBuilder::build_pull_message(&self.doc_id, range, first_revision.rev_id);
user.receive(SyncResponse::Pull(msg)); user.receive(SyncResponse::Pull(msg));
} }
}, },
@ -96,7 +103,6 @@ impl RevisionSynchronizer {
// The client document is outdated. Transform the client revision delta and then // The client document is outdated. Transform the client revision delta and then
// send the prime delta to the client. Client should compose the this prime // send the prime delta to the client. Client should compose the this prime
// delta. // delta.
let id = first_revision.rev_id.to_string();
let from_rev_id = first_revision.rev_id; let from_rev_id = first_revision.rev_id;
let to_rev_id = server_base_rev_id; let to_rev_id = server_base_rev_id;
let rev_ids: Vec<i64> = (from_rev_id..=to_rev_id).collect(); let rev_ids: Vec<i64> = (from_rev_id..=to_rev_id).collect();
@ -111,15 +117,10 @@ impl RevisionSynchronizer {
}, },
}; };
let data = DocumentServerWSDataBuilder::build_push_message(&self.doc_id, revisions, &id); let data = DocumentServerWSDataBuilder::build_push_message(&self.doc_id, revisions);
user.receive(SyncResponse::Push(data)); user.receive(SyncResponse::Push(data));
}, },
} }
user.receive(SyncResponse::Ack(DocumentServerWSDataBuilder::build_ack_message(
&first_revision.doc_id,
&first_revision.rev_id.to_string(),
)));
Ok(()) Ok(())
} }

View File

@ -137,7 +137,7 @@ impl WSStream {
} }
impl fmt::Debug for WSStream { impl fmt::Debug for WSStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WsStream").finish() } fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WSStream").finish() }
} }
impl Future for WSStream { impl Future for WSStream {