diff --git a/.gitignore b/.gitignore index 1ebe369c09..411e6555ef 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,5 @@ Cargo.lock **/*.rs.bk **/target/ **/*.db -.idea/ \ No newline at end of file +.idea/ +/flowy-test/ \ No newline at end of file diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pb.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pb.dart index e05aa9a2bb..d7b8b05d91 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pb.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pb.dart @@ -18,7 +18,7 @@ class Revision extends $pb.GeneratedMessage { static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'Revision', createEmptyInstance: create) ..aInt64(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'baseRevId') ..aInt64(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'revId') - ..a<$core.List<$core.int>>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'delta', $pb.PbFieldType.OY) + ..a<$core.List<$core.int>>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'deltaData', $pb.PbFieldType.OY) ..aOS(4, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'md5') ..aOS(5, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId') ..e(6, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: RevType.Local, valueOf: RevType.valueOf, enumValues: RevType.values) @@ -29,7 +29,7 @@ class Revision extends $pb.GeneratedMessage { factory Revision({ $fixnum.Int64? baseRevId, $fixnum.Int64? revId, - $core.List<$core.int>? delta, + $core.List<$core.int>? deltaData, $core.String? md5, $core.String? docId, RevType? ty, @@ -41,8 +41,8 @@ class Revision extends $pb.GeneratedMessage { if (revId != null) { _result.revId = revId; } - if (delta != null) { - _result.delta = delta; + if (deltaData != null) { + _result.deltaData = deltaData; } if (md5 != null) { _result.md5 = md5; @@ -95,13 +95,13 @@ class Revision extends $pb.GeneratedMessage { void clearRevId() => clearField(2); @$pb.TagNumber(3) - $core.List<$core.int> get delta => $_getN(2); + $core.List<$core.int> get deltaData => $_getN(2); @$pb.TagNumber(3) - set delta($core.List<$core.int> v) { $_setBytes(2, v); } + set deltaData($core.List<$core.int> v) { $_setBytes(2, v); } @$pb.TagNumber(3) - $core.bool hasDelta() => $_has(2); + $core.bool hasDeltaData() => $_has(2); @$pb.TagNumber(3) - void clearDelta() => clearField(3); + void clearDeltaData() => clearField(3); @$pb.TagNumber(4) $core.String get md5 => $_getSZ(3); diff --git a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbjson.dart b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbjson.dart index f46de3e8ac..9aef530c26 100644 --- a/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbjson.dart +++ b/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbjson.dart @@ -25,7 +25,7 @@ const Revision$json = const { '2': const [ const {'1': 'base_rev_id', '3': 1, '4': 1, '5': 3, '10': 'baseRevId'}, const {'1': 'rev_id', '3': 2, '4': 1, '5': 3, '10': 'revId'}, - const {'1': 'delta', '3': 3, '4': 1, '5': 12, '10': 'delta'}, + const {'1': 'delta_data', '3': 3, '4': 1, '5': 12, '10': 'deltaData'}, const {'1': 'md5', '3': 4, '4': 1, '5': 9, '10': 'md5'}, const {'1': 'doc_id', '3': 5, '4': 1, '5': 9, '10': 'docId'}, const {'1': 'ty', '3': 6, '4': 1, '5': 14, '6': '.RevType', '10': 'ty'}, @@ -33,7 +33,7 @@ const Revision$json = const { }; /// Descriptor for `Revision`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List revisionDescriptor = $convert.base64Decode('CghSZXZpc2lvbhIeCgtiYXNlX3Jldl9pZBgBIAEoA1IJYmFzZVJldklkEhUKBnJldl9pZBgCIAEoA1IFcmV2SWQSFAoFZGVsdGEYAyABKAxSBWRlbHRhEhAKA21kNRgEIAEoCVIDbWQ1EhUKBmRvY19pZBgFIAEoCVIFZG9jSWQSGAoCdHkYBiABKA4yCC5SZXZUeXBlUgJ0eQ=='); +final $typed_data.Uint8List revisionDescriptor = $convert.base64Decode('CghSZXZpc2lvbhIeCgtiYXNlX3Jldl9pZBgBIAEoA1IJYmFzZVJldklkEhUKBnJldl9pZBgCIAEoA1IFcmV2SWQSHQoKZGVsdGFfZGF0YRgDIAEoDFIJZGVsdGFEYXRhEhAKA21kNRgEIAEoCVIDbWQ1EhUKBmRvY19pZBgFIAEoCVIFZG9jSWQSGAoCdHkYBiABKA4yCC5SZXZUeXBlUgJ0eQ=='); @$core.Deprecated('Use revisionRangeDescriptor instead') const RevisionRange$json = const { '1': 'RevisionRange', diff --git a/backend/Cargo.toml b/backend/Cargo.toml index cef6772c94..df4745fe74 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -58,6 +58,7 @@ md5 = "0.7.0" futures-core = { version = "0.3", default-features = false } pin-project = "1.0.0" byteorder = {version = "1.3.4"} +async-stream = "0.3.2" flowy-user = { path = "../rust-lib/flowy-user" } flowy-workspace = { path = "../rust-lib/flowy-workspace" } @@ -93,10 +94,12 @@ path = "src/main.rs" parking_lot = "0.11" once_cell = "1.7.2" linkify = "0.5.0" -flowy-user = { path = "../rust-lib/flowy-user" } -flowy-workspace = { path = "../rust-lib/flowy-workspace" } +flowy-user = { path = "../rust-lib/flowy-user", features = ["http_server"] } +flowy-workspace = { path = "../rust-lib/flowy-workspace", features = ["http_server"] } flowy-ws = { path = "../rust-lib/flowy-ws" } flowy-sdk = { path = "../rust-lib/flowy-sdk" } flowy-test = { path = "../rust-lib/flowy-test" } flowy-infra = { path = "../rust-lib/flowy-infra" } -flowy-ot = { path = "../rust-lib/flowy-ot" } \ No newline at end of file +flowy-ot = { path = "../rust-lib/flowy-ot" } +flowy-document = { path = "../rust-lib/flowy-document", features = ["flowy_test", "http_server"] } +flowy-sqlite = { path = "../rust-lib/flowy-sqlite" } \ No newline at end of file diff --git a/backend/src/application.rs b/backend/src/application.rs index 991040ad01..512fa0c885 100644 --- a/backend/src/application.rs +++ b/backend/src/application.rs @@ -16,7 +16,6 @@ use crate::{ service::{ app::router as app, doc::router as doc, - make_ws_biz_handlers, user::router as user, view::router as view, workspace::router as workspace, @@ -31,11 +30,10 @@ pub struct Application { } impl Application { - pub async fn build(configuration: Settings) -> Result { + pub async fn build(configuration: Settings, app_ctx: AppContext) -> Result { let address = format!("{}:{}", configuration.application.host, configuration.application.port); let listener = TcpListener::bind(&address)?; let port = listener.local_addr().unwrap().port(); - let app_ctx = init_app_context(&configuration).await; let server = run(listener, app_ctx)?; Ok(Self { port, server }) } @@ -46,13 +44,9 @@ impl Application { } pub fn run(listener: TcpListener, app_ctx: AppContext) -> Result { - let AppContext { ws_server, pg_pool } = app_ctx; - let ws_server = Data::new(ws_server); - let pg_pool = Data::new(pg_pool); let domain = domain(); let secret: String = secret(); - let ws_biz_handlers = Data::new(make_ws_biz_handlers(pg_pool.clone())); - actix_rt::spawn(period_check(pg_pool.clone())); + actix_rt::spawn(period_check(app_ctx.pg_pool.clone())); let server = HttpServer::new(move || { App::new() @@ -63,9 +57,11 @@ pub fn run(listener: TcpListener, app_ctx: AppContext) -> Result Scope { ) } -async fn init_app_context(configuration: &Settings) -> AppContext { - let _ = crate::service::log::Builder::new("flowy").env_filter("Trace").build(); +pub async fn init_app_context(configuration: &Settings) -> AppContext { + let _ = crate::service::log::Builder::new("flowy-server") + .env_filter("Trace") + .build(); let pg_pool = get_connection_pool(&configuration.database).await.expect(&format!( "Failed to connect to Postgres at {:?}.", configuration.database diff --git a/backend/src/context.rs b/backend/src/context.rs index 34ed0351a3..43c199053e 100644 --- a/backend/src/context.rs +++ b/backend/src/context.rs @@ -1,18 +1,48 @@ -use crate::service::ws::WsServer; +use crate::service::{ + doc::doc::DocBiz, + ws::{WsBizHandlers, WsServer}, +}; use actix::Addr; - +use actix_web::web::Data; +use flowy_ws::WsModule; use sqlx::PgPool; +use std::{io, sync::Arc}; +pub type FlowyRuntime = tokio::runtime::Runtime; + +#[derive(Clone)] pub struct AppContext { - pub ws_server: Addr, - pub pg_pool: PgPool, + pub ws_server: Data>, + pub pg_pool: Data, + pub ws_bizs: Data, + pub doc_biz: Data>, + pub runtime: Data, } impl AppContext { pub fn new(ws_server: Addr, db_pool: PgPool) -> Self { + let ws_server = Data::new(ws_server); + let pg_pool = Data::new(db_pool); + let runtime = Data::new(runtime().unwrap()); + + let mut ws_bizs = WsBizHandlers::new(); + let doc_biz = Arc::new(DocBiz::new(pg_pool.clone())); + ws_bizs.register(WsModule::Doc, doc_biz.clone()); + AppContext { ws_server, - pg_pool: db_pool, + pg_pool, + ws_bizs: Data::new(ws_bizs), + doc_biz: Data::new(doc_biz), + runtime, } } } + +fn runtime() -> io::Result { + tokio::runtime::Builder::new_multi_thread() + .thread_name("flowy-server-rt") + .enable_io() + .enable_time() + .build() +} diff --git a/backend/src/lib.rs b/backend/src/lib.rs index 866d87f0eb..f49c851ce3 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -1,7 +1,7 @@ pub mod application; pub mod config; -mod context; +pub mod context; mod entities; mod middleware; -mod service; +pub mod service; mod sqlx_ext; diff --git a/backend/src/main.rs b/backend/src/main.rs index 2a2e4e2396..06cd255713 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -1,9 +1,13 @@ -use backend::{application::Application, config::get_configuration}; +use backend::{ + application::{init_app_context, Application}, + config::get_configuration, +}; #[actix_web::main] async fn main() -> std::io::Result<()> { let configuration = get_configuration().expect("Failed to read configuration."); - let application = Application::build(configuration).await?; + let app_ctx = init_app_context(&configuration).await; + let application = Application::build(configuration, app_ctx).await?; application.run_until_stopped().await?; Ok(()) diff --git a/backend/src/service/doc/crud.rs b/backend/src/service/doc/crud.rs new file mode 100644 index 0000000000..56e92a9d8d --- /dev/null +++ b/backend/src/service/doc/crud.rs @@ -0,0 +1,121 @@ +use crate::{ + entities::doc::{DocTable, DOC_TABLE}, + sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder}, +}; +use anyhow::Context; +use flowy_document::protobuf::{CreateDocParams, Doc, QueryDocParams, UpdateDocParams}; +use flowy_net::errors::ServerError; +use sqlx::{postgres::PgArguments, PgPool, Postgres}; +use uuid::Uuid; + +#[tracing::instrument(level = "debug", skip(transaction), err)] +pub(crate) async fn create_doc( + transaction: &mut DBTransaction<'_>, + params: CreateDocParams, +) -> Result<(), ServerError> { + let uuid = Uuid::parse_str(¶ms.id)?; + let (sql, args) = NewDocSqlBuilder::new(uuid).data(params.data).build()?; + let _ = sqlx::query_with(&sql, args) + .execute(transaction) + .await + .map_err(map_sqlx_error)?; + + Ok(()) +} + +#[tracing::instrument(level = "debug", skip(pool), err)] +pub(crate) async fn read_doc(pool: &PgPool, params: QueryDocParams) -> Result { + let doc_id = Uuid::parse_str(¶ms.doc_id)?; + let mut transaction = pool + .begin() + .await + .context("Failed to acquire a Postgres connection to read doc")?; + + let builder = SqlBuilder::select(DOC_TABLE).add_field("*").and_where_eq("id", &doc_id); + + let (sql, args) = builder.build()?; + // TODO: benchmark the speed of different documents with different size + let doc: Doc = sqlx::query_as_with::(&sql, args) + .fetch_one(&mut transaction) + .await + .map_err(map_sqlx_error)? + .into(); + + transaction + .commit() + .await + .context("Failed to commit SQL transaction to read doc.")?; + + Ok(doc) +} + +#[tracing::instrument(level = "debug", skip(pool, params), err)] +pub(crate) async fn update_doc(pool: &PgPool, mut params: UpdateDocParams) -> Result<(), ServerError> { + let doc_id = Uuid::parse_str(¶ms.doc_id)?; + let mut transaction = pool + .begin() + .await + .context("Failed to acquire a Postgres connection to update doc")?; + + let data = Some(params.take_data()); + + let (sql, args) = SqlBuilder::update(DOC_TABLE) + .add_some_arg("data", data) + .add_arg("rev_id", params.rev_id) + .and_where_eq("id", doc_id) + .build()?; + + sqlx::query_with(&sql, args) + .execute(&mut transaction) + .await + .map_err(map_sqlx_error)?; + + transaction + .commit() + .await + .context("Failed to commit SQL transaction to update doc.")?; + + Ok(()) +} + +#[tracing::instrument(level = "debug", skip(transaction), err)] +pub(crate) async fn delete_doc(transaction: &mut DBTransaction<'_>, doc_id: Uuid) -> Result<(), ServerError> { + let (sql, args) = SqlBuilder::delete(DOC_TABLE).and_where_eq("id", doc_id).build()?; + + let _ = sqlx::query_with(&sql, args) + .execute(transaction) + .await + .map_err(map_sqlx_error)?; + + Ok(()) +} + +pub struct NewDocSqlBuilder { + table: DocTable, +} + +impl NewDocSqlBuilder { + pub fn new(id: Uuid) -> Self { + let table = DocTable { + id, + data: "".to_owned(), + rev_id: 0, + }; + Self { table } + } + + pub fn data(mut self, data: String) -> Self { + self.table.data = data; + self + } + + pub fn build(self) -> Result<(String, PgArguments), ServerError> { + let (sql, args) = SqlBuilder::create(DOC_TABLE) + .add_arg("id", self.table.id) + .add_arg("data", self.table.data) + .add_arg("rev_id", self.table.rev_id) + .build()?; + + Ok((sql, args)) + } +} diff --git a/backend/src/service/doc/doc.rs b/backend/src/service/doc/doc.rs index 7e3a2fc4db..b59e2fc617 100644 --- a/backend/src/service/doc/doc.rs +++ b/backend/src/service/doc/doc.rs @@ -1,92 +1,89 @@ -use super::sql_builder::*; -use crate::{ - entities::doc::{DocTable, DOC_TABLE}, - sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder}, +use super::edit_doc::EditDocContext; +use crate::service::{ + doc::{ + read_doc, + ws_actor::{DocWsMsg, DocWsMsgActor}, + }, + ws::{WsBizHandler, WsClientData}, }; -use anyhow::Context; -use flowy_document::protobuf::{CreateDocParams, Doc, QueryDocParams, UpdateDocParams}; +use actix_web::web::Data; +use dashmap::DashMap; +use flowy_document::protobuf::QueryDocParams; use flowy_net::errors::ServerError; -use sqlx::{postgres::PgArguments, PgPool, Postgres}; -use uuid::Uuid; -#[tracing::instrument(level = "debug", skip(transaction), err)] -pub(crate) async fn create_doc( - transaction: &mut DBTransaction<'_>, - params: CreateDocParams, -) -> Result<(), ServerError> { - let uuid = Uuid::parse_str(¶ms.id)?; - let (sql, args) = NewDocSqlBuilder::new(uuid).data(params.data).build()?; - let _ = sqlx::query_with(&sql, args) - .execute(transaction) - .await - .map_err(map_sqlx_error)?; +use protobuf::Message; +use sqlx::PgPool; +use std::{sync::Arc, time::Duration}; +use tokio::sync::{mpsc, mpsc::error::SendError, oneshot}; - Ok(()) +pub struct DocBiz { + pub manager: Arc, + sender: mpsc::Sender, + pg_pool: Data, } -#[tracing::instrument(level = "debug", skip(pool), err)] -pub(crate) async fn read_doc(pool: &PgPool, params: QueryDocParams) -> Result { - let doc_id = Uuid::parse_str(¶ms.doc_id)?; - let mut transaction = pool - .begin() - .await - .context("Failed to acquire a Postgres connection to read doc")?; - - let builder = SqlBuilder::select(DOC_TABLE).add_field("*").and_where_eq("id", &doc_id); - - let (sql, args) = builder.build()?; - // TODO: benchmark the speed of different documents with different size - let doc: Doc = sqlx::query_as_with::(&sql, args) - .fetch_one(&mut transaction) - .await - .map_err(map_sqlx_error)? - .into(); - - transaction - .commit() - .await - .context("Failed to commit SQL transaction to read doc.")?; - - Ok(doc) +impl DocBiz { + pub fn new(pg_pool: Data) -> Self { + let manager = Arc::new(DocManager::new()); + let (tx, rx) = mpsc::channel(100); + let actor = DocWsMsgActor::new(rx, manager.clone()); + tokio::task::spawn(actor.run()); + Self { + manager, + sender: tx, + pg_pool, + } + } } -#[tracing::instrument(level = "debug", skip(pool, params), err)] -pub(crate) async fn update_doc(pool: &PgPool, mut params: UpdateDocParams) -> Result<(), ServerError> { - let doc_id = Uuid::parse_str(¶ms.doc_id)?; - let mut transaction = pool - .begin() - .await - .context("Failed to acquire a Postgres connection to update doc")?; +impl WsBizHandler for DocBiz { + fn receive_data(&self, client_data: WsClientData) { + let (ret, rx) = oneshot::channel(); + let sender = self.sender.clone(); + let pool = self.pg_pool.clone(); - let data = Some(params.take_data()); - - let (sql, args) = SqlBuilder::update(DOC_TABLE) - .add_some_arg("data", data) - .add_arg("rev_id", params.rev_id) - .and_where_eq("id", doc_id) - .build()?; - - sqlx::query_with(&sql, args) - .execute(&mut transaction) - .await - .map_err(map_sqlx_error)?; - - transaction - .commit() - .await - .context("Failed to commit SQL transaction to update doc.")?; - - Ok(()) + actix_rt::spawn(async move { + let msg = DocWsMsg::ClientData { + data: client_data, + ret, + pool, + }; + match sender.send(msg).await { + Ok(_) => {}, + Err(e) => log::error!("{}", e), + } + match rx.await { + Ok(_) => {}, + Err(e) => log::error!("{:?}", e), + }; + }); + } } -#[tracing::instrument(level = "debug", skip(transaction), err)] -pub(crate) async fn delete_doc(transaction: &mut DBTransaction<'_>, doc_id: Uuid) -> Result<(), ServerError> { - let (sql, args) = SqlBuilder::delete(DOC_TABLE).and_where_eq("id", doc_id).build()?; - - let _ = sqlx::query_with(&sql, args) - .execute(transaction) - .await - .map_err(map_sqlx_error)?; - - Ok(()) +pub struct DocManager { + docs_map: DashMap>, +} + +impl DocManager { + pub fn new() -> Self { + Self { + docs_map: DashMap::new(), + } + } + + pub async fn get(&self, doc_id: &str, pg_pool: Data) -> Result>, ServerError> { + match self.docs_map.get(doc_id) { + None => { + let params = QueryDocParams { + doc_id: doc_id.to_string(), + ..Default::default() + }; + let doc = read_doc(pg_pool.get_ref(), params).await?; + let edit_doc = Arc::new(EditDocContext::new(doc)?); + self.docs_map.insert(doc_id.to_string(), edit_doc.clone()); + Ok(Some(edit_doc)) + }, + Some(ctx) => Ok(Some(ctx.clone())), + } + } } diff --git a/backend/src/service/doc/edit_actor.rs b/backend/src/service/doc/edit_actor.rs new file mode 100644 index 0000000000..e69de29bb2 diff --git a/backend/src/service/doc/edit_doc_context.rs b/backend/src/service/doc/edit_doc.rs similarity index 90% rename from backend/src/service/doc/edit_doc_context.rs rename to backend/src/service/doc/edit_doc.rs index e97faaf25c..b3678a6bec 100644 --- a/backend/src/service/doc/edit_doc_context.rs +++ b/backend/src/service/doc/edit_doc.rs @@ -1,9 +1,8 @@ use crate::service::{ - doc::update_doc, util::md5, ws::{entities::Socket, WsClientData, WsMessageAdaptor, WsUser}, }; -use actix_web::web::Data; + use byteorder::{BigEndian, WriteBytesExt}; use bytes::Bytes; use dashmap::DashMap; @@ -20,7 +19,7 @@ use flowy_ot::{ use flowy_ws::WsMessage; use parking_lot::RwLock; use protobuf::Message; -use sqlx::PgPool; + use std::{ convert::TryInto, sync::{ @@ -35,16 +34,15 @@ struct EditUser { socket: Socket, } -pub(crate) struct EditDocContext { +pub struct EditDocContext { doc_id: String, rev_id: AtomicI64, document: Arc>, - pg_pool: Data, users: DashMap, } impl EditDocContext { - pub(crate) fn new(doc: Doc, pg_pool: Data) -> Result { + pub fn new(doc: Doc) -> Result { let delta = Delta::from_bytes(&doc.data).map_err(internal_error)?; let document = Arc::new(RwLock::new(Document::from_delta(delta))); let users = DashMap::new(); @@ -52,17 +50,14 @@ impl EditDocContext { doc_id: doc.id.clone(), rev_id: AtomicI64::new(doc.rev_id), document, - pg_pool, users, }) } + pub fn doc_json(&self) -> String { self.document.read().to_json() } + #[tracing::instrument(level = "debug", skip(self, client_data, revision))] - pub(crate) async fn apply_revision( - &self, - client_data: WsClientData, - revision: Revision, - ) -> Result<(), ServerError> { + pub async fn apply_revision(&self, client_data: WsClientData, revision: Revision) -> Result<(), ServerError> { let _ = self.verify_md5(&revision)?; // Opti: find out another way to keep the user socket available. let user = EditUser { @@ -85,7 +80,7 @@ impl EditDocContext { // send the prime delta to the client. Client should compose the this prime // delta. - let (cli_prime, server_prime) = self.transform(&revision.delta).map_err(internal_error)?; + let (cli_prime, server_prime) = self.transform(&revision.delta_data).map_err(internal_error)?; let _ = self.update_document_delta(server_prime)?; log::debug!("{} client delta: {}", self.doc_id, cli_prime.to_json()); @@ -101,7 +96,7 @@ impl EditDocContext { .do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, revision.rev_id)) .map_err(internal_error)?; } else { - let delta = Delta::from_bytes(&revision.delta).map_err(internal_error)?; + let delta = Delta::from_bytes(&revision.delta_data).map_err(internal_error)?; let _ = self.update_document_delta(delta)?; cli_socket .do_send(mk_acked_ws_message(&revision)) @@ -118,12 +113,12 @@ impl EditDocContext { } fn mk_revision(&self, base_rev_id: i64, delta: Delta) -> Revision { - let delta_data = delta.into_bytes(); + let delta_data = delta.to_bytes().to_vec(); let md5 = md5(&delta_data); let revision = Revision { base_rev_id, rev_id: self.rev_id.load(SeqCst), - delta: delta_data, + delta_data, md5, doc_id: self.doc_id.to_string(), ty: RevType::Remote, @@ -160,7 +155,7 @@ impl EditDocContext { } fn verify_md5(&self, revision: &Revision) -> Result<(), ServerError> { - if md5(&revision.delta) != revision.md5 { + if md5(&revision.delta_data) != revision.md5 { return Err(ServerError::internal().context("Delta md5 not match")); } Ok(()) @@ -173,7 +168,7 @@ impl EditDocContext { params.set_doc_id(self.doc_id.clone()); params.set_data(self.document.read().to_json()); params.set_rev_id(revision.rev_id); - let _ = update_doc(self.pg_pool.get_ref(), params).await?; + // let _ = update_doc(self.pg_pool.get_ref(), params).await?; Ok(()) } } diff --git a/backend/src/service/doc/mod.rs b/backend/src/service/doc/mod.rs index 675271725d..669c84cdea 100644 --- a/backend/src/service/doc/mod.rs +++ b/backend/src/service/doc/mod.rs @@ -1,8 +1,8 @@ -mod doc; -mod edit_doc_context; +pub mod crud; +pub mod doc; +pub mod edit_doc; pub mod router; -mod sql_builder; -pub mod ws_handler; +mod ws_actor; -pub(crate) use doc::*; +pub(crate) use crud::*; pub use router::*; diff --git a/backend/src/service/doc/sql_builder.rs b/backend/src/service/doc/sql_builder.rs deleted file mode 100644 index 4864f81efe..0000000000 --- a/backend/src/service/doc/sql_builder.rs +++ /dev/null @@ -1,39 +0,0 @@ -use crate::{ - entities::doc::{DocTable, DOC_TABLE}, - sqlx_ext::SqlBuilder, -}; - -use flowy_net::errors::ServerError; - -use sqlx::postgres::PgArguments; -use uuid::Uuid; - -pub struct NewDocSqlBuilder { - table: DocTable, -} - -impl NewDocSqlBuilder { - pub fn new(id: Uuid) -> Self { - let table = DocTable { - id, - data: "".to_owned(), - rev_id: 0, - }; - Self { table } - } - - pub fn data(mut self, data: String) -> Self { - self.table.data = data; - self - } - - pub fn build(self) -> Result<(String, PgArguments), ServerError> { - let (sql, args) = SqlBuilder::create(DOC_TABLE) - .add_arg("id", self.table.id) - .add_arg("data", self.table.data) - .add_arg("rev_id", self.table.rev_id) - .build()?; - - Ok((sql, args)) - } -} diff --git a/backend/src/service/doc/ws_actor.rs b/backend/src/service/doc/ws_actor.rs new file mode 100644 index 0000000000..ade952c383 --- /dev/null +++ b/backend/src/service/doc/ws_actor.rs @@ -0,0 +1,101 @@ +use crate::service::{doc::doc::DocManager, util::parse_from_bytes, ws::WsClientData}; +use actix_rt::task::spawn_blocking; +use actix_web::web::Data; +use async_stream::stream; +use flowy_document::protobuf::{Revision, WsDataType, WsDocumentData}; +use flowy_net::errors::{internal_error, Result as DocResult}; +use futures::stream::StreamExt; +use sqlx::PgPool; +use std::sync::Arc; +use tokio::sync::{mpsc, oneshot}; + +pub enum DocWsMsg { + ClientData { + data: WsClientData, + pool: Data, + ret: oneshot::Sender>, + }, +} + +pub struct DocWsMsgActor { + receiver: Option>, + doc_manager: Arc, +} + +impl DocWsMsgActor { + pub fn new(receiver: mpsc::Receiver, manager: Arc) -> Self { + Self { + receiver: Some(receiver), + doc_manager: manager, + } + } + + pub async fn run(mut self) { + let mut receiver = self + .receiver + .take() + .expect("DocActor's receiver should only take one time"); + + let stream = stream! { + loop { + match receiver.recv().await { + Some(msg) => yield msg, + None => break, + } + } + }; + + stream.for_each(|msg| self.handle_message(msg)).await; + } + + async fn handle_message(&self, msg: DocWsMsg) { + match msg { + DocWsMsg::ClientData { data, pool, ret } => { + ret.send(self.handle_client_data(data, pool).await); + }, + } + } + + async fn handle_client_data(&self, data: WsClientData, pool: Data) -> DocResult<()> { + let bytes = data.data.clone(); + let document_data = spawn_blocking(move || { + let document_data: WsDocumentData = parse_from_bytes(&bytes)?; + DocResult::Ok(document_data) + }) + .await + .map_err(internal_error)??; + + match document_data.ty { + WsDataType::Acked => {}, + WsDataType::PushRev => self.handle_push_rev(data, document_data.data, pool).await?, + WsDataType::PullRev => {}, + WsDataType::Conflict => {}, + } + Ok(()) + } + + async fn handle_push_rev( + &self, + client_data: WsClientData, + revision_data: Vec, + pool: Data, + ) -> DocResult<()> { + let revision = spawn_blocking(move || { + let revision: Revision = parse_from_bytes(&revision_data)?; + DocResult::Ok(revision) + }) + .await + .map_err(internal_error)??; + + match self.doc_manager.get(&revision.doc_id, pool).await? { + Some(ctx) => { + ctx.apply_revision(client_data, revision).await; + Ok(()) + }, + None => { + // + Ok(()) + }, + } + } +} diff --git a/backend/src/service/doc/ws_handler.rs b/backend/src/service/doc/ws_handler.rs deleted file mode 100644 index d987790713..0000000000 --- a/backend/src/service/doc/ws_handler.rs +++ /dev/null @@ -1,100 +0,0 @@ -use super::edit_doc_context::EditDocContext; -use crate::service::{ - doc::read_doc, - util::parse_from_bytes, - ws::{WsBizHandler, WsClientData}, -}; -use actix_web::web::Data; - -use flowy_document::protobuf::{QueryDocParams, Revision, WsDataType, WsDocumentData}; -use flowy_net::errors::ServerError; -use parking_lot::{RwLock, RwLockUpgradableReadGuard}; -use protobuf::Message; -use sqlx::PgPool; -use std::{collections::HashMap, sync::Arc}; - -pub struct DocWsBizHandler { - doc_manager: Arc, -} - -impl DocWsBizHandler { - pub fn new(pg_pool: Data) -> Self { - Self { - doc_manager: Arc::new(EditDocManager::new(pg_pool)), - } - } -} - -impl WsBizHandler for DocWsBizHandler { - fn receive_data(&self, client_data: WsClientData) { - let doc_manager = self.doc_manager.clone(); - actix_rt::spawn(async move { - let result = doc_manager.handle(client_data).await; - match result { - Ok(_) => {}, - Err(e) => log::error!("WsBizHandler handle data error: {:?}", e), - } - }); - } -} - -struct EditDocManager { - pg_pool: Data, - edit_docs: RwLock>>, -} - -impl EditDocManager { - fn new(pg_pool: Data) -> Self { - Self { - pg_pool, - edit_docs: RwLock::new(HashMap::new()), - } - } - - async fn handle(&self, client_data: WsClientData) -> Result<(), ServerError> { - let document_data: WsDocumentData = parse_from_bytes(&client_data.data)?; - match document_data.ty { - WsDataType::Acked => { - // Do nothing, - }, - WsDataType::PushRev => { - let revision: Revision = parse_from_bytes(&document_data.data)?; - let edited_doc = self.get_edit_doc(&revision.doc_id).await?; - tokio::spawn(async move { - match edited_doc.apply_revision(client_data, revision).await { - Ok(_) => {}, - Err(e) => log::error!("Doc apply revision failed: {:?}", e), - } - }); - }, - WsDataType::PullRev => { - // Do nothing - }, - WsDataType::Conflict => { - unimplemented!() - }, - } - - Ok(()) - } - - async fn get_edit_doc(&self, doc_id: &str) -> Result, ServerError> { - // Opti: using lock free map instead? - let edit_docs = self.edit_docs.upgradable_read(); - if let Some(doc) = edit_docs.get(doc_id) { - return Ok(doc.clone()); - } else { - let mut edit_docs = RwLockUpgradableReadGuard::upgrade(edit_docs); - let pg_pool = self.pg_pool.clone(); - let params = QueryDocParams { - doc_id: doc_id.to_string(), - ..Default::default() - }; - - let doc = read_doc(pg_pool.get_ref(), params).await?; - let edit_doc = Arc::new(EditDocContext::new(doc, self.pg_pool.clone())?); - edit_docs.insert(doc_id.to_string(), edit_doc.clone()); - Ok(edit_doc) - } - } -} diff --git a/backend/src/service/log/mod.rs b/backend/src/service/log/mod.rs index 7e821832da..8e1f410175 100644 --- a/backend/src/service/log/mod.rs +++ b/backend/src/service/log/mod.rs @@ -30,7 +30,7 @@ impl Builder { .with_target(true) .with_max_level(tracing::Level::DEBUG) .with_writer(std::io::stderr) - .with_thread_ids(false) + .with_thread_ids(true) .compact() .finish() .with(env_filter); diff --git a/backend/src/service/mod.rs b/backend/src/service/mod.rs index 8dcc7d066b..3a52636333 100644 --- a/backend/src/service/mod.rs +++ b/backend/src/service/mod.rs @@ -1,9 +1,3 @@ -use crate::service::{doc::ws_handler::DocWsBizHandler, ws::WsBizHandlers}; -use actix_web::web::Data; -use flowy_ws::WsModule; -use sqlx::PgPool; -use std::sync::Arc; - pub mod app; pub mod doc; pub(crate) mod log; @@ -12,16 +6,3 @@ pub(crate) mod util; pub mod view; pub mod workspace; pub mod ws; - -pub fn make_ws_biz_handlers(pg_pool: Data) -> WsBizHandlers { - let mut ws_biz_handlers = WsBizHandlers::new(); - - // doc - let doc_biz_handler = DocWsBizHandler::new(pg_pool); - ws_biz_handlers.register(WsModule::Doc, wrap(doc_biz_handler)); - - // - ws_biz_handlers -} - -fn wrap(val: T) -> Arc { Arc::new(val) } diff --git a/backend/src/service/user/auth.rs b/backend/src/service/user/auth.rs index ccc6707a80..b728001d0e 100644 --- a/backend/src/service/user/auth.rs +++ b/backend/src/service/user/auth.rs @@ -8,14 +8,7 @@ use flowy_net::{ }; use flowy_user::{ entities::parser::{UserEmail, UserName, UserPassword}, - protobuf::{ - SignInParams, - SignInResponse, - SignUpParams, - SignUpResponse, - UpdateUserParams, - UserProfile, - }, + protobuf::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserParams, UserProfile}, }; use crate::{ @@ -28,10 +21,8 @@ use super::AUTHORIZED_USERS; use crate::service::user::user_default::create_default_workspace; pub async fn sign_in(pool: &PgPool, params: SignInParams) -> Result { - let email = - UserEmail::parse(params.email).map_err(|e| ServerError::params_invalid().context(e))?; - let password = UserPassword::parse(params.password) - .map_err(|e| ServerError::params_invalid().context(e))?; + let email = UserEmail::parse(params.email).map_err(|e| ServerError::params_invalid().context(e))?; + let password = UserPassword::parse(params.password).map_err(|e| ServerError::params_invalid().context(e))?; let mut transaction = pool .begin() @@ -62,16 +53,10 @@ pub async fn sign_out(logged_user: LoggedUser) -> Result Result { - let name = - UserName::parse(params.name).map_err(|e| ServerError::params_invalid().context(e))?; - let email = - UserEmail::parse(params.email).map_err(|e| ServerError::params_invalid().context(e))?; - let password = UserPassword::parse(params.password) - .map_err(|e| ServerError::params_invalid().context(e))?; +pub async fn register_user(pool: &PgPool, params: SignUpParams) -> Result { + let name = UserName::parse(params.name).map_err(|e| ServerError::params_invalid().context(e))?; + let email = UserEmail::parse(params.email).map_err(|e| ServerError::params_invalid().context(e))?; + let password = UserPassword::parse(params.password).map_err(|e| ServerError::params_invalid().context(e))?; let mut transaction = pool .begin() @@ -79,14 +64,9 @@ pub async fn register_user( .context("Failed to acquire a Postgres connection to register user")?; let _ = is_email_exist(&mut transaction, email.as_ref()).await?; - let response_data = insert_new_user( - &mut transaction, - name.as_ref(), - email.as_ref(), - password.as_ref(), - ) - .await - .context("Failed to insert user")?; + let response_data = insert_new_user(&mut transaction, name.as_ref(), email.as_ref(), password.as_ref()) + .await + .context("Failed to insert user")?; let logged_user = LoggedUser::new(&response_data.user_id); AUTHORIZED_USERS.store_auth(logged_user, true); @@ -111,12 +91,11 @@ pub(crate) async fn get_user_profile( .context("Failed to acquire a Postgres connection to get user detail")?; let id = logged_user.as_uuid()?; - let user_table = - sqlx::query_as::("SELECT * FROM user_table WHERE id = $1") - .bind(id) - .fetch_one(&mut transaction) - .await - .map_err(|err| ServerError::internal().context(err))?; + let user_table = sqlx::query_as::("SELECT * FROM user_table WHERE id = $1") + .bind(id) + .fetch_one(&mut transaction) + .await + .map_err(|err| ServerError::internal().context(err))?; transaction .commit() @@ -146,11 +125,7 @@ pub(crate) async fn set_user_profile( let name = match params.has_name() { false => None, - true => Some( - UserName::parse(params.get_name().to_owned()) - .map_err(invalid_params)? - .0, - ), + true => Some(UserName::parse(params.get_name().to_owned()).map_err(invalid_params)?.0), }; let email = match params.has_email() { @@ -165,8 +140,7 @@ pub(crate) async fn set_user_profile( let password = match params.has_password() { false => None, true => { - let password = - UserPassword::parse(params.get_password().to_owned()).map_err(invalid_params)?; + let password = UserPassword::parse(params.get_password().to_owned()).map_err(invalid_params)?; let password = hash_password(password.as_ref())?; Some(password) }, @@ -192,10 +166,7 @@ pub(crate) async fn set_user_profile( Ok(FlowyResponse::success()) } -async fn is_email_exist( - transaction: &mut DBTransaction<'_>, - email: &str, -) -> Result<(), ServerError> { +async fn is_email_exist(transaction: &mut DBTransaction<'_>, email: &str) -> Result<(), ServerError> { let result = sqlx::query(r#"SELECT email FROM user_table WHERE email = $1"#) .bind(email) .fetch_optional(transaction) diff --git a/backend/src/service/user/user_default.rs b/backend/src/service/user/user_default.rs index 66a27c3d53..267d8994cd 100644 --- a/backend/src/service/user/user_default.rs +++ b/backend/src/service/user/user_default.rs @@ -6,6 +6,7 @@ use crate::{ }, sqlx_ext::{map_sqlx_error, DBTransaction}, }; + use flowy_net::errors::ServerError; use flowy_workspace::{ entities::view::DOC_DEFAULT_DATA, diff --git a/backend/src/service/view/router.rs b/backend/src/service/view/router.rs index 91b231cbb7..8612e9b35d 100644 --- a/backend/src/service/view/router.rs +++ b/backend/src/service/view/router.rs @@ -5,21 +5,19 @@ use actix_web::{ use sqlx::PgPool; use flowy_net::errors::ServerError; -use flowy_workspace::protobuf::{ - CreateViewParams, - DeleteViewParams, - QueryViewParams, - UpdateViewParams, -}; +use flowy_workspace::protobuf::{CreateViewParams, DeleteViewParams, QueryViewParams, UpdateViewParams}; use crate::service::{ + doc::doc::DocBiz, util::parse_from_payload, view::{create_view, delete_view, read_view, update_view}, }; +use std::sync::Arc; pub async fn create_handler( payload: Payload, pool: Data, + _doc_biz: Data>, ) -> Result { let params: CreateViewParams = parse_from_payload(payload).await?; let resp = create_view(pool.get_ref(), params).await?; @@ -29,25 +27,20 @@ pub async fn create_handler( pub async fn read_handler( payload: Payload, pool: Data, + doc_biz: Data>, ) -> Result { let params: QueryViewParams = parse_from_payload(payload).await?; - let resp = read_view(pool.get_ref(), params).await?; + let resp = read_view(pool.get_ref(), params, doc_biz).await?; Ok(resp.into()) } -pub async fn update_handler( - payload: Payload, - pool: Data, -) -> Result { +pub async fn update_handler(payload: Payload, pool: Data) -> Result { let params: UpdateViewParams = parse_from_payload(payload).await?; let resp = update_view(pool.get_ref(), params).await?; Ok(resp.into()) } -pub async fn delete_handler( - payload: Payload, - pool: Data, -) -> Result { +pub async fn delete_handler(payload: Payload, pool: Data) -> Result { let params: DeleteViewParams = parse_from_payload(payload).await?; let resp = delete_view(pool.get_ref(), ¶ms.view_id).await?; Ok(resp.into()) diff --git a/backend/src/service/view/view.rs b/backend/src/service/view/view.rs index 9beeffe7ba..535a997645 100644 --- a/backend/src/service/view/view.rs +++ b/backend/src/service/view/view.rs @@ -18,11 +18,13 @@ use flowy_workspace::{ use crate::{ entities::workspace::{ViewTable, VIEW_TABLE}, service::{ - doc::{create_doc, delete_doc}, + doc::{create_doc, delete_doc, doc::DocBiz}, view::sql_builder::*, }, sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder}, }; +use actix_web::web::Data; +use std::sync::Arc; pub(crate) async fn create_view(pool: &PgPool, params: CreateViewParams) -> Result { let mut transaction = pool @@ -67,7 +69,11 @@ pub(crate) async fn create_view_with_transaction( Ok(view) } -pub(crate) async fn read_view(pool: &PgPool, params: QueryViewParams) -> Result { +pub(crate) async fn read_view( + pool: &PgPool, + params: QueryViewParams, + _doc_biz: Data>, +) -> Result { let view_id = check_view_id(params.view_id)?; let mut transaction = pool .begin() diff --git a/backend/src/service/ws/router.rs b/backend/src/service/ws/router.rs index 170b52a375..6b071b0014 100644 --- a/backend/src/service/ws/router.rs +++ b/backend/src/service/ws/router.rs @@ -1,7 +1,7 @@ use crate::service::ws::{WsBizHandlers, WsClient, WsServer, WsUser}; use actix::Addr; -use crate::service::user::LoggedUser; +use crate::{context::FlowyRuntime, service::user::LoggedUser}; use actix_web::{ get, web::{Data, Path, Payload}, @@ -17,12 +17,14 @@ pub async fn establish_ws_connection( payload: Payload, token: Path, server: Data>, + runtime: Data, biz_handlers: Data, ) -> Result { + log::info!("establish_ws_connection"); match LoggedUser::from_token(token.clone()) { Ok(user) => { let ws_user = WsUser::new(user.clone()); - let client = WsClient::new(ws_user, server.get_ref().clone(), biz_handlers); + let client = WsClient::new(ws_user, server.get_ref().clone(), biz_handlers, runtime); let result = ws::start(client, &request, payload); match result { Ok(response) => Ok(response.into()), diff --git a/backend/src/service/ws/ws_client.rs b/backend/src/service/ws/ws_client.rs index a034a278e9..279cba653d 100644 --- a/backend/src/service/ws/ws_client.rs +++ b/backend/src/service/ws/ws_client.rs @@ -1,5 +1,6 @@ use crate::{ config::{HEARTBEAT_INTERVAL, PING_TIMEOUT}, + context::FlowyRuntime, service::{ user::LoggedUser, ws::{ @@ -38,16 +39,23 @@ pub struct WsClient { user: Arc, server: Addr, biz_handlers: Data, + runtime: Data, hb: Instant, } impl WsClient { - pub fn new(user: WsUser, server: Addr, biz_handlers: Data) -> Self { + pub fn new( + user: WsUser, + server: Addr, + biz_handlers: Data, + runtime: Data, + ) -> Self { Self { user: Arc::new(user), server, biz_handlers, hb: Instant::now(), + runtime, } } @@ -77,7 +85,7 @@ impl WsClient { socket, data: Bytes::from(message.data), }; - handler.receive_data(client_data) + handler.receive_data(client_data); }, } } diff --git a/backend/src/service/ws/ws_server.rs b/backend/src/service/ws/ws_server.rs index 1cfe587760..fca58def2f 100644 --- a/backend/src/service/ws/ws_server.rs +++ b/backend/src/service/ws/ws_server.rs @@ -46,9 +46,7 @@ impl Handler for WsServer { impl Handler for WsServer { type Result = (); - fn handle(&mut self, _msg: WsMessageAdaptor, _ctx: &mut Context) -> Self::Result { - unimplemented!() - } + fn handle(&mut self, _msg: WsMessageAdaptor, _ctx: &mut Context) -> Self::Result { unimplemented!() } } impl actix::Supervised for WsServer { diff --git a/backend/tests/document/edit.rs b/backend/tests/document/edit.rs index f42e65e1bd..d8d8bac0ae 100644 --- a/backend/tests/document/edit.rs +++ b/backend/tests/document/edit.rs @@ -3,5 +3,12 @@ use crate::document::helper::{DocScript, DocumentTest}; #[actix_rt::test] async fn edit_doc_insert_text() { let test = DocumentTest::new().await; - test.run_scripts(vec![DocScript::SendText("abc")]).await; + test.run_scripts(vec![ + DocScript::SendText(0, "abc"), + DocScript::SendText(3, "123"), + DocScript::SendText(6, "efg"), + DocScript::AssertClient(r#"[{"insert":"abc123efg\n"}]"#), + DocScript::AssertServer(r#"[{"insert":"abc123efg\n"}]"#), + ]) + .await; } diff --git a/backend/tests/document/helper.rs b/backend/tests/document/helper.rs index 04c5fa5968..c3ba9faeb7 100644 --- a/backend/tests/document/helper.rs +++ b/backend/tests/document/helper.rs @@ -1,32 +1,25 @@ // use crate::helper::*; use crate::helper::{spawn_server, TestServer}; + +use actix_web::web::Data; use flowy_document::{ - entities::doc::{DocDelta, QueryDocParams}, - module::FlowyDocument, - services::doc::edit_doc_context::EditDocContext, + entities::doc::QueryDocParams, + services::doc::edit_doc_context::EditDocContext as ClientEditDocContext, }; - use flowy_net::config::ServerConfig; -use flowy_ot::core::Delta; - -use flowy_test::{workspace::ViewTest, FlowyTest, FlowyTestSDK}; -use flowy_user::services::user::UserSession; - -use std::{str::FromStr, sync::Arc}; -use tokio::time::{interval, Duration}; +use flowy_test::{workspace::ViewTest, FlowyTest}; +use std::sync::Arc; +use tokio::time::{sleep, Duration}; pub struct DocumentTest { server: TestServer, flowy_test: FlowyTest, - flowy_document: Arc, - user_session: Arc, - edit_context: Arc, } - #[derive(Clone)] pub enum DocScript { - SendText(&'static str), - SendBinary(Vec), + SendText(usize, &'static str), + AssertClient(&'static str), + AssertServer(&'static str), } impl DocumentTest { @@ -34,46 +27,57 @@ impl DocumentTest { let server = spawn_server().await; let server_config = ServerConfig::new(&server.host, "http", "ws"); let flowy_test = FlowyTest::setup_with(server_config); - - init_user(&flowy_test).await; - - let edit_context = create_doc(&flowy_test).await; - let user_session = flowy_test.sdk.user_session.clone(); - let flowy_document = flowy_test.sdk.flowy_document.clone(); - Self { - server, - flowy_test, - flowy_document, - user_session, - edit_context, - } + Self { server, flowy_test } } pub async fn run_scripts(self, scripts: Vec) { - for script in scripts { - match script { - DocScript::SendText(s) => { - let delta = Delta::from_str(s).unwrap(); - let data = delta.to_json(); - let doc_delta = DocDelta { - doc_id: self.edit_context.doc_id.clone(), - data, - }; - - self.flowy_document.apply_doc_delta(doc_delta).await; - }, - DocScript::SendBinary(_bytes) => {}, - } - } - std::mem::forget(self); - - let mut interval = interval(Duration::from_secs(5)); - interval.tick().await; - interval.tick().await; + init_user(&self.flowy_test).await; + let DocumentTest { server, flowy_test } = self; + run_scripts(server, flowy_test, scripts).await; + sleep(Duration::from_secs(5)).await; } } -async fn create_doc(flowy_test: &FlowyTest) -> Arc { +pub async fn run_scripts(server: TestServer, flowy_test: FlowyTest, scripts: Vec) { + let client_edit_context = create_doc(&flowy_test).await; + let doc_id = client_edit_context.doc_id.clone(); + for script in scripts { + match script { + DocScript::SendText(index, s) => { + client_edit_context.insert(index, s); + }, + DocScript::AssertClient(s) => { + let json = client_edit_context.doc_json(); + assert_eq(s, &json); + }, + DocScript::AssertServer(s) => { + sleep(Duration::from_millis(100)).await; + let pool = server.pg_pool.clone(); + let edit_context = server + .app_ctx + .doc_biz + .manager + .get(&doc_id, Data::new(pool)) + .await + .unwrap() + .unwrap(); + let json = edit_context.doc_json(); + assert_eq(s, &json); + }, + } + } + std::mem::forget(flowy_test); +} + +fn assert_eq(expect: &str, receive: &str) { + if expect != receive { + log::error!("expect: {}", expect); + log::error!("but receive: {}", receive); + } + assert_eq!(expect, receive); +} + +async fn create_doc(flowy_test: &FlowyTest) -> Arc { let view_test = ViewTest::new(flowy_test).await; let doc_id = view_test.view.id.clone(); let user_session = flowy_test.sdk.user_session.clone(); diff --git a/backend/tests/helper.rs b/backend/tests/helper.rs index 88ab80eb80..7da1df7c1e 100644 --- a/backend/tests/helper.rs +++ b/backend/tests/helper.rs @@ -1,8 +1,10 @@ use backend::{ application::{get_connection_pool, Application}, config::{get_configuration, DatabaseSettings}, + context::AppContext, }; +use backend::application::init_app_context; use flowy_document::{ entities::doc::{Doc, QueryDocParams}, prelude::*, @@ -168,6 +170,7 @@ pub struct TestServer { pub host: String, pub port: u16, pub pg_pool: PgPool, + pub app_ctx: AppContext, } pub async fn spawn_server() -> TestServer { @@ -181,7 +184,8 @@ pub async fn spawn_server() -> TestServer { }; let _ = configure_database(&configuration.database).await; - let application = Application::build(configuration.clone()) + let app_ctx = init_app_context(&configuration).await; + let application = Application::build(configuration.clone(), app_ctx.clone()) .await .expect("Failed to build application."); let application_port = application.port(); @@ -197,6 +201,7 @@ pub async fn spawn_server() -> TestServer { pg_pool: get_connection_pool(&configuration.database) .await .expect("Failed to connect to the database"), + app_ctx, } } diff --git a/rust-lib/flowy-document/Cargo.toml b/rust-lib/flowy-document/Cargo.toml index dbb327d015..75ceef90df 100644 --- a/rust-lib/flowy-document/Cargo.toml +++ b/rust-lib/flowy-document/Cargo.toml @@ -49,4 +49,5 @@ env_logger = "0.8.2" [features] -http_server = [] \ No newline at end of file +http_server = [] +flowy_test = [] \ No newline at end of file diff --git a/rust-lib/flowy-document/src/entities/doc/revision.rs b/rust-lib/flowy-document/src/entities/doc/revision.rs index 65d7256269..be4a353426 100644 --- a/rust-lib/flowy-document/src/entities/doc/revision.rs +++ b/rust-lib/flowy-document/src/entities/doc/revision.rs @@ -1,3 +1,4 @@ +use crate::services::util::md5; use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; #[derive(Debug, ProtoBuf_Enum, Clone, Eq, PartialEq)] @@ -19,7 +20,7 @@ pub struct Revision { pub rev_id: i64, #[pb(index = 3)] - pub delta: Vec, + pub delta_data: Vec, #[pb(index = 4)] pub md5: String, @@ -32,11 +33,13 @@ pub struct Revision { } impl Revision { - pub fn new(base_rev_id: i64, rev_id: i64, delta: Vec, md5: String, doc_id: String, ty: RevType) -> Revision { + pub fn new(base_rev_id: i64, rev_id: i64, delta_data: Vec, doc_id: &str, ty: RevType) -> Revision { + let md5 = md5(&delta_data); + let doc_id = doc_id.to_owned(); Self { base_rev_id, rev_id, - delta, + delta_data, md5, doc_id, ty, diff --git a/rust-lib/flowy-document/src/protobuf/model/revision.rs b/rust-lib/flowy-document/src/protobuf/model/revision.rs index 55036e31cb..cb5e31e5f4 100644 --- a/rust-lib/flowy-document/src/protobuf/model/revision.rs +++ b/rust-lib/flowy-document/src/protobuf/model/revision.rs @@ -28,7 +28,7 @@ pub struct Revision { // message fields pub base_rev_id: i64, pub rev_id: i64, - pub delta: ::std::vec::Vec, + pub delta_data: ::std::vec::Vec, pub md5: ::std::string::String, pub doc_id: ::std::string::String, pub ty: RevType, @@ -78,30 +78,30 @@ impl Revision { self.rev_id = v; } - // bytes delta = 3; + // bytes delta_data = 3; - pub fn get_delta(&self) -> &[u8] { - &self.delta + pub fn get_delta_data(&self) -> &[u8] { + &self.delta_data } - pub fn clear_delta(&mut self) { - self.delta.clear(); + pub fn clear_delta_data(&mut self) { + self.delta_data.clear(); } // Param is passed by value, moved - pub fn set_delta(&mut self, v: ::std::vec::Vec) { - self.delta = v; + pub fn set_delta_data(&mut self, v: ::std::vec::Vec) { + self.delta_data = v; } // Mutable pointer to the field. // If field is not initialized, it is initialized with default value first. - pub fn mut_delta(&mut self) -> &mut ::std::vec::Vec { - &mut self.delta + pub fn mut_delta_data(&mut self) -> &mut ::std::vec::Vec { + &mut self.delta_data } // Take field - pub fn take_delta(&mut self) -> ::std::vec::Vec { - ::std::mem::replace(&mut self.delta, ::std::vec::Vec::new()) + pub fn take_delta_data(&mut self) -> ::std::vec::Vec { + ::std::mem::replace(&mut self.delta_data, ::std::vec::Vec::new()) } // string md5 = 4; @@ -196,7 +196,7 @@ impl ::protobuf::Message for Revision { self.rev_id = tmp; }, 3 => { - ::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.delta)?; + ::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.delta_data)?; }, 4 => { ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.md5)?; @@ -225,8 +225,8 @@ impl ::protobuf::Message for Revision { if self.rev_id != 0 { my_size += ::protobuf::rt::value_size(2, self.rev_id, ::protobuf::wire_format::WireTypeVarint); } - if !self.delta.is_empty() { - my_size += ::protobuf::rt::bytes_size(3, &self.delta); + if !self.delta_data.is_empty() { + my_size += ::protobuf::rt::bytes_size(3, &self.delta_data); } if !self.md5.is_empty() { my_size += ::protobuf::rt::string_size(4, &self.md5); @@ -249,8 +249,8 @@ impl ::protobuf::Message for Revision { if self.rev_id != 0 { os.write_int64(2, self.rev_id)?; } - if !self.delta.is_empty() { - os.write_bytes(3, &self.delta)?; + if !self.delta_data.is_empty() { + os.write_bytes(3, &self.delta_data)?; } if !self.md5.is_empty() { os.write_string(4, &self.md5)?; @@ -310,9 +310,9 @@ impl ::protobuf::Message for Revision { |m: &mut Revision| { &mut m.rev_id }, )); fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( - "delta", - |m: &Revision| { &m.delta }, - |m: &mut Revision| { &mut m.delta }, + "delta_data", + |m: &Revision| { &m.delta_data }, + |m: &mut Revision| { &mut m.delta_data }, )); fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( "md5", @@ -347,7 +347,7 @@ impl ::protobuf::Clear for Revision { fn clear(&mut self) { self.base_rev_id = 0; self.rev_id = 0; - self.delta.clear(); + self.delta_data.clear(); self.md5.clear(); self.doc_id.clear(); self.ty = RevType::Local; @@ -647,39 +647,39 @@ impl ::protobuf::reflect::ProtobufValue for RevType { } static file_descriptor_proto_data: &'static [u8] = b"\ - \n\x0erevision.proto\"\x9a\x01\n\x08Revision\x12\x1e\n\x0bbase_rev_id\ + \n\x0erevision.proto\"\xa3\x01\n\x08Revision\x12\x1e\n\x0bbase_rev_id\ \x18\x01\x20\x01(\x03R\tbaseRevId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\ - \x03R\x05revId\x12\x14\n\x05delta\x18\x03\x20\x01(\x0cR\x05delta\x12\x10\ - \n\x03md5\x18\x04\x20\x01(\tR\x03md5\x12\x15\n\x06doc_id\x18\x05\x20\x01\ - (\tR\x05docId\x12\x18\n\x02ty\x18\x06\x20\x01(\x0e2\x08.RevTypeR\x02ty\"\ - b\n\rRevisionRange\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\ - \x1e\n\x0bfrom_rev_id\x18\x02\x20\x01(\x03R\tfromRevId\x12\x1a\n\tto_rev\ - _id\x18\x03\x20\x01(\x03R\x07toRevId*\x20\n\x07RevType\x12\t\n\x05Local\ - \x10\0\x12\n\n\x06Remote\x10\x01J\x9b\x05\n\x06\x12\x04\0\0\x12\x01\n\ - \x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\t\x01\n\n\n\ - \x03\x04\0\x01\x12\x03\x02\x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\ - \x04\x1a\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\t\n\x0c\n\x05\x04\0\ - \x02\0\x01\x12\x03\x03\n\x15\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x18\ - \x19\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x15\n\x0c\n\x05\x04\0\x02\ - \x01\x05\x12\x03\x04\x04\t\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\n\ - \x10\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x13\x14\n\x0b\n\x04\x04\0\ - \x02\x02\x12\x03\x05\x04\x14\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\ - \x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x0f\n\x0c\n\x05\x04\0\ - \x02\x02\x03\x12\x03\x05\x12\x13\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x06\ - \x04\x13\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x06\x04\n\n\x0c\n\x05\x04\ - \0\x02\x03\x01\x12\x03\x06\x0b\x0e\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\ - \x06\x11\x12\n\x0b\n\x04\x04\0\x02\x04\x12\x03\x07\x04\x16\n\x0c\n\x05\ - \x04\0\x02\x04\x05\x12\x03\x07\x04\n\n\x0c\n\x05\x04\0\x02\x04\x01\x12\ - \x03\x07\x0b\x11\n\x0c\n\x05\x04\0\x02\x04\x03\x12\x03\x07\x14\x15\n\x0b\ - \n\x04\x04\0\x02\x05\x12\x03\x08\x04\x13\n\x0c\n\x05\x04\0\x02\x05\x06\ - \x12\x03\x08\x04\x0b\n\x0c\n\x05\x04\0\x02\x05\x01\x12\x03\x08\x0c\x0e\n\ - \x0c\n\x05\x04\0\x02\x05\x03\x12\x03\x08\x11\x12\n\n\n\x02\x04\x01\x12\ - \x04\n\0\x0e\x01\n\n\n\x03\x04\x01\x01\x12\x03\n\x08\x15\n\x0b\n\x04\x04\ - \x01\x02\0\x12\x03\x0b\x04\x16\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\x0b\ - \x04\n\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x0b\x0b\x11\n\x0c\n\x05\x04\ - \x01\x02\0\x03\x12\x03\x0b\x14\x15\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\ - \x0c\x04\x1a\n\x0c\n\x05\x04\x01\x02\x01\x05\x12\x03\x0c\x04\t\n\x0c\n\ - \x05\x04\x01\x02\x01\x01\x12\x03\x0c\n\x15\n\x0c\n\x05\x04\x01\x02\x01\ + \x03R\x05revId\x12\x1d\n\ndelta_data\x18\x03\x20\x01(\x0cR\tdeltaData\ + \x12\x10\n\x03md5\x18\x04\x20\x01(\tR\x03md5\x12\x15\n\x06doc_id\x18\x05\ + \x20\x01(\tR\x05docId\x12\x18\n\x02ty\x18\x06\x20\x01(\x0e2\x08.RevTypeR\ + \x02ty\"b\n\rRevisionRange\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05do\ + cId\x12\x1e\n\x0bfrom_rev_id\x18\x02\x20\x01(\x03R\tfromRevId\x12\x1a\n\ + \tto_rev_id\x18\x03\x20\x01(\x03R\x07toRevId*\x20\n\x07RevType\x12\t\n\ + \x05Local\x10\0\x12\n\n\x06Remote\x10\x01J\x9b\x05\n\x06\x12\x04\0\0\x12\ + \x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\t\x01\ + \n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\x03\ + \x03\x04\x1a\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\t\n\x0c\n\x05\ + \x04\0\x02\0\x01\x12\x03\x03\n\x15\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\ + \x03\x18\x19\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x15\n\x0c\n\x05\ + \x04\0\x02\x01\x05\x12\x03\x04\x04\t\n\x0c\n\x05\x04\0\x02\x01\x01\x12\ + \x03\x04\n\x10\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x13\x14\n\x0b\n\ + \x04\x04\0\x02\x02\x12\x03\x05\x04\x19\n\x0c\n\x05\x04\0\x02\x02\x05\x12\ + \x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x14\n\x0c\n\ + \x05\x04\0\x02\x02\x03\x12\x03\x05\x17\x18\n\x0b\n\x04\x04\0\x02\x03\x12\ + \x03\x06\x04\x13\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x06\x04\n\n\x0c\n\ + \x05\x04\0\x02\x03\x01\x12\x03\x06\x0b\x0e\n\x0c\n\x05\x04\0\x02\x03\x03\ + \x12\x03\x06\x11\x12\n\x0b\n\x04\x04\0\x02\x04\x12\x03\x07\x04\x16\n\x0c\ + \n\x05\x04\0\x02\x04\x05\x12\x03\x07\x04\n\n\x0c\n\x05\x04\0\x02\x04\x01\ + \x12\x03\x07\x0b\x11\n\x0c\n\x05\x04\0\x02\x04\x03\x12\x03\x07\x14\x15\n\ + \x0b\n\x04\x04\0\x02\x05\x12\x03\x08\x04\x13\n\x0c\n\x05\x04\0\x02\x05\ + \x06\x12\x03\x08\x04\x0b\n\x0c\n\x05\x04\0\x02\x05\x01\x12\x03\x08\x0c\ + \x0e\n\x0c\n\x05\x04\0\x02\x05\x03\x12\x03\x08\x11\x12\n\n\n\x02\x04\x01\ + \x12\x04\n\0\x0e\x01\n\n\n\x03\x04\x01\x01\x12\x03\n\x08\x15\n\x0b\n\x04\ + \x04\x01\x02\0\x12\x03\x0b\x04\x16\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\ + \x0b\x04\n\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x0b\x0b\x11\n\x0c\n\x05\ + \x04\x01\x02\0\x03\x12\x03\x0b\x14\x15\n\x0b\n\x04\x04\x01\x02\x01\x12\ + \x03\x0c\x04\x1a\n\x0c\n\x05\x04\x01\x02\x01\x05\x12\x03\x0c\x04\t\n\x0c\ + \n\x05\x04\x01\x02\x01\x01\x12\x03\x0c\n\x15\n\x0c\n\x05\x04\x01\x02\x01\ \x03\x12\x03\x0c\x18\x19\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\r\x04\x18\n\ \x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\r\x04\t\n\x0c\n\x05\x04\x01\x02\ \x02\x01\x12\x03\r\n\x13\n\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\r\x16\ diff --git a/rust-lib/flowy-document/src/protobuf/proto/revision.proto b/rust-lib/flowy-document/src/protobuf/proto/revision.proto index 85d1de9259..139a22fa46 100644 --- a/rust-lib/flowy-document/src/protobuf/proto/revision.proto +++ b/rust-lib/flowy-document/src/protobuf/proto/revision.proto @@ -3,7 +3,7 @@ syntax = "proto3"; message Revision { int64 base_rev_id = 1; int64 rev_id = 2; - bytes delta = 3; + bytes delta_data = 3; string md5 = 4; string doc_id = 5; RevType ty = 6; diff --git a/rust-lib/flowy-document/src/services/doc/document/data.rs b/rust-lib/flowy-document/src/services/doc/document/data.rs index e3796534f3..f2244e6344 100644 --- a/rust-lib/flowy-document/src/services/doc/document/data.rs +++ b/rust-lib/flowy-document/src/services/doc/document/data.rs @@ -1,18 +1,10 @@ -use crate::{errors::DocError, services::doc::DocumentData}; use serde::{Deserialize, Serialize}; -impl> DocumentData for T { - fn into_string(self) -> Result { Ok(self.as_ref().to_string()) } -} - #[derive(Serialize, Deserialize, Debug)] pub struct ImageData { image: String, } -impl DocumentData for ImageData { - fn into_string(self) -> Result { - let s = serde_json::to_string(&self)?; - Ok(s) - } +impl ToString for ImageData { + fn to_string(&self) -> String { self.image.clone() } } diff --git a/rust-lib/flowy-document/src/services/doc/document/document.rs b/rust-lib/flowy-document/src/services/doc/document/document.rs index d4cd8e88bd..670fd1a777 100644 --- a/rust-lib/flowy-document/src/services/doc/document/document.rs +++ b/rust-lib/flowy-document/src/services/doc/document/document.rs @@ -5,10 +5,6 @@ use crate::{ use flowy_ot::core::*; -pub trait DocumentData { - fn into_string(self) -> Result; -} - pub trait CustomDocument { fn init_delta() -> Delta; } @@ -49,7 +45,7 @@ impl Document { pub fn to_json(&self) -> String { self.delta.to_json() } - pub fn to_bytes(&self) -> Vec { self.delta.clone().into_bytes() } + pub fn to_bytes(&self) -> Vec { self.delta.clone().to_bytes().to_vec() } pub fn to_plain_string(&self) -> String { self.delta.apply("").unwrap() } @@ -83,11 +79,11 @@ impl Document { Ok(()) } - pub fn insert(&mut self, index: usize, data: T) -> Result { + pub fn insert(&mut self, index: usize, data: T) -> Result { let interval = Interval::new(index, index); let _ = validate_interval(&self.delta, &interval)?; - let text = data.into_string()?; + let text = data.to_string(); let delta = self.view.insert(&self.delta, &text, interval)?; log::trace!("๐Ÿ‘‰ receive change: {}", delta); self.compose_delta(&delta)?; @@ -115,10 +111,10 @@ impl Document { Ok(format_delta) } - pub fn replace(&mut self, interval: Interval, data: T) -> Result { + pub fn replace(&mut self, interval: Interval, data: T) -> Result { let _ = validate_interval(&self.delta, &interval)?; let mut delta = Delta::default(); - let text = data.into_string()?; + let text = data.to_string(); if !text.is_empty() { delta = self.view.insert(&self.delta, &text, interval)?; log::trace!("๐Ÿ‘‰ receive change: {}", delta); diff --git a/rust-lib/flowy-document/src/services/doc/edit_doc_context.rs b/rust-lib/flowy-document/src/services/doc/edit_doc_context.rs index 465935512d..c58a3986ee 100644 --- a/rust-lib/flowy-document/src/services/doc/edit_doc_context.rs +++ b/rust-lib/flowy-document/src/services/doc/edit_doc_context.rs @@ -1,24 +1,19 @@ use crate::{ entities::{ - doc::{Doc, Revision}, + doc::{Doc, RevType, Revision, RevisionRange}, ws::{WsDataType, WsDocumentData}, }, errors::*, services::{ - doc::{rev_manager::RevisionManager, Document}, - util::{bytes_to_rev_id, md5}, - ws::WsDocumentHandler, + doc::{rev_manager::RevisionManager, Document, UndoResult}, + util::bytes_to_rev_id, + ws::{WsDocumentHandler, WsDocumentSender}, }, -}; -use bytes::Bytes; - -use crate::{ - entities::doc::{RevType, RevisionRange}, - services::ws::WsDocumentSender, sql_tables::{doc::DocTableSql, DocTableChangeset}, }; +use bytes::Bytes; use flowy_database::ConnectionPool; -use flowy_ot::core::Delta; +use flowy_ot::core::{Attribute, Delta, Interval}; use parking_lot::RwLock; use std::{convert::TryFrom, sync::Arc}; @@ -49,6 +44,38 @@ impl EditDocContext { Ok(edit_context) } + pub fn insert(&self, index: usize, data: T) -> Result<(), DocError> { + let delta_data = self.document.write().insert(index, data)?.to_bytes(); + let _ = self.mk_revision(&delta_data)?; + Ok(()) + } + + pub fn delete(&self, interval: Interval) -> Result<(), DocError> { + let delta_data = self.document.write().delete(interval)?.to_bytes(); + let _ = self.mk_revision(&delta_data)?; + Ok(()) + } + + pub fn format(&self, interval: Interval, attribute: Attribute) -> Result<(), DocError> { + let delta_data = self.document.write().format(interval, attribute)?.to_bytes(); + let _ = self.mk_revision(&delta_data)?; + Ok(()) + } + + pub fn replace(&mut self, interval: Interval, data: T) -> Result<(), DocError> { + let delta_data = self.document.write().replace(interval, data)?.to_bytes(); + let _ = self.mk_revision(&delta_data)?; + Ok(()) + } + + pub fn can_undo(&self) -> bool { self.document.read().can_undo() } + + pub fn can_redo(&self) -> bool { self.document.read().can_redo() } + + pub fn undo(&self) -> Result { self.document.write().undo() } + + pub fn redo(&self) -> Result { self.document.write().redo() } + pub fn doc(&self) -> Doc { Doc { id: self.doc_id.clone(), @@ -57,50 +84,54 @@ impl EditDocContext { } } - #[tracing::instrument(level = "debug", skip(self, data), err)] - pub(crate) fn compose_local_delta(&self, data: Bytes) -> Result<(), DocError> { + fn mk_revision(&self, delta_data: &Bytes) -> Result<(), DocError> { let (base_rev_id, rev_id) = self.rev_manager.next_rev_id(); - let revision = Revision::new( - base_rev_id, - rev_id, - data.to_vec(), - md5(&data), - self.doc_id.clone(), - RevType::Local, - ); - - let _ = self.update_document(&revision)?; + let delta_data = delta_data.to_vec(); + let revision = Revision::new(base_rev_id, rev_id, delta_data, &self.doc_id, RevType::Local); + let _ = self.save_to_disk(revision.rev_id)?; let _ = self.rev_manager.add_revision(revision)?; Ok(()) } - #[tracing::instrument(level = "debug", skip(self, revision), err)] - pub fn update_document(&self, revision: &Revision) -> Result<(), DocError> { - let delta = Delta::from_bytes(&revision.delta)?; + #[tracing::instrument(level = "debug", skip(self, data), err)] + pub(crate) fn compose_local_delta(&self, data: Bytes) -> Result<(), DocError> { + let delta = Delta::from_bytes(&data)?; self.document.write().compose_delta(&delta)?; - let data = self.document.read().to_json(); - let changeset = DocTableChangeset { - id: self.doc_id.clone(), - data, - rev_id: revision.rev_id, - }; - let sql = DocTableSql {}; - let conn = self.pool.get().map_err(internal_error)?; - let _ = sql.update_doc_table(changeset, &*conn)?; + let _ = self.mk_revision(&data)?; Ok(()) } #[tracing::instrument(level = "debug", skip(self), err)] fn compose_remote_delta(&self) -> Result<(), DocError> { self.rev_manager.next_compose_revision(|revision| { - let _ = self.update_document(revision)?; + let delta = Delta::from_bytes(&revision.delta_data)?; + self.document.write().compose_delta(&delta)?; + let _ = self.save_to_disk(revision.rev_id)?; + log::debug!("๐Ÿ˜Document: {:?}", self.document.read().to_plain_string()); Ok(()) }); Ok(()) } + #[cfg(feature = "flowy_test")] + pub fn doc_json(&self) -> String { self.document.read().to_json() } + + #[tracing::instrument(level = "debug", skip(self, rev_id), err)] + fn save_to_disk(&self, rev_id: i64) -> Result<(), DocError> { + let data = self.document.read().to_json(); + let changeset = DocTableChangeset { + id: self.doc_id.clone(), + data, + rev_id, + }; + let sql = DocTableSql {}; + let conn = self.pool.get().map_err(internal_error)?; + let _ = sql.update_doc_table(changeset, &*conn)?; + Ok(()) + } + // #[tracing::instrument(level = "debug", skip(self, params), err)] // fn update_doc_on_server(&self, params: UpdateDocParams) -> Result<(), // DocError> { let token = self.user.token()?; diff --git a/rust-lib/flowy-document/src/services/doc/rev_manager.rs b/rust-lib/flowy-document/src/services/doc/rev_manager.rs index 1239fc2ea4..32009324d1 100644 --- a/rust-lib/flowy-document/src/services/doc/rev_manager.rs +++ b/rust-lib/flowy-document/src/services/doc/rev_manager.rs @@ -80,7 +80,7 @@ impl RevisionManager { } pub fn ack(&self, rev_id: i64) -> Result<(), DocError> { - log::debug!("Receive {} acked", rev_id); + log::debug!("Receive rev_id: {} acked", rev_id); self.ack_rev_cache.insert(rev_id); self.update_revisions(); Ok(()) diff --git a/rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs b/rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs index 519ac4dcc0..13248ef570 100644 --- a/rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs +++ b/rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs @@ -28,7 +28,7 @@ impl OpTableSql { doc_id.eq(revision.doc_id), base_rev_id.eq(revision.base_rev_id), rev_id.eq(revision.rev_id), - data.eq(revision.delta), + data.eq(revision.delta_data), state.eq(new_state), ty.eq(rev_ty), ) diff --git a/rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs b/rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs index 9b1713d0d1..7dfb952f59 100644 --- a/rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs +++ b/rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs @@ -52,7 +52,7 @@ impl std::convert::Into for RevTable { Revision { base_rev_id: self.base_rev_id, rev_id: self.rev_id, - delta: self.data, + delta_data: self.data, md5, doc_id: self.doc_id, ty: self.ty.into(), diff --git a/rust-lib/flowy-document/src/sql_tables/mod.rs b/rust-lib/flowy-document/src/sql_tables/mod.rs index dcb40dd84d..0bcdb06e6e 100644 --- a/rust-lib/flowy-document/src/sql_tables/mod.rs +++ b/rust-lib/flowy-document/src/sql_tables/mod.rs @@ -1,3 +1,3 @@ pub(crate) mod doc; -pub use doc::*; +pub(crate) use doc::*; diff --git a/rust-lib/flowy-log/src/lib.rs b/rust-lib/flowy-log/src/lib.rs index b7e5fed208..d985259f04 100644 --- a/rust-lib/flowy-log/src/lib.rs +++ b/rust-lib/flowy-log/src/lib.rs @@ -44,7 +44,7 @@ impl Builder { .with_target(false) .with_max_level(tracing::Level::TRACE) .with_writer(std::io::stderr) - .with_thread_ids(false) + .with_thread_ids(true) // .with_writer(non_blocking) // .json() .compact() @@ -60,7 +60,8 @@ impl Builder { // } let formatting_layer = FlowyFormattingLayer::new(std::io::stdout); - let _ = set_global_default(subscriber.with(JsonStorageLayer).with(formatting_layer)).map_err(|e| format!("{:?}", e))?; + let _ = set_global_default(subscriber.with(JsonStorageLayer).with(formatting_layer)) + .map_err(|e| format!("{:?}", e))?; let _ = LogTracer::builder() .with_max_level(LevelFilter::Trace) diff --git a/rust-lib/flowy-net/src/errors.rs b/rust-lib/flowy-net/src/errors.rs index c9cac3fb58..649d8b3e48 100644 --- a/rust-lib/flowy-net/src/errors.rs +++ b/rust-lib/flowy-net/src/errors.rs @@ -5,6 +5,8 @@ use std::{fmt, fmt::Debug}; use crate::response::FlowyResponse; +pub type Result = std::result::Result; + #[derive(thiserror::Error, Debug, Serialize, Deserialize, Clone)] pub struct ServerError { pub code: ErrorCode, diff --git a/rust-lib/flowy-ot/src/core/delta/delta.rs b/rust-lib/flowy-ot/src/core/delta/delta.rs index 61604103c7..4f9b15d2c1 100644 --- a/rust-lib/flowy-ot/src/core/delta/delta.rs +++ b/rust-lib/flowy-ot/src/core/delta/delta.rs @@ -51,10 +51,6 @@ impl std::convert::TryFrom for Delta { fn try_from(bytes: Bytes) -> Result { Delta::from_bytes(&bytes) } } -// impl>> std::convert::From for Delta { -// fn from(bytes: T) -> Self { -// Delta::from_bytes(bytes.as_ref().to_vec()).unwrap() } } - impl fmt::Display for Delta { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // f.write_str(&serde_json::to_string(self).unwrap_or("".to_owned()))?; @@ -96,9 +92,9 @@ impl Delta { Self::from_json(json) } - pub fn into_bytes(self) -> Vec { + pub fn to_bytes(&self) -> Bytes { let json = self.to_json(); - json.into_bytes() + Bytes::from(json.into_bytes()) } #[inline] diff --git a/rust-lib/flowy-sdk/src/lib.rs b/rust-lib/flowy-sdk/src/lib.rs index 139d53458f..c6a02cee28 100644 --- a/rust-lib/flowy-sdk/src/lib.rs +++ b/rust-lib/flowy-sdk/src/lib.rs @@ -96,7 +96,7 @@ fn init_log(config: &FlowySDKConfig) { if !INIT_LOG.load(Ordering::SeqCst) { INIT_LOG.store(true, Ordering::SeqCst); - let _ = flowy_log::Builder::new("flowy") + let _ = flowy_log::Builder::new("flowy-client") .local(&config.root) .env_filter(&config.log_filter) .build(); diff --git a/rust-lib/flowy-user/src/services/user/user_session.rs b/rust-lib/flowy-user/src/services/user/user_session.rs index d1a252b6c8..f3832793c9 100644 --- a/rust-lib/flowy-user/src/services/user/user_session.rs +++ b/rust-lib/flowy-user/src/services/user/user_session.rs @@ -295,6 +295,7 @@ impl UserSession { } fn start_ws_connection(&self, token: &str) -> Result<(), UserError> { + log::debug!("start_ws_connection"); let addr = format!("{}/{}", self.server.ws_addr(), token); let ws_controller = self.ws_controller.clone(); let retry = Retry::new(&addr, move |addr| {