send revsion to server

This commit is contained in:
appflowy 2021-09-23 15:49:10 +08:00
parent 302631f808
commit 1d9d776e3a
32 changed files with 690 additions and 190 deletions

View File

@ -2,5 +2,6 @@
export './ws.pb.dart';
export './observable.pb.dart';
export './errors.pb.dart';
export './revision.pb.dart';
export './event.pb.dart';
export './doc.pb.dart';

View File

@ -0,0 +1,101 @@
///
// Generated code. Do not modify.
// source: revision.proto
//
// @dart = 2.12
// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields
import 'dart:core' as $core;
import 'package:fixnum/fixnum.dart' as $fixnum;
import 'package:protobuf/protobuf.dart' as $pb;
class Revision extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'Revision', createEmptyInstance: create)
..aInt64(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'baseRevId')
..aInt64(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'revId')
..a<$core.List<$core.int>>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'delta', $pb.PbFieldType.OY)
..aOS(4, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'md5')
..hasRequiredFields = false
;
Revision._() : super();
factory Revision({
$fixnum.Int64? baseRevId,
$fixnum.Int64? revId,
$core.List<$core.int>? delta,
$core.String? md5,
}) {
final _result = create();
if (baseRevId != null) {
_result.baseRevId = baseRevId;
}
if (revId != null) {
_result.revId = revId;
}
if (delta != null) {
_result.delta = delta;
}
if (md5 != null) {
_result.md5 = md5;
}
return _result;
}
factory Revision.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
factory Revision.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.deepCopy] instead. '
'Will be removed in next major version')
Revision clone() => Revision()..mergeFromMessage(this);
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.rebuild] instead. '
'Will be removed in next major version')
Revision copyWith(void Function(Revision) updates) => super.copyWith((message) => updates(message as Revision)) as Revision; // ignore: deprecated_member_use
$pb.BuilderInfo get info_ => _i;
@$core.pragma('dart2js:noInline')
static Revision create() => Revision._();
Revision createEmptyInstance() => create();
static $pb.PbList<Revision> createRepeated() => $pb.PbList<Revision>();
@$core.pragma('dart2js:noInline')
static Revision getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<Revision>(create);
static Revision? _defaultInstance;
@$pb.TagNumber(1)
$fixnum.Int64 get baseRevId => $_getI64(0);
@$pb.TagNumber(1)
set baseRevId($fixnum.Int64 v) { $_setInt64(0, v); }
@$pb.TagNumber(1)
$core.bool hasBaseRevId() => $_has(0);
@$pb.TagNumber(1)
void clearBaseRevId() => clearField(1);
@$pb.TagNumber(2)
$fixnum.Int64 get revId => $_getI64(1);
@$pb.TagNumber(2)
set revId($fixnum.Int64 v) { $_setInt64(1, v); }
@$pb.TagNumber(2)
$core.bool hasRevId() => $_has(1);
@$pb.TagNumber(2)
void clearRevId() => clearField(2);
@$pb.TagNumber(3)
$core.List<$core.int> get delta => $_getN(2);
@$pb.TagNumber(3)
set delta($core.List<$core.int> v) { $_setBytes(2, v); }
@$pb.TagNumber(3)
$core.bool hasDelta() => $_has(2);
@$pb.TagNumber(3)
void clearDelta() => clearField(3);
@$pb.TagNumber(4)
$core.String get md5 => $_getSZ(3);
@$pb.TagNumber(4)
set md5($core.String v) { $_setString(3, v); }
@$pb.TagNumber(4)
$core.bool hasMd5() => $_has(3);
@$pb.TagNumber(4)
void clearMd5() => clearField(4);
}

View File

@ -0,0 +1,7 @@
///
// Generated code. Do not modify.
// source: revision.proto
//
// @dart = 2.12
// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields

View File

@ -0,0 +1,23 @@
///
// Generated code. Do not modify.
// source: revision.proto
//
// @dart = 2.12
// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields,deprecated_member_use_from_same_package
import 'dart:core' as $core;
import 'dart:convert' as $convert;
import 'dart:typed_data' as $typed_data;
@$core.Deprecated('Use revisionDescriptor instead')
const Revision$json = const {
'1': 'Revision',
'2': const [
const {'1': 'base_rev_id', '3': 1, '4': 1, '5': 3, '10': 'baseRevId'},
const {'1': 'rev_id', '3': 2, '4': 1, '5': 3, '10': 'revId'},
const {'1': 'delta', '3': 3, '4': 1, '5': 12, '10': 'delta'},
const {'1': 'md5', '3': 4, '4': 1, '5': 9, '10': 'md5'},
],
};
/// Descriptor for `Revision`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List revisionDescriptor = $convert.base64Decode('CghSZXZpc2lvbhIeCgtiYXNlX3Jldl9pZBgBIAEoA1IJYmFzZVJldklkEhUKBnJldl9pZBgCIAEoA1IFcmV2SWQSFAoFZGVsdGEYAyABKAxSBWRlbHRhEhAKA21kNRgEIAEoCVIDbWQ1');

View File

@ -0,0 +1,9 @@
///
// Generated code. Do not modify.
// source: revision.proto
//
// @dart = 2.12
// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields,deprecated_member_use_from_same_package
export 'revision.pb.dart';

View File

@ -1,7 +1,7 @@
-- Your SQL goes here
CREATE TABLE op_table (
base_rev BIGINT NOT NULL DEFAULT 0,
rev BIGINT NOT NULL PRIMARY KEY,
base_rev_id BIGINT NOT NULL DEFAULT 0,
rev_id BIGINT NOT NULL PRIMARY KEY,
data BLOB NOT NULL DEFAULT (x''),
md5 TEXT NOT NULL DEFAULT '',
state INTEGER NOT NULL DEFAULT 0

View File

@ -22,9 +22,9 @@ table! {
}
table! {
op_table (rev) {
base_rev -> BigInt,
rev -> BigInt,
op_table (rev_id) {
base_rev_id -> BigInt,
rev_id -> BigInt,
data -> Binary,
md5 -> Text,
state -> Integer,

View File

@ -60,6 +60,7 @@ pub fn category_from_str(type_str: &str) -> TypeCategory {
| "UpdateDocParams"
| "DocDelta"
| "QueryDocParams"
| "Revision"
| "WsDocumentData"
| "DocError"
| "FFIRequest"

View File

@ -35,6 +35,8 @@ url = "2.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = {version = "1.0"}
chrono = "0.4.19"
futures-core = { version = "0.3", default-features = false }
md5 = "0.7.0"
[dev-dependencies]
flowy-test = { path = "../flowy-test" }

View File

@ -1,4 +1,6 @@
mod doc;
pub mod parser;
mod revision;
pub use doc::*;
pub use revision::*;

View File

@ -0,0 +1,27 @@
use flowy_derive::ProtoBuf;
#[derive(Debug, Clone, Default, ProtoBuf)]
pub struct Revision {
#[pb(index = 1)]
pub base_rev_id: i64,
#[pb(index = 2)]
pub rev_id: i64,
#[pb(index = 3)]
pub delta: Vec<u8>,
#[pb(index = 4)]
pub md5: String,
}
impl Revision {
pub fn new(base_rev_id: i64, rev_id: i64, delta: Vec<u8>, md5: String) -> Revision {
Self {
base_rev_id,
rev_id,
delta,
md5,
}
}
}

View File

@ -104,6 +104,10 @@ impl std::convert::From<serde_json::Error> for DocError {
fn from(error: serde_json::Error) -> Self { DocError::internal().context(error) }
}
impl std::convert::From<protobuf::ProtobufError> for DocError {
fn from(e: protobuf::ProtobufError) -> Self { DocError::internal().context(e) }
}
// impl std::convert::From<::r2d2::Error> for DocError {
// fn from(error: r2d2::Error) -> Self {
// ErrorBuilder::new(ErrorCode::InternalError).error(error).build() } }

View File

@ -1,6 +1,6 @@
use std::sync::Arc;
use bytes::Bytes;
use diesel::SqliteConnection;
use parking_lot::RwLock;

View File

@ -9,6 +9,9 @@ pub use observable::*;
mod errors;
pub use errors::*;
mod revision;
pub use revision::*;
mod event;
pub use event::*;

View File

@ -0,0 +1,327 @@
// This file is generated by rust-protobuf 2.22.1. Do not edit
// @generated
// https://github.com/rust-lang/rust-clippy/issues/702
#![allow(unknown_lints)]
#![allow(clippy::all)]
#![allow(unused_attributes)]
#![cfg_attr(rustfmt, rustfmt::skip)]
#![allow(box_pointers)]
#![allow(dead_code)]
#![allow(missing_docs)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#![allow(non_upper_case_globals)]
#![allow(trivial_casts)]
#![allow(unused_imports)]
#![allow(unused_results)]
//! Generated file from `revision.proto`
/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_22_1;
#[derive(PartialEq,Clone,Default)]
pub struct Revision {
// message fields
pub base_rev_id: i64,
pub rev_id: i64,
pub delta: ::std::vec::Vec<u8>,
pub md5: ::std::string::String,
// special fields
pub unknown_fields: ::protobuf::UnknownFields,
pub cached_size: ::protobuf::CachedSize,
}
impl<'a> ::std::default::Default for &'a Revision {
fn default() -> &'a Revision {
<Revision as ::protobuf::Message>::default_instance()
}
}
impl Revision {
pub fn new() -> Revision {
::std::default::Default::default()
}
// int64 base_rev_id = 1;
pub fn get_base_rev_id(&self) -> i64 {
self.base_rev_id
}
pub fn clear_base_rev_id(&mut self) {
self.base_rev_id = 0;
}
// Param is passed by value, moved
pub fn set_base_rev_id(&mut self, v: i64) {
self.base_rev_id = v;
}
// int64 rev_id = 2;
pub fn get_rev_id(&self) -> i64 {
self.rev_id
}
pub fn clear_rev_id(&mut self) {
self.rev_id = 0;
}
// Param is passed by value, moved
pub fn set_rev_id(&mut self, v: i64) {
self.rev_id = v;
}
// bytes delta = 3;
pub fn get_delta(&self) -> &[u8] {
&self.delta
}
pub fn clear_delta(&mut self) {
self.delta.clear();
}
// Param is passed by value, moved
pub fn set_delta(&mut self, v: ::std::vec::Vec<u8>) {
self.delta = v;
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_delta(&mut self) -> &mut ::std::vec::Vec<u8> {
&mut self.delta
}
// Take field
pub fn take_delta(&mut self) -> ::std::vec::Vec<u8> {
::std::mem::replace(&mut self.delta, ::std::vec::Vec::new())
}
// string md5 = 4;
pub fn get_md5(&self) -> &str {
&self.md5
}
pub fn clear_md5(&mut self) {
self.md5.clear();
}
// Param is passed by value, moved
pub fn set_md5(&mut self, v: ::std::string::String) {
self.md5 = v;
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_md5(&mut self) -> &mut ::std::string::String {
&mut self.md5
}
// Take field
pub fn take_md5(&mut self) -> ::std::string::String {
::std::mem::replace(&mut self.md5, ::std::string::String::new())
}
}
impl ::protobuf::Message for Revision {
fn is_initialized(&self) -> bool {
true
}
fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
while !is.eof()? {
let (field_number, wire_type) = is.read_tag_unpack()?;
match field_number {
1 => {
if wire_type != ::protobuf::wire_format::WireTypeVarint {
return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
}
let tmp = is.read_int64()?;
self.base_rev_id = tmp;
},
2 => {
if wire_type != ::protobuf::wire_format::WireTypeVarint {
return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
}
let tmp = is.read_int64()?;
self.rev_id = tmp;
},
3 => {
::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.delta)?;
},
4 => {
::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.md5)?;
},
_ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
},
};
}
::std::result::Result::Ok(())
}
// Compute sizes of nested messages
#[allow(unused_variables)]
fn compute_size(&self) -> u32 {
let mut my_size = 0;
if self.base_rev_id != 0 {
my_size += ::protobuf::rt::value_size(1, self.base_rev_id, ::protobuf::wire_format::WireTypeVarint);
}
if self.rev_id != 0 {
my_size += ::protobuf::rt::value_size(2, self.rev_id, ::protobuf::wire_format::WireTypeVarint);
}
if !self.delta.is_empty() {
my_size += ::protobuf::rt::bytes_size(3, &self.delta);
}
if !self.md5.is_empty() {
my_size += ::protobuf::rt::string_size(4, &self.md5);
}
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size);
my_size
}
fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
if self.base_rev_id != 0 {
os.write_int64(1, self.base_rev_id)?;
}
if self.rev_id != 0 {
os.write_int64(2, self.rev_id)?;
}
if !self.delta.is_empty() {
os.write_bytes(3, &self.delta)?;
}
if !self.md5.is_empty() {
os.write_string(4, &self.md5)?;
}
os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(())
}
fn get_cached_size(&self) -> u32 {
self.cached_size.get()
}
fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
&self.unknown_fields
}
fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
&mut self.unknown_fields
}
fn as_any(&self) -> &dyn (::std::any::Any) {
self as &dyn (::std::any::Any)
}
fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
self as &mut dyn (::std::any::Any)
}
fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
self
}
fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
Self::descriptor_static()
}
fn new() -> Revision {
Revision::new()
}
fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
descriptor.get(|| {
let mut fields = ::std::vec::Vec::new();
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>(
"base_rev_id",
|m: &Revision| { &m.base_rev_id },
|m: &mut Revision| { &mut m.base_rev_id },
));
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>(
"rev_id",
|m: &Revision| { &m.rev_id },
|m: &mut Revision| { &mut m.rev_id },
));
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>(
"delta",
|m: &Revision| { &m.delta },
|m: &mut Revision| { &mut m.delta },
));
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
"md5",
|m: &Revision| { &m.md5 },
|m: &mut Revision| { &mut m.md5 },
));
::protobuf::reflect::MessageDescriptor::new_pb_name::<Revision>(
"Revision",
fields,
file_descriptor_proto()
)
})
}
fn default_instance() -> &'static Revision {
static instance: ::protobuf::rt::LazyV2<Revision> = ::protobuf::rt::LazyV2::INIT;
instance.get(Revision::new)
}
}
impl ::protobuf::Clear for Revision {
fn clear(&mut self) {
self.base_rev_id = 0;
self.rev_id = 0;
self.delta.clear();
self.md5.clear();
self.unknown_fields.clear();
}
}
impl ::std::fmt::Debug for Revision {
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
::protobuf::text_format::fmt(self, f)
}
}
impl ::protobuf::reflect::ProtobufValue for Revision {
fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
::protobuf::reflect::ReflectValueRef::Message(self)
}
}
static file_descriptor_proto_data: &'static [u8] = b"\
\n\x0erevision.proto\"i\n\x08Revision\x12\x1e\n\x0bbase_rev_id\x18\x01\
\x20\x01(\x03R\tbaseRevId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\x03R\x05r\
evId\x12\x14\n\x05delta\x18\x03\x20\x01(\x0cR\x05delta\x12\x10\n\x03md5\
\x18\x04\x20\x01(\tR\x03md5J\x86\x02\n\x06\x12\x04\0\0\x06\x01\n\x08\n\
\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x01\0\x06\x01\n\n\n\x03\
\x04\0\x01\x12\x03\x01\x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\x03\x02\x04\
\x1a\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x02\x04\t\n\x0c\n\x05\x04\0\x02\
\0\x01\x12\x03\x02\n\x15\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x02\x18\x19\
\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x03\x04\x15\n\x0c\n\x05\x04\0\x02\x01\
\x05\x12\x03\x03\x04\t\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x03\n\x10\n\
\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x03\x13\x14\n\x0b\n\x04\x04\0\x02\
\x02\x12\x03\x04\x04\x14\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x04\x04\t\
\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x04\n\x0f\n\x0c\n\x05\x04\0\x02\
\x02\x03\x12\x03\x04\x12\x13\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x05\x04\
\x13\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x05\x04\n\n\x0c\n\x05\x04\0\
\x02\x03\x01\x12\x03\x05\x0b\x0e\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\
\x05\x11\x12b\x06proto3\
";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap()
}
pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
file_descriptor_proto_lazy.get(|| {
parse_descriptor_proto()
})
}

View File

@ -0,0 +1,7 @@
syntax = "proto3";
message Revision {
int64 base_rev_id = 1;
int64 rev_id = 2;
bytes delta = 3;
string md5 = 4;
}

View File

@ -1,19 +1,10 @@
use std::{convert::TryInto, fmt::Debug, sync::Arc};
use std::sync::Arc;
use bytes::Bytes;
use dashmap::DashMap;
use parking_lot::RwLock;
use flowy_database::ConnectionPool;
use flowy_ot::{core::Delta, errors::OTError};
use crate::{
entities::doc::Doc,
errors::DocError,
services::{
doc::edit_context::{DocId, EditDocContext},
ws::WsManager,
},
services::doc::edit_context::{DocId, EditDocContext},
};
pub(crate) struct DocCache {

View File

@ -2,19 +2,15 @@ use crate::{
entities::doc::{CreateDocParams, Doc, DocDelta, QueryDocParams, UpdateDocParams},
errors::{internal_error, DocError},
module::DocumentUser,
services::{
cache::DocCache,
doc::edit_context::{DocId, EditDocContext, EditDocPersistence},
server::Server,
ws::WsManager,
},
sql_tables::doc::{DocTable, DocTableChangeset, DocTableSql, OpTableSql},
services::{cache::DocCache, doc::edit_context::EditDocContext, server::Server, ws::WsManager},
sql_tables::doc::{DocTable, DocTableSql, OpTableSql},
};
use bytes::Bytes;
use flowy_database::{ConnectionPool, SqliteConnection};
use flowy_infra::future::ClosureFuture;
use parking_lot::RwLock;
use std::sync::Arc;
use tokio::task::JoinHandle;
pub(crate) struct DocController {
server: Server,
@ -107,29 +103,17 @@ impl DocController {
}
#[tracing::instrument(level = "debug", skip(self, pool), err)]
fn read_doc_from_server(
&self,
params: QueryDocParams,
pool: Arc<ConnectionPool>,
) -> Result<JoinHandle<Result<Arc<EditDocContext>, DocError>>, DocError> {
async fn read_doc_from_server(&self, params: QueryDocParams, pool: Arc<ConnectionPool>) -> Result<Arc<EditDocContext>, DocError> {
let token = self.user.token()?;
let server = self.server.clone();
let doc_sql = self.doc_sql.clone();
let op_sql = self.op_sql.clone();
let ws = self.ws.clone();
let cache = self.cache.clone();
Ok(tokio::spawn(async move {
match server.read_doc(&token, params).await? {
None => Err(DocError::not_found()),
Some(doc) => {
let doc_table = DocTable::new(doc.clone());
let _ = doc_sql.create_doc_table(doc_table, &*(pool.get().map_err(internal_error)?))?;
let edit_doc_ctx = make_edit_context(ws, cache, op_sql, doc)?;
Ok(edit_doc_ctx)
},
}
}))
match self.server.read_doc(&token, params).await? {
None => Err(DocError::not_found()),
Some(doc) => {
let edit = self.make_edit_context(doc.clone())?;
let conn = &*(pool.get().map_err(internal_error)?);
let _ = self.doc_sql.create_doc_table(doc.into(), conn)?;
Ok(edit)
},
}
}
#[tracing::instrument(level = "debug", skip(self), err)]
@ -149,44 +133,26 @@ impl DocController {
}
async fn _open(&self, params: QueryDocParams, pool: Arc<ConnectionPool>) -> Result<Arc<EditDocContext>, DocError> {
match self.doc_sql.read_doc_table(&params.doc_id, &*(pool.get().map_err(internal_error)?)) {
Ok(doc_table) => {
let edit_doc_ctx = make_edit_context(self.ws.clone(), self.cache.clone(), self.op_sql.clone(), doc_table.into())?;
Ok(edit_doc_ctx)
},
match self.doc_sql.read_doc_table(&params.doc_id, pool.clone()) {
Ok(doc_table) => Ok(self.make_edit_context(doc_table.into())?),
Err(error) => {
if error.is_record_not_found() {
log::debug!("Doc:{} don't exist, reading from server", params.doc_id);
match self.read_doc_from_server(params, pool)?.await.map_err(internal_error)? {
Ok(edit_doc_ctx) => Ok(edit_doc_ctx),
Err(error) => Err(error),
}
Ok(self.read_doc_from_server(params, pool.clone()).await?)
} else {
return Err(error);
}
},
}
}
}
fn make_edit_context(
ws: Arc<RwLock<WsManager>>,
cache: Arc<DocCache>,
op_sql: Arc<OpTableSql>,
doc: Doc,
) -> Result<Arc<EditDocContext>, DocError> {
// Opti: require upgradable_read lock and then upgrade to write lock using
// RwLockUpgradableReadGuard::upgrade(xx)
let edit_doc_ctx = Arc::new(EditDocContext::new(doc, ws.read().sender.clone(), op_sql)?);
ws.write().register_handler(edit_doc_ctx.id.as_ref(), edit_doc_ctx.clone());
cache.set(edit_doc_ctx.clone());
Ok(edit_doc_ctx)
}
impl EditDocPersistence for DocController {
fn save(&self, params: UpdateDocParams, pool: Arc<ConnectionPool>) -> Result<(), DocError> {
let changeset = DocTableChangeset::new(params.clone());
let _ = self.doc_sql.update_doc_table(changeset, &*(pool.get().map_err(internal_error)?))?;
Ok(())
fn make_edit_context(&self, doc: Doc) -> Result<Arc<EditDocContext>, DocError> {
// Opti: require upgradable_read lock and then upgrade to write lock using
// RwLockUpgradableReadGuard::upgrade(xx) of ws
let sender = self.ws.read().sender.clone();
let edit_ctx = Arc::new(EditDocContext::new(doc, sender, self.op_sql.clone())?);
self.ws.write().register_handler(edit_ctx.id.as_ref(), edit_ctx.clone());
self.cache.set(edit_ctx.clone());
Ok(edit_ctx)
}
}

View File

@ -4,7 +4,6 @@ use crate::{
};
use bytes::Bytes;
use flowy_ot::core::*;
use std::convert::TryInto;
pub trait DocumentData {
fn into_string(self) -> Result<String, DocError>;
@ -24,24 +23,10 @@ impl CustomDocument for FlowyDoc {
fn init_delta() -> Delta { DeltaBuilder::new().insert("\n").build() }
}
#[derive(Debug, Clone)]
pub struct RevId(pub usize);
#[derive(Debug, Clone)]
pub struct Revision {
rev_id: RevId,
pub delta: Delta,
}
impl Revision {
pub fn new(rev_id: RevId, delta: Delta) -> Revision { Self { rev_id, delta } }
}
pub struct Document {
delta: Delta,
history: History,
view: View,
rev_id_counter: usize,
last_edit_time: usize,
}
@ -53,7 +38,6 @@ impl Document {
delta,
history: History::new(),
view: View::new(),
rev_id_counter: 1,
last_edit_time: 0,
}
}
@ -67,16 +51,11 @@ impl Document {
pub fn to_bytes(&self) -> Vec<u8> { self.delta.clone().into_bytes() }
pub fn to_string(&self) -> String { self.delta.apply("").unwrap() }
pub fn to_plain_string(&self) -> String { self.delta.apply("").unwrap() }
pub fn apply_delta(&mut self, data: Bytes) -> Result<(), DocError> {
let new_delta = Delta::from_bytes(data.to_vec())?;
log::debug!("Apply delta: {}", new_delta);
let rev_id = self.next_rev_id();
let revision = Revision::new(rev_id, new_delta.clone());
let _ = self.add_delta(&new_delta)?;
log::debug!("Document: {}", self.to_json());
Ok(())
@ -173,7 +152,6 @@ impl Document {
fn add_delta(&mut self, delta: &Delta) -> Result<(), DocError> {
let composed_delta = self.delta.compose(delta)?;
let mut undo_delta = delta.invert(&self.delta);
self.rev_id_counter += 1;
let now = chrono::Utc::now().timestamp_millis() as usize;
if now - self.last_edit_time < RECORD_THRESHOLD {
@ -206,9 +184,6 @@ impl Document {
let inverted_delta = change.invert(&self.delta);
Ok((new_delta, inverted_delta))
}
#[allow(dead_code)]
fn next_rev_id(&self) -> RevId { RevId(self.rev_id_counter) }
}
fn validate_interval(delta: &Delta, interval: &Interval) -> Result<(), DocError> {

View File

@ -1,40 +1,30 @@
use crate::{
entities::{
doc::{Doc, UpdateDocParams},
doc::{Doc, Revision},
ws::{WsDocumentData, WsSource},
},
errors::DocError,
errors::{internal_error, DocError},
services::{
doc::Document,
ws::{WsHandler, WsSender},
},
sql_tables::doc::OpTableSql,
sql_tables::doc::{OpState, OpTable, OpTableSql},
};
use bytes::Bytes;
use flowy_database::ConnectionPool;
use flowy_ot::core::Delta;
use parking_lot::RwLock;
use std::{convert::TryInto, sync::Arc};
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct DocId(pub(crate) String);
impl AsRef<str> for DocId {
fn as_ref(&self) -> &str { &self.0 }
}
impl<T> std::convert::From<T> for DocId
where
T: ToString,
{
fn from(s: T) -> Self { DocId(s.to_string()) }
}
pub(crate) trait EditDocPersistence: Send + Sync {
fn save(&self, params: UpdateDocParams, pool: Arc<ConnectionPool>) -> Result<(), DocError>;
}
use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
use std::{
convert::TryInto,
sync::{
atomic::{AtomicI64, AtomicUsize, Ordering::SeqCst},
Arc,
},
};
pub(crate) struct EditDocContext {
pub(crate) id: DocId,
pub(crate) revision: i64,
pub(crate) rev_counter: RevCounter,
document: RwLock<Document>,
ws_sender: Arc<dyn WsSender>,
op_sql: Arc<OpTableSql>,
@ -43,13 +33,13 @@ pub(crate) struct EditDocContext {
impl EditDocContext {
pub(crate) fn new(doc: Doc, ws_sender: Arc<dyn WsSender>, op_sql: Arc<OpTableSql>) -> Result<Self, DocError> {
let id: DocId = doc.id.into();
let revision = doc.revision;
let rev_counter = RevCounter::new(doc.revision);
let delta: Delta = doc.data.try_into()?;
let document = RwLock::new(Document::from_delta(delta));
Ok(Self {
id,
revision,
rev_counter,
document,
ws_sender,
op_sql,
@ -58,33 +48,57 @@ impl EditDocContext {
pub(crate) fn doc(&self) -> Doc {
Doc {
id: self.id.0.clone(),
id: self.id.clone().into(),
data: self.document.read().to_bytes(),
revision: self.revision,
revision: self.rev_counter.value(),
}
}
pub(crate) fn apply_delta(&self, data: Bytes, pool: Arc<ConnectionPool>) -> Result<(), DocError> {
let mut write_guard = self.document.write();
let _ = write_guard.apply_delta(data.clone())?;
let mut guard = self.document.write();
let base_rev_id = self.rev_counter.value();
let rev_id = self.rev_counter.next();
let _ = guard.apply_delta(data.clone())?;
let json = guard.to_json();
drop(guard);
match self.ws_sender.send_data(data) {
Ok(_) => {},
// Opti: it is necessary to save the rev if send success?
let md5 = format!("{:x}", md5::compute(json));
let revision = Revision::new(base_rev_id, rev_id, data.to_vec(), md5);
self.save_revision(revision.clone(), pool.clone());
match self.ws_sender.send_data(revision.try_into()?) {
Ok(_) => {
// TODO: remove the rev if send success
// let _ = self.delete_revision(rev_id, pool)?;
},
Err(e) => {
// TODO: save to local and retry
log::error!("Send delta failed: {:?}", e);
},
}
Ok(())
}
}
// Opti: strategy to save the document
let save = UpdateDocParams {
doc_id: self.id.0.clone(),
data: write_guard.to_bytes(),
};
// let _ = self.persistence.save(save, pool)?;
impl EditDocContext {
fn save_revision(&self, revision: Revision, pool: Arc<ConnectionPool>) -> Result<(), DocError> {
let conn = &*pool.get().map_err(internal_error)?;
conn.immediate_transaction::<_, DocError, _>(|| {
let op_table: OpTable = revision.into();
let _ = self.op_sql.create_op_table(op_table, conn)?;
Ok(())
})?;
Ok(())
}
fn delete_revision(&self, rev_id: i64, pool: Arc<ConnectionPool>) -> Result<(), DocError> {
let conn = &*pool.get().map_err(internal_error)?;
conn.immediate_transaction::<_, DocError, _>(|| {
let _ = self.op_sql.delete_op_table(rev_id, conn)?;
Ok(())
})?;
Ok(())
}
}
impl WsHandler for EditDocContext {
@ -94,3 +108,33 @@ impl WsHandler for EditDocContext {
}
}
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct DocId(pub(crate) String);
impl AsRef<str> for DocId {
fn as_ref(&self) -> &str { &self.0 }
}
impl<T> std::convert::From<T> for DocId
where
T: ToString,
{
fn from(s: T) -> Self { DocId(s.to_string()) }
}
impl std::convert::Into<String> for DocId {
fn into(self) -> String { self.0.clone() }
}
#[derive(Debug)]
pub struct RevCounter(pub AtomicI64);
impl RevCounter {
pub fn new(n: i64) -> Self { Self(AtomicI64::new(n)) }
pub fn next(&self) -> i64 {
let _ = self.0.fetch_add(1, SeqCst);
self.value()
}
pub fn value(&self) -> i64 { self.0.load(SeqCst) }
}

View File

@ -17,7 +17,7 @@ impl OpTableSql {
}
pub(crate) fn update_op_table(&self, changeset: OpChangeset, conn: &SqliteConnection) -> Result<(), DocError> {
let filter = dsl::op_table.filter(op_table::dsl::rev.eq(changeset.rev));
let filter = dsl::op_table.filter(op_table::dsl::rev_id.eq(changeset.rev_id));
let affected_row = diesel::update(filter).set(changeset).execute(conn)?;
debug_assert_eq!(affected_row, 1);
Ok(())
@ -28,8 +28,8 @@ impl OpTableSql {
Ok(ops)
}
pub(crate) fn delete_op_table(&self, rev: i64, conn: &SqliteConnection) -> Result<(), DocError> {
let filter = dsl::op_table.filter(op_table::dsl::rev.eq(rev));
pub(crate) fn delete_op_table(&self, rev_id: i64, conn: &SqliteConnection) -> Result<(), DocError> {
let filter = dsl::op_table.filter(op_table::dsl::rev_id.eq(rev_id));
let affected_row = diesel::delete(filter).execute(conn)?;
debug_assert_eq!(affected_row, 1);
Ok(())

View File

@ -1,12 +1,13 @@
use crate::entities::doc::Revision;
use diesel::sql_types::Integer;
use flowy_database::schema::op_table;
#[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)]
#[table_name = "op_table"]
#[primary_key(rev)]
#[primary_key(rev_id)]
pub(crate) struct OpTable {
pub(crate) base_rev: i64,
pub(crate) rev: i64,
pub(crate) base_rev_id: i64,
pub(crate) rev_id: i64,
pub(crate) data: Vec<u8>,
pub(crate) md5: String,
pub(crate) state: OpState,
@ -47,8 +48,20 @@ impl_sql_integer_expression!(OpState);
#[derive(AsChangeset, Identifiable, Default, Debug)]
#[table_name = "op_table"]
#[primary_key(rev)]
#[primary_key(rev_id)]
pub(crate) struct OpChangeset {
pub(crate) rev: i64,
pub(crate) rev_id: i64,
pub(crate) state: Option<OpState>,
}
impl std::convert::Into<OpTable> for Revision {
fn into(self) -> OpTable {
OpTable {
base_rev_id: self.base_rev_id,
rev_id: self.rev_id,
data: self.delta,
md5: self.md5,
state: OpState::Local,
}
}
}

View File

@ -1,12 +1,14 @@
use crate::{
errors::DocError,
errors::{internal_error, DocError},
sql_tables::doc::{DocTable, DocTableChangeset},
};
use flowy_database::{
prelude::*,
schema::{doc_table, doc_table::dsl},
ConnectionPool,
SqliteConnection,
};
use std::sync::Arc;
pub struct DocTableSql {}
@ -21,7 +23,8 @@ impl DocTableSql {
Ok(())
}
pub(crate) fn read_doc_table(&self, doc_id: &str, conn: &SqliteConnection) -> Result<DocTable, DocError> {
pub(crate) fn read_doc_table(&self, doc_id: &str, pool: Arc<ConnectionPool>) -> Result<DocTable, DocError> {
let conn = &*pool.get().map_err(internal_error)?;
let doc_table = dsl::doc_table.filter(doc_table::id.eq(doc_id)).first::<DocTable>(conn)?;
Ok(doc_table)

View File

@ -44,3 +44,13 @@ impl std::convert::Into<Doc> for DocTable {
}
}
}
impl std::convert::From<Doc> for DocTable {
fn from(doc: Doc) -> Self {
Self {
id: doc.id,
data: doc.data,
revision: doc.revision,
}
}
}

View File

@ -174,7 +174,7 @@ impl TestBuilder {
std::thread::sleep(Duration::from_millis(*mills_sec as u64));
},
TestOp::AssertStr(delta_i, expected) => {
assert_eq!(&self.documents[*delta_i].to_string(), expected);
assert_eq!(&self.documents[*delta_i].to_plain_string(), expected);
},
TestOp::AssertOpsJson(delta_i, expected) => {
@ -199,10 +199,6 @@ impl TestBuilder {
}
}
pub fn debug_print_delta(delta: &Delta) {
eprintln!("😁 {}", serde_json::to_string(delta).unwrap());
}
pub struct Rng(StdRng);
impl Default for Rng {
@ -210,6 +206,7 @@ impl Default for Rng {
}
impl Rng {
#[allow(dead_code)]
pub fn from_seed(seed: [u8; 32]) -> Self { Rng(StdRng::from_seed(seed)) }
pub fn gen_string(&mut self, len: usize) -> String { (0..len).map(|_| self.0.gen::<char>()).collect() }

View File

@ -8,12 +8,12 @@ use std::{
};
#[pin_project]
pub struct RequestFuture<T> {
pub struct ClosureFuture<T> {
#[pin]
pub fut: Pin<Box<dyn Future<Output = T> + Sync + Send>>,
}
impl<T> Future for RequestFuture<T>
impl<T> Future for ClosureFuture<T>
where
T: Send + Sync,
{

View File

@ -5,7 +5,7 @@ use flowy_document::{
prelude::{WsManager, WsSender, WS_ID},
};
use flowy_user::services::user::UserSession;
use flowy_user::{errors::ErrorCode, services::user::UserSession};
use flowy_ws::{WsMessage, WsMessageHandler};
use parking_lot::RwLock;
use std::{path::Path, sync::Arc};
@ -51,9 +51,19 @@ impl DocumentUser for DocumentUserImpl {
Ok(doc_dir)
}
fn user_id(&self) -> Result<String, DocError> { self.user.user_id().map_err(|e| DocError::internal().context(e)) }
fn user_id(&self) -> Result<String, DocError> {
self.user.user_id().map_err(|e| match e.code {
ErrorCode::InternalError => DocError::internal().context(e.msg),
_ => DocError::internal().context(e),
})
}
fn token(&self) -> Result<String, DocError> { self.user.token().map_err(|e| DocError::internal().context(e)) }
fn token(&self) -> Result<String, DocError> {
self.user.token().map_err(|e| match e.code {
ErrorCode::InternalError => DocError::internal().context(e.msg),
_ => DocError::internal().context(e),
})
}
}
struct WsSenderImpl {

View File

@ -49,32 +49,6 @@ impl ConnectionPool {
}
}
#[derive(Default, Debug, Clone)]
pub struct ConnCounter(Arc<ConnCounterInner>);
impl std::ops::Deref for ConnCounter {
type Target = ConnCounterInner;
fn deref(&self) -> &Self::Target { &*self.0 }
}
#[derive(Default, Debug)]
pub struct ConnCounterInner {
max_number: AtomicUsize,
current_number: AtomicUsize,
}
impl ConnCounterInner {
pub fn get_max_num(&self) -> usize { self.max_number.load(SeqCst) }
pub fn reset(&self) {
// reset max_number to current_number
let _ = self
.max_number
.fetch_update(SeqCst, SeqCst, |_| Some(self.current_number.load(SeqCst)));
}
}
pub type OnExecFunc = Box<dyn Fn() -> Box<dyn Fn(&SqliteConnection, &str)> + Send + Sync>;
pub struct PoolConfig {

View File

@ -110,7 +110,12 @@ impl std::convert::From<::r2d2::Error> for UserError {
}
impl std::convert::From<flowy_ws::errors::WsError> for UserError {
fn from(error: flowy_ws::errors::WsError) -> Self { UserError::internal().context(error) }
fn from(error: flowy_ws::errors::WsError) -> Self {
match error.code {
flowy_ws::errors::ErrorCode::InternalError => UserError::internal().context(error.msg),
_ => UserError::internal().context(error),
}
}
}
// use diesel::result::{Error, DatabaseErrorKind};

View File

@ -3,7 +3,6 @@ use crate::{
app::{App, ColorStyle, UpdateAppParams},
view::RepeatedView,
},
impl_sql_binary_expression,
sql_tables::workspace::WorkspaceTable,
};
use diesel::sql_types::Binary;

View File

@ -1,6 +1,5 @@
use crate::{
entities::view::{RepeatedView, UpdateViewParams, View, ViewType},
impl_sql_integer_expression,
sql_tables::app::AppTable,
};
use diesel::sql_types::Integer;

View File

@ -8,10 +8,10 @@ use url::ParseError;
#[derive(Debug, Default, Clone, ProtoBuf)]
pub struct WsError {
#[pb(index = 1)]
code: ErrorCode,
pub code: ErrorCode,
#[pb(index = 2)]
msg: String,
pub msg: String,
}
macro_rules! static_user_error {