diff --git a/backend/src/application.rs b/backend/src/application.rs index 5a5a3e466d..00c5b52deb 100644 --- a/backend/src/application.rs +++ b/backend/src/application.rs @@ -110,7 +110,7 @@ fn user_scope() -> Scope { .route(web::get().to(view::read_handler)) .route(web::patch().to(view::update_handler)) ) - .service(web::resource("/document") + .service(web::resource("/doc") .route(web::post().to(doc::create_document_handler)) .route(web::get().to(doc::read_document_handler)) .route(web::patch().to(doc::reset_document_handler)) diff --git a/backend/src/context.rs b/backend/src/context.rs index 6f044e3903..b6f5f5a3ec 100644 --- a/backend/src/context.rs +++ b/backend/src/context.rs @@ -1,5 +1,5 @@ use crate::services::{ - kv::{KVStore, PostgresKV}, + kv::PostgresKV, web_socket::{WSServer, WebSocketReceivers}, }; use actix::Addr; diff --git a/backend/src/services/document/persistence/kv_store.rs b/backend/src/services/document/persistence/kv_store.rs index 4c91ef23a6..24b6722f4f 100644 --- a/backend/src/services/document/persistence/kv_store.rs +++ b/backend/src/services/document/persistence/kv_store.rs @@ -1,8 +1,10 @@ -use crate::{services::kv::KVStore, util::serde_ext::parse_from_bytes}; +use crate::{ + services::kv::{KVStore, KeyValue}, + util::serde_ext::parse_from_bytes, +}; use backend_service::errors::ServerError; use bytes::Bytes; use flowy_collaboration::protobuf::{RepeatedRevision, Revision}; -use futures::stream::{self, StreamExt}; use protobuf::Message; use std::sync::Arc; @@ -25,16 +27,25 @@ impl DocumentKVPersistence { pub(crate) async fn batch_set_revision(&self, revisions: Vec) -> Result<(), ServerError> { let kv_store = self.inner.clone(); - - let f = |revision: Revision, kv_store: Arc| async move { - let key = make_revision_key(&revision.doc_id, revision.rev_id); - let bytes = revision.write_to_bytes().unwrap(); - let _ = kv_store.set(&key, Bytes::from(bytes)).await; - }; - - stream::iter(revisions) - .for_each_concurrent(None, |revision| f(revision, kv_store.clone())) - .await; + let items = revisions + .into_iter() + .map(|revision| { + let key = make_revision_key(&revision.doc_id, revision.rev_id); + let value = Bytes::from(revision.write_to_bytes().unwrap()); + KeyValue { key, value } + }) + .collect::>(); + let _ = kv_store.batch_set(items).await?; + // use futures::stream::{self, StreamExt}; + // let f = |revision: Revision, kv_store: Arc| async move { + // let key = make_revision_key(&revision.doc_id, revision.rev_id); + // let bytes = revision.write_to_bytes().unwrap(); + // let _ = kv_store.set(&key, Bytes::from(bytes)).await.unwrap(); + // }; + // + // stream::iter(revisions) + // .for_each_concurrent(None, |revision| f(revision, kv_store.clone())) + // .await; Ok(()) } @@ -55,11 +66,14 @@ impl DocumentKVPersistence { }, }; - let revisions = items + let mut revisions = items .into_iter() .filter_map(|kv| parse_from_bytes::(&kv.value).ok()) .collect::>(); + // TODO: optimize sort + revisions.sort_by(|a, b| a.rev_id.cmp(&b.rev_id)); + let mut repeated_revision = RepeatedRevision::new(); repeated_revision.set_items(revisions.into()); Ok(repeated_revision) diff --git a/backend/src/services/document/persistence/postgres.rs b/backend/src/services/document/persistence/postgres.rs index 2f0d6d1a68..57f941fb00 100644 --- a/backend/src/services/document/persistence/postgres.rs +++ b/backend/src/services/document/persistence/postgres.rs @@ -1,4 +1,5 @@ use crate::{context::FlowyPersistence, services::document::persistence::DocumentKVPersistence}; +use anyhow::Context; use backend_service::errors::{internal_error, ServerError}; use flowy_collaboration::protobuf::{ CreateDocParams, @@ -27,7 +28,7 @@ pub(crate) async fn read_doc( persistence: &Arc, params: DocIdentifier, ) -> Result { - let _ = Uuid::parse_str(¶ms.doc_id)?; + let _ = Uuid::parse_str(¶ms.doc_id).context("Parse document id to uuid failed")?; let kv_store = persistence.kv_store(); let revisions = kv_store.batch_get_revisions(¶ms.doc_id, None).await?; @@ -53,6 +54,10 @@ struct DocTable { fn make_doc_from_revisions(doc_id: &str, mut revisions: RepeatedRevision) -> Result { let revisions = revisions.take_items(); + if revisions.is_empty() { + return Err(ServerError::record_not_found().context(format!("{} not exist", doc_id))); + } + let mut document_delta = RichTextDelta::new(); let mut base_rev_id = 0; let mut rev_id = 0; diff --git a/backend/src/services/kv/kv.rs b/backend/src/services/kv/kv.rs index a1995dffcf..7fca6d9fbd 100644 --- a/backend/src/services/kv/kv.rs +++ b/backend/src/services/kv/kv.rs @@ -7,9 +7,10 @@ use anyhow::Context; use backend_service::errors::ServerError; use bytes::Bytes; use lib_infra::future::FutureResultSend; -use sql_builder::{quote, SqlBuilder as RawSqlBuilder}; +use sql_builder::SqlBuilder as RawSqlBuilder; use sqlx::{ postgres::{PgArguments, PgRow}, + Arguments, Error, PgPool, Postgres, @@ -98,20 +99,23 @@ impl KVStore for PostgresKV { .await .context("[KV]:Failed to acquire a Postgres connection")?; + SqlBuilder::create(KV_TABLE).add_field("id").add_field("blob"); let mut builder = RawSqlBuilder::insert_into(KV_TABLE); - let mut m_builder = builder.field("id").field("blob"); + let m_builder = builder.field("id").field("blob"); + + let mut args = PgArguments::default(); + kvs.iter().enumerate().for_each(|(index, _)| { + let index = index * 2 + 1; + m_builder.values(&[format!("${}", index), format!("${}", index + 1)]); + }); + for kv in kvs { - let s = match std::str::from_utf8(&kv.value) { - Ok(v) => v, - Err(e) => { - log::error!("[KV]: {}", e); - "" - }, - }; - m_builder = m_builder.values(&[quote(kv.key), quote(s)]); + args.add(kv.key); + args.add(kv.value.to_vec()); } + let sql = m_builder.sql()?; - let _ = sqlx::query(&sql) + let _ = sqlx::query_with(&sql, args) .execute(&mut transaction) .await .map_err(map_sqlx_error)?; diff --git a/backend/src/services/kv/mod.rs b/backend/src/services/kv/mod.rs index d3a010e4ad..ee2220a080 100644 --- a/backend/src/services/kv/mod.rs +++ b/backend/src/services/kv/mod.rs @@ -1,3 +1,4 @@ +#![allow(clippy::module_inception)] mod kv; use bytes::Bytes; diff --git a/backend/tests/api_test/workspace_test.rs b/backend/tests/api_test/workspace_test.rs index 7851b9fc52..270db2d401 100644 --- a/backend/tests/api_test/workspace_test.rs +++ b/backend/tests/api_test/workspace_test.rs @@ -1,9 +1,17 @@ #![allow(clippy::all)] -use crate::util::helper::*; + +use crate::util::helper::{ViewTest, *}; +use flowy_collaboration::{ + core::document::{Document, PlainDoc}, + entities::{ + doc::{CreateDocParams, DocIdentifier}, + revision::{md5, RepeatedRevision, RevType, Revision}, + }, +}; use flowy_core_data_model::entities::{ app::{AppIdentifier, UpdateAppParams}, trash::{TrashIdentifier, TrashIdentifiers, TrashType}, - view::{UpdateViewParams, ViewIdentifier}, + view::{UpdateViewParams, ViewIdentifier, ViewIdentifiers}, workspace::{CreateWorkspaceParams, UpdateWorkspaceParams, WorkspaceIdentifier}, }; @@ -215,3 +223,60 @@ async fn workspace_list_read() { let workspaces = server.read_workspaces(read_params).await; assert_eq!(workspaces.len(), 3); } + +#[actix_rt::test] +async fn doc_read() { + let test = ViewTest::new().await; + let params = DocIdentifier { + doc_id: test.view.id.clone(), + }; + let doc = test.server.read_doc(params).await; + assert_eq!(doc.is_some(), true); +} + +#[actix_rt::test] +async fn doc_create() { + let mut revisions: Vec = vec![]; + let server = TestUserServer::new().await; + let doc_id = uuid::Uuid::new_v4().to_string(); + let user_id = "a".to_owned(); + let mut document = Document::new::(); + let mut offset = 0; + for i in 0..1000 { + let content = i.to_string(); + let delta = document.insert(offset, content.clone()).unwrap(); + offset += content.len(); + let bytes = delta.to_bytes(); + let md5 = md5(&bytes); + let revision = if i == 0 { + Revision::new(&doc_id, i, i, bytes, RevType::Remote, &user_id, md5) + } else { + Revision::new(&doc_id, i - 1, i, bytes, RevType::Remote, &user_id, md5) + }; + revisions.push(revision); + } + + let params = CreateDocParams { + id: doc_id.clone(), + revisions: RepeatedRevision { items: revisions }, + }; + server.create_doc(params).await; + + let doc = server.read_doc(DocIdentifier { doc_id }).await; + assert_eq!(doc.unwrap().text, document.to_json()); +} + +#[actix_rt::test] +async fn doc_delete() { + let test = ViewTest::new().await; + let delete_params = ViewIdentifiers { + view_ids: vec![test.view.id.clone()], + }; + test.server.delete_view(delete_params).await; + + let params = DocIdentifier { + doc_id: test.view.id.clone(), + }; + let doc = test.server.read_doc(params).await; + assert_eq!(doc.is_none(), true); +} diff --git a/backend/tests/document_test/crud_test.rs b/backend/tests/document_test/crud_test.rs deleted file mode 100644 index d4e1bb37df..0000000000 --- a/backend/tests/document_test/crud_test.rs +++ /dev/null @@ -1,28 +0,0 @@ -use crate::util::helper::ViewTest; -use flowy_collaboration::entities::doc::DocIdentifier; -use flowy_core_data_model::entities::view::ViewIdentifiers; - -#[actix_rt::test] -async fn doc_read() { - let test = ViewTest::new().await; - let params = DocIdentifier { - doc_id: test.view.id.clone(), - }; - let doc = test.server.read_doc(params).await; - assert_eq!(doc.is_some(), true); -} - -#[actix_rt::test] -async fn doc_delete() { - let test = ViewTest::new().await; - let delete_params = ViewIdentifiers { - view_ids: vec![test.view.id.clone()], - }; - test.server.delete_view(delete_params).await; - - let params = DocIdentifier { - doc_id: test.view.id.clone(), - }; - let doc = test.server.read_doc(params).await; - assert_eq!(doc.is_none(), true); -} diff --git a/backend/tests/document_test/mod.rs b/backend/tests/document_test/mod.rs index 3a785e530b..14cb6254f7 100644 --- a/backend/tests/document_test/mod.rs +++ b/backend/tests/document_test/mod.rs @@ -1,3 +1,2 @@ // mod edit_script; // mod edit_test; -mod crud_test; diff --git a/backend/tests/util/helper.rs b/backend/tests/util/helper.rs index 8930867a02..fb29f007c4 100644 --- a/backend/tests/util/helper.rs +++ b/backend/tests/util/helper.rs @@ -9,9 +9,9 @@ use backend_service::{ user_request::*, workspace_request::*, }; -use flowy_collaboration::entities::doc::{DocIdentifier, DocumentInfo}; +use flowy_collaboration::entities::doc::{CreateDocParams, DocIdentifier, DocumentInfo}; use flowy_core_data_model::entities::prelude::*; -use flowy_document::services::server::read_doc_request; +use flowy_document::services::server::{create_doc_request, read_doc_request}; use flowy_user_data_model::entities::*; use sqlx::{Connection, Executor, PgConnection, PgPool}; use uuid::Uuid; @@ -155,6 +155,11 @@ impl TestUserServer { doc } + pub async fn create_doc(&self, params: CreateDocParams) { + let url = format!("{}/api/doc", self.http_addr()); + let _ = create_doc_request(self.user_token(), params, &url).await.unwrap(); + } + pub async fn register_user(&self) -> SignUpResponse { let params = SignUpParams { email: "annie@appflowy.io".to_string(), diff --git a/frontend/rust-lib/flowy-document/src/services/server/server_api.rs b/frontend/rust-lib/flowy-document/src/services/server/server_api.rs index 45a6003a49..7536c72b1e 100644 --- a/frontend/rust-lib/flowy-document/src/services/server/server_api.rs +++ b/frontend/rust-lib/flowy-document/src/services/server/server_api.rs @@ -27,7 +27,7 @@ impl DocumentServerAPI for DocServer { fn update_doc(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> { let token = token.to_owned(); let url = self.config.doc_url(); - FutureResult::new(async move { update_doc_request(&token, params, &url).await }) + FutureResult::new(async move { reset_doc_request(&token, params, &url).await }) } } @@ -60,7 +60,7 @@ pub async fn read_doc_request( Ok(doc) } -pub async fn update_doc_request(token: &str, params: ResetDocumentParams, url: &str) -> Result<(), FlowyError> { +pub async fn reset_doc_request(token: &str, params: ResetDocumentParams, url: &str) -> Result<(), FlowyError> { let _ = request_builder() .patch(&url.to_owned()) .header(HEADER_TOKEN, token) diff --git a/shared-lib/flowy-collaboration/src/core/document/document.rs b/shared-lib/flowy-collaboration/src/core/document/document.rs index 4178488e78..d15f920edf 100644 --- a/shared-lib/flowy-collaboration/src/core/document/document.rs +++ b/shared-lib/flowy-collaboration/src/core/document/document.rs @@ -106,7 +106,7 @@ impl Document { self.history.record(undo_delta); } - tracing::debug!("compose result: {}", composed_delta.to_json()); + tracing::trace!("compose result: {}", composed_delta.to_json()); trim(&mut composed_delta); self.set_delta(composed_delta); diff --git a/shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs b/shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs index f20e265375..6abaae15c0 100644 --- a/shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs +++ b/shared-lib/flowy-collaboration/src/core/sync/synchronizer.rs @@ -154,6 +154,7 @@ impl RevisionSynchronizer { } } + #[allow(dead_code)] fn rev_id(&self) -> i64 { self.rev_id.load(SeqCst) } } diff --git a/shared-lib/lib-ot/src/errors.rs b/shared-lib/lib-ot/src/errors.rs index 2346908169..52597c4e76 100644 --- a/shared-lib/lib-ot/src/errors.rs +++ b/shared-lib/lib-ot/src/errors.rs @@ -98,10 +98,3 @@ impl ErrorBuilder { pub fn build(mut self) -> OTError { OTError::new(self.code, &self.msg.take().unwrap_or_else(|| "".to_owned())) } } - -pub(crate) fn internal_error(e: T) -> OTError -where - T: std::fmt::Debug, -{ - OTError::internal().context(e) -}