send document ping

This commit is contained in:
appflowy 2022-01-01 23:09:13 +08:00
parent df5266d7c9
commit 12e8424e8a
22 changed files with 285 additions and 148 deletions

View File

@ -19,7 +19,7 @@ use protobuf::Message;
use std::sync::Arc;
use uuid::Uuid;
#[tracing::instrument(level = "debug", skip(kv_store), err)]
#[tracing::instrument(level = "debug", skip(kv_store, params), err)]
pub(crate) async fn create_document(
kv_store: &Arc<DocumentKVPersistence>,
mut params: CreateDocParams,
@ -50,7 +50,7 @@ pub async fn reset_document(
.transaction(|mut transaction| {
Box::pin(async move {
let _ = transaction.batch_delete_key_start_with(&doc_id).await?;
let items = revisions_to_key_value_items(revisions.into());
let items = revisions_to_key_value_items(revisions.into())?;
let _ = transaction.batch_set(items).await?;
Ok(())
})
@ -82,7 +82,7 @@ impl DocumentKVPersistence {
pub(crate) fn new(kv_store: Arc<KVStore>) -> Self { DocumentKVPersistence { inner: kv_store } }
pub(crate) async fn batch_set_revision(&self, revisions: Vec<Revision>) -> Result<(), ServerError> {
let items = revisions_to_key_value_items(revisions);
let items = revisions_to_key_value_items(revisions)?;
self.inner
.transaction(|mut t| Box::pin(async move { t.batch_set(items).await }))
.await
@ -152,15 +152,19 @@ impl DocumentKVPersistence {
}
#[inline]
fn revisions_to_key_value_items(revisions: Vec<Revision>) -> Vec<KeyValue> {
revisions
.into_iter()
.map(|revision| {
let key = make_revision_key(&revision.doc_id, revision.rev_id);
let value = Bytes::from(revision.write_to_bytes().unwrap());
KeyValue { key, value }
})
.collect::<Vec<KeyValue>>()
fn revisions_to_key_value_items(revisions: Vec<Revision>) -> Result<Vec<KeyValue>, ServerError> {
let mut items = vec![];
for revision in revisions {
let key = make_revision_key(&revision.doc_id, revision.rev_id);
if revision.delta_data.is_empty() {
return Err(ServerError::internal().context("The delta_data of Revision should not be empty"));
}
let value = Bytes::from(revision.write_to_bytes().unwrap());
items.push(KeyValue { key, value });
}
Ok(items)
}
#[inline]
@ -193,6 +197,11 @@ fn make_doc_from_revisions(doc_id: &str, mut revisions: RepeatedRevision) -> Res
for revision in revisions {
base_rev_id = revision.base_rev_id;
rev_id = revision.rev_id;
if revision.delta_data.is_empty() {
tracing::warn!("revision delta_data is empty");
}
let delta = RichTextDelta::from_bytes(revision.delta_data).map_err(internal_error)?;
document_delta = document_delta.compose(&delta).map_err(internal_error)?;
}

View File

@ -87,17 +87,18 @@ impl DocumentWebSocketActor {
match &document_client_data.ty {
DocumentClientWSDataType::ClientPushRev => {
match self
let _ = self
.doc_manager
.apply_revisions(user, document_client_data)
.handle_client_revisions(user, document_client_data)
.await
.map_err(internal_error)
{
Ok(_) => {},
Err(e) => {
tracing::error!("[DocumentWebSocketActor]: process client data failed: {:?}", e);
},
}
.map_err(internal_error)?;
},
DocumentClientWSDataType::ClientPing => {
let _ = self
.doc_manager
.handle_client_ping(user, document_client_data)
.await
.map_err(internal_error)?;
},
}

View File

@ -2,22 +2,20 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
use std::convert::TryInto;
use actix_web::web::Data;
use flowy_document::services::doc::edit::ClientDocEditor as ClientEditDocContext;
use flowy_document::services::doc::edit::ClientDocumentEditor;
use flowy_test::{helper::ViewTest, FlowySDKTest};
use flowy_user::services::user::UserSession;
use futures_util::{stream, stream::StreamExt};
use std::sync::Arc;
use bytes::Bytes;
use tokio::time::{sleep, Duration};
// use crate::helper::*;
use crate::util::helper::{spawn_server, TestServer};
use flowy_collaboration::{entities::doc::DocIdentifier, protobuf::ResetDocumentParams};
use flowy_collaboration::{entities::doc::DocumentId, protobuf::ResetDocumentParams};
use lib_ot::rich_text::{RichTextAttribute, RichTextDelta};
use parking_lot::RwLock;
use backend::services::document::persistence::{DocumentKVPersistence, read_document, reset_document};
use flowy_collaboration::entities::revision::{RepeatedRevision, Revision, RevType};
use flowy_collaboration::entities::revision::{RepeatedRevision, Revision};
use lib_ot::core::Interval;
use flowy_net::services::ws::FlowyWSConnect;
@ -56,7 +54,7 @@ impl DocumentTest {
#[derive(Clone)]
struct ScriptContext {
client_edit_context: Option<Arc<ClientEditDocContext>>,
client_edit_context: Option<Arc<ClientDocumentEditor>>,
client_sdk: FlowySDKTest,
client_user_session: Arc<UserSession>,
ws_conn: Arc<FlowyWSConnect>,
@ -82,11 +80,11 @@ impl ScriptContext {
async fn open_doc(&mut self) {
let doc_id = self.doc_id.clone();
let edit_context = self.client_sdk.document_ctx.open(DocIdentifier { doc_id }).await.unwrap();
let edit_context = self.client_sdk.document_ctx.controller.open(doc_id).await.unwrap();
self.client_edit_context = Some(edit_context);
}
fn client_edit_context(&self) -> Arc<ClientEditDocContext> { self.client_edit_context.as_ref().unwrap().clone() }
fn client_edit_context(&self) -> Arc<ClientDocumentEditor> { self.client_edit_context.as_ref().unwrap().clone() }
}
async fn run_scripts(context: Arc<RwLock<ScriptContext>>, scripts: Vec<DocScript>) {
@ -126,7 +124,7 @@ async fn run_scripts(context: Arc<RwLock<ScriptContext>>, scripts: Vec<DocScript
DocScript::AssertServer(s, rev_id) => {
sleep(Duration::from_millis(100)).await;
let persistence = Data::new(context.read().server.app_ctx.persistence.kv_store());
let doc_identifier: flowy_collaboration::protobuf::DocIdentifier = DocIdentifier {
let doc_identifier: flowy_collaboration::protobuf::DocumentId = DocumentId {
doc_id
}.try_into().unwrap();
@ -144,7 +142,6 @@ async fn run_scripts(context: Arc<RwLock<ScriptContext>>, scripts: Vec<DocScript
base_rev_id,
rev_id,
delta_data,
RevType::Remote,
&user_id,
md5,
);

View File

@ -1,2 +1,2 @@
// mod edit_script;
// mod edit_test;
mod edit_script;
mod edit_test;

View File

@ -10,11 +10,11 @@ import 'dart:core' as $core;
import 'package:protobuf/protobuf.dart' as $pb;
class RevisionState extends $pb.ProtobufEnum {
static const RevisionState StateLocal = RevisionState._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'StateLocal');
static const RevisionState Local = RevisionState._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Local');
static const RevisionState Ack = RevisionState._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Ack');
static const $core.List<RevisionState> values = <RevisionState> [
StateLocal,
Local,
Ack,
];

View File

@ -12,13 +12,13 @@ import 'dart:typed_data' as $typed_data;
const RevisionState$json = const {
'1': 'RevisionState',
'2': const [
const {'1': 'StateLocal', '2': 0},
const {'1': 'Local', '2': 0},
const {'1': 'Ack', '2': 1},
],
};
/// Descriptor for `RevisionState`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List revisionStateDescriptor = $convert.base64Decode('Cg1SZXZpc2lvblN0YXRlEg4KClN0YXRlTG9jYWwQABIHCgNBY2sQAQ==');
final $typed_data.Uint8List revisionStateDescriptor = $convert.base64Decode('Cg1SZXZpc2lvblN0YXRlEgkKBUxvY2FsEAASBwoDQWNrEAE=');
@$core.Deprecated('Use revTypeDescriptor instead')
const RevType$json = const {
'1': 'RevType',

View File

@ -11,9 +11,11 @@ import 'package:protobuf/protobuf.dart' as $pb;
class DocumentClientWSDataType extends $pb.ProtobufEnum {
static const DocumentClientWSDataType ClientPushRev = DocumentClientWSDataType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'ClientPushRev');
static const DocumentClientWSDataType ClientPing = DocumentClientWSDataType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'ClientPing');
static const $core.List<DocumentClientWSDataType> values = <DocumentClientWSDataType> [
ClientPushRev,
ClientPing,
];
static final $core.Map<$core.int, DocumentClientWSDataType> _byValue = $pb.ProtobufEnum.initByValue(values);

View File

@ -13,11 +13,12 @@ const DocumentClientWSDataType$json = const {
'1': 'DocumentClientWSDataType',
'2': const [
const {'1': 'ClientPushRev', '2': 0},
const {'1': 'ClientPing', '2': 1},
],
};
/// Descriptor for `DocumentClientWSDataType`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List documentClientWSDataTypeDescriptor = $convert.base64Decode('ChhEb2N1bWVudENsaWVudFdTRGF0YVR5cGUSEQoNQ2xpZW50UHVzaFJldhAA');
final $typed_data.Uint8List documentClientWSDataTypeDescriptor = $convert.base64Decode('ChhEb2N1bWVudENsaWVudFdTRGF0YVR5cGUSEQoNQ2xpZW50UHVzaFJldhAAEg4KCkNsaWVudFBpbmcQAQ==');
@$core.Deprecated('Use documentServerWSDataTypeDescriptor instead')
const DocumentServerWSDataType$json = const {
'1': 'DocumentServerWSDataType',

View File

@ -688,6 +688,13 @@ packages:
url: "https://pub.dartlang.org"
source: hosted
version: "0.12.11"
material_color_utilities:
dependency: transitive
description:
name: material_color_utilities
url: "https://pub.dartlang.org"
source: hosted
version: "0.1.2"
meta:
dependency: transitive
description:
@ -1077,7 +1084,7 @@ packages:
name: test_api
url: "https://pub.dartlang.org"
source: hosted
version: "0.4.3"
version: "0.4.8"
textstyle_extensions:
dependency: transitive
description:

View File

@ -62,8 +62,8 @@ impl ViewController {
}
#[tracing::instrument(level = "debug", skip(self, params), fields(name = %params.name), err)]
pub(crate) async fn create_view_from_params(&self, mut params: CreateViewParams) -> Result<View, FlowyError> {
let delta_data = Bytes::from(params.take_view_data());
pub(crate) async fn create_view_from_params(&self, params: CreateViewParams) -> Result<View, FlowyError> {
let delta_data = Bytes::from(params.view_data.clone());
let user_id = self.user.user_id()?;
let repeated_revision: RepeatedRevision =
Revision::initial_revision(&user_id, &params.view_id, delta_data).into();

View File

@ -26,7 +26,7 @@ pub trait RevisionServer: Send + Sync {
}
pub struct RevisionManager {
doc_id: String,
pub(crate) doc_id: String,
user_id: String,
rev_id_counter: RevIdCounter,
cache: Arc<RevisionCache>,
@ -67,6 +67,9 @@ impl RevisionManager {
#[tracing::instrument(level = "debug", skip(self, revision))]
pub async fn add_remote_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
if revision.delta_data.is_empty() {
return Err(FlowyError::internal().context("Delta data should be empty"));
}
self.rev_id_counter.set(revision.rev_id);
let _ = self.cache.add(revision.clone(), RevisionState::Ack, true).await?;
Ok(())
@ -74,6 +77,10 @@ impl RevisionManager {
#[tracing::instrument(level = "debug", skip(self, revision))]
pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
if revision.delta_data.is_empty() {
return Err(FlowyError::internal().context("Delta data should be empty"));
}
let record = self.cache.add(revision.clone(), RevisionState::Local, true).await?;
self.sync_seq.add_revision(record).await?;
Ok(())

View File

@ -285,7 +285,12 @@ impl SharedWSSinkDataProvider {
let doc_id = rev.doc_id.clone();
Ok(Some(DocumentClientWSData::from_revisions(&doc_id, vec![rev])))
},
None => Ok(None),
None => {
//
let doc_id = self.rev_manager.doc_id.clone();
let latest_rev_id = self.rev_manager.rev_id();
Ok(Some(DocumentClientWSData::ping(&doc_id, latest_rev_id)))
},
}
},
}

View File

@ -37,9 +37,15 @@ impl MockDocServer {
});
let pb_client_data: flowy_collaboration::protobuf::DocumentClientWSData =
client_data.try_into().unwrap();
self.manager.apply_revisions(user, pb_client_data).await.unwrap();
self.manager
.handle_client_revisions(user, pb_client_data)
.await
.unwrap();
Some(rx)
},
DocumentClientWSDataType::ClientPing => {
todo!()
},
}
}
}

View File

@ -117,6 +117,8 @@ impl RepeatedRevision {
Self { items }
}
pub fn empty() -> Self { RepeatedRevision { items: vec![] } }
pub fn into_inner(self) -> Vec<Revision> { self.items }
}

View File

@ -9,6 +9,7 @@ use std::convert::{TryFrom, TryInto};
#[derive(Debug, Clone, ProtoBuf_Enum, Eq, PartialEq, Hash)]
pub enum DocumentClientWSDataType {
ClientPushRev = 0,
ClientPing = 1,
}
impl DocumentClientWSDataType {
@ -54,6 +55,15 @@ impl DocumentClientWSData {
}
}
pub fn ping(doc_id: &str, rev_id: i64) -> Self {
Self {
doc_id: doc_id.to_owned(),
ty: DocumentClientWSDataType::ClientPing,
revisions: RepeatedRevision::empty(),
id: rev_id.to_string(),
}
}
pub fn id(&self) -> String { self.id.clone() }
}

View File

@ -958,7 +958,7 @@ impl ::protobuf::reflect::ProtobufValue for RevisionRange {
#[derive(Clone,PartialEq,Eq,Debug,Hash)]
pub enum RevisionState {
StateLocal = 0,
Local = 0,
Ack = 1,
}
@ -969,7 +969,7 @@ impl ::protobuf::ProtobufEnum for RevisionState {
fn from_i32(value: i32) -> ::std::option::Option<RevisionState> {
match value {
0 => ::std::option::Option::Some(RevisionState::StateLocal),
0 => ::std::option::Option::Some(RevisionState::Local),
1 => ::std::option::Option::Some(RevisionState::Ack),
_ => ::std::option::Option::None
}
@ -977,7 +977,7 @@ impl ::protobuf::ProtobufEnum for RevisionState {
fn values() -> &'static [Self] {
static values: &'static [RevisionState] = &[
RevisionState::StateLocal,
RevisionState::Local,
RevisionState::Ack,
];
values
@ -996,7 +996,7 @@ impl ::std::marker::Copy for RevisionState {
impl ::std::default::Default for RevisionState {
fn default() -> Self {
RevisionState::StateLocal
RevisionState::Local
}
}
@ -1067,8 +1067,8 @@ static file_descriptor_proto_data: &'static [u8] = b"\
\"\x1d\n\x05RevId\x12\x14\n\x05value\x18\x01\x20\x01(\x03R\x05value\"N\n\
\rRevisionRange\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\x14\
\n\x05start\x18\x02\x20\x01(\x03R\x05start\x12\x10\n\x03end\x18\x03\x20\
\x01(\x03R\x03end*(\n\rRevisionState\x12\x0e\n\nStateLocal\x10\0\x12\x07\
\n\x03Ack\x10\x01*4\n\x07RevType\x12\x13\n\x0fDeprecatedLocal\x10\0\x12\
\x01(\x03R\x03end*#\n\rRevisionState\x12\t\n\x05Local\x10\0\x12\x07\n\
\x03Ack\x10\x01*4\n\x07RevType\x12\x13\n\x0fDeprecatedLocal\x10\0\x12\
\x14\n\x10DeprecatedRemote\x10\x01J\xe8\x07\n\x06\x12\x04\0\0\x1d\x01\n\
\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\n\x01\n\n\n\
\x03\x04\0\x01\x12\x03\x02\x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\
@ -1109,16 +1109,16 @@ static file_descriptor_proto_data: &'static [u8] = b"\
\n\x0c\n\x05\x04\x03\x02\x02\x05\x12\x03\x14\x04\t\n\x0c\n\x05\x04\x03\
\x02\x02\x01\x12\x03\x14\n\r\n\x0c\n\x05\x04\x03\x02\x02\x03\x12\x03\x14\
\x10\x11\n\n\n\x02\x05\0\x12\x04\x16\0\x19\x01\n\n\n\x03\x05\0\x01\x12\
\x03\x16\x05\x12\n\x0b\n\x04\x05\0\x02\0\x12\x03\x17\x04\x13\n\x0c\n\x05\
\x05\0\x02\0\x01\x12\x03\x17\x04\x0e\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\
\x17\x11\x12\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x18\x04\x0c\n\x0c\n\x05\
\x05\0\x02\x01\x01\x12\x03\x18\x04\x07\n\x0c\n\x05\x05\0\x02\x01\x02\x12\
\x03\x18\n\x0b\n\n\n\x02\x05\x01\x12\x04\x1a\0\x1d\x01\n\n\n\x03\x05\x01\
\x01\x12\x03\x1a\x05\x0c\n\x0b\n\x04\x05\x01\x02\0\x12\x03\x1b\x04\x18\n\
\x0c\n\x05\x05\x01\x02\0\x01\x12\x03\x1b\x04\x13\n\x0c\n\x05\x05\x01\x02\
\0\x02\x12\x03\x1b\x16\x17\n\x0b\n\x04\x05\x01\x02\x01\x12\x03\x1c\x04\
\x19\n\x0c\n\x05\x05\x01\x02\x01\x01\x12\x03\x1c\x04\x14\n\x0c\n\x05\x05\
\x01\x02\x01\x02\x12\x03\x1c\x17\x18b\x06proto3\
\x03\x16\x05\x12\n\x0b\n\x04\x05\0\x02\0\x12\x03\x17\x04\x0e\n\x0c\n\x05\
\x05\0\x02\0\x01\x12\x03\x17\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\
\x17\x0c\r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x18\x04\x0c\n\x0c\n\x05\x05\
\0\x02\x01\x01\x12\x03\x18\x04\x07\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\
\x18\n\x0b\n\n\n\x02\x05\x01\x12\x04\x1a\0\x1d\x01\n\n\n\x03\x05\x01\x01\
\x12\x03\x1a\x05\x0c\n\x0b\n\x04\x05\x01\x02\0\x12\x03\x1b\x04\x18\n\x0c\
\n\x05\x05\x01\x02\0\x01\x12\x03\x1b\x04\x13\n\x0c\n\x05\x05\x01\x02\0\
\x02\x12\x03\x1b\x16\x17\n\x0b\n\x04\x05\x01\x02\x01\x12\x03\x1c\x04\x19\
\n\x0c\n\x05\x05\x01\x02\x01\x01\x12\x03\x1c\x04\x14\n\x0c\n\x05\x05\x01\
\x02\x01\x02\x12\x03\x1c\x17\x18b\x06proto3\
";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

View File

@ -790,6 +790,7 @@ impl ::protobuf::reflect::ProtobufValue for NewDocumentUser {
#[derive(Clone,PartialEq,Eq,Debug,Hash)]
pub enum DocumentClientWSDataType {
ClientPushRev = 0,
ClientPing = 1,
}
impl ::protobuf::ProtobufEnum for DocumentClientWSDataType {
@ -800,6 +801,7 @@ impl ::protobuf::ProtobufEnum for DocumentClientWSDataType {
fn from_i32(value: i32) -> ::std::option::Option<DocumentClientWSDataType> {
match value {
0 => ::std::option::Option::Some(DocumentClientWSDataType::ClientPushRev),
1 => ::std::option::Option::Some(DocumentClientWSDataType::ClientPing),
_ => ::std::option::Option::None
}
}
@ -807,6 +809,7 @@ impl ::protobuf::ProtobufEnum for DocumentClientWSDataType {
fn values() -> &'static [Self] {
static values: &'static [DocumentClientWSDataType] = &[
DocumentClientWSDataType::ClientPushRev,
DocumentClientWSDataType::ClientPing,
];
values
}
@ -900,55 +903,57 @@ static file_descriptor_proto_data: &'static [u8] = b"\
umentServerWSDataTypeR\x02ty\x12\x12\n\x04data\x18\x03\x20\x01(\x0cR\x04\
data\"f\n\x0fNewDocumentUser\x12\x17\n\x07user_id\x18\x01\x20\x01(\tR\
\x06userId\x12\x15\n\x06doc_id\x18\x02\x20\x01(\tR\x05docId\x12#\n\rrevi\
sion_data\x18\x03\x20\x01(\x0cR\x0crevisionData*-\n\x18DocumentClientWSD\
ataType\x12\x11\n\rClientPushRev\x10\0*`\n\x18DocumentServerWSDataType\
\x12\r\n\tServerAck\x10\0\x12\x11\n\rServerPushRev\x10\x01\x12\x11\n\rSe\
rverPullRev\x10\x02\x12\x0f\n\x0bUserConnect\x10\x03J\x88\x07\n\x06\x12\
\x04\0\0\x1b\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\t\n\x02\x03\0\x12\x03\
\x01\0\x18\n\n\n\x02\x04\0\x12\x04\x03\0\x08\x01\n\n\n\x03\x04\0\x01\x12\
\x03\x03\x08\x1c\n\x0b\n\x04\x04\0\x02\0\x12\x03\x04\x04\x16\n\x0c\n\x05\
\x04\0\x02\0\x05\x12\x03\x04\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\
\x04\x0b\x11\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x04\x14\x15\n\x0b\n\x04\
\x04\0\x02\x01\x12\x03\x05\x04$\n\x0c\n\x05\x04\0\x02\x01\x06\x12\x03\
\x05\x04\x1c\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x05\x1d\x1f\n\x0c\n\
\x05\x04\0\x02\x01\x03\x12\x03\x05\"#\n\x0b\n\x04\x04\0\x02\x02\x12\x03\
\x06\x04#\n\x0c\n\x05\x04\0\x02\x02\x06\x12\x03\x06\x04\x14\n\x0c\n\x05\
\x04\0\x02\x02\x01\x12\x03\x06\x15\x1e\n\x0c\n\x05\x04\0\x02\x02\x03\x12\
\x03\x06!\"\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x07\x04\x12\n\x0c\n\x05\
\x04\0\x02\x03\x05\x12\x03\x07\x04\n\n\x0c\n\x05\x04\0\x02\x03\x01\x12\
\x03\x07\x0b\r\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\x07\x10\x11\n\n\n\
\x02\x04\x01\x12\x04\t\0\r\x01\n\n\n\x03\x04\x01\x01\x12\x03\t\x08\x1c\n\
\x0b\n\x04\x04\x01\x02\0\x12\x03\n\x04\x16\n\x0c\n\x05\x04\x01\x02\0\x05\
\x12\x03\n\x04\n\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\n\x0b\x11\n\x0c\n\
\x05\x04\x01\x02\0\x03\x12\x03\n\x14\x15\n\x0b\n\x04\x04\x01\x02\x01\x12\
\x03\x0b\x04$\n\x0c\n\x05\x04\x01\x02\x01\x06\x12\x03\x0b\x04\x1c\n\x0c\
\n\x05\x04\x01\x02\x01\x01\x12\x03\x0b\x1d\x1f\n\x0c\n\x05\x04\x01\x02\
\x01\x03\x12\x03\x0b\"#\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\x0c\x04\x13\
\n\x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\x0c\x04\t\n\x0c\n\x05\x04\x01\
\x02\x02\x01\x12\x03\x0c\n\x0e\n\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\
\x0c\x11\x12\n\n\n\x02\x04\x02\x12\x04\x0e\0\x12\x01\n\n\n\x03\x04\x02\
\x01\x12\x03\x0e\x08\x17\n\x0b\n\x04\x04\x02\x02\0\x12\x03\x0f\x04\x17\n\
\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0f\x04\n\n\x0c\n\x05\x04\x02\x02\0\
\x01\x12\x03\x0f\x0b\x12\n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03\x0f\x15\
\x16\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\x10\x04\x16\n\x0c\n\x05\x04\x02\
\x02\x01\x05\x12\x03\x10\x04\n\n\x0c\n\x05\x04\x02\x02\x01\x01\x12\x03\
\x10\x0b\x11\n\x0c\n\x05\x04\x02\x02\x01\x03\x12\x03\x10\x14\x15\n\x0b\n\
\x04\x04\x02\x02\x02\x12\x03\x11\x04\x1c\n\x0c\n\x05\x04\x02\x02\x02\x05\
\x12\x03\x11\x04\t\n\x0c\n\x05\x04\x02\x02\x02\x01\x12\x03\x11\n\x17\n\
\x0c\n\x05\x04\x02\x02\x02\x03\x12\x03\x11\x1a\x1b\n\n\n\x02\x05\0\x12\
\x04\x13\0\x15\x01\n\n\n\x03\x05\0\x01\x12\x03\x13\x05\x1d\n\x0b\n\x04\
\x05\0\x02\0\x12\x03\x14\x04\x16\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x14\
\x04\x11\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x14\x14\x15\n\n\n\x02\x05\
\x01\x12\x04\x16\0\x1b\x01\n\n\n\x03\x05\x01\x01\x12\x03\x16\x05\x1d\n\
\x0b\n\x04\x05\x01\x02\0\x12\x03\x17\x04\x12\n\x0c\n\x05\x05\x01\x02\0\
\x01\x12\x03\x17\x04\r\n\x0c\n\x05\x05\x01\x02\0\x02\x12\x03\x17\x10\x11\
\n\x0b\n\x04\x05\x01\x02\x01\x12\x03\x18\x04\x16\n\x0c\n\x05\x05\x01\x02\
\x01\x01\x12\x03\x18\x04\x11\n\x0c\n\x05\x05\x01\x02\x01\x02\x12\x03\x18\
\x14\x15\n\x0b\n\x04\x05\x01\x02\x02\x12\x03\x19\x04\x16\n\x0c\n\x05\x05\
\x01\x02\x02\x01\x12\x03\x19\x04\x11\n\x0c\n\x05\x05\x01\x02\x02\x02\x12\
\x03\x19\x14\x15\n\x0b\n\x04\x05\x01\x02\x03\x12\x03\x1a\x04\x14\n\x0c\n\
\x05\x05\x01\x02\x03\x01\x12\x03\x1a\x04\x0f\n\x0c\n\x05\x05\x01\x02\x03\
\x02\x12\x03\x1a\x12\x13b\x06proto3\
sion_data\x18\x03\x20\x01(\x0cR\x0crevisionData*=\n\x18DocumentClientWSD\
ataType\x12\x11\n\rClientPushRev\x10\0\x12\x0e\n\nClientPing\x10\x01*`\n\
\x18DocumentServerWSDataType\x12\r\n\tServerAck\x10\0\x12\x11\n\rServerP\
ushRev\x10\x01\x12\x11\n\rServerPullRev\x10\x02\x12\x0f\n\x0bUserConnect\
\x10\x03J\xb1\x07\n\x06\x12\x04\0\0\x1c\x01\n\x08\n\x01\x0c\x12\x03\0\0\
\x12\n\t\n\x02\x03\0\x12\x03\x01\0\x18\n\n\n\x02\x04\0\x12\x04\x03\0\x08\
\x01\n\n\n\x03\x04\0\x01\x12\x03\x03\x08\x1c\n\x0b\n\x04\x04\0\x02\0\x12\
\x03\x04\x04\x16\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x04\x04\n\n\x0c\n\
\x05\x04\0\x02\0\x01\x12\x03\x04\x0b\x11\n\x0c\n\x05\x04\0\x02\0\x03\x12\
\x03\x04\x14\x15\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x05\x04$\n\x0c\n\x05\
\x04\0\x02\x01\x06\x12\x03\x05\x04\x1c\n\x0c\n\x05\x04\0\x02\x01\x01\x12\
\x03\x05\x1d\x1f\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x05\"#\n\x0b\n\
\x04\x04\0\x02\x02\x12\x03\x06\x04#\n\x0c\n\x05\x04\0\x02\x02\x06\x12\
\x03\x06\x04\x14\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x06\x15\x1e\n\x0c\
\n\x05\x04\0\x02\x02\x03\x12\x03\x06!\"\n\x0b\n\x04\x04\0\x02\x03\x12\
\x03\x07\x04\x12\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x07\x04\n\n\x0c\n\
\x05\x04\0\x02\x03\x01\x12\x03\x07\x0b\r\n\x0c\n\x05\x04\0\x02\x03\x03\
\x12\x03\x07\x10\x11\n\n\n\x02\x04\x01\x12\x04\t\0\r\x01\n\n\n\x03\x04\
\x01\x01\x12\x03\t\x08\x1c\n\x0b\n\x04\x04\x01\x02\0\x12\x03\n\x04\x16\n\
\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\n\x04\n\n\x0c\n\x05\x04\x01\x02\0\
\x01\x12\x03\n\x0b\x11\n\x0c\n\x05\x04\x01\x02\0\x03\x12\x03\n\x14\x15\n\
\x0b\n\x04\x04\x01\x02\x01\x12\x03\x0b\x04$\n\x0c\n\x05\x04\x01\x02\x01\
\x06\x12\x03\x0b\x04\x1c\n\x0c\n\x05\x04\x01\x02\x01\x01\x12\x03\x0b\x1d\
\x1f\n\x0c\n\x05\x04\x01\x02\x01\x03\x12\x03\x0b\"#\n\x0b\n\x04\x04\x01\
\x02\x02\x12\x03\x0c\x04\x13\n\x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\x0c\
\x04\t\n\x0c\n\x05\x04\x01\x02\x02\x01\x12\x03\x0c\n\x0e\n\x0c\n\x05\x04\
\x01\x02\x02\x03\x12\x03\x0c\x11\x12\n\n\n\x02\x04\x02\x12\x04\x0e\0\x12\
\x01\n\n\n\x03\x04\x02\x01\x12\x03\x0e\x08\x17\n\x0b\n\x04\x04\x02\x02\0\
\x12\x03\x0f\x04\x17\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0f\x04\n\n\
\x0c\n\x05\x04\x02\x02\0\x01\x12\x03\x0f\x0b\x12\n\x0c\n\x05\x04\x02\x02\
\0\x03\x12\x03\x0f\x15\x16\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\x10\x04\
\x16\n\x0c\n\x05\x04\x02\x02\x01\x05\x12\x03\x10\x04\n\n\x0c\n\x05\x04\
\x02\x02\x01\x01\x12\x03\x10\x0b\x11\n\x0c\n\x05\x04\x02\x02\x01\x03\x12\
\x03\x10\x14\x15\n\x0b\n\x04\x04\x02\x02\x02\x12\x03\x11\x04\x1c\n\x0c\n\
\x05\x04\x02\x02\x02\x05\x12\x03\x11\x04\t\n\x0c\n\x05\x04\x02\x02\x02\
\x01\x12\x03\x11\n\x17\n\x0c\n\x05\x04\x02\x02\x02\x03\x12\x03\x11\x1a\
\x1b\n\n\n\x02\x05\0\x12\x04\x13\0\x16\x01\n\n\n\x03\x05\0\x01\x12\x03\
\x13\x05\x1d\n\x0b\n\x04\x05\0\x02\0\x12\x03\x14\x04\x16\n\x0c\n\x05\x05\
\0\x02\0\x01\x12\x03\x14\x04\x11\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x14\
\x14\x15\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x15\x04\x13\n\x0c\n\x05\x05\0\
\x02\x01\x01\x12\x03\x15\x04\x0e\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\
\x15\x11\x12\n\n\n\x02\x05\x01\x12\x04\x17\0\x1c\x01\n\n\n\x03\x05\x01\
\x01\x12\x03\x17\x05\x1d\n\x0b\n\x04\x05\x01\x02\0\x12\x03\x18\x04\x12\n\
\x0c\n\x05\x05\x01\x02\0\x01\x12\x03\x18\x04\r\n\x0c\n\x05\x05\x01\x02\0\
\x02\x12\x03\x18\x10\x11\n\x0b\n\x04\x05\x01\x02\x01\x12\x03\x19\x04\x16\
\n\x0c\n\x05\x05\x01\x02\x01\x01\x12\x03\x19\x04\x11\n\x0c\n\x05\x05\x01\
\x02\x01\x02\x12\x03\x19\x14\x15\n\x0b\n\x04\x05\x01\x02\x02\x12\x03\x1a\
\x04\x16\n\x0c\n\x05\x05\x01\x02\x02\x01\x12\x03\x1a\x04\x11\n\x0c\n\x05\
\x05\x01\x02\x02\x02\x12\x03\x1a\x14\x15\n\x0b\n\x04\x05\x01\x02\x03\x12\
\x03\x1b\x04\x14\n\x0c\n\x05\x05\x01\x02\x03\x01\x12\x03\x1b\x04\x0f\n\
\x0c\n\x05\x05\x01\x02\x03\x02\x12\x03\x1b\x12\x13b\x06proto3\
";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

View File

@ -21,7 +21,7 @@ message RevisionRange {
int64 end = 3;
}
enum RevisionState {
StateLocal = 0;
Local = 0;
Ack = 1;
}
enum RevType {

View File

@ -19,6 +19,7 @@ message NewDocumentUser {
}
enum DocumentClientWSDataType {
ClientPushRev = 0;
ClientPing = 1;
}
enum DocumentServerWSDataType {
ServerAck = 0;

View File

@ -40,16 +40,14 @@ impl ServerDocumentManager {
}
}
pub async fn apply_revisions(
pub async fn handle_client_revisions(
&self,
user: Arc<dyn RevisionUser>,
mut client_data: DocumentClientWSData,
) -> Result<(), CollaborateError> {
let mut pb = client_data.take_revisions();
let cloned_user = user.clone();
let ack_id = client_data.id.clone().parse::<i64>().map_err(|e| {
CollaborateError::internal().context(format!("Parse rev_id from {} failed. {}", &client_data.id, e))
})?;
let ack_id = rev_id_from_str(&client_data.id)?;
let doc_id = client_data.doc_id;
let revisions = spawn_blocking(move || {
@ -81,6 +79,23 @@ impl ServerDocumentManager {
result
}
pub async fn handle_client_ping(
&self,
user: Arc<dyn RevisionUser>,
client_data: DocumentClientWSData,
) -> Result<(), CollaborateError> {
let rev_id = rev_id_from_str(&client_data.id)?;
let doc_id = client_data.doc_id.clone();
match self.get_document_handler(&doc_id).await {
None => Ok(()),
Some(handler) => {
let _ = handler.apply_ping(doc_id.clone(), rev_id, user).await?;
Ok(())
},
}
}
async fn get_document_handler(&self, doc_id: &str) -> Option<Arc<OpenDocHandle>> {
match self.open_doc_map.get(doc_id).map(|ctx| ctx.clone()) {
Some(edit_doc) => Some(edit_doc),
@ -168,6 +183,27 @@ impl OpenDocHandle {
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, user), err)]
async fn apply_ping(
&self,
doc_id: String,
rev_id: i64,
user: Arc<dyn RevisionUser>,
) -> Result<(), CollaborateError> {
let (ret, rx) = oneshot::channel();
self.users.insert(user.user_id(), user.clone());
let persistence = self.persistence.clone();
let msg = DocumentCommand::Ping {
doc_id,
user,
persistence,
rev_id,
ret,
};
let _ = self.send(msg, rx).await?;
Ok(())
}
async fn send<T>(&self, msg: DocumentCommand, rx: oneshot::Receiver<T>) -> CollaborateResult<T> {
let _ = self
.sender
@ -193,6 +229,13 @@ enum DocumentCommand {
persistence: Arc<dyn DocumentPersistence>,
ret: oneshot::Sender<CollaborateResult<()>>,
},
Ping {
doc_id: String,
user: Arc<dyn RevisionUser>,
persistence: Arc<dyn DocumentPersistence>,
rev_id: i64,
ret: oneshot::Sender<CollaborateResult<()>>,
},
}
struct DocumentCommandQueue {
@ -248,7 +291,20 @@ impl DocumentCommandQueue {
.sync_revisions(doc_id, user, revisions, persistence)
.await
.map_err(internal_error);
log::debug!("handle message {:?}", result);
let _ = ret.send(result);
},
DocumentCommand::Ping {
doc_id,
user,
persistence,
rev_id,
ret,
} => {
let result = self
.synchronizer
.pong(doc_id, user, persistence, rev_id)
.await
.map_err(internal_error);
let _ = ret.send(result);
},
}
@ -260,3 +316,11 @@ impl std::ops::Drop for DocumentCommandQueue {
log::debug!("{} DocumentCommandQueue drop", self.doc_id);
}
}
fn rev_id_from_str(s: &str) -> Result<i64, CollaborateError> {
let rev_id = s
.to_owned()
.parse::<i64>()
.map_err(|e| CollaborateError::internal().context(format!("Parse rev_id from {} failed. {}", s, e)))?;
Ok(rev_id)
}

View File

@ -101,24 +101,33 @@ impl RevisionSynchronizer {
// delta.
let from_rev_id = first_revision.rev_id;
let to_rev_id = server_base_rev_id;
let rev_ids: Vec<i64> = (from_rev_id..=to_rev_id).collect();
let revisions = match persistence.get_revisions(&self.doc_id, rev_ids).await {
Ok(revisions) => {
assert_eq!(
revisions.is_empty(),
false,
"revisions should not be empty if the doc exists"
);
revisions
},
Err(e) => {
tracing::error!("{}", e);
vec![]
},
};
let _ = self.push_revisions_to_user(user, persistence, from_rev_id, to_rev_id);
},
}
Ok(())
}
let data = DocumentServerWSDataBuilder::build_push_message(&self.doc_id, revisions);
user.receive(SyncResponse::Push(data));
pub async fn pong(
&self,
doc_id: String,
user: Arc<dyn RevisionUser>,
persistence: Arc<dyn DocumentPersistence>,
rev_id: i64,
) -> Result<(), CollaborateError> {
let server_base_rev_id = self.rev_id.load(SeqCst);
match server_base_rev_id.cmp(&rev_id) {
Ordering::Less => tracing::error!(
"[Pong] Client should not send ping and the server should pull the revisions from the client"
),
Ordering::Equal => tracing::debug!("[Pong]: The document:{} is up to date.", doc_id),
Ordering::Greater => {
// The client document is outdated. Transform the client revision delta and then
// send the prime delta to the client. Client should compose the this prime
// delta.
let from_rev_id = rev_id;
let to_rev_id = server_base_rev_id;
tracing::trace!("[Pong]: Push revisions to user");
let _ = self.push_revisions_to_user(user, persistence, from_rev_id, to_rev_id);
},
}
Ok(())
@ -154,20 +163,6 @@ impl RevisionSynchronizer {
Ok(())
}
// fn mk_revision(&self, base_rev_id: i64, delta: RichTextDelta) -> Revision {
// let delta_data = delta.to_bytes().to_vec();
// let md5 = md5(&delta_data);
// Revision {
// base_rev_id,
// rev_id: self.rev_id.load(SeqCst),
// delta_data,
// md5,
// doc_id: self.doc_id.to_string(),
// ty: RevType::Remote,
// user_id: "".to_string(),
// }
// }
#[allow(dead_code)]
pub(crate) fn rev_id(&self) -> i64 { self.rev_id.load(SeqCst) }
@ -182,6 +177,33 @@ impl RevisionSynchronizer {
false
}
async fn push_revisions_to_user(
&self,
user: Arc<dyn RevisionUser>,
persistence: Arc<dyn DocumentPersistence>,
from: i64,
to: i64,
) {
let rev_ids: Vec<i64> = (from..=to).collect();
let revisions = match persistence.get_revisions(&self.doc_id, rev_ids).await {
Ok(revisions) => {
assert_eq!(
revisions.is_empty(),
false,
"revisions should not be empty if the doc exists"
);
revisions
},
Err(e) => {
tracing::error!("{}", e);
vec![]
},
};
let data = DocumentServerWSDataBuilder::build_push_message(&self.doc_id, revisions);
user.receive(SyncResponse::Push(data));
}
}
#[inline]

View File

@ -97,8 +97,6 @@ impl CreateViewParams {
view_id,
}
}
pub fn take_view_data(&mut self) -> String { std::mem::take(&mut self.view_data) }
}
impl TryInto<CreateViewParams> for CreateViewRequest {