diff --git a/backend/src/service/doc/edit/edit_doc.rs b/backend/src/service/doc/edit/edit_doc.rs index 7b5849d541..31ee41965a 100644 --- a/backend/src/service/doc/edit/edit_doc.rs +++ b/backend/src/service/doc/edit/edit_doc.rs @@ -112,6 +112,7 @@ impl ServerEditDoc { }, Ordering::Equal => { // Do nothing + log::warn!("Applied revision rev_id is the same as cur_rev_id"); }, Ordering::Greater => { // The client document is outdated. Transform the client revision delta and then @@ -175,6 +176,10 @@ impl ServerEditDoc { ) )] fn compose_delta(&self, delta: Delta) -> Result<(), ServerError> { + if delta.is_empty() { + log::warn!("Composed delta is empty"); + } + match self.document.try_write_for(Duration::from_millis(300)) { None => { log::error!("Failed to acquire write lock of document"); diff --git a/rust-lib/flowy-document/src/entities/doc/revision.rs b/rust-lib/flowy-document/src/entities/doc/revision.rs index f1f19a826e..090f78113a 100644 --- a/rust-lib/flowy-document/src/entities/doc/revision.rs +++ b/rust-lib/flowy-document/src/entities/doc/revision.rs @@ -156,7 +156,7 @@ impl RevisionRange { pub fn len(&self) -> i64 { debug_assert!(self.end >= self.start); if self.end >= self.start { - self.end - self.start + self.end - self.start + 1 } else { 0 } diff --git a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs index 6fdc3446c7..1ac83a50d3 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs @@ -12,7 +12,7 @@ use crate::{ message::{DocumentMsg, TransformDeltas}, model::OpenDocAction, }, - revision::{DocRevision, RevisionManager, RevisionServer, RevisionStoreActor}, + revision::{RevisionManager, RevisionServer}, UndoResult, }, ws::{DocumentWebSocket, WsDocumentHandler}, @@ -45,7 +45,7 @@ impl ClientEditDoc { server: Arc, user: Arc, ) -> DocResult { - let (sender, mut receiver) = mpsc::channel(1); + let (sender, receiver) = mpsc::channel(1); let mut rev_manager = RevisionManager::new(doc_id, pool.clone(), server.clone(), sender); spawn_rev_receiver(receiver, ws.clone()); @@ -255,7 +255,7 @@ impl ClientEditDoc { ); let _ = self.ws.send(revision.into()); - save_document(self.document.clone(), local_rev_id.into()).await; + let _ = save_document(self.document.clone(), local_rev_id.into()).await?; Ok(()) } @@ -306,6 +306,7 @@ fn spawn_rev_receiver(mut receiver: mpsc::Receiver, ws: Arc {}, Err(e) => log::error!("Send revision failed: {:?}", e), diff --git a/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/rust-lib/flowy-document/src/services/doc/revision/manager.rs index fd77902849..d46661b75f 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -1,13 +1,13 @@ use crate::{ entities::doc::{Doc, RevId, RevType, Revision, RevisionRange}, - errors::{internal_error, DocError, DocResult}, - services::{doc::revision::RevisionStoreActor, util::RevIdCounter, ws::DocumentWebSocket}, + errors::{DocError, DocResult}, + services::{doc::revision::RevisionStore, util::RevIdCounter}, }; use flowy_database::ConnectionPool; use flowy_infra::future::ResultFuture; use flowy_ot::core::{Delta, OperationTransformable}; use std::sync::Arc; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; pub struct DocRevision { pub base_rev_id: RevId, @@ -22,7 +22,7 @@ pub trait RevisionServer: Send + Sync { pub struct RevisionManager { doc_id: String, rev_id_counter: RevIdCounter, - rev_store: Arc, + rev_store: Arc, } impl RevisionManager { @@ -32,7 +32,7 @@ impl RevisionManager { server: Arc, pending_rev_sender: mpsc::Sender, ) -> Self { - let rev_store = Arc::new(RevisionStoreActor::new(doc_id, pool, server, pending_rev_sender)); + let rev_store = RevisionStore::new(doc_id, pool, server, pending_rev_sender); let rev_id_counter = RevIdCounter::new(0); Self { doc_id: doc_id.to_string(), diff --git a/rust-lib/flowy-document/src/services/doc/revision/model.rs b/rust-lib/flowy-document/src/services/doc/revision/model.rs index cfe2dbff8d..3317ce0b6c 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/model.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/model.rs @@ -1,9 +1,17 @@ -use crate::{entities::doc::Revision, errors::DocResult, services::ws::DocumentWebSocket, sql_tables::RevState}; +use crate::{ + entities::doc::{Revision, RevisionRange}, + errors::{internal_error, DocError, DocResult}, + sql_tables::{RevState, RevTableSql}, +}; +use async_stream::stream; +use flowy_database::ConnectionPool; +use flowy_infra::future::ResultFuture; +use futures::{stream::StreamExt, TryFutureExt}; +use std::{sync::Arc, time::Duration}; +use tokio::sync::{broadcast, mpsc}; -use tokio::sync::oneshot; - -pub type PendingRevSender = oneshot::Sender>; -pub type PendingRevReceiver = oneshot::Receiver>; +pub type RevIdReceiver = broadcast::Receiver; +pub type RevIdSender = broadcast::Sender; pub struct RevisionContext { pub revision: Revision, @@ -21,9 +29,118 @@ impl RevisionContext { pub(crate) struct PendingRevId { pub rev_id: i64, - pub sender: PendingRevSender, + pub sender: RevIdSender, } impl PendingRevId { - pub(crate) fn new(rev_id: i64, sender: PendingRevSender) -> Self { Self { rev_id, sender } } + pub(crate) fn new(rev_id: i64, sender: RevIdSender) -> Self { Self { rev_id, sender } } + + pub(crate) fn finish(&self, rev_id: i64) -> bool { + if self.rev_id > rev_id { + false + } else { + self.sender.send(self.rev_id); + true + } + } +} + +pub(crate) struct Persistence { + pub(crate) rev_sql: Arc, + pub(crate) pool: Arc, +} + +impl Persistence { + pub(crate) fn new(pool: Arc) -> Self { + let rev_sql = Arc::new(RevTableSql {}); + Self { rev_sql, pool } + } + + pub(crate) fn create_revs(&self, revisions_state: Vec<(Revision, RevState)>) -> DocResult<()> { + let conn = &*self.pool.get().map_err(internal_error)?; + conn.immediate_transaction::<_, DocError, _>(|| { + let _ = self.rev_sql.create_rev_table(revisions_state, conn)?; + Ok(()) + }) + } + + pub(crate) fn read_rev_with_range(&self, doc_id: &str, range: RevisionRange) -> DocResult> { + let conn = &*self.pool.get().map_err(internal_error).unwrap(); + let revisions = self.rev_sql.read_rev_tables_with_range(doc_id, range, conn)?; + Ok(revisions) + } + + pub(crate) fn read_rev(&self, doc_id: &str, rev_id: &i64) -> DocResult> { + let conn = self.pool.get().map_err(internal_error)?; + let some = self.rev_sql.read_rev_table(&doc_id, rev_id, &*conn)?; + Ok(some) + } +} + +pub trait RevisionIterator: Send + Sync { + fn next(&self) -> ResultFuture, DocError>; +} + +pub(crate) enum PendingMsg { + Revision { ret: RevIdReceiver }, +} + +pub(crate) type PendingSender = mpsc::UnboundedSender; +pub(crate) type PendingReceiver = mpsc::UnboundedReceiver; + +pub(crate) struct PendingRevisionStream { + revisions: Arc, + receiver: Option, + next_revision: mpsc::Sender, +} + +impl PendingRevisionStream { + pub(crate) fn new( + revisions: Arc, + pending_rx: PendingReceiver, + next_revision: mpsc::Sender, + ) -> Self { + Self { + revisions, + receiver: Some(pending_rx), + next_revision, + } + } + + pub async fn run(mut self) { + let mut receiver = self.receiver.take().expect("Should only call once"); + let stream = stream! { + loop { + match receiver.recv().await { + Some(msg) => yield msg, + None => break, + } + } + }; + stream + .for_each(|msg| async { + match self.handle_msg(msg).await { + Ok(_) => {}, + Err(e) => log::error!("{:?}", e), + } + }) + .await; + } + + async fn handle_msg(&self, msg: PendingMsg) -> DocResult<()> { + match msg { + PendingMsg::Revision { ret } => self.prepare_next_pending_rev(ret).await, + } + } + + async fn prepare_next_pending_rev(&self, mut ret: RevIdReceiver) -> DocResult<()> { + match self.revisions.next().await? { + None => Ok(()), + Some(revision) => { + self.next_revision.send(revision).await.map_err(internal_error); + let _ = tokio::time::timeout(Duration::from_millis(2000), ret.recv()).await; + Ok(()) + }, + } + } } diff --git a/rust-lib/flowy-document/src/services/doc/revision/rev_store.rs b/rust-lib/flowy-document/src/services/doc/revision/rev_store.rs index 315e2ed0cb..ff03c76809 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/rev_store.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/rev_store.rs @@ -2,67 +2,59 @@ use crate::{ entities::doc::{revision_from_doc, Doc, RevId, RevType, Revision, RevisionRange}, errors::{internal_error, DocError, DocResult}, services::doc::revision::{ - model::{PendingRevId, PendingRevReceiver, RevisionContext}, + model::{RevisionIterator, *}, RevisionServer, }, - sql_tables::{RevState, RevTableSql}, + sql_tables::RevState, }; -use async_stream::stream; -use dashmap::{mapref::one::Ref, DashMap, DashSet}; + +use dashmap::DashMap; use flowy_database::ConnectionPool; +use flowy_infra::future::ResultFuture; use flowy_ot::core::{Delta, OperationTransformable}; use futures::{stream::StreamExt, TryFutureExt}; -use std::{ - collections::{HashMap, VecDeque}, - sync::Arc, - time::Duration, -}; +use std::{collections::VecDeque, sync::Arc, time::Duration}; use tokio::{ - sync::{mpsc, oneshot, RwLock, RwLockWriteGuard}, + sync::{broadcast, mpsc, oneshot, RwLock}, task::{spawn_blocking, JoinHandle}, }; -pub struct RevisionStoreActor { +pub struct RevisionStore { doc_id: String, persistence: Arc, revs_map: Arc>, - pending_revs_sender: RevSender, + pending_tx: PendingSender, pending_revs: Arc>>, delay_save: RwLock>>, server: Arc, } -impl RevisionStoreActor { +impl RevisionStore { pub fn new( doc_id: &str, pool: Arc, server: Arc, - pending_rev_sender: mpsc::Sender, - ) -> RevisionStoreActor { + next_revision: mpsc::Sender, + ) -> Arc { let doc_id = doc_id.to_owned(); let persistence = Arc::new(Persistence::new(pool)); let revs_map = Arc::new(DashMap::new()); - let (pending_revs_sender, receiver) = mpsc::unbounded_channel(); + let (pending_tx, pending_rx) = mpsc::unbounded_channel(); let pending_revs = Arc::new(RwLock::new(VecDeque::new())); - let pending = PendingRevision::new( - &doc_id, - receiver, - persistence.clone(), - revs_map.clone(), - pending_rev_sender, - pending_revs.clone(), - ); - tokio::spawn(pending.run()); - Self { + let store = Arc::new(Self { doc_id, persistence, revs_map, - pending_revs_sender, pending_revs, + pending_tx, delay_save: RwLock::new(None), server, - } + }); + + tokio::spawn(PendingRevisionStream::new(store.clone(), pending_rx, next_revision).run()); + + store } #[tracing::instrument(level = "debug", skip(self, revision))] @@ -71,36 +63,36 @@ impl RevisionStoreActor { return Err(DocError::duplicate_rev().context(format!("Duplicate revision id: {}", revision.rev_id))); } - self.pending_revs_sender.send(PendingRevisionMsg::Revision { - revision: revision.clone(), + let (sender, receiver) = broadcast::channel(1); + let revs_map = self.revs_map.clone(); + let mut rx = sender.subscribe(); + tokio::spawn(async move { + match rx.recv().await { + Ok(rev_id) => match revs_map.get_mut(&rev_id) { + None => {}, + Some(mut rev) => rev.value_mut().state = RevState::Acked, + }, + Err(_) => {}, + } }); + + let pending_rev = PendingRevId::new(revision.rev_id, sender); + self.pending_revs.write().await.push_back(pending_rev); self.revs_map.insert(revision.rev_id, RevisionContext::new(revision)); + + let _ = self.pending_tx.send(PendingMsg::Revision { ret: receiver }); self.save_revisions().await; Ok(()) } - #[tracing::instrument(level = "debug", skip(self, rev_id))] + #[tracing::instrument(level = "debug", skip(self))] pub async fn handle_revision_acked(&self, rev_id: RevId) { let rev_id = rev_id.value; - log::debug!("Receive revision acked: {}", rev_id); - match self.pending_revs.write().await.pop_front() { - None => {}, - Some(pending) => { - debug_assert!(pending.rev_id == rev_id); - if pending.rev_id != rev_id { - log::error!( - "Acked: expected rev_id: {:?}, but receive: {:?}", - pending.rev_id, - rev_id - ); - } - pending.sender.send(Ok(())); - }, - } - match self.revs_map.get_mut(&rev_id) { - None => {}, - Some(mut rev) => rev.value_mut().state = RevState::Acked, - } + self.pending_revs + .write() + .await + .retain(|pending| !pending.finish(rev_id)); + self.save_revisions().await; } @@ -124,14 +116,7 @@ impl RevisionStoreActor { .map(|kv| (kv.revision.clone(), kv.state)) .collect::>(); - // TODO: Ok to unwrap? - let conn = &*persistence.pool.get().map_err(internal_error).unwrap(); - let result = conn.immediate_transaction::<_, DocError, _>(|| { - let _ = persistence.rev_sql.create_rev_table(revisions_state, conn).unwrap(); - Ok(()) - }); - - match result { + match persistence.create_revs(revisions_state) { Ok(_) => revs_map.retain(|k, _| !ids.contains(k)), Err(e) => log::error!("Save revision failed: {:?}", e), } @@ -152,14 +137,9 @@ impl RevisionStoreActor { } else { let doc_id = self.doc_id.clone(); let persistence = self.persistence.clone(); - let result = spawn_blocking(move || { - let conn = &*persistence.pool.get().map_err(internal_error).unwrap(); - let revisions = persistence.rev_sql.read_rev_tables_with_range(&doc_id, range, conn)?; - Ok(revisions) - }) - .await - .map_err(internal_error)?; - + let result = spawn_blocking(move || persistence.read_rev_with_range(&doc_id, range)) + .await + .map_err(internal_error)?; result } } @@ -172,20 +152,29 @@ impl RevisionStoreActor { let doc = self.server.fetch_document_from_remote(&self.doc_id).await?; let revision = revision_from_doc(doc.clone(), RevType::Remote); - let conn = &*self.persistence.pool.get().map_err(internal_error).unwrap(); - let _ = conn.immediate_transaction::<_, DocError, _>(|| { - let _ = self - .persistence - .rev_sql - .create_rev_table(vec![(revision, RevState::Acked)], conn) - .unwrap(); - Ok(()) - })?; - + let _ = self.persistence.create_revs(vec![(revision, RevState::Acked)])?; Ok(doc) } } +impl RevisionIterator for RevisionStore { + fn next(&self) -> ResultFuture, DocError> { + let pending_revs = self.pending_revs.clone(); + let revs_map = self.revs_map.clone(); + let persistence = self.persistence.clone(); + let doc_id = self.doc_id.clone(); + ResultFuture::new(async move { + match pending_revs.read().await.front() { + None => Ok(None), + Some(pending) => match revs_map.get(&pending.rev_id) { + None => persistence.read_rev(&doc_id, &pending.rev_id), + Some(context) => Ok(Some(context.revision.clone())), + }, + } + }) + } +} + async fn fetch_from_local(doc_id: &str, persistence: Arc) -> DocResult { let doc_id = doc_id.to_owned(); spawn_blocking(move || { @@ -220,118 +209,6 @@ async fn fetch_from_local(doc_id: &str, persistence: Arc) -> DocRes .map_err(internal_error)? } -struct Persistence { - rev_sql: Arc, - pool: Arc, -} - -impl Persistence { - fn new(pool: Arc) -> Self { - let rev_sql = Arc::new(RevTableSql {}); - Self { rev_sql, pool } - } -} - -enum PendingRevisionMsg { - Revision { revision: Revision }, -} - -type RevSender = mpsc::UnboundedSender; -type RevReceiver = mpsc::UnboundedReceiver; - -struct PendingRevision { - doc_id: String, - pending_revs: Arc>>, - persistence: Arc, - revs_map: Arc>, - msg_receiver: Option, - next_rev: mpsc::Sender, -} - -impl PendingRevision { - pub fn new( - doc_id: &str, - msg_receiver: RevReceiver, - persistence: Arc, - revs_map: Arc>, - next_rev: mpsc::Sender, - pending_revs: Arc>>, - ) -> Self { - Self { - doc_id: doc_id.to_owned(), - pending_revs, - msg_receiver: Some(msg_receiver), - persistence, - revs_map, - next_rev, - } - } - - pub async fn run(mut self) { - let mut receiver = self.msg_receiver.take().expect("Should only call once"); - let stream = stream! { - loop { - match receiver.recv().await { - Some(msg) => yield msg, - None => break, - } - } - }; - stream - .for_each(|msg| async { - match self.handle_msg(msg).await { - Ok(_) => {}, - Err(e) => log::error!("{:?}", e), - } - }) - .await; - } - - async fn handle_msg(&self, msg: PendingRevisionMsg) -> DocResult<()> { - match msg { - PendingRevisionMsg::Revision { revision } => self.handle_revision(revision).await, - } - } - - async fn handle_revision(&self, revision: Revision) -> DocResult<()> { - let (sender, receiver) = oneshot::channel(); - let pending_rev = PendingRevId { - rev_id: revision.rev_id, - sender, - }; - self.pending_revs.write().await.push_back(pending_rev); - let _ = self.prepare_next_pending_rev(receiver).await?; - Ok(()) - } - - async fn prepare_next_pending_rev(&self, done: PendingRevReceiver) -> DocResult<()> { - let next_rev_notify = self.next_rev.clone(); - let doc_id = self.doc_id.clone(); - let _ = match self.pending_revs.read().await.front() { - None => Ok(()), - Some(pending) => match self.revs_map.get(&pending.rev_id) { - None => { - let conn = self.persistence.pool.get().map_err(internal_error)?; - let some = self - .persistence - .rev_sql - .read_rev_table(&doc_id, &pending.rev_id, &*conn)?; - match some { - Some(revision) => next_rev_notify.send(revision).await.map_err(internal_error), - None => Ok(()), - } - }, - Some(context) => next_rev_notify - .send(context.revision.clone()) - .await - .map_err(internal_error), - }, - }?; - let _ = tokio::time::timeout(Duration::from_millis(2000), done).await; - Ok(()) - } -} - // fn update_revisions(&self) { // let rev_ids = self // .revs