test with batch set and batch get revision

This commit is contained in:
appflowy 2021-12-24 15:47:14 +08:00
parent 463cab6eee
commit e069bfb057
14 changed files with 129 additions and 70 deletions

View File

@ -110,7 +110,7 @@ fn user_scope() -> Scope {
.route(web::get().to(view::read_handler))
.route(web::patch().to(view::update_handler))
)
.service(web::resource("/document")
.service(web::resource("/doc")
.route(web::post().to(doc::create_document_handler))
.route(web::get().to(doc::read_document_handler))
.route(web::patch().to(doc::reset_document_handler))

View File

@ -1,5 +1,5 @@
use crate::services::{
kv::{KVStore, PostgresKV},
kv::PostgresKV,
web_socket::{WSServer, WebSocketReceivers},
};
use actix::Addr;

View File

@ -1,8 +1,10 @@
use crate::{services::kv::KVStore, util::serde_ext::parse_from_bytes};
use crate::{
services::kv::{KVStore, KeyValue},
util::serde_ext::parse_from_bytes,
};
use backend_service::errors::ServerError;
use bytes::Bytes;
use flowy_collaboration::protobuf::{RepeatedRevision, Revision};
use futures::stream::{self, StreamExt};
use protobuf::Message;
use std::sync::Arc;
@ -25,16 +27,25 @@ impl DocumentKVPersistence {
pub(crate) async fn batch_set_revision(&self, revisions: Vec<Revision>) -> Result<(), ServerError> {
let kv_store = self.inner.clone();
let f = |revision: Revision, kv_store: Arc<dyn KVStore>| async move {
let key = make_revision_key(&revision.doc_id, revision.rev_id);
let bytes = revision.write_to_bytes().unwrap();
let _ = kv_store.set(&key, Bytes::from(bytes)).await;
};
stream::iter(revisions)
.for_each_concurrent(None, |revision| f(revision, kv_store.clone()))
.await;
let items = revisions
.into_iter()
.map(|revision| {
let key = make_revision_key(&revision.doc_id, revision.rev_id);
let value = Bytes::from(revision.write_to_bytes().unwrap());
KeyValue { key, value }
})
.collect::<Vec<KeyValue>>();
let _ = kv_store.batch_set(items).await?;
// use futures::stream::{self, StreamExt};
// let f = |revision: Revision, kv_store: Arc<dyn KVStore>| async move {
// let key = make_revision_key(&revision.doc_id, revision.rev_id);
// let bytes = revision.write_to_bytes().unwrap();
// let _ = kv_store.set(&key, Bytes::from(bytes)).await.unwrap();
// };
//
// stream::iter(revisions)
// .for_each_concurrent(None, |revision| f(revision, kv_store.clone()))
// .await;
Ok(())
}
@ -55,11 +66,14 @@ impl DocumentKVPersistence {
},
};
let revisions = items
let mut revisions = items
.into_iter()
.filter_map(|kv| parse_from_bytes::<Revision>(&kv.value).ok())
.collect::<Vec<Revision>>();
// TODO: optimize sort
revisions.sort_by(|a, b| a.rev_id.cmp(&b.rev_id));
let mut repeated_revision = RepeatedRevision::new();
repeated_revision.set_items(revisions.into());
Ok(repeated_revision)

View File

@ -1,4 +1,5 @@
use crate::{context::FlowyPersistence, services::document::persistence::DocumentKVPersistence};
use anyhow::Context;
use backend_service::errors::{internal_error, ServerError};
use flowy_collaboration::protobuf::{
CreateDocParams,
@ -27,7 +28,7 @@ pub(crate) async fn read_doc(
persistence: &Arc<FlowyPersistence>,
params: DocIdentifier,
) -> Result<DocumentInfo, ServerError> {
let _ = Uuid::parse_str(&params.doc_id)?;
let _ = Uuid::parse_str(&params.doc_id).context("Parse document id to uuid failed")?;
let kv_store = persistence.kv_store();
let revisions = kv_store.batch_get_revisions(&params.doc_id, None).await?;
@ -53,6 +54,10 @@ struct DocTable {
fn make_doc_from_revisions(doc_id: &str, mut revisions: RepeatedRevision) -> Result<DocumentInfo, ServerError> {
let revisions = revisions.take_items();
if revisions.is_empty() {
return Err(ServerError::record_not_found().context(format!("{} not exist", doc_id)));
}
let mut document_delta = RichTextDelta::new();
let mut base_rev_id = 0;
let mut rev_id = 0;

View File

@ -7,9 +7,10 @@ use anyhow::Context;
use backend_service::errors::ServerError;
use bytes::Bytes;
use lib_infra::future::FutureResultSend;
use sql_builder::{quote, SqlBuilder as RawSqlBuilder};
use sql_builder::SqlBuilder as RawSqlBuilder;
use sqlx::{
postgres::{PgArguments, PgRow},
Arguments,
Error,
PgPool,
Postgres,
@ -98,20 +99,23 @@ impl KVStore for PostgresKV {
.await
.context("[KV]:Failed to acquire a Postgres connection")?;
SqlBuilder::create(KV_TABLE).add_field("id").add_field("blob");
let mut builder = RawSqlBuilder::insert_into(KV_TABLE);
let mut m_builder = builder.field("id").field("blob");
let m_builder = builder.field("id").field("blob");
let mut args = PgArguments::default();
kvs.iter().enumerate().for_each(|(index, _)| {
let index = index * 2 + 1;
m_builder.values(&[format!("${}", index), format!("${}", index + 1)]);
});
for kv in kvs {
let s = match std::str::from_utf8(&kv.value) {
Ok(v) => v,
Err(e) => {
log::error!("[KV]: {}", e);
""
},
};
m_builder = m_builder.values(&[quote(kv.key), quote(s)]);
args.add(kv.key);
args.add(kv.value.to_vec());
}
let sql = m_builder.sql()?;
let _ = sqlx::query(&sql)
let _ = sqlx::query_with(&sql, args)
.execute(&mut transaction)
.await
.map_err(map_sqlx_error)?;

View File

@ -1,3 +1,4 @@
#![allow(clippy::module_inception)]
mod kv;
use bytes::Bytes;

View File

@ -1,9 +1,17 @@
#![allow(clippy::all)]
use crate::util::helper::*;
use crate::util::helper::{ViewTest, *};
use flowy_collaboration::{
core::document::{Document, PlainDoc},
entities::{
doc::{CreateDocParams, DocIdentifier},
revision::{md5, RepeatedRevision, RevType, Revision},
},
};
use flowy_core_data_model::entities::{
app::{AppIdentifier, UpdateAppParams},
trash::{TrashIdentifier, TrashIdentifiers, TrashType},
view::{UpdateViewParams, ViewIdentifier},
view::{UpdateViewParams, ViewIdentifier, ViewIdentifiers},
workspace::{CreateWorkspaceParams, UpdateWorkspaceParams, WorkspaceIdentifier},
};
@ -215,3 +223,60 @@ async fn workspace_list_read() {
let workspaces = server.read_workspaces(read_params).await;
assert_eq!(workspaces.len(), 3);
}
#[actix_rt::test]
async fn doc_read() {
let test = ViewTest::new().await;
let params = DocIdentifier {
doc_id: test.view.id.clone(),
};
let doc = test.server.read_doc(params).await;
assert_eq!(doc.is_some(), true);
}
#[actix_rt::test]
async fn doc_create() {
let mut revisions: Vec<Revision> = vec![];
let server = TestUserServer::new().await;
let doc_id = uuid::Uuid::new_v4().to_string();
let user_id = "a".to_owned();
let mut document = Document::new::<PlainDoc>();
let mut offset = 0;
for i in 0..1000 {
let content = i.to_string();
let delta = document.insert(offset, content.clone()).unwrap();
offset += content.len();
let bytes = delta.to_bytes();
let md5 = md5(&bytes);
let revision = if i == 0 {
Revision::new(&doc_id, i, i, bytes, RevType::Remote, &user_id, md5)
} else {
Revision::new(&doc_id, i - 1, i, bytes, RevType::Remote, &user_id, md5)
};
revisions.push(revision);
}
let params = CreateDocParams {
id: doc_id.clone(),
revisions: RepeatedRevision { items: revisions },
};
server.create_doc(params).await;
let doc = server.read_doc(DocIdentifier { doc_id }).await;
assert_eq!(doc.unwrap().text, document.to_json());
}
#[actix_rt::test]
async fn doc_delete() {
let test = ViewTest::new().await;
let delete_params = ViewIdentifiers {
view_ids: vec![test.view.id.clone()],
};
test.server.delete_view(delete_params).await;
let params = DocIdentifier {
doc_id: test.view.id.clone(),
};
let doc = test.server.read_doc(params).await;
assert_eq!(doc.is_none(), true);
}

View File

@ -1,28 +0,0 @@
use crate::util::helper::ViewTest;
use flowy_collaboration::entities::doc::DocIdentifier;
use flowy_core_data_model::entities::view::ViewIdentifiers;
#[actix_rt::test]
async fn doc_read() {
let test = ViewTest::new().await;
let params = DocIdentifier {
doc_id: test.view.id.clone(),
};
let doc = test.server.read_doc(params).await;
assert_eq!(doc.is_some(), true);
}
#[actix_rt::test]
async fn doc_delete() {
let test = ViewTest::new().await;
let delete_params = ViewIdentifiers {
view_ids: vec![test.view.id.clone()],
};
test.server.delete_view(delete_params).await;
let params = DocIdentifier {
doc_id: test.view.id.clone(),
};
let doc = test.server.read_doc(params).await;
assert_eq!(doc.is_none(), true);
}

View File

@ -1,3 +1,2 @@
// mod edit_script;
// mod edit_test;
mod crud_test;

View File

@ -9,9 +9,9 @@ use backend_service::{
user_request::*,
workspace_request::*,
};
use flowy_collaboration::entities::doc::{DocIdentifier, DocumentInfo};
use flowy_collaboration::entities::doc::{CreateDocParams, DocIdentifier, DocumentInfo};
use flowy_core_data_model::entities::prelude::*;
use flowy_document::services::server::read_doc_request;
use flowy_document::services::server::{create_doc_request, read_doc_request};
use flowy_user_data_model::entities::*;
use sqlx::{Connection, Executor, PgConnection, PgPool};
use uuid::Uuid;
@ -155,6 +155,11 @@ impl TestUserServer {
doc
}
pub async fn create_doc(&self, params: CreateDocParams) {
let url = format!("{}/api/doc", self.http_addr());
let _ = create_doc_request(self.user_token(), params, &url).await.unwrap();
}
pub async fn register_user(&self) -> SignUpResponse {
let params = SignUpParams {
email: "annie@appflowy.io".to_string(),

View File

@ -27,7 +27,7 @@ impl DocumentServerAPI for DocServer {
fn update_doc(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.doc_url();
FutureResult::new(async move { update_doc_request(&token, params, &url).await })
FutureResult::new(async move { reset_doc_request(&token, params, &url).await })
}
}
@ -60,7 +60,7 @@ pub async fn read_doc_request(
Ok(doc)
}
pub async fn update_doc_request(token: &str, params: ResetDocumentParams, url: &str) -> Result<(), FlowyError> {
pub async fn reset_doc_request(token: &str, params: ResetDocumentParams, url: &str) -> Result<(), FlowyError> {
let _ = request_builder()
.patch(&url.to_owned())
.header(HEADER_TOKEN, token)

View File

@ -106,7 +106,7 @@ impl Document {
self.history.record(undo_delta);
}
tracing::debug!("compose result: {}", composed_delta.to_json());
tracing::trace!("compose result: {}", composed_delta.to_json());
trim(&mut composed_delta);
self.set_delta(composed_delta);

View File

@ -154,6 +154,7 @@ impl RevisionSynchronizer {
}
}
#[allow(dead_code)]
fn rev_id(&self) -> i64 { self.rev_id.load(SeqCst) }
}

View File

@ -98,10 +98,3 @@ impl ErrorBuilder {
pub fn build(mut self) -> OTError { OTError::new(self.code, &self.msg.take().unwrap_or_else(|| "".to_owned())) }
}
pub(crate) fn internal_error<T>(e: T) -> OTError
where
T: std::fmt::Debug,
{
OTError::internal().context(e)
}