add document actor && remove rwlock of document

This commit is contained in:
appflowy 2021-10-01 19:39:08 +08:00
parent efb2a607e7
commit 957b10fe5e
85 changed files with 1274 additions and 651 deletions

View File

@ -1,18 +1,18 @@
// use crate::helper::*;
use crate::helper::{spawn_server, TestServer};
use std::sync::Arc;
use actix_web::web::Data;
use futures_util::{stream, stream::StreamExt};
use sqlx::PgPool;
use tokio::time::{sleep, Duration};
use backend::service::doc::doc::DocManager;
use flowy_document::{
entities::doc::QueryDocParams,
services::doc::edit_doc_context::EditDocContext as ClientEditDocContext,
};
use flowy_document::{entities::doc::QueryDocParams, services::doc::edit::EditDocContext as ClientEditDocContext};
use flowy_net::config::ServerConfig;
use flowy_test::{workspace::ViewTest, FlowyTest};
use flowy_user::services::user::UserSession;
use futures_util::{stream, stream::StreamExt};
use sqlx::PgPool;
use std::sync::Arc;
use tokio::time::{sleep, Duration};
// use crate::helper::*;
use crate::helper::{spawn_server, TestServer};
pub struct DocumentTest {
server: TestServer,
@ -69,10 +69,10 @@ async fn run_scripts(context: ScriptContext, scripts: Vec<DocScript>) {
let _ = context.user_session.start_ws_connection(&token).await.unwrap();
},
DocScript::SendText(index, s) => {
context.client_edit_context.insert(index, s).unwrap();
context.client_edit_context.insert(index, s).await.unwrap();
},
DocScript::AssertClient(s) => {
let json = context.client_edit_context.doc_json();
let json = context.client_edit_context.doc_json().await.unwrap();
assert_eq(s, &json);
},
DocScript::AssertServer(s) => {

View File

@ -1,4 +1,2 @@
mod model;
pub use model::*;

View File

@ -1,7 +1,7 @@
// Auto-generated, do not edit
// Auto-generated, do not edit
mod ffi_response;
pub use ffi_response::*;
mod ffi_response;
pub use ffi_response::*;
mod ffi_request;
pub use ffi_request::*;
mod ffi_request;
pub use ffi_request::*;

View File

@ -193,7 +193,9 @@ pub enum ASTStyle {
pub fn struct_from_ast<'a>(cx: &Ctxt, fields: &'a syn::Fields) -> (ASTStyle, Vec<ASTField<'a>>) {
match fields {
syn::Fields::Named(fields) => (ASTStyle::Struct, fields_from_ast(cx, &fields.named)),
syn::Fields::Unnamed(fields) if fields.unnamed.len() == 1 => (ASTStyle::NewType, fields_from_ast(cx, &fields.unnamed)),
syn::Fields::Unnamed(fields) if fields.unnamed.len() == 1 => {
(ASTStyle::NewType, fields_from_ast(cx, &fields.unnamed))
},
syn::Fields::Unnamed(fields) => (ASTStyle::Tuple, fields_from_ast(cx, &fields.unnamed)),
syn::Fields::Unit => (ASTStyle::Unit, Vec::new()),
}

View File

@ -91,7 +91,8 @@ impl<'c, T> ASTAttr<'c, T> {
let tokens = obj.into_token_stream();
if self.value.is_some() {
self.cx.error_spanned_by(tokens, format!("duplicate attribute `{}`", self.name));
self.cx
.error_spanned_by(tokens, format!("duplicate attribute `{}`", self.name));
} else {
self.tokens = tokens;
self.value = Some(value);
@ -281,7 +282,11 @@ impl ASTEnumAttrVariant {
pub fn event_error(&self) -> String { self.event_attrs.error_ty.as_ref().unwrap().clone() }
}
fn get_event_attrs_from(ctxt: &Ctxt, variant_attrs: &Vec<syn::Attribute>, enum_attrs: &Vec<syn::Attribute>) -> EventAttrs {
fn get_event_attrs_from(
ctxt: &Ctxt,
variant_attrs: &Vec<syn::Attribute>,
enum_attrs: &Vec<syn::Attribute>,
) -> EventAttrs {
let mut event_attrs = EventAttrs {
input: None,
output: None,
@ -309,7 +314,9 @@ fn get_event_attrs_from(ctxt: &Ctxt, variant_attrs: &Vec<syn::Attribute>, enum_a
if name_value.path == EVENT_INPUT {
if let syn::Lit::Str(s) = &name_value.lit {
let input_type = parse_lit_str(s)
.map_err(|_| ctxt.error_spanned_by(s, format!("failed to parse request deserializer {:?}", s.value())))
.map_err(|_| {
ctxt.error_spanned_by(s, format!("failed to parse request deserializer {:?}", s.value()))
})
.unwrap();
event_attrs.input = Some(input_type);
}
@ -318,7 +325,9 @@ fn get_event_attrs_from(ctxt: &Ctxt, variant_attrs: &Vec<syn::Attribute>, enum_a
if name_value.path == EVENT_OUTPUT {
if let syn::Lit::Str(s) = &name_value.lit {
let output_type = parse_lit_str(s)
.map_err(|_| ctxt.error_spanned_by(s, format!("failed to parse response deserializer {:?}", s.value())))
.map_err(|_| {
ctxt.error_spanned_by(s, format!("failed to parse response deserializer {:?}", s.value()))
})
.unwrap();
event_attrs.output = Some(output_type);
}
@ -342,7 +351,9 @@ fn get_event_attrs_from(ctxt: &Ctxt, variant_attrs: &Vec<syn::Attribute>, enum_a
.collect::<Vec<(&syn::Attribute, Vec<syn::NestedMeta>)>>();
for (attr, nested_metas) in attr_meta_items_info {
nested_metas.iter().for_each(|meta_item| extract_event_attr(attr, meta_item))
nested_metas
.iter()
.for_each(|meta_item| extract_event_attr(attr, meta_item))
}
// eprintln!("😁{:#?}", event_attrs);
@ -381,7 +392,10 @@ fn get_lit_str<'a>(cx: &Ctxt, attr_name: Symbol, lit: &'a syn::Lit) -> Result<&'
} else {
cx.error_spanned_by(
lit,
format!("expected pb {} attribute to be a string: `{} = \"...\"`", attr_name, attr_name),
format!(
"expected pb {} attribute to be a string: `{} = \"...\"`",
attr_name, attr_name
),
);
Err(())
}
@ -390,7 +404,12 @@ fn get_lit_str<'a>(cx: &Ctxt, attr_name: Symbol, lit: &'a syn::Lit) -> Result<&'
fn parse_lit_into_ty(cx: &Ctxt, attr_name: Symbol, lit: &syn::Lit) -> Result<syn::Type, ()> {
let string = get_lit_str(cx, attr_name, lit)?;
parse_lit_str(string).map_err(|_| cx.error_spanned_by(lit, format!("failed to parse type: {} = {:?}", attr_name, string.value())))
parse_lit_str(string).map_err(|_| {
cx.error_spanned_by(
lit,
format!("failed to parse type: {} = {:?}", attr_name, string.value()),
)
})
}
pub fn parse_lit_str<T>(s: &syn::LitStr) -> parse::Result<T>

View File

@ -70,11 +70,4 @@ table! {
}
}
allow_tables_to_appear_in_same_query!(
app_table,
doc_table,
rev_table,
user_table,
view_table,
workspace_table,
);
allow_tables_to_appear_in_same_query!(app_table, doc_table, rev_table, user_table, view_table, workspace_table,);

View File

@ -1,7 +1,9 @@
use proc_macro2::TokenStream;
// #[proc_macro_derive(DartEvent, attributes(event_ty))]
pub fn expand_enum_derive(_input: &syn::DeriveInput) -> Result<TokenStream, Vec<syn::Error>> { Ok(TokenStream::default()) }
pub fn expand_enum_derive(_input: &syn::DeriveInput) -> Result<TokenStream, Vec<syn::Error>> {
Ok(TokenStream::default())
}
// use flowy_ast::{ASTContainer, Ctxt};
// use proc_macro2::TokenStream;

View File

@ -74,8 +74,7 @@ pub fn category_from_str(type_str: &str) -> TypeCategory {
| "UserProfile"
| "UpdateUserRequest"
| "UpdateUserParams"
| "UserError"
=> TypeCategory::Protobuf,
| "UserError" => TypeCategory::Protobuf,
"ViewType"
| "WorkspaceEvent"
| "ErrorCode"
@ -86,8 +85,7 @@ pub fn category_from_str(type_str: &str) -> TypeCategory {
| "DocObservable"
| "FFIStatusCode"
| "UserEvent"
| "UserObservable"
=> TypeCategory::Enum,
| "UserObservable" => TypeCategory::Enum,
"Option" => TypeCategory::Opt,
_ => TypeCategory::Primitive,

View File

@ -16,19 +16,25 @@ mod proto_buf;
#[proc_macro_derive(ProtoBuf, attributes(pb))]
pub fn derive_proto_buf(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as DeriveInput);
proto_buf::expand_derive(&input).unwrap_or_else(to_compile_errors).into()
proto_buf::expand_derive(&input)
.unwrap_or_else(to_compile_errors)
.into()
}
#[proc_macro_derive(ProtoBuf_Enum, attributes(pb))]
pub fn derive_proto_buf_enum(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as DeriveInput);
proto_buf::expand_enum_derive(&input).unwrap_or_else(to_compile_errors).into()
proto_buf::expand_enum_derive(&input)
.unwrap_or_else(to_compile_errors)
.into()
}
#[proc_macro_derive(Flowy_Event, attributes(event, event_err))]
pub fn derive_dart_event(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as DeriveInput);
dart_event::expand_enum_derive(&input).unwrap_or_else(to_compile_errors).into()
dart_event::expand_enum_derive(&input)
.unwrap_or_else(to_compile_errors)
.into()
}
fn to_compile_errors(errors: Vec<syn::Error>) -> proc_macro2::TokenStream {

View File

@ -6,16 +6,20 @@ pub fn make_de_token_steam(ctxt: &Ctxt, ast: &ASTContainer) -> Option<TokenStrea
let pb_ty = ast.attrs.pb_struct_type()?;
let struct_ident = &ast.ident;
let build_take_fields = ast.data.all_fields().filter(|f| !f.attrs.skip_deserializing()).flat_map(|field| {
if let Some(func) = field.attrs.deserialize_with() {
let member = &field.member;
Some(quote! { o.#member=#struct_ident::#func(pb); })
} else if field.attrs.is_one_of() {
token_stream_for_one_of(ctxt, field)
} else {
token_stream_for_field(ctxt, &field.member, &field.ty, false)
}
});
let build_take_fields = ast
.data
.all_fields()
.filter(|f| !f.attrs.skip_deserializing())
.flat_map(|field| {
if let Some(func) = field.attrs.deserialize_with() {
let member = &field.member;
Some(quote! { o.#member=#struct_ident::#func(pb); })
} else if field.attrs.is_one_of() {
token_stream_for_one_of(ctxt, field)
} else {
token_stream_for_field(ctxt, &field.member, &field.ty, false)
}
});
let de_token_stream: TokenStream = quote! {
impl std::convert::TryFrom<bytes::Bytes> for #struct_ident {

View File

@ -3,7 +3,11 @@ mod enum_serde;
mod serialize;
mod util;
use crate::proto_buf::{deserialize::make_de_token_steam, enum_serde::make_enum_token_stream, serialize::make_se_token_stream};
use crate::proto_buf::{
deserialize::make_de_token_steam,
enum_serde::make_enum_token_stream,
serialize::make_se_token_stream,
};
use flowy_ast::*;
use proc_macro2::TokenStream;

View File

@ -93,7 +93,9 @@ fn gen_token_stream(ctxt: &Ctxt, member: &syn::Member, ty: &syn::Type, is_option
Some(quote! { pb.#member = self.#member.clone(); })
}
},
TypeCategory::Protobuf => Some(quote! { pb.#member = ::protobuf::SingularPtrField::some(self.#member.try_into().unwrap()); }),
TypeCategory::Protobuf => {
Some(quote! { pb.#member = ::protobuf::SingularPtrField::some(self.#member.try_into().unwrap()); })
},
TypeCategory::Opt => gen_token_stream(ctxt, member, ty_info.bracket_ty_info.unwrap().ty, true),
TypeCategory::Enum => {
// let pb_enum_ident = format_ident!("{}", ty_info.ident.to_string());

View File

@ -10,13 +10,19 @@ pub struct ModuleDataMap {
impl ModuleDataMap {
#[inline]
pub fn new() -> ModuleDataMap { ModuleDataMap { map: HashMap::default() } }
pub fn new() -> ModuleDataMap {
ModuleDataMap {
map: HashMap::default(),
}
}
pub fn insert<T>(&mut self, val: T) -> Option<T>
where
T: 'static + Send + Sync,
{
self.map.insert(TypeId::of::<T>(), Box::new(val)).and_then(downcast_owned)
self.map
.insert(TypeId::of::<T>(), Box::new(val))
.and_then(downcast_owned)
}
pub fn remove<T>(&mut self) -> Option<T>
@ -37,7 +43,9 @@ impl ModuleDataMap {
where
T: 'static + Send + Sync,
{
self.map.get_mut(&TypeId::of::<T>()).and_then(|boxed| boxed.downcast_mut())
self.map
.get_mut(&TypeId::of::<T>())
.and_then(|boxed| boxed.downcast_mut())
}
pub fn contains<T>(&self) -> bool
@ -50,4 +58,6 @@ impl ModuleDataMap {
pub fn extend(&mut self, other: ModuleDataMap) { self.map.extend(other.map); }
}
fn downcast_owned<T: 'static + Send + Sync>(boxed: Box<dyn Any + Send + Sync>) -> Option<T> { boxed.downcast().ok().map(|boxed| *boxed) }
fn downcast_owned<T: 'static + Send + Sync>(boxed: Box<dyn Any + Send + Sync>) -> Option<T> {
boxed.downcast().ok().map(|boxed| *boxed)
}

View File

@ -16,7 +16,17 @@ use crate::{
module::{container::ModuleDataMap, Unit},
request::{payload::Payload, EventRequest, FromRequest},
response::{EventResponse, Responder},
service::{factory, BoxService, BoxServiceFactory, Handler, HandlerService, Service, ServiceFactory, ServiceRequest, ServiceResponse},
service::{
factory,
BoxService,
BoxServiceFactory,
Handler,
HandlerService,
Service,
ServiceFactory,
ServiceRequest,
ServiceResponse,
},
};
use futures_core::future::BoxFuture;
use std::sync::Arc;
@ -135,7 +145,10 @@ impl ServiceFactory<ModuleRequest> for Module {
let service_map = self.service_map.clone();
let module_data = self.module_data.clone();
Box::pin(async move {
let service = ModuleService { service_map, module_data };
let service = ModuleService {
service_map,
module_data,
};
let module_service = Box::new(service) as Self::Service;
Ok(module_service)
})

View File

@ -109,7 +109,10 @@ impl SystemRunner {
match rt.block_on(stop_rx) {
Ok(code) => {
if code != 0 {
Err(io::Error::new(io::ErrorKind::Other, format!("Non-zero exit code: {}", code)))
Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", code),
))
} else {
Ok(())
}

View File

@ -10,10 +10,18 @@ pub fn tokio_default_runtime() -> io::Result<tokio::runtime::Runtime> {
.enable_io()
.enable_time()
.on_thread_start(move || {
log::trace!("{:?} thread started: thread_id= {}", thread::current(), thread_id::get());
log::trace!(
"{:?} thread started: thread_id= {}",
thread::current(),
thread_id::get()
);
})
.on_thread_stop(move || {
log::trace!("{:?} thread stopping: thread_id= {}", thread::current(), thread_id::get(),);
log::trace!(
"{:?} thread stopping: thread_id= {}",
thread::current(),
thread_id::get(),
);
})
.build()
}

View File

@ -39,6 +39,8 @@ chrono = "0.4.19"
futures-core = { version = "0.3", default-features = false }
md5 = "0.7.0"
byteorder = {version = "1.3.4"}
async-stream = "0.3.2"
futures = "0.3.15"
[dev-dependencies]
flowy-test = { path = "../flowy-test" }

View File

@ -5,6 +5,8 @@ use flowy_dispatch::prelude::{EventResponse, ResponseBuilder};
use flowy_net::errors::ServerError;
use std::{convert::TryInto, fmt};
pub type DocResult<T> = std::result::Result<T, DocError>;
#[derive(Debug, Default, Clone, ProtoBuf)]
pub struct DocError {
#[pb(index = 1)]
@ -27,7 +29,12 @@ macro_rules! static_doc_error {
}
impl DocError {
fn new(code: ErrorCode, msg: &str) -> Self { Self { code, msg: msg.to_owned() } }
fn new(code: ErrorCode, msg: &str) -> Self {
Self {
code,
msg: msg.to_owned(),
}
}
pub fn context<T: Debug>(mut self, error: T) -> Self {
self.msg = format!("{:?}", error);

View File

@ -4,17 +4,17 @@ use diesel::SqliteConnection;
use parking_lot::RwLock;
use flowy_database::ConnectionPool;
use flowy_net::config::ServerConfig;
use crate::{
entities::doc::{CreateDocParams, Doc, DocDelta, QueryDocParams},
errors::DocError,
services::{
doc::{doc_controller::DocController, edit_doc_context::EditDocContext},
doc::{doc_controller::DocController, edit::EditDocContext},
server::construct_doc_server,
ws::WsDocumentManager,
},
};
use flowy_net::config::ServerConfig;
pub trait DocumentUser: Send + Sync {
fn user_dir(&self) -> Result<String, DocError>;
@ -59,7 +59,7 @@ impl FlowyDocument {
pub async fn apply_doc_delta(&self, params: DocDelta) -> Result<Doc, DocError> {
// workaround: compare the rust's delta with flutter's delta. Will be removed
// very soon
let doc = self.doc_ctrl.edit_doc(params.clone())?;
let doc = self.doc_ctrl.edit_doc(params.clone()).await?;
Ok(doc)
}
}

View File

@ -11,4 +11,6 @@ impl std::convert::Into<i32> for DocObservable {
}
#[allow(dead_code)]
pub(crate) fn observable(id: &str, ty: DocObservable) -> NotifyBuilder { NotifyBuilder::new(id, ty, OBSERVABLE_CATEGORY) }
pub(crate) fn observable(id: &str, ty: DocObservable) -> NotifyBuilder {
NotifyBuilder::new(id, ty, OBSERVABLE_CATEGORY)
}

View File

@ -1,4 +1,2 @@
mod model;
pub use model::*;

View File

@ -1,19 +1,19 @@
// Auto-generated, do not edit
// Auto-generated, do not edit
mod ws;
pub use ws::*;
mod ws;
pub use ws::*;
mod observable;
pub use observable::*;
mod observable;
pub use observable::*;
mod errors;
pub use errors::*;
mod errors;
pub use errors::*;
mod revision;
pub use revision::*;
mod revision;
pub use revision::*;
mod event;
pub use event::*;
mod event;
pub use event::*;
mod doc;
pub use doc::*;
mod doc;
pub use doc::*;

View File

@ -4,7 +4,7 @@ use dashmap::DashMap;
use crate::{
errors::DocError,
services::doc::edit_doc_context::{DocId, EditDocContext},
services::doc::edit::{DocId, EditDocContext},
};
pub(crate) struct DocCache {

View File

@ -1,17 +1,19 @@
use std::sync::Arc;
use bytes::Bytes;
use parking_lot::RwLock;
use tokio::time::{interval, Duration};
use flowy_database::{ConnectionPool, SqliteConnection};
use flowy_infra::future::{wrap_future, FnFuture};
use crate::{
entities::doc::{CreateDocParams, Doc, DocDelta, QueryDocParams},
errors::{internal_error, DocError},
module::DocumentUser,
services::{cache::DocCache, doc::edit_doc_context::EditDocContext, server::Server, ws::WsDocumentManager},
services::{cache::DocCache, doc::edit::EditDocContext, server::Server, ws::WsDocumentManager},
sql_tables::doc::{DocTable, DocTableSql},
};
use bytes::Bytes;
use flowy_database::{ConnectionPool, SqliteConnection};
use flowy_infra::future::{wrap_future, FnFuture};
use parking_lot::RwLock;
use std::sync::Arc;
use tokio::time::{interval, Duration};
pub(crate) struct DocController {
server: Server,
@ -79,10 +81,10 @@ impl DocController {
}
#[tracing::instrument(level = "debug", skip(self, delta), err)]
pub(crate) fn edit_doc(&self, delta: DocDelta) -> Result<Doc, DocError> {
pub(crate) async fn edit_doc(&self, delta: DocDelta) -> Result<Doc, DocError> {
let edit_doc_ctx = self.cache.get(&delta.doc_id)?;
let _ = edit_doc_ctx.compose_local_delta(Bytes::from(delta.data))?;
Ok(edit_doc_ctx.doc())
let _ = edit_doc_ctx.compose_local_delta(Bytes::from(delta.data)).await?;
Ok(edit_doc_ctx.doc().await?)
}
}

View File

@ -4,6 +4,7 @@ use crate::{
};
use flowy_ot::core::*;
use tokio::sync::mpsc;
pub trait CustomDocument {
fn init_delta() -> Delta;
@ -24,6 +25,7 @@ pub struct Document {
history: History,
view: View,
last_edit_time: usize,
notify: Option<mpsc::UnboundedSender<()>>,
}
impl Document {
@ -35,6 +37,7 @@ impl Document {
history: History::new(),
view: View::new(),
last_edit_time: 0,
notify: None,
}
}
@ -51,7 +54,19 @@ impl Document {
pub fn delta(&self) -> &Delta { &self.delta }
pub fn set_delta(&mut self, data: Delta) { self.delta = data; }
pub fn set_notify(&mut self, notify: mpsc::UnboundedSender<()>) { self.notify = Some(notify); }
pub fn set_delta(&mut self, data: Delta) {
self.delta = data;
match &self.notify {
None => {},
Some(notify) => {
let notify = notify.clone();
notify.send(());
},
}
}
pub fn compose_delta(&mut self, delta: &Delta) -> Result<(), DocError> {
let composed_delta = self.delta.compose(delta)?;
@ -75,7 +90,7 @@ impl Document {
}
log::trace!("document delta: {}", &composed_delta);
self.delta = composed_delta;
self.set_delta(composed_delta);
Ok(())
}
@ -139,7 +154,7 @@ impl Document {
Some(undo_delta) => {
let (new_delta, inverted_delta) = self.invert_change(&undo_delta)?;
let result = UndoResult::success(new_delta.target_len as usize);
self.delta = new_delta;
self.set_delta(new_delta);
self.history.add_redo(inverted_delta);
Ok(result)
@ -153,7 +168,7 @@ impl Document {
Some(redo_delta) => {
let (new_delta, inverted_delta) = self.invert_change(&redo_delta)?;
let result = UndoResult::success(new_delta.target_len as usize);
self.delta = new_delta;
self.set_delta(new_delta);
self.history.add_undo(inverted_delta);
Ok(result)

View File

@ -0,0 +1,177 @@
use crate::{
errors::{internal_error, DocResult},
services::doc::{edit::DocId, Document, UndoResult},
sql_tables::{DocTableChangeset, DocTableSql},
};
use async_stream::stream;
use flowy_database::ConnectionPool;
use flowy_ot::core::{Attribute, Delta, Interval};
use futures::stream::StreamExt;
use std::{cell::RefCell, sync::Arc};
use tokio::sync::{mpsc, oneshot};
pub type Ret<T> = oneshot::Sender<DocResult<T>>;
pub enum EditMsg {
Delta {
delta: Delta,
ret: Ret<()>,
},
Insert {
index: usize,
data: String,
ret: Ret<Delta>,
},
Delete {
interval: Interval,
ret: Ret<Delta>,
},
Format {
interval: Interval,
attribute: Attribute,
ret: Ret<Delta>,
},
Replace {
interval: Interval,
data: String,
ret: Ret<Delta>,
},
CanUndo {
ret: oneshot::Sender<bool>,
},
CanRedo {
ret: oneshot::Sender<bool>,
},
Undo {
ret: Ret<UndoResult>,
},
Redo {
ret: Ret<UndoResult>,
},
Doc {
ret: Ret<String>,
},
SaveRevision {
rev_id: i64,
ret: Ret<()>,
},
}
pub struct DocumentEditActor {
doc_id: DocId,
document: RefCell<Document>,
pool: Arc<ConnectionPool>,
receiver: Option<mpsc::UnboundedReceiver<EditMsg>>,
}
impl DocumentEditActor {
pub fn new(
doc_id: &str,
delta: Delta,
pool: Arc<ConnectionPool>,
receiver: mpsc::UnboundedReceiver<EditMsg>,
) -> Self {
let doc_id = doc_id.to_string();
let document = RefCell::new(Document::from_delta(delta));
Self {
doc_id,
document,
pool,
receiver: Some(receiver),
}
}
pub async fn run(mut self) {
let mut receiver = self.receiver.take().expect("Should only call once");
let stream = stream! {
loop {
match receiver.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
stream.for_each(|msg| self.handle_message(msg)).await;
}
async fn handle_message(&self, msg: EditMsg) {
match msg {
EditMsg::Delta { delta, ret } => {
let result = self.document.borrow_mut().compose_delta(&delta);
let _ = ret.send(result);
},
EditMsg::Insert { index, data, ret } => {
let delta = self.document.borrow_mut().insert(index, data);
let _ = ret.send(delta);
},
EditMsg::Delete { interval, ret } => {
let result = self.document.borrow_mut().delete(interval);
let _ = ret.send(result);
},
EditMsg::Format {
interval,
attribute,
ret,
} => {
let result = self.document.borrow_mut().format(interval, attribute);
let _ = ret.send(result);
},
EditMsg::Replace { interval, data, ret } => {
let result = self.document.borrow_mut().replace(interval, data);
let _ = ret.send(result);
},
EditMsg::CanUndo { ret } => {
let _ = ret.send(self.document.borrow().can_undo());
},
EditMsg::CanRedo { ret } => {
let _ = ret.send(self.document.borrow().can_redo());
},
EditMsg::Undo { ret } => {
let result = self.document.borrow_mut().undo();
let _ = ret.send(result);
},
EditMsg::Redo { ret } => {
let result = self.document.borrow_mut().redo();
let _ = ret.send(result);
},
EditMsg::Doc { ret } => {
let data = self.document.borrow().to_json();
let _ = ret.send(Ok(data));
},
EditMsg::SaveRevision { rev_id, ret } => {
let result = self.save_to_disk(rev_id);
let _ = ret.send(result);
},
}
}
#[tracing::instrument(level = "debug", skip(self, rev_id), err)]
fn save_to_disk(&self, rev_id: i64) -> DocResult<()> {
let data = self.document.borrow().to_json();
let changeset = DocTableChangeset {
id: self.doc_id.clone(),
data,
rev_id,
};
let sql = DocTableSql {};
let conn = self.pool.get().map_err(internal_error)?;
let _ = sql.update_doc_table(changeset, &*conn)?;
Ok(())
}
}
// #[tracing::instrument(level = "debug", skip(self, params), err)]
// fn update_doc_on_server(&self, params: UpdateDocParams) -> Result<(),
// DocError> { let token = self.user.token()?;
// let server = self.server.clone();
// tokio::spawn(async move {
// match server.update_doc(&token, params).await {
// Ok(_) => {},
// Err(e) => {
// // TODO: retry?
// log::error!("Update doc failed: {}", e);
// },
// }
// });
// Ok(())
// }

View File

@ -0,0 +1,231 @@
use crate::{
entities::{
doc::{Doc, RevType, Revision, RevisionRange},
ws::{WsDataType, WsDocumentData},
},
errors::{internal_error, DocError, DocResult},
services::{
doc::{
edit::cache::{DocumentEditActor, EditMsg},
rev_manager::RevisionManager,
UndoResult,
},
util::bytes_to_rev_id,
ws::{WsDocumentHandler, WsDocumentSender},
},
};
use bytes::Bytes;
use flowy_database::ConnectionPool;
use flowy_ot::core::{Attribute, Delta, Interval};
use std::{convert::TryFrom, sync::Arc};
use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
pub type DocId = String;
pub struct EditDocContext {
pub doc_id: DocId,
rev_manager: Arc<RevisionManager>,
document: UnboundedSender<EditMsg>,
pool: Arc<ConnectionPool>,
}
impl EditDocContext {
pub(crate) async fn new(
doc: Doc,
pool: Arc<ConnectionPool>,
ws_sender: Arc<dyn WsDocumentSender>,
) -> Result<Self, DocError> {
let delta = Delta::from_bytes(doc.data)?;
let (sender, receiver) = mpsc::unbounded_channel::<EditMsg>();
let edit_actor = DocumentEditActor::new(&doc.id, delta, pool.clone(), receiver);
tokio::task::spawn_local(edit_actor.run());
let rev_manager = Arc::new(RevisionManager::new(&doc.id, doc.rev_id, pool.clone(), ws_sender));
let edit_context = Self {
doc_id: doc.id,
rev_manager,
document: sender,
pool,
};
Ok(edit_context)
}
pub async fn insert<T: ToString>(&self, index: usize, data: T) -> Result<(), DocError> {
let (ret, rx) = oneshot::channel::<DocResult<Delta>>();
let msg = EditMsg::Insert {
index,
data: data.to_string(),
ret,
};
let _ = self.document.send(msg);
let delta_data = rx.await.map_err(internal_error)??.to_bytes();
let rev_id = self.mk_revision(&delta_data).await?;
save(rev_id, self.document.clone()).await
}
pub async fn delete(&self, interval: Interval) -> Result<(), DocError> {
let (ret, rx) = oneshot::channel::<DocResult<Delta>>();
let msg = EditMsg::Delete { interval, ret };
let _ = self.document.send(msg);
let delta_data = rx.await.map_err(internal_error)??.to_bytes();
let _ = self.mk_revision(&delta_data).await?;
Ok(())
}
pub async fn format(&self, interval: Interval, attribute: Attribute) -> Result<(), DocError> {
let (ret, rx) = oneshot::channel::<DocResult<Delta>>();
let msg = EditMsg::Format {
interval,
attribute,
ret,
};
let _ = self.document.send(msg);
let delta_data = rx.await.map_err(internal_error)??.to_bytes();
let _ = self.mk_revision(&delta_data).await?;
Ok(())
}
pub async fn replace<T: ToString>(&mut self, interval: Interval, data: T) -> Result<(), DocError> {
let (ret, rx) = oneshot::channel::<DocResult<Delta>>();
let msg = EditMsg::Replace {
interval,
data: data.to_string(),
ret,
};
let _ = self.document.send(msg);
let delta_data = rx.await.map_err(internal_error)??.to_bytes();
let _ = self.mk_revision(&delta_data).await?;
Ok(())
}
pub async fn can_undo(&self) -> bool {
let (ret, rx) = oneshot::channel::<bool>();
let msg = EditMsg::CanUndo { ret };
let _ = self.document.send(msg);
rx.await.unwrap_or(false)
}
pub async fn can_redo(&self) -> bool {
let (ret, rx) = oneshot::channel::<bool>();
let msg = EditMsg::CanRedo { ret };
let _ = self.document.send(msg);
rx.await.unwrap_or(false)
}
pub async fn undo(&self) -> Result<UndoResult, DocError> {
let (ret, rx) = oneshot::channel::<DocResult<UndoResult>>();
let msg = EditMsg::Undo { ret };
let _ = self.document.send(msg);
rx.await.map_err(internal_error)?
}
pub async fn redo(&self) -> Result<UndoResult, DocError> {
let (ret, rx) = oneshot::channel::<DocResult<UndoResult>>();
let msg = EditMsg::Redo { ret };
let _ = self.document.send(msg);
rx.await.map_err(internal_error)?
}
pub async fn doc(&self) -> DocResult<Doc> {
let (ret, rx) = oneshot::channel::<DocResult<String>>();
let msg = EditMsg::Doc { ret };
let _ = self.document.send(msg);
let data = rx.await.map_err(internal_error)??;
let rev_id = self.rev_manager.rev_id();
let id = self.doc_id.clone();
Ok(Doc { id, data, rev_id })
}
async fn mk_revision(&self, delta_data: &Bytes) -> Result<i64, DocError> {
let (base_rev_id, rev_id) = self.rev_manager.next_rev_id();
let delta_data = delta_data.to_vec();
let revision = Revision::new(base_rev_id, rev_id, delta_data, &self.doc_id, RevType::Local);
self.rev_manager.add_revision(revision).await;
Ok(rev_id)
}
#[tracing::instrument(level = "debug", skip(self, data), err)]
pub(crate) async fn compose_local_delta(&self, data: Bytes) -> Result<(), DocError> {
let delta = Delta::from_bytes(&data)?;
let (ret, rx) = oneshot::channel::<DocResult<()>>();
let msg = EditMsg::Delta { delta, ret };
let _ = self.document.send(msg);
let _ = rx.await.map_err(internal_error)??;
let rev_id = self.mk_revision(&data).await?;
save(rev_id, self.document.clone()).await
}
#[cfg(feature = "flowy_test")]
pub async fn doc_json(&self) -> DocResult<String> {
let (ret, rx) = oneshot::channel::<DocResult<String>>();
let msg = EditMsg::Doc { ret };
let _ = self.document.send(msg);
rx.await.map_err(internal_error)?
}
}
impl WsDocumentHandler for EditDocContext {
fn receive(&self, doc_data: WsDocumentData) {
let document = self.document.clone();
let rev_manager = self.rev_manager.clone();
let f = |doc_data: WsDocumentData| async move {
let bytes = Bytes::from(doc_data.data);
match doc_data.ty {
WsDataType::PushRev => {
let _ = handle_push_rev(bytes, rev_manager, document).await?;
},
WsDataType::PullRev => {
let range = RevisionRange::try_from(bytes)?;
let _ = rev_manager.send_revisions(range)?;
},
WsDataType::Acked => {
let rev_id = bytes_to_rev_id(bytes.to_vec())?;
let _ = rev_manager.ack_rev(rev_id);
},
WsDataType::Conflict => {},
}
Result::<(), DocError>::Ok(())
};
tokio::spawn(async move {
if let Err(e) = f(doc_data).await {
log::error!("{:?}", e);
}
});
}
}
async fn save(rev_id: i64, document: UnboundedSender<EditMsg>) -> DocResult<()> {
let (ret, rx) = oneshot::channel::<DocResult<()>>();
let _ = document.send(EditMsg::SaveRevision { rev_id, ret });
let result = rx.await.map_err(internal_error)?;
result
}
async fn handle_push_rev(
rev_bytes: Bytes,
rev_manager: Arc<RevisionManager>,
document: UnboundedSender<EditMsg>,
) -> DocResult<()> {
let revision = Revision::try_from(rev_bytes)?;
let _ = rev_manager.add_revision(revision).await?;
match rev_manager.next_compose_revision() {
None => Ok(()),
Some(revision) => {
let delta = Delta::from_bytes(&revision.delta_data)?;
let (ret, rx) = oneshot::channel::<DocResult<()>>();
let msg = EditMsg::Delta { delta, ret };
let _ = document.send(msg);
match rx.await.map_err(internal_error)? {
Ok(_) => save(revision.rev_id, document).await,
Err(e) => {
rev_manager.push_compose_revision(revision);
Err(e)
},
}
},
}
}

View File

@ -0,0 +1,4 @@
mod cache;
mod context;
pub use context::*;

View File

@ -1,179 +0,0 @@
use crate::{
entities::{
doc::{Doc, RevType, Revision, RevisionRange},
ws::{WsDataType, WsDocumentData},
},
errors::*,
services::{
doc::{rev_manager::RevisionManager, Document, UndoResult},
util::bytes_to_rev_id,
ws::{WsDocumentHandler, WsDocumentSender},
},
sql_tables::{doc::DocTableSql, DocTableChangeset},
};
use bytes::Bytes;
use flowy_database::ConnectionPool;
use flowy_ot::core::{Attribute, Delta, Interval};
use parking_lot::RwLock;
use std::{convert::TryFrom, sync::Arc};
pub type DocId = String;
pub struct EditDocContext {
pub doc_id: DocId,
document: Arc<RwLock<Document>>,
rev_manager: Arc<RevisionManager>,
pool: Arc<ConnectionPool>,
}
impl EditDocContext {
pub(crate) async fn new(
doc: Doc,
pool: Arc<ConnectionPool>,
ws_sender: Arc<dyn WsDocumentSender>,
) -> Result<Self, DocError> {
let delta = Delta::from_bytes(doc.data)?;
let rev_manager = Arc::new(RevisionManager::new(&doc.id, doc.rev_id, pool.clone(), ws_sender));
let document = Arc::new(RwLock::new(Document::from_delta(delta)));
let edit_context = Self {
doc_id: doc.id,
document,
rev_manager,
pool,
};
Ok(edit_context)
}
pub fn insert<T: ToString>(&self, index: usize, data: T) -> Result<(), DocError> {
let delta_data = self.document.write().insert(index, data)?.to_bytes();
let _ = self.mk_revision(&delta_data)?;
Ok(())
}
pub fn delete(&self, interval: Interval) -> Result<(), DocError> {
let delta_data = self.document.write().delete(interval)?.to_bytes();
let _ = self.mk_revision(&delta_data)?;
Ok(())
}
pub fn format(&self, interval: Interval, attribute: Attribute) -> Result<(), DocError> {
let delta_data = self.document.write().format(interval, attribute)?.to_bytes();
let _ = self.mk_revision(&delta_data)?;
Ok(())
}
pub fn replace<T: ToString>(&mut self, interval: Interval, data: T) -> Result<(), DocError> {
let delta_data = self.document.write().replace(interval, data)?.to_bytes();
let _ = self.mk_revision(&delta_data)?;
Ok(())
}
pub fn can_undo(&self) -> bool { self.document.read().can_undo() }
pub fn can_redo(&self) -> bool { self.document.read().can_redo() }
pub fn undo(&self) -> Result<UndoResult, DocError> { self.document.write().undo() }
pub fn redo(&self) -> Result<UndoResult, DocError> { self.document.write().redo() }
pub fn doc(&self) -> Doc {
Doc {
id: self.doc_id.clone(),
data: self.document.read().to_json(),
rev_id: self.rev_manager.rev_id(),
}
}
fn mk_revision(&self, delta_data: &Bytes) -> Result<(), DocError> {
let (base_rev_id, rev_id) = self.rev_manager.next_rev_id();
let delta_data = delta_data.to_vec();
let revision = Revision::new(base_rev_id, rev_id, delta_data, &self.doc_id, RevType::Local);
let _ = self.save_to_disk(revision.rev_id)?;
let _ = self.rev_manager.add_revision(revision)?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, data), err)]
pub(crate) fn compose_local_delta(&self, data: Bytes) -> Result<(), DocError> {
let delta = Delta::from_bytes(&data)?;
self.document.write().compose_delta(&delta)?;
let _ = self.mk_revision(&data)?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self), err)]
fn compose_remote_delta(&self) -> Result<(), DocError> {
self.rev_manager.next_compose_revision(|revision| {
let delta = Delta::from_bytes(&revision.delta_data)?;
self.document.write().compose_delta(&delta)?;
let _ = self.save_to_disk(revision.rev_id)?;
log::debug!("😁Document: {:?}", self.document.read().to_plain_string());
Ok(())
});
Ok(())
}
#[cfg(feature = "flowy_test")]
pub fn doc_json(&self) -> String { self.document.read().to_json() }
#[tracing::instrument(level = "debug", skip(self, rev_id), err)]
fn save_to_disk(&self, rev_id: i64) -> Result<(), DocError> {
let data = self.document.read().to_json();
let changeset = DocTableChangeset {
id: self.doc_id.clone(),
data,
rev_id,
};
let sql = DocTableSql {};
let conn = self.pool.get().map_err(internal_error)?;
let _ = sql.update_doc_table(changeset, &*conn)?;
Ok(())
}
// #[tracing::instrument(level = "debug", skip(self, params), err)]
// fn update_doc_on_server(&self, params: UpdateDocParams) -> Result<(),
// DocError> { let token = self.user.token()?;
// let server = self.server.clone();
// tokio::spawn(async move {
// match server.update_doc(&token, params).await {
// Ok(_) => {},
// Err(e) => {
// // TODO: retry?
// log::error!("Update doc failed: {}", e);
// },
// }
// });
// Ok(())
// }
}
impl WsDocumentHandler for EditDocContext {
fn receive(&self, doc_data: WsDocumentData) {
let f = |doc_data: WsDocumentData| {
let bytes = Bytes::from(doc_data.data);
match doc_data.ty {
WsDataType::PushRev => {
let revision = Revision::try_from(bytes)?;
let _ = self.rev_manager.add_revision(revision)?;
let _ = self.compose_remote_delta()?;
},
WsDataType::PullRev => {
let range = RevisionRange::try_from(bytes)?;
let _ = self.rev_manager.send_rev_with_range(range)?;
},
WsDataType::Acked => {
let rev_id = bytes_to_rev_id(bytes.to_vec())?;
let _ = self.rev_manager.ack(rev_id);
},
WsDataType::Conflict => {},
}
Result::<(), DocError>::Ok(())
};
if let Err(e) = f(doc_data) {
log::error!("{:?}", e);
}
}
}

View File

@ -6,6 +6,11 @@ impl DeleteExt for DefaultDelete {
fn ext_name(&self) -> &str { "DefaultDelete" }
fn apply(&self, _delta: &Delta, interval: Interval) -> Option<Delta> {
Some(DeltaBuilder::new().retain(interval.start).delete(interval.size()).build())
Some(
DeltaBuilder::new()
.retain(interval.start)
.delete(interval.size())
.build(),
)
}
}

View File

@ -21,7 +21,10 @@ impl DeleteExt for PreserveLineFormatOnMerge {
}
iter.seek::<CharMetric>(interval.size() - 1);
let mut new_delta = DeltaBuilder::new().retain(interval.start).delete(interval.size()).build();
let mut new_delta = DeltaBuilder::new()
.retain(interval.start)
.delete(interval.size())
.build();
while iter.has_next() {
match iter.next() {

View File

@ -7,6 +7,6 @@ mod history;
mod view;
pub(crate) mod doc_controller;
pub mod edit_doc_context;
pub mod edit;
pub mod extensions;
mod rev_manager;

View File

@ -1,203 +0,0 @@
use crate::{
entities::doc::{RevType, Revision, RevisionRange},
errors::{internal_error, DocError},
services::{util::RevIdCounter, ws::WsDocumentSender},
sql_tables::{OpTableSql, RevChangeset, RevState},
};
use dashmap::DashSet;
use flowy_database::ConnectionPool;
use parking_lot::RwLock;
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
};
use tokio::{task::JoinHandle, time::Duration};
pub struct RevisionManager {
doc_id: String,
op_sql: Arc<OpTableSql>,
pool: Arc<ConnectionPool>,
rev_id_counter: RevIdCounter,
ws_sender: Arc<dyn WsDocumentSender>,
rev_cache: Arc<RwLock<HashMap<i64, Revision>>>,
ack_rev_cache: Arc<DashSet<i64>>,
remote_rev_cache: RwLock<VecDeque<Revision>>,
save_operation: RwLock<Option<JoinHandle<()>>>,
}
impl RevisionManager {
pub fn new(doc_id: &str, rev_id: i64, pool: Arc<ConnectionPool>, ws_sender: Arc<dyn WsDocumentSender>) -> Self {
let op_sql = Arc::new(OpTableSql {});
let rev_id_counter = RevIdCounter::new(rev_id);
let rev_cache = Arc::new(RwLock::new(HashMap::new()));
let remote_rev_cache = RwLock::new(VecDeque::new());
let ack_rev_cache = Arc::new(DashSet::new());
Self {
doc_id: doc_id.to_owned(),
op_sql,
pool,
rev_id_counter,
ws_sender,
rev_cache,
ack_rev_cache,
remote_rev_cache,
save_operation: RwLock::new(None),
}
}
pub fn next_compose_revision<F>(&self, mut f: F)
where
F: FnMut(&Revision) -> Result<(), DocError>,
{
if let Some(rev) = self.remote_rev_cache.write().pop_front() {
match f(&rev) {
Ok(_) => {},
Err(e) => {
log::error!("{}", e);
self.remote_rev_cache.write().push_front(rev);
},
}
}
}
#[tracing::instrument(level = "debug", skip(self, revision))]
pub fn add_revision(&self, revision: Revision) -> Result<(), DocError> {
self.rev_cache.write().insert(revision.rev_id, revision.clone());
self.save_revisions();
match revision.ty {
RevType::Local => match self.ws_sender.send(revision.into()) {
Ok(_) => {},
Err(e) => {
log::error!("Send delta failed: {:?}", e);
},
},
RevType::Remote => {
self.remote_rev_cache.write().push_back(revision);
},
}
Ok(())
}
pub fn ack(&self, rev_id: i64) -> Result<(), DocError> {
log::debug!("Receive rev_id: {} acked", rev_id);
self.ack_rev_cache.insert(rev_id);
self.update_revisions();
Ok(())
}
pub fn next_rev_id(&self) -> (i64, i64) {
let cur = self.rev_id_counter.value();
let next = self.rev_id_counter.next();
(cur, next)
}
pub fn rev_id(&self) -> i64 { self.rev_id_counter.value() }
pub fn send_rev_with_range(&self, range: RevisionRange) -> Result<(), DocError> {
debug_assert!(&range.doc_id == &self.doc_id);
unimplemented!()
}
fn save_revisions(&self) {
let op_sql = self.op_sql.clone();
let pool = self.pool.clone();
let mut write_guard = self.save_operation.write();
if let Some(handler) = write_guard.take() {
handler.abort();
}
let rev_cache = self.rev_cache.clone();
let ack_rev_cache = self.ack_rev_cache.clone();
let ids = self.rev_cache.read().keys().map(|v| v.clone()).collect::<Vec<i64>>();
*write_guard = Some(tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(300)).await;
let revisions = rev_cache
.read()
.values()
.map(|v| {
let state = match ack_rev_cache.contains(&v.rev_id) {
true => RevState::Acked,
false => RevState::Local,
};
(v.clone(), state)
})
.collect::<Vec<(Revision, RevState)>>();
let mut rev_cache_write = rev_cache.write();
let conn = &*pool.get().map_err(internal_error).unwrap();
let result = conn.immediate_transaction::<_, DocError, _>(|| {
let _ = op_sql.create_rev_table(revisions, conn).unwrap();
Ok(())
});
match result {
Ok(_) => rev_cache_write.retain(|k, _| !ids.contains(k)),
Err(e) => log::error!("Save revision failed: {:?}", e),
}
}));
}
fn update_revisions(&self) {
match self.rev_cache.try_read_for(Duration::from_millis(300)) {
None => log::warn!("try read rev_cache failed"),
Some(read_guard) => {
let rev_ids = self
.ack_rev_cache
.iter()
.flat_map(|k| match read_guard.contains_key(&k) {
true => None,
false => Some(k.clone()),
})
.collect::<Vec<i64>>();
log::debug!("Try to update {:?} state", rev_ids);
if rev_ids.is_empty() {
return;
}
let conn = &*self.pool.get().map_err(internal_error).unwrap();
let result = conn.immediate_transaction::<_, DocError, _>(|| {
for rev_id in &rev_ids {
let changeset = RevChangeset {
doc_id: self.doc_id.clone(),
rev_id: rev_id.clone(),
state: RevState::Acked,
};
let _ = self.op_sql.update_rev_table(changeset, conn)?;
}
Ok(())
});
match result {
Ok(_) => {
rev_ids.iter().for_each(|rev_id| {
self.ack_rev_cache.remove(rev_id);
});
},
Err(e) => log::error!("Save revision failed: {:?}", e),
}
},
}
}
fn delete_revision(&self, rev_id: i64) {
let op_sql = self.op_sql.clone();
let pool = self.pool.clone();
let doc_id = self.doc_id.clone();
tokio::spawn(async move {
let conn = &*pool.get().map_err(internal_error).unwrap();
let result = conn.immediate_transaction::<_, DocError, _>(|| {
let _ = op_sql.delete_rev_table(&doc_id, rev_id, conn)?;
Ok(())
});
match result {
Ok(_) => {},
Err(e) => log::error!("Delete revision failed: {:?}", e),
}
});
}
}

View File

@ -0,0 +1,5 @@
mod rev_manager;
mod store;
mod util;
pub use rev_manager::*;

View File

@ -0,0 +1,108 @@
use crate::{
entities::doc::{RevType, Revision, RevisionRange},
errors::DocError,
services::{
doc::rev_manager::store::{Store, StoreMsg},
util::RevIdCounter,
ws::WsDocumentSender,
},
};
use flowy_database::ConnectionPool;
use parking_lot::RwLock;
use std::{collections::VecDeque, sync::Arc};
use tokio::sync::{mpsc, oneshot};
pub struct RevisionManager {
doc_id: String,
rev_id_counter: RevIdCounter,
ws_sender: Arc<dyn WsDocumentSender>,
store_sender: mpsc::Sender<StoreMsg>,
pending_revs: RwLock<VecDeque<Revision>>,
}
// tokio::time::timeout
impl RevisionManager {
pub fn new(doc_id: &str, rev_id: i64, pool: Arc<ConnectionPool>, ws_sender: Arc<dyn WsDocumentSender>) -> Self {
let (sender, receiver) = mpsc::channel::<StoreMsg>(50);
let store = Store::new(doc_id, pool, receiver);
tokio::task::spawn_local(store.run());
let doc_id = doc_id.to_string();
let rev_id_counter = RevIdCounter::new(rev_id);
let pending_revs = RwLock::new(VecDeque::new());
Self {
doc_id,
rev_id_counter,
ws_sender,
pending_revs,
store_sender: sender,
}
}
// pub fn next_compose_revision<F>(&self, mut f: F)
// where
// F: FnMut(&Revision) -> Result<(), DocError>,
// {
// if let Some(rev) = self.pending_revs.write().pop_front() {
// match f(&rev) {
// Ok(_) => {},
// Err(e) => {
// log::error!("{}", e);
// self.pending_revs.write().push_front(rev);
// },
// }
// }
// }
pub fn push_compose_revision(&self, revision: Revision) { self.pending_revs.write().push_front(revision); }
pub fn next_compose_revision(&self) -> Option<Revision> { self.pending_revs.write().pop_front() }
#[tracing::instrument(level = "debug", skip(self, revision))]
pub async fn add_revision(&self, revision: Revision) -> Result<(), DocError> {
let msg = StoreMsg::Revision {
revision: revision.clone(),
};
let _ = self.store_sender.send(msg).await;
match revision.ty {
RevType::Local => match self.ws_sender.send(revision.into()) {
Ok(_) => {},
Err(e) => log::error!("Send delta failed: {:?}", e),
},
RevType::Remote => {
self.pending_revs.write().push_back(revision);
},
}
Ok(())
}
pub fn ack_rev(&self, rev_id: i64) -> Result<(), DocError> {
let sender = self.store_sender.clone();
tokio::spawn(async move {
let _ = sender.send(StoreMsg::AckRevision { rev_id }).await;
});
Ok(())
}
pub fn rev_id(&self) -> i64 { self.rev_id_counter.value() }
pub fn next_rev_id(&self) -> (i64, i64) {
let cur = self.rev_id_counter.value();
let next = self.rev_id_counter.next();
(cur, next)
}
pub fn send_revisions(&self, range: RevisionRange) -> Result<(), DocError> {
debug_assert!(&range.doc_id == &self.doc_id);
let (ret, _rx) = oneshot::channel();
let sender = self.store_sender.clone();
tokio::spawn(async move {
let _ = sender.send(StoreMsg::SendRevisions { range, ret }).await;
});
unimplemented!()
}
}

View File

@ -0,0 +1,188 @@
use crate::{
entities::doc::{Revision, RevisionRange},
errors::{internal_error, DocError, DocResult},
services::doc::rev_manager::util::RevisionOperation,
sql_tables::{OpTableSql, RevChangeset, RevState},
};
use async_stream::stream;
use dashmap::DashMap;
use flowy_database::ConnectionPool;
use futures::stream::StreamExt;
use std::{cell::RefCell, sync::Arc, time::Duration};
use tokio::{
sync::{mpsc, oneshot},
task::JoinHandle,
};
pub enum StoreMsg {
Revision {
revision: Revision,
},
AckRevision {
rev_id: i64,
},
SendRevisions {
range: RevisionRange,
ret: oneshot::Sender<DocResult<Vec<Revision>>>,
},
}
pub struct Store {
doc_id: String,
op_sql: Arc<OpTableSql>,
pool: Arc<ConnectionPool>,
revs: Arc<DashMap<i64, RevisionOperation>>,
save_operation: RefCell<Option<JoinHandle<()>>>,
receiver: Option<mpsc::Receiver<StoreMsg>>,
}
impl Store {
pub fn new(doc_id: &str, pool: Arc<ConnectionPool>, receiver: mpsc::Receiver<StoreMsg>) -> Store {
let op_sql = Arc::new(OpTableSql {});
let revs = Arc::new(DashMap::new());
let save_operation = RefCell::new(None);
let doc_id = doc_id.to_owned();
Self {
doc_id,
op_sql,
pool,
revs,
save_operation,
receiver: Some(receiver),
}
}
pub async fn run(mut self) {
let mut receiver = self.receiver.take().expect("Should only call once");
let stream = stream! {
loop {
match receiver.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
stream.for_each(|msg| self.handle_message(msg)).await;
}
async fn handle_message(&self, msg: StoreMsg) {
match msg {
StoreMsg::Revision { revision } => {
self.handle_new_revision(revision);
},
StoreMsg::AckRevision { rev_id } => {
self.handle_revision_acked(rev_id);
},
StoreMsg::SendRevisions { range: _, ret: _ } => {
unimplemented!()
},
}
}
pub fn handle_new_revision(&self, revision: Revision) {
let mut operation = RevisionOperation::new(&revision);
let _receiver = operation.receiver();
self.revs.insert(revision.rev_id, operation);
self.save_revisions();
}
pub fn handle_revision_acked(&self, rev_id: i64) {
match self.revs.get_mut(&rev_id) {
None => {},
Some(mut rev) => rev.value_mut().finish(),
}
}
pub fn revs_in_range(&self, _range: RevisionRange) -> DocResult<Vec<Revision>> { unimplemented!() }
fn save_revisions(&self) {
if let Some(handler) = self.save_operation.borrow_mut().take() {
handler.abort();
}
let revs = self.revs.clone();
let pool = self.pool.clone();
let op_sql = self.op_sql.clone();
*self.save_operation.borrow_mut() = Some(tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(300)).await;
let ids = revs.iter().map(|kv| kv.key().clone()).collect::<Vec<i64>>();
let revisions = revs
.iter()
.map(|kv| ((*kv.value()).clone(), kv.state))
.collect::<Vec<(Revision, RevState)>>();
let conn = &*pool.get().map_err(internal_error).unwrap();
let result = conn.immediate_transaction::<_, DocError, _>(|| {
let _ = op_sql.create_rev_table(revisions, conn).unwrap();
Ok(())
});
match result {
Ok(_) => revs.retain(|k, _| !ids.contains(k)),
Err(e) => log::error!("Save revision failed: {:?}", e),
}
}));
}
fn update_revisions(&self) {
let rev_ids = self
.revs
.iter()
.flat_map(|kv| match kv.state == RevState::Acked {
true => None,
false => Some(kv.key().clone()),
})
.collect::<Vec<i64>>();
if rev_ids.is_empty() {
return;
}
log::debug!("Try to update {:?} state", rev_ids);
match self.update(&rev_ids) {
Ok(_) => {
self.revs.retain(|k, _| !rev_ids.contains(k));
},
Err(e) => log::error!("Save revision failed: {:?}", e),
}
}
fn update(&self, rev_ids: &Vec<i64>) -> Result<(), DocError> {
let conn = &*self.pool.get().map_err(internal_error).unwrap();
let result = conn.immediate_transaction::<_, DocError, _>(|| {
for rev_id in rev_ids {
let changeset = RevChangeset {
doc_id: self.doc_id.clone(),
rev_id: rev_id.clone(),
state: RevState::Acked,
};
let _ = self.op_sql.update_rev_table(changeset, conn)?;
}
Ok(())
});
result
}
// fn delete_revision(&self, rev_id: i64) {
// let op_sql = self.op_sql.clone();
// let pool = self.pool.clone();
// let doc_id = self.doc_id.clone();
// tokio::spawn(async move {
// let conn = &*pool.get().map_err(internal_error).unwrap();
// let result = conn.immediate_transaction::<_, DocError, _>(|| {
// let _ = op_sql.delete_rev_table(&doc_id, rev_id, conn)?;
// Ok(())
// });
//
// match result {
// Ok(_) => {},
// Err(e) => log::error!("Delete revision failed: {:?}", e),
// }
// });
// }
}

View File

@ -0,0 +1,43 @@
use crate::{entities::doc::Revision, errors::DocResult, sql_tables::RevState};
use tokio::sync::oneshot;
pub type Sender = oneshot::Sender<DocResult<()>>;
pub type Receiver = oneshot::Receiver<DocResult<()>>;
pub struct RevisionOperation {
inner: Revision,
ret: Option<Sender>,
receiver: Option<Receiver>,
pub state: RevState,
}
impl RevisionOperation {
pub fn new(revision: &Revision) -> Self {
let (ret, receiver) = oneshot::channel::<DocResult<()>>();
Self {
inner: revision.clone(),
ret: Some(ret),
receiver: Some(receiver),
state: RevState::Local,
}
}
pub fn receiver(&mut self) -> Receiver { self.receiver.take().expect("Receiver should not be called twice") }
pub fn finish(&mut self) {
self.state = RevState::Acked;
match self.ret.take() {
None => {},
Some(ret) => {
let _ = ret.send(Ok(()));
},
}
}
}
impl std::ops::Deref for RevisionOperation {
type Target = Revision;
fn deref(&self) -> &Self::Target { &self.inner }
}

View File

@ -92,4 +92,6 @@ fn construct_format_exts() -> Vec<FormatExtension> {
]
}
fn construct_delete_exts() -> Vec<DeleteExtension> { vec![Box::new(PreserveLineFormatOnMerge {}), Box::new(DefaultDelete {})] }
fn construct_delete_exts() -> Vec<DeleteExtension> {
vec![Box::new(PreserveLineFormatOnMerge {}), Box::new(DefaultDelete {})]
}

View File

@ -7,13 +7,19 @@ use flowy_infra::future::ResultFuture;
pub struct DocServerMock {}
impl DocumentServerAPI for DocServerMock {
fn create_doc(&self, _token: &str, _params: CreateDocParams) -> ResultFuture<(), DocError> { ResultFuture::new(async { Ok(()) }) }
fn create_doc(&self, _token: &str, _params: CreateDocParams) -> ResultFuture<(), DocError> {
ResultFuture::new(async { Ok(()) })
}
fn read_doc(&self, _token: &str, _params: QueryDocParams) -> ResultFuture<Option<Doc>, DocError> {
ResultFuture::new(async { Ok(None) })
}
fn update_doc(&self, _token: &str, _params: UpdateDocParams) -> ResultFuture<(), DocError> { ResultFuture::new(async { Ok(()) }) }
fn update_doc(&self, _token: &str, _params: UpdateDocParams) -> ResultFuture<(), DocError> {
ResultFuture::new(async { Ok(()) })
}
fn delete_doc(&self, _token: &str, _params: QueryDocParams) -> ResultFuture<(), DocError> { ResultFuture::new(async { Ok(()) }) }
fn delete_doc(&self, _token: &str, _params: QueryDocParams) -> ResultFuture<(), DocError> {
ResultFuture::new(async { Ok(()) })
}
}

View File

@ -54,7 +54,9 @@ impl KV {
let conn = database.get_connection().unwrap();
SqliteConnection::execute(&*conn, KV_SQL).unwrap();
let mut store = KV_HOLDER.write().map_err(|e| format!("KVStore write failed: {:?}", e))?;
let mut store = KV_HOLDER
.write()
.map_err(|e| format!("KVStore write failed: {:?}", e))?;
store.database = Some(database);
Ok(())

View File

@ -1,4 +1,2 @@
mod model;
pub use model::*;

View File

@ -1,4 +1,4 @@
// Auto-generated, do not edit
// Auto-generated, do not edit
mod kv;
pub use kv::*;
mod kv;
pub use kv::*;

View File

@ -98,7 +98,10 @@ impl fmt::Display for Type {
}
}
fn format_span_context<S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>>(span: &SpanRef<S>, ty: Type) -> String {
fn format_span_context<S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>>(
span: &SpanRef<S>,
ty: Type,
) -> String {
format!("[⛳ {} - {}]", span.metadata().name().to_uppercase(), ty)
}
@ -164,11 +167,9 @@ where
// Add all the other fields associated with the event, expect the message we
// already used.
for (key, value) in event_visitor
.values()
.iter()
.filter(|(&key, _)| key != "message" && !FLOWY_RESERVED_FIELDS.contains(&key) && !IGNORE_FIELDS.contains(&key))
{
for (key, value) in event_visitor.values().iter().filter(|(&key, _)| {
key != "message" && !FLOWY_RESERVED_FIELDS.contains(&key) && !IGNORE_FIELDS.contains(&key)
}) {
map_serializer.serialize_entry(key, value)?;
}

View File

@ -1,4 +1,2 @@
mod model;
pub use model::*;

View File

@ -1,4 +1,4 @@
// Auto-generated, do not edit
// Auto-generated, do not edit
mod subject;
pub use subject::*;
mod subject;
pub use subject::*;

View File

@ -171,7 +171,8 @@ lazy_static! {
AttributeKey::Size,
AttributeKey::Background,
]);
static ref INGORE_KEYS: HashSet<AttributeKey> = HashSet::from_iter(vec![AttributeKey::Width, AttributeKey::Height,]);
static ref INGORE_KEYS: HashSet<AttributeKey> =
HashSet::from_iter(vec![AttributeKey::Width, AttributeKey::Height,]);
}
#[derive(Debug, PartialEq, Eq, Clone)]

View File

@ -45,7 +45,11 @@ impl Serialize for Attributes {
Err(e) => log::error!("Serial {:?} failed. {:?}", k, e),
},
AttributeKey::Link | AttributeKey::Color | AttributeKey::Background | AttributeKey::Align | AttributeKey::List => {
AttributeKey::Link
| AttributeKey::Color
| AttributeKey::Background
| AttributeKey::Align
| AttributeKey::List => {
map.serialize_entry(k, v)?;
},
}
@ -102,7 +106,9 @@ impl<'de> Deserialize<'de> for AttributeValue {
struct AttributeValueVisitor;
impl<'de> Visitor<'de> for AttributeValueVisitor {
type Value = AttributeValue;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { formatter.write_str("bool, usize or string") }
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("bool, usize or string")
}
fn visit_bool<E>(self, value: bool) -> Result<Self::Value, E>
where
E: de::Error,

View File

@ -208,7 +208,9 @@ impl OpNewline {
pub fn is_not_found(&self) -> bool { self == &OpNewline::NotFound }
pub fn is_contain(&self) -> bool { self.is_start() || self.is_end() || self.is_equal() || self == &OpNewline::Contain }
pub fn is_contain(&self) -> bool {
self.is_start() || self.is_end() || self.is_equal() || self == &OpNewline::Contain
}
pub fn is_equal(&self) -> bool { self == &OpNewline::Equal }
}

View File

@ -33,7 +33,9 @@ impl Interval {
pub fn contains(&self, val: usize) -> bool { self.start <= val && val < self.end }
pub fn contains_range(&self, start: usize, end: usize) -> bool { !self.intersect(Interval::new(start, end)).is_empty() }
pub fn contains_range(&self, start: usize, end: usize) -> bool {
!self.intersect(Interval::new(start, end)).is_empty()
}
pub fn is_after(&self, val: usize) -> bool { self.start > val }

View File

@ -42,7 +42,9 @@ impl Operation {
pub fn has_attribute(&self) -> bool { !self.get_attributes().is_empty() }
pub fn contain_attribute(&self, attribute: &Attribute) -> bool { self.get_attributes().contains_key(&attribute.key) }
pub fn contain_attribute(&self, attribute: &Attribute) -> bool {
self.get_attributes().contains_key(&attribute.key)
}
pub fn len(&self) -> usize {
match self {
@ -70,7 +72,11 @@ impl Operation {
},
Operation::Insert(insert) => {
let attributes = self.get_attributes();
left = Some(OpBuilder::insert(&insert.s[0..index]).attributes(attributes.clone()).build());
left = Some(
OpBuilder::insert(&insert.s[0..index])
.attributes(attributes.clone())
.build(),
);
right = Some(
OpBuilder::insert(&insert.s[index..insert.num_chars()])
.attributes(attributes)
@ -167,7 +173,12 @@ impl fmt::Display for Retain {
impl Retain {
pub fn merge_or_new(&mut self, n: usize, attributes: Attributes) -> Option<Operation> {
log::trace!("merge_retain_or_new_op: len: {:?}, l: {} - r: {}", n, self.attributes, attributes);
log::trace!(
"merge_retain_or_new_op: len: {:?}, l: {} - r: {}",
n,
self.attributes,
attributes
);
if self.attributes == attributes {
self.n += n;
None

View File

@ -7,7 +7,12 @@ pub struct OTError {
}
impl OTError {
pub fn new(code: OTErrorCode, msg: &str) -> OTError { Self { code, msg: msg.to_owned() } }
pub fn new(code: OTErrorCode, msg: &str) -> OTError {
Self {
code,
msg: msg.to_owned(),
}
}
}
impl fmt::Display for OTError {

View File

@ -29,7 +29,9 @@ impl DocumentDepsResolver {
let ws_manager = Arc::new(RwLock::new(WsDocumentManager::new(sender)));
let ws_handler = Arc::new(WsDocumentReceiver { inner: ws_manager.clone() });
let ws_handler = Arc::new(WsDocumentReceiver {
inner: ws_manager.clone(),
});
self.user_session.add_ws_handler(ws_handler);
@ -74,7 +76,10 @@ struct WsSenderImpl {
impl WsDocumentSender for WsSenderImpl {
fn send(&self, data: WsDocumentData) -> Result<(), DocError> {
let msg: WsMessage = data.into();
let _ = self.user.send_ws_msg(msg).map_err(|e| DocError::internal().context(e))?;
let _ = self
.user
.send_ws_msg(msg)
.map_err(|e| DocError::internal().context(e))?;
Ok(())
}
}

View File

@ -1,4 +1,3 @@
use flowy_database::ConnectionPool;
use flowy_user::services::user::UserSession;
use flowy_workspace::{
@ -32,12 +31,22 @@ impl WorkspaceDepsResolver {
impl WorkspaceDatabase for Resolver {
fn db_pool(&self) -> Result<Arc<ConnectionPool>, WorkspaceError> {
self.user_session.db_pool().map_err(|e| WorkspaceError::internal().context(e))
self.user_session
.db_pool()
.map_err(|e| WorkspaceError::internal().context(e))
}
}
impl WorkspaceUser for Resolver {
fn user_id(&self) -> Result<String, WorkspaceError> { self.user_session.user_id().map_err(|e| WorkspaceError::internal().context(e)) }
fn user_id(&self) -> Result<String, WorkspaceError> {
self.user_session
.user_id()
.map_err(|e| WorkspaceError::internal().context(e))
}
fn token(&self) -> Result<String, WorkspaceError> { self.user_session.token().map_err(|e| WorkspaceError::internal().context(e)) }
fn token(&self) -> Result<String, WorkspaceError> {
self.user_session
.token()
.map_err(|e| WorkspaceError::internal().context(e))
}
}

View File

@ -21,7 +21,10 @@ impl Database {
}
let pool = ConnectionPool::new(pool_config, &uri)?;
Ok(Self { uri, pool: Arc::new(pool) })
Ok(Self {
uri,
pool: Arc::new(pool),
})
}
pub fn get_uri(&self) -> &str { &self.uri }

View File

@ -1,4 +1,10 @@
use error_chain::{error_chain, error_chain_processing, impl_error_chain_kind, impl_error_chain_processed, impl_extract_backtrace};
use error_chain::{
error_chain,
error_chain_processing,
impl_error_chain_kind,
impl_error_chain_processed,
impl_extract_backtrace,
};
error_chain! {
errors {

View File

@ -57,5 +57,7 @@ mod tests {
}
#[quickcheck_macros::quickcheck]
fn valid_emails_are_parsed_successfully(valid_email: ValidEmailFixture) -> bool { UserEmail::parse(valid_email.0).is_ok() }
fn valid_emails_are_parsed_successfully(valid_email: ValidEmailFixture) -> bool {
UserEmail::parse(valid_email.0).is_ok()
}
}

View File

@ -139,6 +139,11 @@ impl TryInto<UpdateUserParams> for UpdateUserRequest {
Some(password) => Some(UserPassword::parse(password).map_err(|e| UserError::code(e))?.0),
};
Ok(UpdateUserParams { id, name, email, password })
Ok(UpdateUserParams {
id,
name,
email,
password,
})
}
}

View File

@ -28,7 +28,10 @@ pub async fn sign_out(session: Unit<Arc<UserSession>>) -> Result<(), UserError>
}
#[tracing::instrument(name = "update_user", skip(data, session))]
pub async fn update_user_handler(data: Data<UpdateUserRequest>, session: Unit<Arc<UserSession>>) -> Result<(), UserError> {
pub async fn update_user_handler(
data: Data<UpdateUserRequest>,
session: Unit<Arc<UserSession>>,
) -> Result<(), UserError> {
let params: UpdateUserParams = data.into_inner().try_into()?;
session.update_user(params).await?;
Ok(())

View File

@ -1,4 +1,2 @@
mod model;
pub use model::*;

View File

@ -1,19 +1,19 @@
// Auto-generated, do not edit
// Auto-generated, do not edit
mod observable;
pub use observable::*;
mod observable;
pub use observable::*;
mod user_table;
pub use user_table::*;
mod user_table;
pub use user_table::*;
mod errors;
pub use errors::*;
mod errors;
pub use errors::*;
mod user_profile;
pub use user_profile::*;
mod user_profile;
pub use user_profile::*;
mod event;
pub use event::*;
mod event;
pub use event::*;
mod auth;
pub use auth::*;
mod auth;
pub use auth::*;

View File

@ -14,7 +14,11 @@ pub(crate) struct UserDB {
}
impl UserDB {
pub(crate) fn new(db_dir: &str) -> Self { Self { db_dir: db_dir.to_owned() } }
pub(crate) fn new(db_dir: &str) -> Self {
Self {
db_dir: db_dir.to_owned(),
}
}
fn open_user_db(&self, user_id: &str) -> Result<(), UserError> {
if user_id.is_empty() {

View File

@ -16,14 +16,13 @@ use flowy_database::{
ExpressionMethods,
UserDatabaseConnection,
};
use flowy_infra::{future::wrap_future, kv::KV};
use flowy_infra::kv::KV;
use flowy_net::config::ServerConfig;
use flowy_sqlite::ConnectionPool;
use flowy_ws::{connect::Retry, WsController, WsMessage, WsMessageHandler, WsSender};
use flowy_ws::{WsController, WsMessage, WsMessageHandler};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::{sync::Arc, time::Duration};
use tokio::task::JoinHandle;
use std::sync::Arc;
pub struct UserSessionConfig {
root_dir: String,

View File

@ -55,7 +55,8 @@ impl TryInto<CreateAppParams> for CreateAppRequest {
let id = WorkspaceId::parse(self.workspace_id).map_err(|e| WorkspaceError::workspace_id().context(e))?;
let color_style = AppColorStyle::parse(self.color_style).map_err(|e| WorkspaceError::color_style().context(e))?;
let color_style =
AppColorStyle::parse(self.color_style).map_err(|e| WorkspaceError::color_style().context(e))?;
Ok(CreateAppParams {
workspace_id: id.0,

View File

@ -18,7 +18,9 @@ impl TryInto<DeleteAppParams> for DeleteAppRequest {
type Error = WorkspaceError;
fn try_into(self) -> Result<DeleteAppParams, Self::Error> {
let app_id = AppId::parse(self.app_id).map_err(|e| WorkspaceError::app_id().context(e))?.0;
let app_id = AppId::parse(self.app_id)
.map_err(|e| WorkspaceError::app_id().context(e))?
.0;
Ok(DeleteAppParams { app_id })
}

View File

@ -69,7 +69,9 @@ impl TryInto<QueryAppParams> for QueryAppRequest {
type Error = WorkspaceError;
fn try_into(self) -> Result<QueryAppParams, Self::Error> {
let app_id = AppId::parse(self.app_id).map_err(|e| WorkspaceError::app_id().context(e))?.0;
let app_id = AppId::parse(self.app_id)
.map_err(|e| WorkspaceError::app_id().context(e))?
.0;
Ok(QueryAppParams {
app_id,

View File

@ -72,11 +72,17 @@ impl TryInto<UpdateAppParams> for UpdateAppRequest {
type Error = WorkspaceError;
fn try_into(self) -> Result<UpdateAppParams, Self::Error> {
let app_id = AppId::parse(self.app_id).map_err(|e| WorkspaceError::app_id().context(e))?.0;
let app_id = AppId::parse(self.app_id)
.map_err(|e| WorkspaceError::app_id().context(e))?
.0;
let name = match self.name {
None => None,
Some(name) => Some(AppName::parse(name).map_err(|e| WorkspaceError::workspace_name().context(e))?.0),
Some(name) => Some(
AppName::parse(name)
.map_err(|e| WorkspaceError::workspace_name().context(e))?
.0,
),
};
let color_style = match self.color_style {

View File

@ -19,7 +19,9 @@ impl TryInto<DeleteViewParams> for DeleteViewRequest {
type Error = WorkspaceError;
fn try_into(self) -> Result<DeleteViewParams, Self::Error> {
let view_id = ViewId::parse(self.view_id).map_err(|e| WorkspaceError::view_id().context(e))?.0;
let view_id = ViewId::parse(self.view_id)
.map_err(|e| WorkspaceError::view_id().context(e))?
.0;
Ok(DeleteViewParams { view_id })
}

View File

@ -68,7 +68,9 @@ impl std::convert::Into<QueryDocParams> for QueryViewParams {
impl TryInto<QueryViewParams> for QueryViewRequest {
type Error = WorkspaceError;
fn try_into(self) -> Result<QueryViewParams, Self::Error> {
let view_id = ViewId::parse(self.view_id).map_err(|e| WorkspaceError::view_id().context(e))?.0;
let view_id = ViewId::parse(self.view_id)
.map_err(|e| WorkspaceError::view_id().context(e))?
.0;
Ok(QueryViewParams {
view_id,
@ -88,7 +90,9 @@ impl std::convert::TryInto<QueryDocParams> for OpenViewRequest {
type Error = WorkspaceError;
fn try_into(self) -> Result<QueryDocParams, Self::Error> {
let view_id = ViewId::parse(self.view_id).map_err(|e| WorkspaceError::view_id().context(e))?.0;
let view_id = ViewId::parse(self.view_id)
.map_err(|e| WorkspaceError::view_id().context(e))?
.0;
Ok(QueryDocParams { doc_id: view_id })
}
}

View File

@ -28,7 +28,12 @@ macro_rules! static_workspace_error {
}
impl WorkspaceError {
pub fn new(code: ErrorCode, msg: &str) -> Self { Self { code, msg: msg.to_owned() } }
pub fn new(code: ErrorCode, msg: &str) -> Self {
Self {
code,
msg: msg.to_owned(),
}
}
static_workspace_error!(workspace_name, ErrorCode::WorkspaceNameInvalid);
static_workspace_error!(workspace_id, ErrorCode::WorkspaceIdInvalid);
@ -99,6 +104,13 @@ pub enum ErrorCode {
RecordNotFound = 1001,
}
pub fn internal_error<T>(e: T) -> WorkspaceError
where
T: std::fmt::Debug,
{
WorkspaceError::internal().context(e)
}
impl std::default::Default for ErrorCode {
fn default() -> Self { ErrorCode::InternalError }
}

View File

@ -28,14 +28,20 @@ pub(crate) async fn create_app_handler(
}
#[tracing::instrument(skip(data, controller))]
pub(crate) async fn delete_app_handler(data: Data<DeleteAppRequest>, controller: Unit<Arc<AppController>>) -> Result<(), WorkspaceError> {
pub(crate) async fn delete_app_handler(
data: Data<DeleteAppRequest>,
controller: Unit<Arc<AppController>>,
) -> Result<(), WorkspaceError> {
let params: DeleteAppParams = data.into_inner().try_into()?;
let _ = controller.delete_app(&params.app_id).await?;
Ok(())
}
#[tracing::instrument(skip(data, controller))]
pub(crate) async fn update_app_handler(data: Data<UpdateAppRequest>, controller: Unit<Arc<AppController>>) -> Result<(), WorkspaceError> {
pub(crate) async fn update_app_handler(
data: Data<UpdateAppRequest>,
controller: Unit<Arc<AppController>>,
) -> Result<(), WorkspaceError> {
let params: UpdateAppParams = data.into_inner().try_into()?;
let _ = controller.update_app(params).await?;
Ok(())

View File

@ -18,13 +18,17 @@ pub(crate) async fn create_workspace_handler(
}
#[tracing::instrument(skip(controller), err)]
pub(crate) async fn read_cur_workspace_handler(controller: Unit<Arc<WorkspaceController>>) -> DataResult<Workspace, WorkspaceError> {
pub(crate) async fn read_cur_workspace_handler(
controller: Unit<Arc<WorkspaceController>>,
) -> DataResult<Workspace, WorkspaceError> {
let workspace = controller.read_cur_workspace().await?;
data_result(workspace)
}
#[tracing::instrument(skip(controller), err)]
pub(crate) async fn read_workspace_apps_handler(controller: Unit<Arc<WorkspaceController>>) -> DataResult<RepeatedApp, WorkspaceError> {
pub(crate) async fn read_workspace_apps_handler(
controller: Unit<Arc<WorkspaceController>>,
) -> DataResult<RepeatedApp, WorkspaceError> {
let repeated_app = controller.read_workspace_apps().await?;
data_result(repeated_app)
}

View File

@ -26,4 +26,6 @@ impl std::convert::Into<i32> for WorkspaceObservable {
fn into(self) -> i32 { self as i32 }
}
pub(crate) fn notify(id: &str, ty: WorkspaceObservable) -> NotifyBuilder { NotifyBuilder::new(id, ty, OBSERVABLE_CATEGORY) }
pub(crate) fn notify(id: &str, ty: WorkspaceObservable) -> NotifyBuilder {
NotifyBuilder::new(id, ty, OBSERVABLE_CATEGORY)
}

View File

@ -1,4 +1,2 @@
mod model;
pub use model::*;

View File

@ -1,49 +1,49 @@
// Auto-generated, do not edit
// Auto-generated, do not edit
mod view_update;
pub use view_update::*;
mod view_update;
pub use view_update::*;
mod view_delete;
pub use view_delete::*;
mod view_delete;
pub use view_delete::*;
mod app_query;
pub use app_query::*;
mod app_query;
pub use app_query::*;
mod workspace_delete;
pub use workspace_delete::*;
mod workspace_delete;
pub use workspace_delete::*;
mod observable;
pub use observable::*;
mod observable;
pub use observable::*;
mod errors;
pub use errors::*;
mod errors;
pub use errors::*;
mod workspace_update;
pub use workspace_update::*;
mod workspace_update;
pub use workspace_update::*;
mod app_create;
pub use app_create::*;
mod app_create;
pub use app_create::*;
mod workspace_query;
pub use workspace_query::*;
mod workspace_query;
pub use workspace_query::*;
mod event;
pub use event::*;
mod event;
pub use event::*;
mod view_create;
pub use view_create::*;
mod view_create;
pub use view_create::*;
mod workspace_user_detail;
pub use workspace_user_detail::*;
mod workspace_user_detail;
pub use workspace_user_detail::*;
mod workspace_create;
pub use workspace_create::*;
mod workspace_create;
pub use workspace_create::*;
mod app_update;
pub use app_update::*;
mod app_update;
pub use app_update::*;
mod view_query;
pub use view_query::*;
mod view_query;
pub use view_query::*;
mod app_delete;
pub use app_delete::*;
mod app_delete;
pub use app_delete::*;

View File

@ -33,7 +33,11 @@ impl WorkspaceServerAPI for WorkspaceServerMock {
ResultFuture::new(async { Ok(workspace) })
}
fn read_workspace(&self, _token: &str, _params: QueryWorkspaceParams) -> ResultFuture<RepeatedWorkspace, WorkspaceError> {
fn read_workspace(
&self,
_token: &str,
_params: QueryWorkspaceParams,
) -> ResultFuture<RepeatedWorkspace, WorkspaceError> {
ResultFuture::new(async {
let repeated_workspace = RepeatedWorkspace { items: vec![] };
Ok(repeated_workspace)
@ -95,7 +99,11 @@ impl WorkspaceServerAPI for WorkspaceServerMock {
ResultFuture::new(async { Ok(None) })
}
fn update_app(&self, _token: &str, _params: UpdateAppParams) -> ResultFuture<(), WorkspaceError> { ResultFuture::new(async { Ok(()) }) }
fn update_app(&self, _token: &str, _params: UpdateAppParams) -> ResultFuture<(), WorkspaceError> {
ResultFuture::new(async { Ok(()) })
}
fn delete_app(&self, _token: &str, _params: DeleteAppParams) -> ResultFuture<(), WorkspaceError> { ResultFuture::new(async { Ok(()) }) }
fn delete_app(&self, _token: &str, _params: DeleteAppParams) -> ResultFuture<(), WorkspaceError> {
ResultFuture::new(async { Ok(()) })
}
}

View File

@ -9,6 +9,7 @@ use crate::{
use crate::{
entities::view::{DeleteViewParams, QueryViewParams, RepeatedView},
errors::internal_error,
module::WorkspaceUser,
observable::WorkspaceObservable,
};
@ -80,7 +81,7 @@ impl ViewController {
#[tracing::instrument(level = "debug", skip(self), err)]
pub(crate) async fn open_view(&self, params: QueryDocParams) -> Result<Doc, WorkspaceError> {
let edit_context = self.document.open(params, self.database.db_pool()?).await?;
Ok(edit_context.doc())
Ok(edit_context.doc().await.map_err(internal_error)?)
}
pub(crate) async fn delete_view(&self, params: DeleteViewParams) -> Result<(), WorkspaceError> {

View File

@ -123,9 +123,13 @@ impl WorkspaceController {
}
}
pub(crate) async fn read_workspaces(&self, params: QueryWorkspaceParams) -> Result<RepeatedWorkspace, WorkspaceError> {
pub(crate) async fn read_workspaces(
&self,
params: QueryWorkspaceParams,
) -> Result<RepeatedWorkspace, WorkspaceError> {
let user_id = self.user.user_id()?;
let workspaces = self.read_local_workspaces(params.workspace_id.clone(), &user_id, &*self.database.db_connection()?)?;
let workspaces =
self.read_local_workspaces(params.workspace_id.clone(), &user_id, &*self.database.db_connection()?)?;
let _ = self.read_workspaces_on_server(user_id.clone(), params.clone());
Ok(workspaces)
}
@ -170,7 +174,12 @@ impl WorkspaceController {
Ok(RepeatedWorkspace { items: workspaces })
}
fn read_local_workspace(&self, workspace_id: String, user_id: &str, conn: &SqliteConnection) -> Result<Workspace, WorkspaceError> {
fn read_local_workspace(
&self,
workspace_id: String,
user_id: &str,
conn: &SqliteConnection,
) -> Result<Workspace, WorkspaceError> {
// Opti: fetch single workspace from local db
let mut repeated_workspace = self.read_local_workspaces(Some(workspace_id.clone()), user_id, conn)?;
if repeated_workspace.is_empty() {
@ -280,7 +289,9 @@ impl WorkspaceController {
Ok(())
})?;
notify(&token, WorkspaceObservable::WorkspaceListUpdated).payload(workspaces).send();
notify(&token, WorkspaceObservable::WorkspaceListUpdated)
.payload(workspaces)
.send();
Result::<(), WorkspaceError>::Ok(())
});
@ -294,7 +305,9 @@ fn set_current_workspace(workspace: &str) { KV::set_str(CURRENT_WORKSPACE_ID, wo
fn get_current_workspace() -> Result<String, WorkspaceError> {
match KV::get_str(CURRENT_WORKSPACE_ID) {
None => Err(WorkspaceError::not_found().context("Current workspace not found or should call open workspace first")),
None => {
Err(WorkspaceError::not_found().context("Current workspace not found or should call open workspace first"))
},
Some(workspace_id) => Ok(workspace_id),
}
}

View File

@ -22,12 +22,21 @@ impl AppTableSql {
Ok(())
}
pub(crate) fn update_app(&self, changeset: AppTableChangeset, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
pub(crate) fn update_app(
&self,
changeset: AppTableChangeset,
conn: &SqliteConnection,
) -> Result<(), WorkspaceError> {
diesel_update_table!(app_table, changeset, conn);
Ok(())
}
pub(crate) fn read_app(&self, app_id: &str, is_trash: Option<bool>, conn: &SqliteConnection) -> Result<AppTable, WorkspaceError> {
pub(crate) fn read_app(
&self,
app_id: &str,
is_trash: Option<bool>,
conn: &SqliteConnection,
) -> Result<AppTable, WorkspaceError> {
let mut filter = dsl::app_table.filter(app_table::id.eq(app_id)).into_boxed();
if let Some(is_trash) = is_trash {
@ -38,7 +47,12 @@ impl AppTableSql {
Ok(app_table)
}
pub(crate) fn read_apps(&self, workspace_id: &str, is_trash: bool, conn: &SqliteConnection) -> Result<Vec<AppTable>, WorkspaceError> {
pub(crate) fn read_apps(
&self,
workspace_id: &str,
is_trash: bool,
conn: &SqliteConnection,
) -> Result<Vec<AppTable>, WorkspaceError> {
let app_table = dsl::app_table
.filter(app_table::workspace_id.eq(workspace_id))
.filter(app_table::is_trash.eq(is_trash))
@ -48,7 +62,9 @@ impl AppTableSql {
}
pub(crate) fn delete_app(&self, app_id: &str, conn: &SqliteConnection) -> Result<AppTable, WorkspaceError> {
let app_table = dsl::app_table.filter(app_table::id.eq(app_id)).first::<AppTable>(conn)?;
let app_table = dsl::app_table
.filter(app_table::id.eq(app_id))
.first::<AppTable>(conn)?;
diesel_delete_table!(app_table, app_id, conn);
Ok(app_table)
}

View File

@ -67,7 +67,9 @@ impl std::convert::TryInto<Vec<u8>> for &ColorStyleCol {
impl std::convert::TryFrom<&[u8]> for ColorStyleCol {
type Error = String;
fn try_from(value: &[u8]) -> Result<Self, Self::Error> { bincode::deserialize(value).map_err(|e| format!("{:?}", e)) }
fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
bincode::deserialize(value).map_err(|e| format!("{:?}", e))
}
}
impl_sql_binary_expression!(ColorStyleCol);

View File

@ -22,7 +22,12 @@ impl ViewTableSql {
Ok(())
}
pub(crate) fn read_view(&self, view_id: &str, is_trash: Option<bool>, conn: &SqliteConnection) -> Result<ViewTable, WorkspaceError> {
pub(crate) fn read_view(
&self,
view_id: &str,
is_trash: Option<bool>,
conn: &SqliteConnection,
) -> Result<ViewTable, WorkspaceError> {
// https://docs.diesel.rs/diesel/query_builder/struct.UpdateStatement.html
let mut filter = dsl::view_table.filter(view_table::id.eq(view_id)).into_boxed();
if let Some(is_trash) = is_trash {
@ -32,7 +37,11 @@ impl ViewTableSql {
Ok(view_table)
}
pub(crate) fn read_views_belong_to(&self, belong_to_id: &str, conn: &SqliteConnection) -> Result<Vec<ViewTable>, WorkspaceError> {
pub(crate) fn read_views_belong_to(
&self,
belong_to_id: &str,
conn: &SqliteConnection,
) -> Result<Vec<ViewTable>, WorkspaceError> {
let view_tables = dsl::view_table
.filter(view_table::belong_to_id.eq(belong_to_id))
.load::<ViewTable>(conn)?;
@ -40,13 +49,19 @@ impl ViewTableSql {
Ok(view_tables)
}
pub(crate) fn update_view(&self, changeset: ViewTableChangeset, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
pub(crate) fn update_view(
&self,
changeset: ViewTableChangeset,
conn: &SqliteConnection,
) -> Result<(), WorkspaceError> {
diesel_update_table!(view_table, changeset, conn);
Ok(())
}
pub(crate) fn delete_view(&self, view_id: &str, conn: &SqliteConnection) -> Result<ViewTable, WorkspaceError> {
let view_table = dsl::view_table.filter(view_table::id.eq(view_id)).first::<ViewTable>(conn)?;
let view_table = dsl::view_table
.filter(view_table::id.eq(view_id))
.first::<ViewTable>(conn)?;
diesel_delete_table!(view_table, view_id, conn);
Ok(view_table)
}

View File

@ -14,7 +14,11 @@ use flowy_database::{
pub(crate) struct WorkspaceTableSql {}
impl WorkspaceTableSql {
pub(crate) fn create_workspace(&self, table: WorkspaceTable, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
pub(crate) fn create_workspace(
&self,
table: WorkspaceTable,
conn: &SqliteConnection,
) -> Result<(), WorkspaceError> {
match diesel_record_count!(workspace_table, &table.id, conn) {
0 => diesel_insert_table!(workspace_table, &table, conn),
_ => {
@ -45,7 +49,11 @@ impl WorkspaceTableSql {
}
#[allow(dead_code)]
pub(crate) fn update_workspace(&self, changeset: WorkspaceTableChangeset, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
pub(crate) fn update_workspace(
&self,
changeset: WorkspaceTableChangeset,
conn: &SqliteConnection,
) -> Result<(), WorkspaceError> {
diesel_update_table!(workspace_table, changeset, conn);
Ok(())
}

View File

@ -28,7 +28,12 @@ macro_rules! static_user_error {
impl WsError {
#[allow(dead_code)]
pub(crate) fn new(code: ErrorCode) -> WsError { WsError { code, msg: "".to_string() } }
pub(crate) fn new(code: ErrorCode) -> WsError {
WsError {
code,
msg: "".to_string(),
}
}
pub fn context<T: Debug>(mut self, error: T) -> Self {
self.msg = format!("{:?}", error);

View File

@ -1,4 +1,2 @@
mod model;
pub use model::*;

View File

@ -1,7 +1,7 @@
// Auto-generated, do not edit
// Auto-generated, do not edit
mod errors;
pub use errors::*;
mod errors;
pub use errors::*;
mod msg;
pub use msg::*;
mod msg;
pub use msg::*;

View File

@ -6,23 +6,19 @@ use crate::{
};
use bytes::Bytes;
use dashmap::DashMap;
use flowy_net::errors::{internal_error, ServerError};
use flowy_net::errors::ServerError;
use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures_core::{future::BoxFuture, ready, Stream};
use futures_core::{ready, Stream};
use parking_lot::RwLock;
use pin_project::pin_project;
use std::{
collections::HashMap,
convert::TryFrom,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::{
sync::{broadcast, oneshot},
task::JoinHandle,
};
use tokio::sync::{broadcast, oneshot};
use tokio_tungstenite::tungstenite::{
protocol::{frame::coding::CloseCode, CloseFrame},
Message,
@ -37,22 +33,6 @@ pub trait WsMessageHandler: Sync + Send + 'static {
fn receive_message(&self, msg: WsMessage);
}
type NotifyCallback = Arc<dyn Fn(&WsState) + Send + Sync + 'static>;
struct WsStateNotify {
#[allow(dead_code)]
state: WsState,
callback: Option<NotifyCallback>,
}
impl WsStateNotify {
fn update_state(&mut self, state: WsState) {
if let Some(f) = &self.callback {
f(&state);
}
self.state = state;
}
}
#[derive(Clone)]
pub enum WsState {
Init,
@ -114,13 +94,13 @@ impl WsController {
tokio::spawn(async move {
match connection.await {
Ok(stream) => {
state_notify.send(WsState::Connected(sender));
ret.send(Ok(()));
let _ = state_notify.send(WsState::Connected(sender));
let _ = ret.send(Ok(()));
spawn_steam_and_handlers(stream, handlers, state_notify).await;
},
Err(e) => {
state_notify.send(WsState::Disconnected(e.clone()));
ret.send(Err(ServerError::internal().context(e)));
let _ = state_notify.send(WsState::Disconnected(e.clone()));
let _ = ret.send(Err(ServerError::internal().context(e)));
},
}
});
@ -160,7 +140,7 @@ async fn spawn_steam_and_handlers(
Err(e) => {
// TODO: retry?
log::error!("ws stream error {:?}", e);
state_notify.send(WsState::Disconnected(e));
let _ = state_notify.send(WsState::Disconnected(e));
}
}
},