[backend]: fix reset document bugs

This commit is contained in:
appflowy 2022-01-02 22:23:33 +08:00
parent 1a869c0003
commit 951584c2ab
13 changed files with 161 additions and 73 deletions

View File

@ -57,6 +57,7 @@ pub fn run(listener: TcpListener, app_ctx: AppContext) -> Result<Server, std::io
.app_data(app_ctx.persistence.clone())
.app_data(Data::new(app_ctx.persistence.pg_pool()))
.app_data(app_ctx.ws_receivers.clone())
.app_data(app_ctx.document_manager.clone())
})
.listen(listener)?
.run();

View File

@ -5,7 +5,11 @@ use crate::services::{
use actix::Addr;
use actix_web::web::Data;
use crate::services::document::{persistence::DocumentKVPersistence, ws_receiver::make_document_ws_receiver};
use crate::services::document::{
persistence::DocumentKVPersistence,
ws_receiver::{make_document_ws_receiver, DocumentPersistenceImpl},
};
use flowy_collaboration::sync::ServerDocumentManager;
use lib_ws::WSModule;
use sqlx::PgPool;
use std::sync::Arc;
@ -15,6 +19,7 @@ pub struct AppContext {
pub ws_server: Data<Addr<WSServer>>,
pub persistence: Data<Arc<FlowyPersistence>>,
pub ws_receivers: Data<WebSocketReceivers>,
pub document_manager: Data<Arc<ServerDocumentManager>>,
}
impl AppContext {
@ -25,12 +30,16 @@ impl AppContext {
let kv_store = make_document_kv_store(pg_pool.clone());
let persistence = Arc::new(FlowyPersistence { pg_pool, kv_store });
let document_ws_receiver = make_document_ws_receiver(persistence.clone());
let document_persistence = Arc::new(DocumentPersistenceImpl(persistence.clone()));
let document_manager = Arc::new(ServerDocumentManager::new(document_persistence));
let document_ws_receiver = make_document_ws_receiver(persistence.clone(), document_manager.clone());
ws_receivers.set(WSModule::Doc, document_ws_receiver);
AppContext {
ws_server,
persistence: Data::new(persistence),
ws_receivers: Data::new(ws_receivers),
document_manager: Data::new(document_manager),
}
}
}

View File

@ -15,7 +15,9 @@ use flowy_collaboration::protobuf::{
};
use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta};
use protobuf::Message;
use std::convert::TryInto;
use flowy_collaboration::sync::ServerDocumentManager;
use std::sync::Arc;
use uuid::Uuid;
@ -39,23 +41,23 @@ pub async fn read_document(
make_doc_from_revisions(&params.doc_id, revisions)
}
#[tracing::instrument(level = "debug", skip(kv_store, params), fields(delta), err)]
#[tracing::instrument(level = "debug", skip(document_manager, params), fields(delta), err)]
pub async fn reset_document(
kv_store: &Arc<DocumentKVPersistence>,
document_manager: &Arc<ServerDocumentManager>,
mut params: ResetDocumentParams,
) -> Result<(), ServerError> {
let revisions = params.take_revisions().take_items();
let doc_id = params.take_doc_id();
kv_store
.transaction(|mut transaction| {
Box::pin(async move {
let _ = transaction.batch_delete_key_start_with(&doc_id).await?;
let items = revisions_to_key_value_items(revisions.into())?;
let _ = transaction.batch_set(items).await?;
Ok(())
})
})
let params: flowy_collaboration::entities::doc::ResetDocumentParams = (&mut params).try_into().unwrap();
let mut revisions = params.revisions.into_inner();
if revisions.is_empty() {
return Err(ServerError::payload_none().context("Revisions should not be empty when reset the document"));
}
let doc_id = params.doc_id.clone();
revisions.sort_by(|a, b| a.rev_id.cmp(&b.rev_id));
let _ = document_manager
.handle_document_reset(&doc_id, revisions)
.await
.map_err(internal_error)?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(kv_store), err)]
@ -152,7 +154,7 @@ impl DocumentKVPersistence {
}
#[inline]
fn revisions_to_key_value_items(revisions: Vec<Revision>) -> Result<Vec<KeyValue>, ServerError> {
pub fn revisions_to_key_value_items(revisions: Vec<Revision>) -> Result<Vec<KeyValue>, ServerError> {
let mut items = vec![];
for revision in revisions {
let key = make_revision_key(&revision.doc_id, revision.rev_id);
@ -193,7 +195,7 @@ fn make_doc_from_revisions(doc_id: &str, mut revisions: RepeatedRevision) -> Res
let mut document_delta = RichTextDelta::new();
let mut base_rev_id = 0;
let mut rev_id = 0;
// TODO: generate delta from revision should be wrapped into function.
// TODO: replace with make_delta_from_revisions
for revision in revisions {
base_rev_id = revision.base_rev_id;
rev_id = revision.rev_id;

View File

@ -10,6 +10,7 @@ use actix_web::{
use backend_service::{errors::ServerError, response::FlowyResponse};
use flowy_collaboration::protobuf::{CreateDocParams, DocumentId, ResetDocumentParams};
use flowy_collaboration::sync::ServerDocumentManager;
use std::sync::Arc;
pub async fn create_document_handler(
@ -36,10 +37,9 @@ pub async fn read_document_handler(
pub async fn reset_document_handler(
payload: Payload,
persistence: Data<Arc<FlowyPersistence>>,
document_manager: Data<Arc<ServerDocumentManager>>,
) -> Result<HttpResponse, ServerError> {
let params: ResetDocumentParams = parse_from_payload(payload).await?;
let kv_store = persistence.kv_store();
let _ = reset_document(&kv_store, params).await?;
let _ = reset_document(document_manager.get_ref(), params).await?;
Ok(FlowyResponse::success().into())
}

View File

@ -6,7 +6,7 @@ use crate::services::{
web_socket::{WSClientData, WebSocketReceiver},
};
use crate::context::FlowyPersistence;
use crate::{context::FlowyPersistence, services::document::persistence::revisions_to_key_value_items};
use backend_service::errors::ServerError;
use flowy_collaboration::{
entities::{
@ -25,10 +25,10 @@ use std::{
};
use tokio::sync::{mpsc, oneshot};
pub fn make_document_ws_receiver(persistence: Arc<FlowyPersistence>) -> Arc<DocumentWebSocketReceiver> {
let document_persistence = Arc::new(DocumentPersistenceImpl(persistence.clone()));
let document_manager = Arc::new(ServerDocumentManager::new(document_persistence));
pub fn make_document_ws_receiver(
persistence: Arc<FlowyPersistence>,
document_manager: Arc<ServerDocumentManager>,
) -> Arc<DocumentWebSocketReceiver> {
let (ws_sender, rx) = tokio::sync::mpsc::channel(100);
let actor = DocumentWebSocketActor::new(rx, document_manager);
tokio::task::spawn(actor.run());
@ -72,7 +72,7 @@ impl WebSocketReceiver for DocumentWebSocketReceiver {
}
}
struct DocumentPersistenceImpl(Arc<FlowyPersistence>);
pub struct DocumentPersistenceImpl(pub Arc<FlowyPersistence>);
impl Debug for DocumentPersistenceImpl {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("DocumentPersistenceImpl") }
}
@ -83,9 +83,9 @@ impl DocumentPersistence for DocumentPersistenceImpl {
doc_id: doc_id.to_string(),
..Default::default()
};
let persistence = self.0.kv_store();
let kv_store = self.0.kv_store();
Box::pin(async move {
let mut pb_doc = read_document(&persistence, params)
let mut pb_doc = read_document(&kv_store, params)
.await
.map_err(server_error_to_collaborate_error)?;
let doc = (&mut pb_doc)
@ -136,6 +136,24 @@ impl DocumentPersistence for DocumentPersistenceImpl {
Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) })
}
fn reset_document(&self, doc_id: &str, revisions: Vec<Revision>) -> BoxResultFuture<(), CollaborateError> {
let kv_store = self.0.kv_store();
let doc_id = doc_id.to_owned();
let f = || async move {
kv_store
.transaction(|mut transaction| {
Box::pin(async move {
let _ = transaction.batch_delete_key_start_with(&doc_id).await?;
// let items = revisions_to_key_value_items(vec![])?;
let _ = transaction.batch_set(vec![]).await?;
Ok(())
})
})
.await
};
Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) })
}
}
fn server_error_to_collaborate_error(error: ServerError) -> CollaborateError {

View File

@ -14,8 +14,9 @@ use crate::util::helper::{spawn_server, TestServer};
use flowy_collaboration::{entities::doc::DocumentId, protobuf::ResetDocumentParams};
use lib_ot::rich_text::{RichTextAttribute, RichTextDelta};
use parking_lot::RwLock;
use backend::services::document::persistence::{DocumentKVPersistence, read_document, reset_document};
use backend::services::document::persistence::{read_document, reset_document};
use flowy_collaboration::entities::revision::{RepeatedRevision, Revision};
use flowy_collaboration::sync::ServerDocumentManager;
use lib_ot::core::Interval;
use flowy_net::services::ws::FlowyWSConnect;
@ -146,8 +147,8 @@ async fn run_scripts(context: Arc<RwLock<ScriptContext>>, scripts: Vec<DocScript
md5,
);
let kv_store = Data::new(context.read().server.app_ctx.persistence.kv_store());
reset_doc(&doc_id, RepeatedRevision::new(vec![revision]), kv_store.get_ref()).await;
let document_manager = context.read().server.app_ctx.document_manager.clone();
reset_doc(&doc_id, RepeatedRevision::new(vec![revision]), document_manager.get_ref()).await;
sleep(Duration::from_millis(2000)).await;
},
// DocScript::Sleep(sec) => {
@ -182,10 +183,10 @@ async fn create_doc(flowy_test: &FlowySDKTest) -> String {
view_test.view.id
}
async fn reset_doc(doc_id: &str, repeated_revision: RepeatedRevision, kv_store: &Arc<DocumentKVPersistence>) {
async fn reset_doc(doc_id: &str, repeated_revision: RepeatedRevision, document_manager: &Arc<ServerDocumentManager>) {
let pb: flowy_collaboration::protobuf::RepeatedRevision = repeated_revision.try_into().unwrap();
let mut params = ResetDocumentParams::new();
params.set_doc_id(doc_id.to_owned());
params.set_revisions(pb);
let _ = reset_document(kv_store, params).await.unwrap();
let _ = reset_document(document_manager, params).await.unwrap();
}

View File

@ -4,6 +4,7 @@ use flowy_collaboration::{
document::{history::UndoResult, Document, NewlineDoc},
entities::revision::Revision,
errors::CollaborateError,
util::make_delta_from_revisions,
};
use flowy_error::FlowyError;
use futures::stream::StreamExt;
@ -77,20 +78,7 @@ impl EditorCommandQueue {
},
EditorCommand::TransformRevision { revisions, ret } => {
let f = || async {
let mut new_delta = RichTextDelta::new();
for revision in revisions {
match RichTextDelta::from_bytes(revision.delta_data) {
Ok(delta) => {
new_delta = new_delta.compose(&delta)?;
},
Err(e) => {
let err_msg = format!("Deserialize remote revision failed: {:?}", e);
log::error!("{}", err_msg);
return Err(CollaborateError::internal().context(err_msg));
},
}
}
let new_delta = make_delta_from_revisions(revisions)?;
let read_guard = self.document.read().await;
let mut server_prime: Option<RichTextDelta> = None;
let client_prime: RichTextDelta;

View File

@ -19,7 +19,7 @@ use flowy_error::{internal_error, FlowyError, FlowyResult};
use lib_infra::future::FutureResult;
use crate::services::doc::web_socket::local_ws_impl::LocalWebSocketManager;
use flowy_collaboration::entities::{revision::pair_rev_id_from_revisions, ws::DocumentServerWSDataType};
use flowy_collaboration::entities::ws::DocumentServerWSDataType;
use lib_ot::rich_text::RichTextDelta;
use lib_ws::WSConnectState;
use std::{collections::VecDeque, convert::TryFrom, sync::Arc};

View File

@ -94,6 +94,10 @@ impl DocumentPersistence for MockDocServerPersistence {
}
fn get_doc_revisions(&self, _doc_id: &str) -> BoxResultFuture<Vec<Revision>, CollaborateError> { unimplemented!() }
fn reset_document(&self, _doc_id: &str, _revisions: Vec<Revision>) -> BoxResultFuture<(), CollaborateError> {
unimplemented!()
}
}
#[derive(Debug)]

View File

@ -107,10 +107,9 @@ impl std::ops::DerefMut for RepeatedRevision {
}
impl RepeatedRevision {
pub fn new(items: Vec<Revision>) -> Self {
let mut sorted_items = items.clone();
sorted_items.sort_by(|a, b| a.rev_id.cmp(&b.rev_id));
Self { items: sorted_items }
pub fn new(mut items: Vec<Revision>) -> Self {
items.sort_by(|a, b| a.rev_id.cmp(&b.rev_id));
Self { items }
}
pub fn empty() -> Self { RepeatedRevision { items: vec![] } }

View File

@ -25,6 +25,7 @@ pub trait DocumentPersistence: Send + Sync + Debug {
fn create_doc(&self, doc_id: &str, revisions: Vec<Revision>) -> BoxResultFuture<DocumentInfo, CollaborateError>;
fn get_revisions(&self, doc_id: &str, rev_ids: Vec<i64>) -> BoxResultFuture<Vec<Revision>, CollaborateError>;
fn get_doc_revisions(&self, doc_id: &str) -> BoxResultFuture<Vec<Revision>, CollaborateError>;
fn reset_document(&self, doc_id: &str, revisions: Vec<Revision>) -> BoxResultFuture<(), CollaborateError>;
}
pub struct ServerDocumentManager {
@ -66,7 +67,7 @@ impl ServerDocumentManager {
Ok(())
},
Some(handler) => {
let _ = handler.apply_revisions(doc_id.clone(), user, revisions).await?;
let _ = handler.apply_revisions(user, revisions).await?;
Ok(())
},
};
@ -86,14 +87,26 @@ impl ServerDocumentManager {
) -> Result<(), CollaborateError> {
let rev_id = rev_id_from_str(&client_data.id)?;
let doc_id = client_data.doc_id.clone();
match self.get_document_handler(&doc_id).await {
None => {
tracing::warn!("Document:{} doesn't exist, ignore pinging", doc_id);
Ok(())
},
Some(handler) => {
let _ = handler.apply_ping(doc_id.clone(), rev_id, user).await?;
let _ = handler.apply_ping(rev_id, user).await?;
Ok(())
},
}
}
pub async fn handle_document_reset(&self, doc_id: &str, revisions: Vec<Revision>) -> Result<(), CollaborateError> {
match self.get_document_handler(doc_id).await {
None => {
tracing::warn!("Document:{} doesn't exist, ignore document reset", doc_id);
Ok(())
},
Some(handler) => {
let _ = handler.apply_document_reset(revisions).await?;
Ok(())
},
}
@ -167,7 +180,6 @@ impl OpenDocHandle {
#[tracing::instrument(level = "debug", skip(self, user, revisions), err)]
async fn apply_revisions(
&self,
doc_id: String,
user: Arc<dyn RevisionUser>,
revisions: Vec<Revision>,
) -> Result<(), CollaborateError> {
@ -175,7 +187,6 @@ impl OpenDocHandle {
let persistence = self.persistence.clone();
self.users.insert(user.user_id(), user.clone());
let msg = DocumentCommand::ApplyRevisions {
doc_id,
user,
revisions,
persistence,
@ -187,17 +198,11 @@ impl OpenDocHandle {
}
#[tracing::instrument(level = "debug", skip(self, user), err)]
async fn apply_ping(
&self,
doc_id: String,
rev_id: i64,
user: Arc<dyn RevisionUser>,
) -> Result<(), CollaborateError> {
async fn apply_ping(&self, rev_id: i64, user: Arc<dyn RevisionUser>) -> Result<(), CollaborateError> {
let (ret, rx) = oneshot::channel();
self.users.insert(user.user_id(), user.clone());
let persistence = self.persistence.clone();
let msg = DocumentCommand::Ping {
doc_id,
user,
persistence,
rev_id,
@ -207,6 +212,19 @@ impl OpenDocHandle {
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, revisions), err)]
async fn apply_document_reset(&self, revisions: Vec<Revision>) -> Result<(), CollaborateError> {
let (ret, rx) = oneshot::channel();
let persistence = self.persistence.clone();
let msg = DocumentCommand::Reset {
persistence,
revisions,
ret,
};
let _ = self.send(msg, rx).await?;
Ok(())
}
async fn send<T>(&self, msg: DocumentCommand, rx: oneshot::Receiver<T>) -> CollaborateResult<T> {
let _ = self
.sender
@ -226,19 +244,22 @@ impl std::ops::Drop for OpenDocHandle {
// #[derive(Debug)]
enum DocumentCommand {
ApplyRevisions {
doc_id: String,
user: Arc<dyn RevisionUser>,
revisions: Vec<Revision>,
persistence: Arc<dyn DocumentPersistence>,
ret: oneshot::Sender<CollaborateResult<()>>,
},
Ping {
doc_id: String,
user: Arc<dyn RevisionUser>,
persistence: Arc<dyn DocumentPersistence>,
rev_id: i64,
ret: oneshot::Sender<CollaborateResult<()>>,
},
Reset {
persistence: Arc<dyn DocumentPersistence>,
revisions: Vec<Revision>,
ret: oneshot::Sender<CollaborateResult<()>>,
},
}
struct DocumentCommandQueue {
@ -283,7 +304,6 @@ impl DocumentCommandQueue {
async fn handle_message(&self, msg: DocumentCommand) {
match msg {
DocumentCommand::ApplyRevisions {
doc_id,
user,
revisions,
persistence,
@ -291,13 +311,12 @@ impl DocumentCommandQueue {
} => {
let result = self
.synchronizer
.sync_revisions(doc_id, user, revisions, persistence)
.sync_revisions(user, revisions, persistence)
.await
.map_err(internal_error);
let _ = ret.send(result);
},
DocumentCommand::Ping {
doc_id,
user,
persistence,
rev_id,
@ -305,7 +324,19 @@ impl DocumentCommandQueue {
} => {
let result = self
.synchronizer
.pong(doc_id, user, persistence, rev_id)
.pong(user, persistence, rev_id)
.await
.map_err(internal_error);
let _ = ret.send(result);
},
DocumentCommand::Reset {
persistence,
revisions,
ret,
} => {
let result = self
.synchronizer
.reset(persistence, revisions)
.await
.map_err(internal_error);
let _ = ret.send(result);

View File

@ -8,6 +8,7 @@ use crate::{
sync::DocumentPersistence,
};
use crate::util::make_delta_from_revisions;
use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta};
use parking_lot::RwLock;
use std::{
@ -51,11 +52,11 @@ impl RevisionSynchronizer {
#[tracing::instrument(level = "debug", skip(self, user, revisions, persistence), err)]
pub async fn sync_revisions(
&self,
doc_id: String,
user: Arc<dyn RevisionUser>,
revisions: Vec<Revision>,
persistence: Arc<dyn DocumentPersistence>,
) -> Result<(), CollaborateError> {
let doc_id = self.doc_id.clone();
if revisions.is_empty() {
// Return all the revisions to client
let revisions = persistence.get_doc_revisions(&doc_id).await?;
@ -112,11 +113,11 @@ impl RevisionSynchronizer {
#[tracing::instrument(level = "debug", skip(self, user, persistence), err)]
pub async fn pong(
&self,
doc_id: String,
user: Arc<dyn RevisionUser>,
persistence: Arc<dyn DocumentPersistence>,
rev_id: i64,
) -> Result<(), CollaborateError> {
let doc_id = self.doc_id.clone();
let server_base_rev_id = self.rev_id.load(SeqCst);
match server_base_rev_id.cmp(&rev_id) {
Ordering::Less => tracing::error!(
@ -138,6 +139,21 @@ impl RevisionSynchronizer {
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, revisions), err)]
pub async fn reset(
&self,
persistence: Arc<dyn DocumentPersistence>,
revisions: Vec<Revision>,
) -> Result<(), CollaborateError> {
let doc_id = self.doc_id.clone();
let _ = persistence.reset_document(&doc_id, revisions.clone()).await?;
let delta = make_delta_from_revisions(revisions)?;
let new_document = Document::from_delta(delta);
*self.document.write() = new_document;
Ok(())
}
pub fn doc_json(&self) -> String { self.document.read().to_json() }
fn compose_revision(&self, revision: &Revision) -> Result<(), CollaborateError> {

View File

@ -1,4 +1,11 @@
use lib_ot::core::{NEW_LINE, WHITESPACE};
use crate::{
entities::revision::Revision,
errors::{CollaborateError, CollaborateResult},
};
use lib_ot::{
core::{OperationTransformable, NEW_LINE, WHITESPACE},
rich_text::RichTextDelta,
};
use std::sync::atomic::{AtomicI64, Ordering::SeqCst};
#[inline]
@ -32,3 +39,15 @@ impl RevIdCounter {
pub fn set(&self, n: i64) { let _ = self.0.fetch_update(SeqCst, SeqCst, |_| Some(n)); }
}
pub fn make_delta_from_revisions(revisions: Vec<Revision>) -> CollaborateResult<RichTextDelta> {
let mut new_delta = RichTextDelta::new();
for revision in revisions {
let delta = RichTextDelta::from_bytes(revision.delta_data).map_err(|e| {
let err_msg = format!("Deserialize remote revision failed: {:?}", e);
CollaborateError::internal().context(err_msg)
})?;
new_delta = new_delta.compose(&delta)?;
}
Ok(new_delta)
}