save the delta continuely when typing

This commit is contained in:
appflowy 2021-09-21 15:07:07 +08:00
parent 749b043a99
commit 526f1408bc
46 changed files with 886 additions and 365 deletions

View File

@ -20,7 +20,7 @@ class DocEditBloc extends Bloc<DocEditEvent, DocEditState> {
iDocImpl.applyChangeset(json: changeset.data);
},
save: (Save save) async* {
iDocImpl.saveDoc(json: save.data);
// no need to save
});
}
}

View File

@ -8,7 +8,7 @@ import 'package:flowy_log/flowy_log.dart';
import 'package:flowy_sdk/protobuf/flowy-document/doc.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-workspace/errors.pb.dart';
class FlowyDoc implements EditorChangesetSender {
class FlowyDoc implements EditorDeltaSender {
final Doc doc;
final IDoc iDocImpl;
Document data;
@ -19,7 +19,7 @@ class FlowyDoc implements EditorChangesetSender {
String get id => doc.id;
@override
void sendDelta(Delta changeset, Delta delta) async {
void sendNewDelta(Delta changeset, Delta delta) async {
final json = jsonEncode(changeset.toJson());
Log.debug("Send json: $json");
final result = await iDocImpl.applyChangeset(json: json);
@ -41,7 +41,6 @@ class FlowyDoc implements EditorChangesetSender {
abstract class IDoc {
Future<Either<Doc, WorkspaceError>> readDoc();
Future<Either<Unit, WorkspaceError>> saveDoc({String? json});
Future<Either<Doc, WorkspaceError>> applyChangeset({String? json});
Future<Either<Unit, WorkspaceError>> closeDoc();
}

View File

@ -89,9 +89,6 @@ class HomeDepsResolver {
(docId, _) => DocEditBloc(getIt<IDoc>(param1: docId)));
// editor
getIt.registerFactoryParam<EditorPersistence, String, void>(
(docId, _) => EditorPersistenceImpl(repo: DocRepository(docId: docId)));
getIt.registerFactoryParam<ViewListBloc, List<View>, void>(
(views, _) => ViewListBloc(views: views));

View File

@ -2,11 +2,9 @@ import 'dart:convert';
import 'dart:typed_data';
import 'package:dartz/dartz.dart';
import 'package:flowy_editor/flowy_editor.dart';
import 'package:app_flowy/workspace/domain/i_doc.dart';
import 'package:app_flowy/workspace/infrastructure/repos/doc_repo.dart';
import 'package:flowy_editor/src/model/quill_delta.dart';
import 'package:flowy_log/flowy_log.dart';
import 'package:flowy_sdk/protobuf/flowy-document/doc.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-workspace/errors.pb.dart';
@ -26,12 +24,6 @@ class IDocImpl extends IDoc {
return docOrFail;
}
@override
Future<Either<Unit, WorkspaceError>> saveDoc({String? json}) {
Log.debug("Saving doc");
return repo.saveDoc(data: _encodeText(json));
}
@override
Future<Either<Doc, WorkspaceError>> applyChangeset({String? json}) {
return repo.applyChangeset(data: _encodeText(json));
@ -42,24 +34,3 @@ Uint8List _encodeText(String? json) {
final data = utf8.encode(json ?? "");
return Uint8List.fromList(data);
}
class EditorPersistenceImpl implements EditorPersistence {
DocRepository repo;
EditorPersistenceImpl({
required this.repo,
});
@override
Future<bool> save(List<dynamic> jsonList) async {
Log.debug("Saving doc");
final json = jsonEncode(jsonList);
final data = utf8.encode(json);
return repo.saveDoc(data: Uint8List.fromList(data)).then((result) {
return result.fold(
(l) => true,
(r) => false,
);
});
}
}

View File

@ -18,13 +18,6 @@ class DocRepository {
return WorkspaceEventOpenView(request).send();
}
Future<Either<Unit, WorkspaceError>> saveDoc({required Uint8List data}) {
final request = SaveViewDataRequest.create()
..viewId = docId
..data = data;
return WorkspaceEventSaveViewData(request).send();
}
Future<Either<Doc, WorkspaceError>> applyChangeset(
{required Uint8List data}) {
final request = ApplyChangesetRequest.create()

View File

@ -14,12 +14,12 @@ class DocPage extends StatelessWidget {
final FlowyDoc doc;
DocPage({Key? key, required this.doc}) : super(key: key) {
// getIt<EditorChangesetSender>(param1: doc.id))
// getIt<EditorDeltaSender>(param1: doc.id))
controller = EditorController(
document: doc.data,
selection: const TextSelection.collapsed(offset: 0),
persistence: getIt<EditorPersistence>(param1: doc.id));
document: doc.data,
selection: const TextSelection.collapsed(offset: 0),
);
}
@override

View File

@ -14,13 +14,13 @@ import 'node/line.dart';
import 'node/node.dart';
import 'package:flowy_log/flowy_log.dart';
abstract class EditorChangesetSender {
void sendDelta(Delta changeset, Delta delta);
abstract class EditorDeltaSender {
void sendNewDelta(Delta changeset, Delta delta);
}
/// The rich text document
class Document {
EditorChangesetSender? sender;
EditorDeltaSender? sender;
Document({this.sender}) : _delta = Delta()..insert('\n') {
_loadDocument(_delta);
}
@ -171,7 +171,7 @@ class Document {
_delta = _delta.compose(delta);
sender?.sendDelta(changeset, _delta);
sender?.sendNewDelta(changeset, _delta);
} catch (e) {
throw '_delta compose failed';
}

View File

@ -10,20 +10,14 @@ import '../model/document/document.dart';
import '../model/document/style.dart';
import '../model/document/node/embed.dart';
abstract class EditorPersistence {
Future<bool> save(List<dynamic> jsonList);
}
class EditorController extends ChangeNotifier {
final Document document;
TextSelection selection;
final EditorPersistence? persistence;
Style toggledStyle = Style();
EditorController({
required this.document,
required this.selection,
this.persistence,
});
// item1: Document state before [change].
@ -59,9 +53,7 @@ class EditorController extends ChangeNotifier {
}
void save() {
if (persistence != null) {
persistence!.save(document.toDelta().toJson());
}
// no need to save, deprecated
}
@override

View File

@ -254,23 +254,6 @@ class WorkspaceEventOpenView {
}
}
class WorkspaceEventSaveViewData {
SaveViewDataRequest request;
WorkspaceEventSaveViewData(this.request);
Future<Either<Unit, WorkspaceError>> send() {
final request = FFIRequest.create()
..event = WorkspaceEvent.SaveViewData.toString()
..payload = requestToBytes(this.request);
return Dispatch.asyncRequest(request)
.then((bytesResult) => bytesResult.fold(
(bytes) => left(unit),
(errBytes) => right(WorkspaceError.fromBuffer(errBytes)),
));
}
}
class WorkspaceEventApplyChangeset {
ApplyChangesetRequest request;
WorkspaceEventApplyChangeset(this.request);

View File

@ -1,4 +1,5 @@
// Auto-generated, do not edit
export './ws.pb.dart';
export './observable.pb.dart';
export './errors.pb.dart';
export './event.pb.dart';

View File

@ -0,0 +1,90 @@
///
// Generated code. Do not modify.
// source: ws.proto
//
// @dart = 2.12
// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields
import 'dart:core' as $core;
import 'package:protobuf/protobuf.dart' as $pb;
import 'ws.pbenum.dart';
export 'ws.pbenum.dart';
class WsDocumentData extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'WsDocumentData', createEmptyInstance: create)
..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id')
..e<WsSource>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'source', $pb.PbFieldType.OE, defaultOrMaker: WsSource.Delta, valueOf: WsSource.valueOf, enumValues: WsSource.values)
..a<$core.List<$core.int>>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY)
..hasRequiredFields = false
;
WsDocumentData._() : super();
factory WsDocumentData({
$core.String? id,
WsSource? source,
$core.List<$core.int>? data,
}) {
final _result = create();
if (id != null) {
_result.id = id;
}
if (source != null) {
_result.source = source;
}
if (data != null) {
_result.data = data;
}
return _result;
}
factory WsDocumentData.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
factory WsDocumentData.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.deepCopy] instead. '
'Will be removed in next major version')
WsDocumentData clone() => WsDocumentData()..mergeFromMessage(this);
@$core.Deprecated(
'Using this can add significant overhead to your binary. '
'Use [GeneratedMessageGenericExtensions.rebuild] instead. '
'Will be removed in next major version')
WsDocumentData copyWith(void Function(WsDocumentData) updates) => super.copyWith((message) => updates(message as WsDocumentData)) as WsDocumentData; // ignore: deprecated_member_use
$pb.BuilderInfo get info_ => _i;
@$core.pragma('dart2js:noInline')
static WsDocumentData create() => WsDocumentData._();
WsDocumentData createEmptyInstance() => create();
static $pb.PbList<WsDocumentData> createRepeated() => $pb.PbList<WsDocumentData>();
@$core.pragma('dart2js:noInline')
static WsDocumentData getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<WsDocumentData>(create);
static WsDocumentData? _defaultInstance;
@$pb.TagNumber(1)
$core.String get id => $_getSZ(0);
@$pb.TagNumber(1)
set id($core.String v) { $_setString(0, v); }
@$pb.TagNumber(1)
$core.bool hasId() => $_has(0);
@$pb.TagNumber(1)
void clearId() => clearField(1);
@$pb.TagNumber(2)
WsSource get source => $_getN(1);
@$pb.TagNumber(2)
set source(WsSource v) { setField(2, v); }
@$pb.TagNumber(2)
$core.bool hasSource() => $_has(1);
@$pb.TagNumber(2)
void clearSource() => clearField(2);
@$pb.TagNumber(3)
$core.List<$core.int> get data => $_getN(2);
@$pb.TagNumber(3)
set data($core.List<$core.int> v) { $_setBytes(2, v); }
@$pb.TagNumber(3)
$core.bool hasData() => $_has(2);
@$pb.TagNumber(3)
void clearData() => clearField(3);
}

View File

@ -0,0 +1,24 @@
///
// Generated code. Do not modify.
// source: ws.proto
//
// @dart = 2.12
// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields
// ignore_for_file: UNDEFINED_SHOWN_NAME
import 'dart:core' as $core;
import 'package:protobuf/protobuf.dart' as $pb;
class WsSource extends $pb.ProtobufEnum {
static const WsSource Delta = WsSource._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Delta');
static const $core.List<WsSource> values = <WsSource> [
Delta,
];
static final $core.Map<$core.int, WsSource> _byValue = $pb.ProtobufEnum.initByValue(values);
static WsSource? valueOf($core.int value) => _byValue[value];
const WsSource._($core.int v, $core.String n) : super(v, n);
}

View File

@ -0,0 +1,32 @@
///
// Generated code. Do not modify.
// source: ws.proto
//
// @dart = 2.12
// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields,deprecated_member_use_from_same_package
import 'dart:core' as $core;
import 'dart:convert' as $convert;
import 'dart:typed_data' as $typed_data;
@$core.Deprecated('Use wsSourceDescriptor instead')
const WsSource$json = const {
'1': 'WsSource',
'2': const [
const {'1': 'Delta', '2': 0},
],
};
/// Descriptor for `WsSource`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List wsSourceDescriptor = $convert.base64Decode('CghXc1NvdXJjZRIJCgVEZWx0YRAA');
@$core.Deprecated('Use wsDocumentDataDescriptor instead')
const WsDocumentData$json = const {
'1': 'WsDocumentData',
'2': const [
const {'1': 'id', '3': 1, '4': 1, '5': 9, '10': 'id'},
const {'1': 'source', '3': 2, '4': 1, '5': 14, '6': '.WsSource', '10': 'source'},
const {'1': 'data', '3': 3, '4': 1, '5': 12, '10': 'data'},
],
};
/// Descriptor for `WsDocumentData`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List wsDocumentDataDescriptor = $convert.base64Decode('Cg5Xc0RvY3VtZW50RGF0YRIOCgJpZBgBIAEoCVICaWQSIQoGc291cmNlGAIgASgOMgkuV3NTb3VyY2VSBnNvdXJjZRISCgRkYXRhGAMgASgMUgRkYXRh');

View File

@ -0,0 +1,9 @@
///
// Generated code. Do not modify.
// source: ws.proto
//
// @dart = 2.12
// ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields,deprecated_member_use_from_same_package
export 'ws.pb.dart';

View File

@ -25,8 +25,7 @@ class WorkspaceEvent extends $pb.ProtobufEnum {
static const WorkspaceEvent UpdateView = WorkspaceEvent._(203, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UpdateView');
static const WorkspaceEvent DeleteView = WorkspaceEvent._(204, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'DeleteView');
static const WorkspaceEvent OpenView = WorkspaceEvent._(205, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'OpenView');
static const WorkspaceEvent SaveViewData = WorkspaceEvent._(206, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'SaveViewData');
static const WorkspaceEvent ApplyChangeset = WorkspaceEvent._(207, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'ApplyChangeset');
static const WorkspaceEvent ApplyChangeset = WorkspaceEvent._(206, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'ApplyChangeset');
static const $core.List<WorkspaceEvent> values = <WorkspaceEvent> [
CreateWorkspace,
@ -44,7 +43,6 @@ class WorkspaceEvent extends $pb.ProtobufEnum {
UpdateView,
DeleteView,
OpenView,
SaveViewData,
ApplyChangeset,
];

View File

@ -27,10 +27,9 @@ const WorkspaceEvent$json = const {
const {'1': 'UpdateView', '2': 203},
const {'1': 'DeleteView', '2': 204},
const {'1': 'OpenView', '2': 205},
const {'1': 'SaveViewData', '2': 206},
const {'1': 'ApplyChangeset', '2': 207},
const {'1': 'ApplyChangeset', '2': 206},
],
};
/// Descriptor for `WorkspaceEvent`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List workspaceEventDescriptor = $convert.base64Decode('Cg5Xb3Jrc3BhY2VFdmVudBITCg9DcmVhdGVXb3Jrc3BhY2UQABIUChBSZWFkQ3VyV29ya3NwYWNlEAESEgoOUmVhZFdvcmtzcGFjZXMQAhITCg9EZWxldGVXb3Jrc3BhY2UQAxIRCg1PcGVuV29ya3NwYWNlEAQSFQoRUmVhZFdvcmtzcGFjZUFwcHMQBRINCglDcmVhdGVBcHAQZRINCglEZWxldGVBcHAQZhILCgdSZWFkQXBwEGcSDQoJVXBkYXRlQXBwEGgSDwoKQ3JlYXRlVmlldxDJARINCghSZWFkVmlldxDKARIPCgpVcGRhdGVWaWV3EMsBEg8KCkRlbGV0ZVZpZXcQzAESDQoIT3BlblZpZXcQzQESEQoMU2F2ZVZpZXdEYXRhEM4BEhMKDkFwcGx5Q2hhbmdlc2V0EM8B');
final $typed_data.Uint8List workspaceEventDescriptor = $convert.base64Decode('Cg5Xb3Jrc3BhY2VFdmVudBITCg9DcmVhdGVXb3Jrc3BhY2UQABIUChBSZWFkQ3VyV29ya3NwYWNlEAESEgoOUmVhZFdvcmtzcGFjZXMQAhITCg9EZWxldGVXb3Jrc3BhY2UQAxIRCg1PcGVuV29ya3NwYWNlEAQSFQoRUmVhZFdvcmtzcGFjZUFwcHMQBRINCglDcmVhdGVBcHAQZRINCglEZWxldGVBcHAQZhILCgdSZWFkQXBwEGcSDQoJVXBkYXRlQXBwEGgSDwoKQ3JlYXRlVmlldxDJARINCghSZWFkVmlldxDKARIPCgpVcGRhdGVWaWV3EMsBEg8KCkRlbGV0ZVZpZXcQzAESDQoIT3BlblZpZXcQzQESEwoOQXBwbHlDaGFuZ2VzZXQQzgE=');

View File

@ -60,6 +60,7 @@ pub fn category_from_str(type_str: &str) -> TypeCategory {
| "SaveDocParams"
| "ApplyChangesetParams"
| "QueryDocParams"
| "WsDocumentData"
| "DocError"
| "FFIRequest"
| "FFIResponse"
@ -79,6 +80,7 @@ pub fn category_from_str(type_str: &str) -> TypeCategory {
| "WorkspaceEvent"
| "ErrorCode"
| "WorkspaceObservable"
| "WsSource"
| "DocObservable"
| "FFIStatusCode"
| "UserEvent"

View File

@ -37,7 +37,7 @@ pub struct ApplyChangesetParams {
pub id: String,
#[pb(index = 2)]
pub data: Vec<u8>,
pub data: Vec<u8>, // Delta
}
#[derive(ProtoBuf, Default, Debug, Clone)]

View File

@ -1 +1,2 @@
pub mod doc;
pub mod ws;

View File

@ -0,0 +1,3 @@
mod ws;
pub use ws::*;

View File

@ -0,0 +1,22 @@
use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
#[derive(Debug, Clone, ProtoBuf_Enum, Eq, PartialEq, Hash)]
pub enum WsSource {
Delta = 0,
}
impl std::default::Default for WsSource {
fn default() -> Self { WsSource::Delta }
}
#[derive(ProtoBuf, Default, Debug, Clone)]
pub struct WsDocumentData {
#[pb(index = 1)]
pub id: String,
#[pb(index = 2)]
pub source: WsSource,
#[pb(index = 3)]
pub data: Vec<u8>, // Delta
}

View File

@ -12,6 +12,6 @@ extern crate flowy_database;
pub mod prelude {
pub use crate::{
module::*,
services::{server::*, ws_document::*},
services::{server::*, ws::*},
};
}

View File

@ -1,19 +1,12 @@
use crate::{
errors::DocError,
services::{doc_cache::OpenedDocumentCache, server::construct_doc_server},
};
use crate::{
entities::doc::{ApplyChangesetParams, CreateDocParams, Doc, QueryDocParams, SaveDocParams},
errors::internal_error,
services::{doc_controller::DocController, ws_document::WsDocument},
errors::{internal_error, DocError},
services::{doc_controller::DocController, open_doc::OpenedDocManager, server::construct_doc_server, ws::WsManager},
};
use diesel::SqliteConnection;
use flowy_database::ConnectionPool;
use parking_lot::RwLock;
use std::{sync::Arc};
use std::sync::Arc;
pub trait DocumentUser: Send + Sync {
fn user_dir(&self) -> Result<String, DocError>;
@ -23,16 +16,16 @@ pub trait DocumentUser: Send + Sync {
pub struct FlowyDocument {
controller: Arc<DocController>,
ws: Arc<RwLock<WsDocument>>,
cache: Arc<OpenedDocumentCache>,
doc_manager: Arc<OpenedDocManager>,
}
impl FlowyDocument {
pub fn new(user: Arc<dyn DocumentUser>, ws: Arc<RwLock<WsDocument>>) -> FlowyDocument {
pub fn new(user: Arc<dyn DocumentUser>, ws_manager: Arc<RwLock<WsManager>>) -> FlowyDocument {
let server = construct_doc_server();
let cache = Arc::new(OpenedDocumentCache::new());
let controller = Arc::new(DocController::new(server.clone(), user.clone()));
Self { controller, cache, ws }
let doc_manager = Arc::new(OpenedDocManager::new(ws_manager, controller.clone()));
Self { controller, doc_manager }
}
pub fn create(&self, params: CreateDocParams, conn: &SqliteConnection) -> Result<(), DocError> {
@ -41,20 +34,20 @@ impl FlowyDocument {
}
pub fn delete(&self, params: QueryDocParams, conn: &SqliteConnection) -> Result<(), DocError> {
let _ = self.cache.close(&params.doc_id)?;
let _ = self.doc_manager.close(&params.doc_id)?;
let _ = self.controller.delete(params.into(), conn)?;
Ok(())
}
pub async fn open(&self, params: QueryDocParams, pool: Arc<ConnectionPool>) -> Result<Doc, DocError> {
let doc = match self.cache.is_opened(&params.doc_id) {
let doc = match self.doc_manager.is_opened(&params.doc_id) {
true => {
let data = self.cache.read_doc(&params.doc_id).await?;
let data = self.doc_manager.read_doc(&params.doc_id).await?;
Doc { id: params.doc_id, data }
},
false => {
let doc = self.controller.open(params, pool).await?;
let _ = self.cache.open(&doc.id, doc.data.clone())?;
let _ = self.doc_manager.open(&doc.id, doc.data.clone())?;
doc
},
};
@ -62,21 +55,9 @@ impl FlowyDocument {
Ok(doc)
}
pub async fn update(&self, params: SaveDocParams, pool: Arc<ConnectionPool>) -> Result<(), DocError> {
let _ = self.controller.update(params, &*pool.get().map_err(internal_error)?)?;
Ok(())
}
pub async fn apply_changeset(&self, params: ApplyChangesetParams) -> Result<Doc, DocError> {
let _ = self
.cache
.mut_doc(&params.id, |doc| {
let _ = doc.apply_changeset(params.data.clone())?;
Ok(())
})
.await?;
let data = self.cache.read_doc(&params.id).await?;
pub async fn apply_changeset(&self, params: ApplyChangesetParams, pool: Arc<ConnectionPool>) -> Result<Doc, DocError> {
let _ = self.doc_manager.apply_changeset(&params.id, params.data, pool).await?;
let data = self.doc_manager.read_doc(&params.id).await?;
let doc = Doc { id: params.id, data };
Ok(doc)
}

View File

@ -1,5 +1,8 @@
// Auto-generated, do not edit
mod ws;
pub use ws::*;
mod observable;
pub use observable::*;

View File

@ -0,0 +1,335 @@
// This file is generated by rust-protobuf 2.22.1. Do not edit
// @generated
// https://github.com/rust-lang/rust-clippy/issues/702
#![allow(unknown_lints)]
#![allow(clippy::all)]
#![allow(unused_attributes)]
#![cfg_attr(rustfmt, rustfmt::skip)]
#![allow(box_pointers)]
#![allow(dead_code)]
#![allow(missing_docs)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#![allow(non_upper_case_globals)]
#![allow(trivial_casts)]
#![allow(unused_imports)]
#![allow(unused_results)]
//! Generated file from `ws.proto`
/// Generated files are compatible only with the same version
/// of protobuf runtime.
// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_22_1;
#[derive(PartialEq,Clone,Default)]
pub struct WsDocumentData {
// message fields
pub id: ::std::string::String,
pub source: WsSource,
pub data: ::std::vec::Vec<u8>,
// special fields
pub unknown_fields: ::protobuf::UnknownFields,
pub cached_size: ::protobuf::CachedSize,
}
impl<'a> ::std::default::Default for &'a WsDocumentData {
fn default() -> &'a WsDocumentData {
<WsDocumentData as ::protobuf::Message>::default_instance()
}
}
impl WsDocumentData {
pub fn new() -> WsDocumentData {
::std::default::Default::default()
}
// string id = 1;
pub fn get_id(&self) -> &str {
&self.id
}
pub fn clear_id(&mut self) {
self.id.clear();
}
// Param is passed by value, moved
pub fn set_id(&mut self, v: ::std::string::String) {
self.id = v;
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_id(&mut self) -> &mut ::std::string::String {
&mut self.id
}
// Take field
pub fn take_id(&mut self) -> ::std::string::String {
::std::mem::replace(&mut self.id, ::std::string::String::new())
}
// .WsSource source = 2;
pub fn get_source(&self) -> WsSource {
self.source
}
pub fn clear_source(&mut self) {
self.source = WsSource::Delta;
}
// Param is passed by value, moved
pub fn set_source(&mut self, v: WsSource) {
self.source = v;
}
// bytes data = 3;
pub fn get_data(&self) -> &[u8] {
&self.data
}
pub fn clear_data(&mut self) {
self.data.clear();
}
// Param is passed by value, moved
pub fn set_data(&mut self, v: ::std::vec::Vec<u8>) {
self.data = v;
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_data(&mut self) -> &mut ::std::vec::Vec<u8> {
&mut self.data
}
// Take field
pub fn take_data(&mut self) -> ::std::vec::Vec<u8> {
::std::mem::replace(&mut self.data, ::std::vec::Vec::new())
}
}
impl ::protobuf::Message for WsDocumentData {
fn is_initialized(&self) -> bool {
true
}
fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
while !is.eof()? {
let (field_number, wire_type) = is.read_tag_unpack()?;
match field_number {
1 => {
::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.id)?;
},
2 => {
::protobuf::rt::read_proto3_enum_with_unknown_fields_into(wire_type, is, &mut self.source, 2, &mut self.unknown_fields)?
},
3 => {
::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.data)?;
},
_ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
},
};
}
::std::result::Result::Ok(())
}
// Compute sizes of nested messages
#[allow(unused_variables)]
fn compute_size(&self) -> u32 {
let mut my_size = 0;
if !self.id.is_empty() {
my_size += ::protobuf::rt::string_size(1, &self.id);
}
if self.source != WsSource::Delta {
my_size += ::protobuf::rt::enum_size(2, self.source);
}
if !self.data.is_empty() {
my_size += ::protobuf::rt::bytes_size(3, &self.data);
}
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size);
my_size
}
fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
if !self.id.is_empty() {
os.write_string(1, &self.id)?;
}
if self.source != WsSource::Delta {
os.write_enum(2, ::protobuf::ProtobufEnum::value(&self.source))?;
}
if !self.data.is_empty() {
os.write_bytes(3, &self.data)?;
}
os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(())
}
fn get_cached_size(&self) -> u32 {
self.cached_size.get()
}
fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
&self.unknown_fields
}
fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
&mut self.unknown_fields
}
fn as_any(&self) -> &dyn (::std::any::Any) {
self as &dyn (::std::any::Any)
}
fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
self as &mut dyn (::std::any::Any)
}
fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
self
}
fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
Self::descriptor_static()
}
fn new() -> WsDocumentData {
WsDocumentData::new()
}
fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
descriptor.get(|| {
let mut fields = ::std::vec::Vec::new();
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
"id",
|m: &WsDocumentData| { &m.id },
|m: &mut WsDocumentData| { &mut m.id },
));
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeEnum<WsSource>>(
"source",
|m: &WsDocumentData| { &m.source },
|m: &mut WsDocumentData| { &mut m.source },
));
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>(
"data",
|m: &WsDocumentData| { &m.data },
|m: &mut WsDocumentData| { &mut m.data },
));
::protobuf::reflect::MessageDescriptor::new_pb_name::<WsDocumentData>(
"WsDocumentData",
fields,
file_descriptor_proto()
)
})
}
fn default_instance() -> &'static WsDocumentData {
static instance: ::protobuf::rt::LazyV2<WsDocumentData> = ::protobuf::rt::LazyV2::INIT;
instance.get(WsDocumentData::new)
}
}
impl ::protobuf::Clear for WsDocumentData {
fn clear(&mut self) {
self.id.clear();
self.source = WsSource::Delta;
self.data.clear();
self.unknown_fields.clear();
}
}
impl ::std::fmt::Debug for WsDocumentData {
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
::protobuf::text_format::fmt(self, f)
}
}
impl ::protobuf::reflect::ProtobufValue for WsDocumentData {
fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
::protobuf::reflect::ReflectValueRef::Message(self)
}
}
#[derive(Clone,PartialEq,Eq,Debug,Hash)]
pub enum WsSource {
Delta = 0,
}
impl ::protobuf::ProtobufEnum for WsSource {
fn value(&self) -> i32 {
*self as i32
}
fn from_i32(value: i32) -> ::std::option::Option<WsSource> {
match value {
0 => ::std::option::Option::Some(WsSource::Delta),
_ => ::std::option::Option::None
}
}
fn values() -> &'static [Self] {
static values: &'static [WsSource] = &[
WsSource::Delta,
];
values
}
fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor {
static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::EnumDescriptor> = ::protobuf::rt::LazyV2::INIT;
descriptor.get(|| {
::protobuf::reflect::EnumDescriptor::new_pb_name::<WsSource>("WsSource", file_descriptor_proto())
})
}
}
impl ::std::marker::Copy for WsSource {
}
impl ::std::default::Default for WsSource {
fn default() -> Self {
WsSource::Delta
}
}
impl ::protobuf::reflect::ProtobufValue for WsSource {
fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
::protobuf::reflect::ReflectValueRef::Enum(::protobuf::ProtobufEnum::descriptor(self))
}
}
static file_descriptor_proto_data: &'static [u8] = b"\
\n\x08ws.proto\"W\n\x0eWsDocumentData\x12\x0e\n\x02id\x18\x01\x20\x01(\t\
R\x02id\x12!\n\x06source\x18\x02\x20\x01(\x0e2\t.WsSourceR\x06source\x12\
\x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data*\x15\n\x08WsSource\x12\t\n\
\x05Delta\x10\0J\x90\x02\n\x06\x12\x04\0\0\t\x01\n\x08\n\x01\x0c\x12\x03\
\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x06\x01\n\n\n\x03\x04\0\x01\x12\
\x03\x02\x08\x16\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x12\n\x0c\n\x05\
\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\
\x03\x0b\r\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x10\x11\n\x0b\n\x04\
\x04\0\x02\x01\x12\x03\x04\x04\x18\n\x0c\n\x05\x04\0\x02\x01\x06\x12\x03\
\x04\x04\x0c\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\r\x13\n\x0c\n\x05\
\x04\0\x02\x01\x03\x12\x03\x04\x16\x17\n\x0b\n\x04\x04\0\x02\x02\x12\x03\
\x05\x04\x13\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\n\x0c\n\x05\
\x04\0\x02\x02\x01\x12\x03\x05\n\x0e\n\x0c\n\x05\x04\0\x02\x02\x03\x12\
\x03\x05\x11\x12\n\n\n\x02\x05\0\x12\x04\x07\0\t\x01\n\n\n\x03\x05\0\x01\
\x12\x03\x07\x05\r\n\x0b\n\x04\x05\0\x02\0\x12\x03\x08\x04\x0e\n\x0c\n\
\x05\x05\0\x02\0\x01\x12\x03\x08\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\
\x03\x08\x0c\rb\x06proto3\
";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap()
}
pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
file_descriptor_proto_lazy.get(|| {
parse_descriptor_proto()
})
}

View File

@ -0,0 +1,10 @@
syntax = "proto3";
message WsDocumentData {
string id = 1;
WsSource source = 2;
bytes data = 3;
}
enum WsSource {
Delta = 0;
}

View File

@ -1,89 +0,0 @@
use crate::errors::DocError;
use dashmap::DashMap;
use flowy_ot::{client::Document, core::Delta, errors::OTError};
use std::convert::TryInto;
use tokio::sync::RwLock;
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct DocId(pub(crate) String);
pub struct OpenDocument {
document: Document,
}
impl<T> std::convert::From<T> for DocId
where
T: ToString,
{
fn from(s: T) -> Self { DocId(s.to_string()) }
}
pub(crate) struct OpenedDocumentCache {
inner: DashMap<DocId, RwLock<OpenDocument>>,
}
impl OpenedDocumentCache {
pub(crate) fn new() -> Self { Self { inner: DashMap::new() } }
pub(crate) fn open<T, D>(&self, id: T, data: D) -> Result<(), DocError>
where
T: Into<DocId>,
D: TryInto<Delta, Error = OTError>,
{
let doc_id = id.into();
let delta = data.try_into()?;
let document = Document::from_delta(delta);
let doc_info = OpenDocument { document };
self.inner.insert(doc_id, RwLock::new(doc_info));
Ok(())
}
pub(crate) fn is_opened<T>(&self, id: T) -> bool
where
T: Into<DocId>,
{
let doc_id = id.into();
self.inner.get(&doc_id).is_some()
}
pub(crate) async fn mut_doc<T, F>(&self, id: T, f: F) -> Result<(), DocError>
where
T: Into<DocId>,
F: FnOnce(&mut Document) -> Result<(), DocError>,
{
let doc_id = id.into();
match self.inner.get(&doc_id) {
None => Err(doc_not_found()),
Some(doc_info) => {
let mut write_guard = doc_info.write().await;
f(&mut write_guard.document)
},
}
}
pub(crate) async fn read_doc<T>(&self, id: T) -> Result<Vec<u8>, DocError>
where
T: Into<DocId> + Clone,
{
if self.is_opened(id.clone()) {
return Err(doc_not_found());
}
let doc_id = id.into();
let doc_info = self.inner.get(&doc_id).unwrap();
let write_guard = doc_info.read().await;
let doc = &(*write_guard).document;
Ok(doc.to_bytes())
}
pub(crate) fn close<T>(&self, id: T) -> Result<(), DocError>
where
T: Into<DocId>,
{
let doc_id = id.into();
self.inner.remove(&doc_id);
Ok(())
}
}
fn doc_not_found() -> DocError { DocError::not_found().context("Doc is close or you should call open first") }

View File

@ -7,7 +7,7 @@ use crate::{
};
use flowy_database::{ConnectionPool, SqliteConnection};
use crate::errors::internal_error;
use crate::{errors::internal_error, services::open_doc::OpenedDocPersistence};
use std::sync::Arc;
use tokio::task::JoinHandle;
@ -33,15 +33,6 @@ impl DocController {
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, conn, params), err)]
pub(crate) fn update(&self, params: SaveDocParams, conn: &SqliteConnection) -> Result<(), DocError> {
let changeset = DocTableChangeset::new(params.clone());
let _ = self.sql.update_doc_table(changeset, &*conn)?;
let _ = self.update_doc_on_server(params)?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, pool), err)]
pub(crate) async fn open(&self, params: QueryDocParams, pool: Arc<ConnectionPool>) -> Result<Doc, DocError> {
match self._open(params.clone(), pool.clone()) {
@ -139,3 +130,13 @@ impl DocController {
}
}
}
impl OpenedDocPersistence for DocController {
fn save(&self, params: SaveDocParams, pool: Arc<ConnectionPool>) -> Result<(), DocError> {
let changeset = DocTableChangeset::new(params.clone());
let _ = self.sql.update_doc_table(changeset, &*(pool.get().map_err(internal_error)?))?;
let _ = self.update_doc_on_server(params)?;
Ok(())
}
}

View File

@ -1,4 +1,4 @@
pub(crate) mod doc_cache;
pub(crate) mod doc_controller;
pub(crate) mod open_doc;
pub mod server;
pub mod ws_document;
pub mod ws;

View File

@ -0,0 +1,88 @@
use crate::{
errors::DocError,
services::{
open_doc::{DocId, OpenedDoc, OpenedDocPersistence},
ws::WsManager,
},
};
use dashmap::DashMap;
use flowy_database::ConnectionPool;
use flowy_ot::{core::Delta, errors::OTError};
use parking_lot::RwLock;
use std::{convert::TryInto, fmt::Debug, sync::Arc};
pub(crate) struct OpenedDocManager {
doc_map: DashMap<DocId, Arc<OpenedDoc>>,
ws_manager: Arc<RwLock<WsManager>>,
persistence: Arc<dyn OpenedDocPersistence>,
}
impl OpenedDocManager {
pub(crate) fn new(ws_manager: Arc<RwLock<WsManager>>, persistence: Arc<dyn OpenedDocPersistence>) -> Self {
Self {
doc_map: DashMap::new(),
ws_manager,
persistence,
}
}
#[tracing::instrument(level = "debug", skip(self, data), err)]
pub(crate) fn open<T, D>(&self, id: T, data: D) -> Result<(), DocError>
where
T: Into<DocId> + Debug,
D: TryInto<Delta, Error = OTError>,
{
let doc = Arc::new(OpenedDoc::new(id.into(), data.try_into()?, self.persistence.clone()));
self.ws_manager.write().register_handler(doc.id.as_ref(), doc.clone());
self.doc_map.insert(doc.id.clone(), doc.clone());
Ok(())
}
pub(crate) fn is_opened<T>(&self, id: T) -> bool
where
T: Into<DocId>,
{
let doc_id = id.into();
self.doc_map.get(&doc_id).is_some()
}
#[tracing::instrument(level = "debug", skip(self, changeset, pool), err)]
pub(crate) async fn apply_changeset<T>(&self, id: T, changeset: Vec<u8>, pool: Arc<ConnectionPool>) -> Result<(), DocError>
where
T: Into<DocId> + Debug,
{
let id = id.into();
match self.doc_map.get(&id) {
None => Err(doc_not_found()),
Some(doc) => {
let _ = doc.apply_delta(changeset, pool)?;
Ok(())
},
}
}
pub(crate) async fn read_doc<T>(&self, id: T) -> Result<Vec<u8>, DocError>
where
T: Into<DocId> + Clone,
{
if !self.is_opened(id.clone()) {
return Err(doc_not_found());
}
let doc_id = id.into();
let doc = self.doc_map.get(&doc_id).unwrap();
Ok(doc.data())
}
pub(crate) fn close<T>(&self, id: T) -> Result<(), DocError>
where
T: Into<DocId>,
{
let doc_id = id.into();
self.doc_map.remove(&doc_id);
self.ws_manager.write().remove_handler(doc_id.as_ref());
Ok(())
}
}
fn doc_not_found() -> DocError { DocError::not_found().context("Doc is close or you should call open first") }

View File

@ -0,0 +1,5 @@
mod manager;
mod open_doc;
pub use manager::*;
pub use open_doc::*;

View File

@ -0,0 +1,68 @@
use crate::{
entities::{
doc::SaveDocParams,
ws::{WsDocumentData, WsSource},
},
errors::DocError,
services::ws::WsHandler,
};
use flowy_database::ConnectionPool;
use flowy_ot::{client::Document, core::Delta};
use parking_lot::RwLock;
use std::sync::Arc;
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct DocId(pub(crate) String);
impl AsRef<str> for DocId {
fn as_ref(&self) -> &str { &self.0 }
}
impl<T> std::convert::From<T> for DocId
where
T: ToString,
{
fn from(s: T) -> Self { DocId(s.to_string()) }
}
pub(crate) trait OpenedDocPersistence: Send + Sync {
fn save(&self, params: SaveDocParams, pool: Arc<ConnectionPool>) -> Result<(), DocError>;
}
pub(crate) struct OpenedDoc {
pub(crate) id: DocId,
document: RwLock<Document>,
persistence: Arc<dyn OpenedDocPersistence>,
}
impl OpenedDoc {
pub(crate) fn new(id: DocId, delta: Delta, persistence: Arc<dyn OpenedDocPersistence>) -> Self {
Self {
id,
document: RwLock::new(Document::from_delta(delta)),
persistence,
}
}
pub(crate) fn data(&self) -> Vec<u8> { self.document.read().to_bytes() }
pub(crate) fn apply_delta(&self, data: Vec<u8>, pool: Arc<ConnectionPool>) -> Result<(), DocError> {
let mut write_guard = self.document.write();
let _ = write_guard.apply_changeset(data)?;
// Opti: strategy to save the document
let mut save = SaveDocParams {
id: self.id.0.clone(),
data: write_guard.to_bytes(),
};
let _ = self.persistence.save(save, pool)?;
Ok(())
}
}
impl WsHandler for OpenedDoc {
fn receive(&self, data: WsDocumentData) {
match data.source {
WsSource::Delta => {},
}
}
}

View File

@ -0,0 +1,2 @@
mod ws_manager;
pub use ws_manager::*;

View File

@ -0,0 +1,62 @@
use crate::{entities::ws::WsDocumentData, errors::DocError};
use bytes::Bytes;
use lazy_static::lazy_static;
use std::{collections::HashMap, convert::TryInto, sync::Arc};
pub trait WsSender: Send + Sync {
fn send_data(&self, data: Bytes) -> Result<(), DocError>;
}
lazy_static! {
pub static ref WS_ID: String = "Document".to_string();
}
pub struct WsManager {
sender: Box<dyn WsSender>,
doc_handlers: HashMap<String, Arc<dyn WsHandler>>,
}
impl WsManager {
pub fn new(sender: Box<dyn WsSender>) -> Self {
Self {
sender,
doc_handlers: HashMap::new(),
}
}
pub(crate) fn register_handler(&mut self, id: &str, handler: Arc<dyn WsHandler>) {
if self.doc_handlers.contains_key(id) {
log::error!("Duplicate handler registered for {:?}", id);
}
self.doc_handlers.insert(id.to_string(), handler);
}
pub(crate) fn remove_handler(&mut self, id: &str) { self.doc_handlers.remove(id); }
pub fn receive_data(&self, data: Bytes) {
let data: WsDocumentData = data.try_into().unwrap();
match self.doc_handlers.get(&data.id) {
None => {
log::error!("Can't find any source handler for {:?}", data.id);
},
Some(handler) => {
handler.receive(data);
},
}
}
pub fn send_data(&self, data: WsDocumentData) {
let bytes: Bytes = data.try_into().unwrap();
match self.sender.send_data(bytes) {
Ok(_) => {},
Err(e) => {
log::error!("WsDocument send message failed: {:?}", e);
},
}
}
}
pub(crate) trait WsHandler: Send + Sync {
fn receive(&self, data: WsDocumentData);
}

View File

@ -1,53 +0,0 @@
use crate::errors::DocError;
use bytes::Bytes;
use lazy_static::lazy_static;
use std::{convert::TryFrom, sync::Arc};
pub struct WsDocumentMessage(pub Bytes);
pub trait WsSender: Send + Sync {
fn send_msg(&self, msg: WsDocumentMessage) -> Result<(), DocError>;
}
lazy_static! {
pub static ref WS_ID: String = "Document".to_string();
}
pub struct WsDocument {
sender: Arc<dyn WsSender>,
}
impl WsDocument {
pub fn new(sender: Arc<dyn WsSender>) -> Self { Self { sender } }
pub fn receive_msg(&self, _msg: WsDocumentMessage) { unimplemented!() }
pub fn send_msg(&self, _msg: WsDocumentMessage) { unimplemented!() }
}
pub enum WsSource {
Delta,
}
impl AsRef<str> for WsSource {
fn as_ref(&self) -> &str {
match self {
WsSource::Delta => "delta",
}
}
}
impl ToString for WsSource {
fn to_string(&self) -> String {
match self {
WsSource::Delta => self.as_ref().to_string(),
}
}
}
impl TryFrom<String> for WsSource {
type Error = DocError;
fn try_from(value: String) -> Result<Self, Self::Error> {
match value.as_str() {
"delta" => Ok(WsSource::Delta),
_ => Err(DocError::internal().context(format!("Deserialize WsSource failed. Unknown type: {}", &value))),
}
}
}

View File

@ -10,7 +10,11 @@ const LEVEL: &str = "level";
const TIME: &str = "time";
const MESSAGE: &str = "msg";
const LOG_MODULE_PATH: &str = "log.module_path";
const LOG_TARGET_PATH: &str = "log.target";
const FLOWY_RESERVED_FIELDS: [&str; 3] = [LEVEL, TIME, MESSAGE];
const IGNORE_FIELDS: [&str; 2] = [LOG_MODULE_PATH, LOG_TARGET_PATH];
pub struct FlowyFormattingLayer<W: MakeWriter + 'static> {
make_writer: W,
@ -29,11 +33,11 @@ impl<W: MakeWriter + 'static> FlowyFormattingLayer<W> {
&self,
map_serializer: &mut impl SerializeMap<Error = serde_json::Error>,
message: &str,
level: &Level,
_level: &Level,
) -> Result<(), std::io::Error> {
map_serializer.serialize_entry(MESSAGE, &message)?;
map_serializer.serialize_entry(LEVEL, &format!("{}", level))?;
map_serializer.serialize_entry(TIME, &chrono::Utc::now().timestamp())?;
// map_serializer.serialize_entry(LEVEL, &format!("{}", level))?;
// map_serializer.serialize_entry(TIME, &chrono::Utc::now().timestamp())?;
Ok(())
}
@ -57,7 +61,7 @@ impl<W: MakeWriter + 'static> FlowyFormattingLayer<W> {
let extensions = span.extensions();
if let Some(visitor) = extensions.get::<JsonStorage>() {
for (key, value) in visitor.values() {
if !FLOWY_RESERVED_FIELDS.contains(key) {
if !FLOWY_RESERVED_FIELDS.contains(key) && !IGNORE_FIELDS.contains(key) {
map_serializer.serialize_entry(key, value)?;
} else {
tracing::debug!("{} is a reserved field in the bunyan log format. Skipping it.", key);
@ -155,15 +159,15 @@ where
map_serializer.serialize_entry("target", event.metadata().target())?;
}
map_serializer.serialize_entry("line", &event.metadata().line())?;
map_serializer.serialize_entry("file", &event.metadata().file())?;
// map_serializer.serialize_entry("line", &event.metadata().line())?;
// map_serializer.serialize_entry("file", &event.metadata().file())?;
// Add all the other fields associated with the event, expect the message we
// already used.
for (key, value) in event_visitor
.values()
.iter()
.filter(|(&key, _)| key != "message" && !FLOWY_RESERVED_FIELDS.contains(&key))
.filter(|(&key, _)| key != "message" && !FLOWY_RESERVED_FIELDS.contains(&key) && !IGNORE_FIELDS.contains(&key))
{
map_serializer.serialize_entry(key, value)?;
}
@ -173,7 +177,7 @@ where
let extensions = span.extensions();
if let Some(visitor) = extensions.get::<JsonStorage>() {
for (key, value) in visitor.values() {
if !FLOWY_RESERVED_FIELDS.contains(key) {
if !FLOWY_RESERVED_FIELDS.contains(key) && !IGNORE_FIELDS.contains(key) {
map_serializer.serialize_entry(key, value)?;
} else {
tracing::debug!("{} is a reserved field in the flowy log format. Skipping it.", key);

View File

@ -60,9 +60,9 @@ impl Document {
T: TryInto<Delta, Error = OTError>,
{
let new_delta: Delta = changeset.try_into()?;
log::debug!("Apply delta: {}", new_delta);
log::debug!("Delta changeset: {}", new_delta);
self.add_delta(&new_delta);
log::debug!("Current delta: {}", self.to_json());
log::debug!("Document: {}", self.to_json());
Ok(())
}

View File

@ -156,7 +156,13 @@ pub struct Retain {
}
impl fmt::Display for Retain {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.write_fmt(format_args!("retain: {}, attributes: {}", self.n, self.attributes)) }
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
if self.attributes.is_empty() {
f.write_fmt(format_args!("retain: {}", self.n))
} else {
f.write_fmt(format_args!("retain: {}, attributes: {}", self.n, self.attributes))
}
}
}
impl Retain {
@ -212,7 +218,11 @@ impl fmt::Display for Insert {
}
}
f.write_fmt(format_args!("insert: {}, attributes: {}", s, self.attributes))
if self.attributes.is_empty() {
f.write_fmt(format_args!("insert: {}", s))
} else {
f.write_fmt(format_args!("insert: {}, attributes: {}", s, self.attributes))
}
}
}

View File

@ -2,8 +2,9 @@ use bytes::Bytes;
use flowy_document::{
errors::DocError,
module::DocumentUser,
prelude::{WsDocument, WsDocumentMessage, WsSender, WS_ID},
prelude::{WsManager, WsSender, WS_ID},
};
use flowy_user::services::user::UserSession;
use flowy_ws::{WsMessage, WsMessageHandler};
use parking_lot::RwLock;
@ -16,25 +17,25 @@ pub struct DocumentDepsResolver {
impl DocumentDepsResolver {
pub fn new(user_session: Arc<UserSession>) -> Self { Self { user_session } }
pub fn split_into(self) -> (Arc<dyn DocumentUser>, Arc<RwLock<WsDocument>>) {
pub fn split_into(self) -> (Arc<dyn DocumentUser>, Arc<RwLock<WsManager>>) {
let user = Arc::new(DocumentUserImpl {
user: self.user_session.clone(),
});
let sender = Arc::new(WsSenderImpl {
let sender = Box::new(WsSenderImpl {
user: self.user_session.clone(),
});
let ws = Arc::new(RwLock::new(WsDocument::new(sender.clone())));
let ws_manager = Arc::new(RwLock::new(WsManager::new(sender)));
let ws_handler = Arc::new(WsDocumentResolver {
user: self.user_session.clone(),
inner: ws.clone(),
inner: ws_manager.clone(),
});
self.user_session.add_ws_handler(ws_handler);
(user, ws)
(user, ws_manager)
}
}
@ -63,10 +64,10 @@ struct WsSenderImpl {
}
impl WsSender for WsSenderImpl {
fn send_msg(&self, msg: WsDocumentMessage) -> Result<(), DocError> {
fn send_data(&self, data: Bytes) -> Result<(), DocError> {
let msg = WsMessage {
source: WS_ID.clone(),
data: msg.0.to_vec(),
data: data.to_vec(),
};
let _ = self.user.send_ws_msg(msg).map_err(|e| DocError::internal().context(e))?;
Ok(())
@ -75,15 +76,14 @@ impl WsSender for WsSenderImpl {
struct WsDocumentResolver {
user: Arc<UserSession>,
inner: Arc<RwLock<WsDocument>>,
inner: Arc<RwLock<WsManager>>,
}
impl WsMessageHandler for WsDocumentResolver {
fn source(&self) -> String { WS_ID.clone() }
fn receive_message(&self, msg: WsMessage) {
let msg = WsDocumentMessage(Bytes::from(msg.data));
self.inner.read().receive_msg(msg);
let data = Bytes::from(msg.data);
self.inner.read().receive_data(data);
}
}

View File

@ -27,7 +27,7 @@ fn build_workspace_module(user_session: Arc<UserSession>) -> Module {
fn build_document_module(user_session: Arc<UserSession>) -> Arc<FlowyDocument> {
let document_deps = DocumentDepsResolver::new(user_session.clone());
let (user, ws) = document_deps.split_into();
let document = Arc::new(FlowyDocument::new(user, ws));
let (user, ws_manager) = document_deps.split_into();
let document = Arc::new(FlowyDocument::new(user, ws_manager));
document
}

View File

@ -49,9 +49,6 @@ pub enum WorkspaceEvent {
#[event(input = "OpenViewRequest", output = "Doc")]
OpenView = 205,
#[event(input = "SaveViewDataRequest")]
SaveViewData = 206,
#[event(input = "ApplyChangesetRequest", output = "Doc")]
ApplyChangeset = 207,
ApplyChangeset = 206,
}

View File

@ -55,15 +55,6 @@ pub(crate) async fn update_view_handler(
Ok(())
}
#[tracing::instrument(skip(data, controller), err)]
pub(crate) async fn update_view_data_handler(
data: Data<SaveViewDataRequest>,
controller: Unit<Arc<ViewController>>,
) -> Result<(), WorkspaceError> {
let params: SaveDocParams = data.into_inner().try_into()?;
let _ = controller.update_view_data(params).await?;
Ok(())
}
#[tracing::instrument(skip(data, controller), err)]
pub(crate) async fn apply_changeset_handler(

View File

@ -66,7 +66,6 @@ pub fn create(user: Arc<dyn WorkspaceUser>, database: Arc<dyn WorkspaceDatabase>
.event(WorkspaceEvent::UpdateView, update_view_handler)
.event(WorkspaceEvent::DeleteView, delete_view_handler)
.event(WorkspaceEvent::OpenView, open_view_handler)
.event(WorkspaceEvent::SaveViewData, update_view_data_handler)
.event(WorkspaceEvent::ApplyChangeset, apply_changeset_handler);
module

View File

@ -40,8 +40,7 @@ pub enum WorkspaceEvent {
UpdateView = 203,
DeleteView = 204,
OpenView = 205,
SaveViewData = 206,
ApplyChangeset = 207,
ApplyChangeset = 206,
}
impl ::protobuf::ProtobufEnum for WorkspaceEvent {
@ -66,8 +65,7 @@ impl ::protobuf::ProtobufEnum for WorkspaceEvent {
203 => ::std::option::Option::Some(WorkspaceEvent::UpdateView),
204 => ::std::option::Option::Some(WorkspaceEvent::DeleteView),
205 => ::std::option::Option::Some(WorkspaceEvent::OpenView),
206 => ::std::option::Option::Some(WorkspaceEvent::SaveViewData),
207 => ::std::option::Option::Some(WorkspaceEvent::ApplyChangeset),
206 => ::std::option::Option::Some(WorkspaceEvent::ApplyChangeset),
_ => ::std::option::Option::None
}
}
@ -89,7 +87,6 @@ impl ::protobuf::ProtobufEnum for WorkspaceEvent {
WorkspaceEvent::UpdateView,
WorkspaceEvent::DeleteView,
WorkspaceEvent::OpenView,
WorkspaceEvent::SaveViewData,
WorkspaceEvent::ApplyChangeset,
];
values
@ -119,51 +116,49 @@ impl ::protobuf::reflect::ProtobufValue for WorkspaceEvent {
}
static file_descriptor_proto_data: &'static [u8] = b"\
\n\x0bevent.proto*\xc1\x02\n\x0eWorkspaceEvent\x12\x13\n\x0fCreateWorksp\
\n\x0bevent.proto*\xae\x02\n\x0eWorkspaceEvent\x12\x13\n\x0fCreateWorksp\
ace\x10\0\x12\x14\n\x10ReadCurWorkspace\x10\x01\x12\x12\n\x0eReadWorkspa\
ces\x10\x02\x12\x13\n\x0fDeleteWorkspace\x10\x03\x12\x11\n\rOpenWorkspac\
e\x10\x04\x12\x15\n\x11ReadWorkspaceApps\x10\x05\x12\r\n\tCreateApp\x10e\
\x12\r\n\tDeleteApp\x10f\x12\x0b\n\x07ReadApp\x10g\x12\r\n\tUpdateApp\
\x10h\x12\x0f\n\nCreateView\x10\xc9\x01\x12\r\n\x08ReadView\x10\xca\x01\
\x12\x0f\n\nUpdateView\x10\xcb\x01\x12\x0f\n\nDeleteView\x10\xcc\x01\x12\
\r\n\x08OpenView\x10\xcd\x01\x12\x11\n\x0cSaveViewData\x10\xce\x01\x12\
\x13\n\x0eApplyChangeset\x10\xcf\x01J\xe3\x05\n\x06\x12\x04\0\0\x14\x01\
\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x05\0\x12\x04\x02\0\x14\x01\n\
\n\n\x03\x05\0\x01\x12\x03\x02\x05\x13\n\x0b\n\x04\x05\0\x02\0\x12\x03\
\x03\x04\x18\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x03\x04\x13\n\x0c\n\x05\
\x05\0\x02\0\x02\x12\x03\x03\x16\x17\n\x0b\n\x04\x05\0\x02\x01\x12\x03\
\x04\x04\x19\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x04\x04\x14\n\x0c\n\
\x05\x05\0\x02\x01\x02\x12\x03\x04\x17\x18\n\x0b\n\x04\x05\0\x02\x02\x12\
\x03\x05\x04\x17\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\x05\x04\x12\n\x0c\
\n\x05\x05\0\x02\x02\x02\x12\x03\x05\x15\x16\n\x0b\n\x04\x05\0\x02\x03\
\x12\x03\x06\x04\x18\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\x06\x04\x13\n\
\x0c\n\x05\x05\0\x02\x03\x02\x12\x03\x06\x16\x17\n\x0b\n\x04\x05\0\x02\
\x04\x12\x03\x07\x04\x16\n\x0c\n\x05\x05\0\x02\x04\x01\x12\x03\x07\x04\
\x11\n\x0c\n\x05\x05\0\x02\x04\x02\x12\x03\x07\x14\x15\n\x0b\n\x04\x05\0\
\x02\x05\x12\x03\x08\x04\x1a\n\x0c\n\x05\x05\0\x02\x05\x01\x12\x03\x08\
\x04\x15\n\x0c\n\x05\x05\0\x02\x05\x02\x12\x03\x08\x18\x19\n\x0b\n\x04\
\x05\0\x02\x06\x12\x03\t\x04\x14\n\x0c\n\x05\x05\0\x02\x06\x01\x12\x03\t\
\x04\r\n\x0c\n\x05\x05\0\x02\x06\x02\x12\x03\t\x10\x13\n\x0b\n\x04\x05\0\
\x02\x07\x12\x03\n\x04\x14\n\x0c\n\x05\x05\0\x02\x07\x01\x12\x03\n\x04\r\
\n\x0c\n\x05\x05\0\x02\x07\x02\x12\x03\n\x10\x13\n\x0b\n\x04\x05\0\x02\
\x08\x12\x03\x0b\x04\x12\n\x0c\n\x05\x05\0\x02\x08\x01\x12\x03\x0b\x04\
\x0b\n\x0c\n\x05\x05\0\x02\x08\x02\x12\x03\x0b\x0e\x11\n\x0b\n\x04\x05\0\
\x02\t\x12\x03\x0c\x04\x14\n\x0c\n\x05\x05\0\x02\t\x01\x12\x03\x0c\x04\r\
\n\x0c\n\x05\x05\0\x02\t\x02\x12\x03\x0c\x10\x13\n\x0b\n\x04\x05\0\x02\n\
\x12\x03\r\x04\x15\n\x0c\n\x05\x05\0\x02\n\x01\x12\x03\r\x04\x0e\n\x0c\n\
\x05\x05\0\x02\n\x02\x12\x03\r\x11\x14\n\x0b\n\x04\x05\0\x02\x0b\x12\x03\
\x0e\x04\x13\n\x0c\n\x05\x05\0\x02\x0b\x01\x12\x03\x0e\x04\x0c\n\x0c\n\
\x05\x05\0\x02\x0b\x02\x12\x03\x0e\x0f\x12\n\x0b\n\x04\x05\0\x02\x0c\x12\
\x03\x0f\x04\x15\n\x0c\n\x05\x05\0\x02\x0c\x01\x12\x03\x0f\x04\x0e\n\x0c\
\n\x05\x05\0\x02\x0c\x02\x12\x03\x0f\x11\x14\n\x0b\n\x04\x05\0\x02\r\x12\
\x03\x10\x04\x15\n\x0c\n\x05\x05\0\x02\r\x01\x12\x03\x10\x04\x0e\n\x0c\n\
\x05\x05\0\x02\r\x02\x12\x03\x10\x11\x14\n\x0b\n\x04\x05\0\x02\x0e\x12\
\x03\x11\x04\x13\n\x0c\n\x05\x05\0\x02\x0e\x01\x12\x03\x11\x04\x0c\n\x0c\
\n\x05\x05\0\x02\x0e\x02\x12\x03\x11\x0f\x12\n\x0b\n\x04\x05\0\x02\x0f\
\x12\x03\x12\x04\x17\n\x0c\n\x05\x05\0\x02\x0f\x01\x12\x03\x12\x04\x10\n\
\x0c\n\x05\x05\0\x02\x0f\x02\x12\x03\x12\x13\x16\n\x0b\n\x04\x05\0\x02\
\x10\x12\x03\x13\x04\x19\n\x0c\n\x05\x05\0\x02\x10\x01\x12\x03\x13\x04\
\x12\n\x0c\n\x05\x05\0\x02\x10\x02\x12\x03\x13\x15\x18b\x06proto3\
\r\n\x08OpenView\x10\xcd\x01\x12\x13\n\x0eApplyChangeset\x10\xce\x01J\
\xba\x05\n\x06\x12\x04\0\0\x13\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\
\x02\x05\0\x12\x04\x02\0\x13\x01\n\n\n\x03\x05\0\x01\x12\x03\x02\x05\x13\
\n\x0b\n\x04\x05\0\x02\0\x12\x03\x03\x04\x18\n\x0c\n\x05\x05\0\x02\0\x01\
\x12\x03\x03\x04\x13\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x03\x16\x17\n\
\x0b\n\x04\x05\0\x02\x01\x12\x03\x04\x04\x19\n\x0c\n\x05\x05\0\x02\x01\
\x01\x12\x03\x04\x04\x14\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x04\x17\
\x18\n\x0b\n\x04\x05\0\x02\x02\x12\x03\x05\x04\x17\n\x0c\n\x05\x05\0\x02\
\x02\x01\x12\x03\x05\x04\x12\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\x05\
\x15\x16\n\x0b\n\x04\x05\0\x02\x03\x12\x03\x06\x04\x18\n\x0c\n\x05\x05\0\
\x02\x03\x01\x12\x03\x06\x04\x13\n\x0c\n\x05\x05\0\x02\x03\x02\x12\x03\
\x06\x16\x17\n\x0b\n\x04\x05\0\x02\x04\x12\x03\x07\x04\x16\n\x0c\n\x05\
\x05\0\x02\x04\x01\x12\x03\x07\x04\x11\n\x0c\n\x05\x05\0\x02\x04\x02\x12\
\x03\x07\x14\x15\n\x0b\n\x04\x05\0\x02\x05\x12\x03\x08\x04\x1a\n\x0c\n\
\x05\x05\0\x02\x05\x01\x12\x03\x08\x04\x15\n\x0c\n\x05\x05\0\x02\x05\x02\
\x12\x03\x08\x18\x19\n\x0b\n\x04\x05\0\x02\x06\x12\x03\t\x04\x14\n\x0c\n\
\x05\x05\0\x02\x06\x01\x12\x03\t\x04\r\n\x0c\n\x05\x05\0\x02\x06\x02\x12\
\x03\t\x10\x13\n\x0b\n\x04\x05\0\x02\x07\x12\x03\n\x04\x14\n\x0c\n\x05\
\x05\0\x02\x07\x01\x12\x03\n\x04\r\n\x0c\n\x05\x05\0\x02\x07\x02\x12\x03\
\n\x10\x13\n\x0b\n\x04\x05\0\x02\x08\x12\x03\x0b\x04\x12\n\x0c\n\x05\x05\
\0\x02\x08\x01\x12\x03\x0b\x04\x0b\n\x0c\n\x05\x05\0\x02\x08\x02\x12\x03\
\x0b\x0e\x11\n\x0b\n\x04\x05\0\x02\t\x12\x03\x0c\x04\x14\n\x0c\n\x05\x05\
\0\x02\t\x01\x12\x03\x0c\x04\r\n\x0c\n\x05\x05\0\x02\t\x02\x12\x03\x0c\
\x10\x13\n\x0b\n\x04\x05\0\x02\n\x12\x03\r\x04\x15\n\x0c\n\x05\x05\0\x02\
\n\x01\x12\x03\r\x04\x0e\n\x0c\n\x05\x05\0\x02\n\x02\x12\x03\r\x11\x14\n\
\x0b\n\x04\x05\0\x02\x0b\x12\x03\x0e\x04\x13\n\x0c\n\x05\x05\0\x02\x0b\
\x01\x12\x03\x0e\x04\x0c\n\x0c\n\x05\x05\0\x02\x0b\x02\x12\x03\x0e\x0f\
\x12\n\x0b\n\x04\x05\0\x02\x0c\x12\x03\x0f\x04\x15\n\x0c\n\x05\x05\0\x02\
\x0c\x01\x12\x03\x0f\x04\x0e\n\x0c\n\x05\x05\0\x02\x0c\x02\x12\x03\x0f\
\x11\x14\n\x0b\n\x04\x05\0\x02\r\x12\x03\x10\x04\x15\n\x0c\n\x05\x05\0\
\x02\r\x01\x12\x03\x10\x04\x0e\n\x0c\n\x05\x05\0\x02\r\x02\x12\x03\x10\
\x11\x14\n\x0b\n\x04\x05\0\x02\x0e\x12\x03\x11\x04\x13\n\x0c\n\x05\x05\0\
\x02\x0e\x01\x12\x03\x11\x04\x0c\n\x0c\n\x05\x05\0\x02\x0e\x02\x12\x03\
\x11\x0f\x12\n\x0b\n\x04\x05\0\x02\x0f\x12\x03\x12\x04\x19\n\x0c\n\x05\
\x05\0\x02\x0f\x01\x12\x03\x12\x04\x12\n\x0c\n\x05\x05\0\x02\x0f\x02\x12\
\x03\x12\x15\x18b\x06proto3\
";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

View File

@ -16,6 +16,5 @@ enum WorkspaceEvent {
UpdateView = 203;
DeleteView = 204;
OpenView = 205;
SaveViewData = 206;
ApplyChangeset = 207;
ApplyChangeset = 206;
}

View File

@ -76,6 +76,7 @@ impl ViewController {
Ok(view)
}
#[tracing::instrument(level = "debug", skip(self), err)]
pub(crate) async fn open_view(&self, params: QueryDocParams) -> Result<Doc, WorkspaceError> {
let doc = self.document.open(params, self.database.db_pool()?).await?;
Ok(doc)
@ -124,13 +125,9 @@ impl ViewController {
Ok(())
}
pub(crate) async fn update_view_data(&self, params: SaveDocParams) -> Result<(), WorkspaceError> {
let _ = self.document.update(params, self.database.db_pool()?).await?;
Ok(())
}
pub(crate) async fn apply_changeset(&self, params: ApplyChangesetParams) -> Result<Doc, WorkspaceError> {
let doc = self.document.apply_changeset(params).await?;
let pool = self.database.db_pool()?;
let doc = self.document.apply_changeset(params, pool).await?;
Ok(doc)
}
}