send revision periodically

This commit is contained in:
appflowy 2021-12-08 14:17:40 +08:00
parent 23f4684d3f
commit 4450d4410b
14 changed files with 449 additions and 496 deletions

View File

@ -2,7 +2,7 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
use actix_web::web::Data;
use backend::services::doc::{crud::update_doc, manager::DocManager};
use flowy_document::services::doc::ClientDocEditor as ClientEditDocContext;
use flowy_document::services::doc::edit::ClientDocEditor as ClientEditDocContext;
use flowy_test::{workspace::ViewTest, FlowyTest};
use flowy_user::services::user::UserSession;
use futures_util::{stream, stream::StreamExt};

View File

@ -1,7 +1,7 @@
use crate::{
errors::DocError,
services::{
doc::{doc_controller::DocController, ClientDocEditor},
doc::{doc_controller::DocController, edit::ClientDocEditor},
server::construct_doc_server,
ws::WsDocumentManager,
},

View File

@ -4,7 +4,7 @@ use dashmap::DashMap;
use crate::{
errors::DocError,
services::doc::{ClientDocEditor, DocId},
services::doc::edit::{ClientDocEditor, DocId},
};
pub(crate) struct DocCache {

View File

@ -2,7 +2,10 @@ use crate::{
errors::{internal_error, DocError, DocResult},
module::DocumentUser,
services::{
doc::{EditCommand, EditCommandQueue, OpenDocAction, RevisionManager, RevisionServer, TransformDeltas},
doc::{
edit::{EditCommand, EditCommandQueue, OpenDocAction, TransformDeltas},
revision::{RevisionManager, RevisionServer},
},
ws::{DocumentWebSocket, WsDocumentHandler},
},
};

View File

@ -1,6 +1,4 @@
mod edit;
mod revision;
pub mod edit;
pub mod revision;
pub(crate) mod doc_controller;
pub use edit::*;
pub(crate) use revision::*;

View File

@ -0,0 +1,263 @@
use crate::{
errors::{internal_error, DocError, DocResult},
services::doc::revision::RevisionServer,
sql_tables::RevTableSql,
};
use flowy_database::ConnectionPool;
use flowy_document_infra::entities::doc::Doc;
use lib_infra::future::ResultFuture;
use lib_ot::{
core::{Operation, OperationTransformable},
revision::{
RevId,
RevState,
RevType,
Revision,
RevisionDiskCache,
RevisionMemoryCache,
RevisionRange,
RevisionRecord,
},
rich_text::RichTextDelta,
};
use std::{sync::Arc, time::Duration};
use tokio::{
sync::RwLock,
task::{spawn_blocking, JoinHandle},
};
pub trait RevisionIterator: Send + Sync {
fn next(&self) -> ResultFuture<Option<RevisionRecord>, DocError>;
}
type DocRevisionDeskCache = dyn RevisionDiskCache<Error = DocError>;
pub struct RevisionCache {
doc_id: String,
dish_cache: Arc<DocRevisionDeskCache>,
memory_cache: Arc<RevisionMemoryCache>,
defer_save: RwLock<Option<JoinHandle<()>>>,
server: Arc<dyn RevisionServer>,
}
impl RevisionCache {
pub fn new(doc_id: &str, pool: Arc<ConnectionPool>, server: Arc<dyn RevisionServer>) -> RevisionCache {
let doc_id = doc_id.to_owned();
let dish_cache = Arc::new(Persistence::new(pool));
let memory_cache = Arc::new(RevisionMemoryCache::new());
Self {
doc_id,
dish_cache,
memory_cache,
defer_save: RwLock::new(None),
server,
}
}
#[tracing::instrument(level = "debug", skip(self, revision))]
pub async fn add_revision(&self, revision: Revision) -> DocResult<()> {
if self.memory_cache.contains(&revision.rev_id) {
return Err(DocError::duplicate_rev().context(format!("Duplicate revision id: {}", revision.rev_id)));
}
self.memory_cache.add_revision(revision.clone()).await?;
self.save_revisions().await;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, rev_id), fields(rev_id = %rev_id.as_ref()))]
pub async fn ack_revision(&self, rev_id: RevId) {
let rev_id = rev_id.value;
self.memory_cache.mut_revision(&rev_id, |mut rev| rev.value_mut().ack());
self.save_revisions().await;
}
async fn save_revisions(&self) {
if let Some(handler) = self.defer_save.write().await.take() {
handler.abort();
}
if self.memory_cache.is_empty() {
return;
}
let memory_cache = self.memory_cache.clone();
let disk_cache = self.dish_cache.clone();
*self.defer_save.write().await = Some(tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(300)).await;
let (ids, records) = memory_cache.revisions();
match disk_cache.create_revisions(records) {
Ok(_) => {
memory_cache.remove_revisions(ids);
},
Err(e) => log::error!("Save revision failed: {:?}", e),
}
}));
}
pub async fn revisions_in_range(&self, range: RevisionRange) -> DocResult<Vec<Revision>> {
let revs = self.memory_cache.revisions_in_range(&range).await?;
if revs.len() == range.len() as usize {
Ok(revs)
} else {
let doc_id = self.doc_id.clone();
let disk_cache = self.dish_cache.clone();
spawn_blocking(move || disk_cache.revisions_in_range(&doc_id, &range))
.await
.map_err(internal_error)?
}
}
pub async fn fetch_document(&self) -> DocResult<Doc> {
let result = fetch_from_local(&self.doc_id, self.dish_cache.clone()).await;
if result.is_ok() {
return result;
}
let doc = self.server.fetch_document_from_remote(&self.doc_id).await?;
let delta_data = doc.data.as_bytes();
let revision = Revision::new(
doc.base_rev_id,
doc.rev_id,
delta_data.to_owned(),
&doc.id,
RevType::Remote,
);
let record = RevisionRecord {
revision,
state: RevState::Acked,
};
let _ = self.dish_cache.create_revisions(vec![record])?;
Ok(doc)
}
}
impl RevisionIterator for RevisionCache {
fn next(&self) -> ResultFuture<Option<RevisionRecord>, DocError> {
let memory_cache = self.memory_cache.clone();
let disk_cache = self.dish_cache.clone();
let doc_id = self.doc_id.clone();
ResultFuture::new(async move {
match memory_cache.front_revision().await {
None => {
//
match memory_cache.front_rev_id().await {
None => Ok(None),
Some(rev_id) => match disk_cache.read_revision(&doc_id, rev_id)? {
None => Ok(None),
Some(revision) => Ok(Some(RevisionRecord::new(revision))),
},
}
},
Some((_, record)) => Ok(Some(record)),
}
})
}
}
async fn fetch_from_local(doc_id: &str, disk_cache: Arc<DocRevisionDeskCache>) -> DocResult<Doc> {
let doc_id = doc_id.to_owned();
spawn_blocking(move || {
let revisions = disk_cache.read_revisions(&doc_id)?;
if revisions.is_empty() {
return Err(DocError::record_not_found().context("Local doesn't have this document"));
}
let base_rev_id: RevId = revisions.last().unwrap().base_rev_id.into();
let rev_id: RevId = revisions.last().unwrap().rev_id.into();
let mut delta = RichTextDelta::new();
for (_, revision) in revisions.into_iter().enumerate() {
match RichTextDelta::from_bytes(revision.delta_data) {
Ok(local_delta) => {
delta = delta.compose(&local_delta)?;
},
Err(e) => {
log::error!("Deserialize delta from revision failed: {}", e);
},
}
}
#[cfg(debug_assertions)]
validate_delta(&doc_id, disk_cache, &delta);
match delta.ops.last() {
None => {},
Some(op) => {
let data = op.get_data();
if !data.ends_with('\n') {
delta.ops.push(Operation::Insert("\n".into()))
}
},
}
Result::<Doc, DocError>::Ok(Doc {
id: doc_id,
data: delta.to_json(),
rev_id: rev_id.into(),
base_rev_id: base_rev_id.into(),
})
})
.await
.map_err(internal_error)?
}
#[cfg(debug_assertions)]
fn validate_delta(doc_id: &str, disk_cache: Arc<DocRevisionDeskCache>, delta: &RichTextDelta) {
if delta.ops.last().is_none() {
return;
}
let data = delta.ops.last().as_ref().unwrap().get_data();
if !data.ends_with('\n') {
log::error!("The op must end with newline");
let result = || {
let revisions = disk_cache.read_revisions(&doc_id)?;
for revision in revisions {
let delta = RichTextDelta::from_bytes(revision.delta_data)?;
log::error!("Invalid revision: {}:{}", revision.rev_id, delta.to_json());
}
Ok::<(), DocError>(())
};
match result() {
Ok(_) => {},
Err(e) => log::error!("{}", e),
}
}
}
pub(crate) struct Persistence {
pub(crate) pool: Arc<ConnectionPool>,
}
impl RevisionDiskCache for Persistence {
type Error = DocError;
fn create_revisions(&self, revisions: Vec<RevisionRecord>) -> Result<(), Self::Error> {
let conn = &*self.pool.get().map_err(internal_error)?;
conn.immediate_transaction::<_, DocError, _>(|| {
let _ = RevTableSql::create_rev_table(revisions, conn)?;
Ok(())
})
}
fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result<Vec<Revision>, Self::Error> {
let conn = &*self.pool.get().map_err(internal_error).unwrap();
let revisions = RevTableSql::read_rev_tables_with_range(doc_id, range.clone(), conn)?;
Ok(revisions)
}
fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result<Option<Revision>, Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let some = RevTableSql::read_rev_table(doc_id, &rev_id, &*conn)?;
Ok(some)
}
fn read_revisions(&self, doc_id: &str) -> Result<Vec<Revision>, Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let some = RevTableSql::read_rev_tables(doc_id, &*conn)?;
Ok(some)
}
}
impl Persistence {
pub(crate) fn new(pool: Arc<ConnectionPool>) -> Self { Self { pool } }
}

View File

@ -1,6 +1,6 @@
use crate::{
errors::{DocError, DocResult},
services::doc::revision::RevisionStore,
services::doc::revision::{RevisionCache, RevisionUploadStream},
};
use flowy_database::ConnectionPool;
use flowy_document_infra::{entities::doc::Doc, util::RevIdCounter};
@ -20,7 +20,7 @@ pub trait RevisionServer: Send + Sync {
pub struct RevisionManager {
doc_id: String,
rev_id_counter: RevIdCounter,
rev_store: Arc<RevisionStore>,
cache: Arc<RevisionCache>,
}
impl RevisionManager {
@ -28,30 +28,32 @@ impl RevisionManager {
doc_id: &str,
pool: Arc<ConnectionPool>,
server: Arc<dyn RevisionServer>,
pending_rev_sender: mpsc::UnboundedSender<Revision>,
ws_sender: mpsc::UnboundedSender<Revision>,
) -> Self {
let rev_store = RevisionStore::new(doc_id, pool, server, pending_rev_sender);
let cache = Arc::new(RevisionCache::new(doc_id, pool, server));
spawn_upload_stream(cache.clone(), ws_sender);
let rev_id_counter = RevIdCounter::new(0);
Self {
doc_id: doc_id.to_string(),
rev_id_counter,
rev_store,
cache,
}
}
pub async fn load_document(&mut self) -> DocResult<RichTextDelta> {
let doc = self.rev_store.fetch_document().await?;
let doc = self.cache.fetch_document().await?;
self.update_rev_id_counter_value(doc.rev_id);
Ok(doc.delta()?)
}
pub async fn add_revision(&self, revision: &Revision) -> Result<(), DocError> {
let _ = self.rev_store.add_revision(revision.clone()).await?;
let _ = self.cache.add_revision(revision.clone()).await?;
Ok(())
}
pub async fn ack_revision(&self, rev_id: RevId) -> Result<(), DocError> {
self.rev_store.ack_revision(rev_id).await;
self.cache.ack_revision(rev_id).await;
Ok(())
}
@ -67,7 +69,7 @@ impl RevisionManager {
pub async fn mk_revisions(&self, range: RevisionRange) -> Result<Revision, DocError> {
debug_assert!(range.doc_id == self.doc_id);
let revisions = self.rev_store.revs_in_range(range.clone()).await?;
let revisions = self.cache.revisions_in_range(range.clone()).await?;
let mut new_delta = RichTextDelta::new();
for revision in revisions {
match RichTextDelta::from_bytes(revision.delta_data) {
@ -90,3 +92,7 @@ impl RevisionManager {
Ok(revision)
}
}
fn spawn_upload_stream(cache: Arc<RevisionCache>, ws_sender: mpsc::UnboundedSender<Revision>) {
tokio::spawn(RevisionUploadStream::new(cache, ws_sender).run());
}

View File

@ -1,6 +1,7 @@
mod cache;
mod manager;
mod model;
mod persistence;
mod sync;
pub use cache::*;
pub use manager::*;
pub use persistence::*;
pub(crate) use sync::*;

View File

@ -1,45 +0,0 @@
use crate::{
errors::{internal_error, DocError, DocResult},
sql_tables::{RevTableSql, SqlRevState},
};
use flowy_database::ConnectionPool;
use lib_infra::future::ResultFuture;
use lib_ot::revision::{Revision, RevisionRange};
use std::sync::Arc;
use tokio::sync::broadcast;
pub(crate) struct Persistence {
pub(crate) rev_sql: Arc<RevTableSql>,
pub(crate) pool: Arc<ConnectionPool>,
}
impl Persistence {
pub(crate) fn new(pool: Arc<ConnectionPool>) -> Self {
let rev_sql = Arc::new(RevTableSql {});
Self { rev_sql, pool }
}
pub(crate) fn create_revs(&self, revisions: Vec<(Revision, SqlRevState)>) -> DocResult<()> {
let conn = &*self.pool.get().map_err(internal_error)?;
conn.immediate_transaction::<_, DocError, _>(|| {
let _ = self.rev_sql.create_rev_table(revisions, conn)?;
Ok(())
})
}
pub(crate) fn read_rev_with_range(&self, doc_id: &str, range: RevisionRange) -> DocResult<Vec<Revision>> {
let conn = &*self.pool.get().map_err(internal_error).unwrap();
let revisions = self.rev_sql.read_rev_tables_with_range(doc_id, range, conn)?;
Ok(revisions)
}
pub(crate) fn read_rev(&self, doc_id: &str, rev_id: &i64) -> DocResult<Option<Revision>> {
let conn = self.pool.get().map_err(internal_error)?;
let some = self.rev_sql.read_rev_table(&doc_id, rev_id, &*conn)?;
Ok(some)
}
}
pub trait RevisionIterator: Send + Sync {
fn next(&self) -> ResultFuture<Option<Revision>, DocError>;
}

View File

@ -1,380 +0,0 @@
use crate::{
errors::{internal_error, DocError, DocResult},
services::doc::revision::{model::*, RevisionServer},
sql_tables::SqlRevState,
};
use async_stream::stream;
use dashmap::DashMap;
use flowy_database::{ConnectionPool, SqliteConnection};
use flowy_document_infra::entities::doc::Doc;
use futures::stream::StreamExt;
use lib_infra::future::ResultFuture;
use lib_ot::{
core::{Operation, OperationTransformable},
revision::{PendingRevId, RevId, RevIdReceiver, RevType, Revision, RevisionRange, RevisionRecord},
rich_text::RichTextDelta,
};
use std::{collections::VecDeque, sync::Arc, time::Duration};
use tokio::{
sync::{broadcast, mpsc, RwLock},
task::{spawn_blocking, JoinHandle},
};
pub struct RevisionStore {
doc_id: String,
persistence: Arc<Persistence>,
revs_map: Arc<DashMap<i64, RevisionRecord>>,
pending_tx: PendingSender,
pending_revs: Arc<RwLock<VecDeque<PendingRevId>>>,
defer_save: RwLock<Option<JoinHandle<()>>>,
server: Arc<dyn RevisionServer>,
}
impl RevisionStore {
pub fn new(
doc_id: &str,
pool: Arc<ConnectionPool>,
server: Arc<dyn RevisionServer>,
ws_revision_sender: mpsc::UnboundedSender<Revision>,
) -> Arc<RevisionStore> {
let doc_id = doc_id.to_owned();
let persistence = Arc::new(Persistence::new(pool));
let revs_map = Arc::new(DashMap::new());
let (pending_tx, pending_rx) = mpsc::unbounded_channel();
let pending_revs = Arc::new(RwLock::new(VecDeque::new()));
let store = Arc::new(Self {
doc_id,
persistence,
revs_map,
pending_revs,
pending_tx,
defer_save: RwLock::new(None),
server,
});
tokio::spawn(RevisionUploadStream::new(store.clone(), pending_rx, ws_revision_sender).run());
store
}
#[tracing::instrument(level = "debug", skip(self, revision))]
pub async fn add_revision(&self, revision: Revision) -> DocResult<()> {
if self.revs_map.contains_key(&revision.rev_id) {
return Err(DocError::duplicate_rev().context(format!("Duplicate revision id: {}", revision.rev_id)));
}
let (sender, receiver) = broadcast::channel(2);
let revs_map = self.revs_map.clone();
let mut rx = sender.subscribe();
tokio::spawn(async move {
if let Ok(rev_id) = rx.recv().await {
match revs_map.get_mut(&rev_id) {
None => {},
Some(mut rev) => rev.value_mut().state = SqlRevState::Acked.into(),
}
}
});
let pending_rev = PendingRevId::new(revision.rev_id, sender);
self.pending_revs.write().await.push_back(pending_rev);
self.revs_map.insert(revision.rev_id, RevisionRecord::new(revision));
let _ = self.pending_tx.send(PendingMsg::Revision { ret: receiver });
self.save_revisions().await;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, rev_id), fields(rev_id = %rev_id.as_ref()))]
pub async fn ack_revision(&self, rev_id: RevId) {
let rev_id = rev_id.value;
self.pending_revs
.write()
.await
.retain(|pending| !pending.finish(rev_id));
self.save_revisions().await;
}
async fn save_revisions(&self) {
if let Some(handler) = self.defer_save.write().await.take() {
handler.abort();
}
if self.revs_map.is_empty() {
return;
}
let revs_map = self.revs_map.clone();
let persistence = self.persistence.clone();
*self.defer_save.write().await = Some(tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(300)).await;
let ids = revs_map.iter().map(|kv| *kv.key()).collect::<Vec<i64>>();
let revisions_state = revs_map
.iter()
.map(|kv| (kv.revision.clone(), kv.state))
.collect::<Vec<(Revision, SqlRevState)>>();
match persistence.create_revs(revisions_state.clone()) {
Ok(_) => {
tracing::debug!(
"Revision State Changed: {:?}",
revisions_state.iter().map(|s| (s.0.rev_id, s.1)).collect::<Vec<_>>()
);
revs_map.retain(|k, _| !ids.contains(k));
},
Err(e) => log::error!("Save revision failed: {:?}", e),
}
}));
}
pub async fn revs_in_range(&self, range: RevisionRange) -> DocResult<Vec<Revision>> {
let revs = range
.iter()
.flat_map(|rev_id| match self.revs_map.get(&rev_id) {
None => None,
Some(rev) => Some(rev.revision.clone()),
})
.collect::<Vec<Revision>>();
if revs.len() == range.len() as usize {
Ok(revs)
} else {
let doc_id = self.doc_id.clone();
let persistence = self.persistence.clone();
let result = spawn_blocking(move || persistence.read_rev_with_range(&doc_id, range))
.await
.map_err(internal_error)?;
result
}
}
pub async fn fetch_document(&self) -> DocResult<Doc> {
let result = fetch_from_local(&self.doc_id, self.persistence.clone()).await;
if result.is_ok() {
return result;
}
let doc = self.server.fetch_document_from_remote(&self.doc_id).await?;
let revision = revision_from_doc(doc.clone(), RevType::Remote);
let _ = self.persistence.create_revs(vec![(revision, SqlRevState::Acked)])?;
Ok(doc)
}
}
pub fn revision_from_doc(doc: Doc, ty: RevType) -> Revision {
let delta_data = doc.data.as_bytes();
Revision::new(doc.base_rev_id, doc.rev_id, delta_data.to_owned(), &doc.id, ty)
}
impl RevisionIterator for RevisionStore {
fn next(&self) -> ResultFuture<Option<Revision>, DocError> {
let pending_revs = self.pending_revs.clone();
let revs_map = self.revs_map.clone();
let persistence = self.persistence.clone();
let doc_id = self.doc_id.clone();
ResultFuture::new(async move {
match pending_revs.read().await.front() {
None => Ok(None),
Some(pending) => match revs_map.get(&pending.rev_id) {
None => persistence.read_rev(&doc_id, &pending.rev_id),
Some(context) => Ok(Some(context.revision.clone())),
},
}
})
}
}
async fn fetch_from_local(doc_id: &str, persistence: Arc<Persistence>) -> DocResult<Doc> {
let doc_id = doc_id.to_owned();
spawn_blocking(move || {
let conn = &*persistence.pool.get().map_err(internal_error)?;
let revisions = persistence.rev_sql.read_rev_tables(&doc_id, conn)?;
if revisions.is_empty() {
return Err(DocError::record_not_found().context("Local doesn't have this document"));
}
let base_rev_id: RevId = revisions.last().unwrap().base_rev_id.into();
let rev_id: RevId = revisions.last().unwrap().rev_id.into();
let mut delta = RichTextDelta::new();
for (_, revision) in revisions.into_iter().enumerate() {
match RichTextDelta::from_bytes(revision.delta_data) {
Ok(local_delta) => {
delta = delta.compose(&local_delta)?;
},
Err(e) => {
log::error!("Deserialize delta from revision failed: {}", e);
},
}
}
#[cfg(debug_assertions)]
validate_delta(&doc_id, persistence, conn, &delta);
match delta.ops.last() {
None => {},
Some(op) => {
let data = op.get_data();
if !data.ends_with('\n') {
delta.ops.push(Operation::Insert("\n".into()))
}
},
}
Result::<Doc, DocError>::Ok(Doc {
id: doc_id,
data: delta.to_json(),
rev_id: rev_id.into(),
base_rev_id: base_rev_id.into(),
})
})
.await
.map_err(internal_error)?
}
#[cfg(debug_assertions)]
fn validate_delta(doc_id: &str, persistence: Arc<Persistence>, conn: &SqliteConnection, delta: &RichTextDelta) {
if delta.ops.last().is_none() {
return;
}
let data = delta.ops.last().as_ref().unwrap().get_data();
if !data.ends_with('\n') {
log::error!("The op must end with newline");
let result = || {
let revisions = persistence.rev_sql.read_rev_tables(&doc_id, conn)?;
for revision in revisions {
let delta = RichTextDelta::from_bytes(revision.delta_data)?;
log::error!("Invalid revision: {}:{}", revision.rev_id, delta.to_json());
}
Ok::<(), DocError>(())
};
match result() {
Ok(_) => {},
Err(e) => log::error!("{}", e),
}
}
}
// fn update_revisions(&self) {
// let rev_ids = self
// .revs
// .iter()
// .flat_map(|kv| match kv.state == RevState::Acked {
// true => None,
// false => Some(kv.key().clone()),
// })
// .collect::<Vec<i64>>();
//
// if rev_ids.is_empty() {
// return;
// }
//
// tracing::debug!("Try to update {:?} state", rev_ids);
// match self.update(&rev_ids) {
// Ok(_) => {
// self.revs.retain(|k, _| !rev_ids.contains(k));
// },
// Err(e) => log::error!("Save revision failed: {:?}", e),
// }
// }
//
// fn update(&self, rev_ids: &Vec<i64>) -> Result<(), DocError> {
// let conn = &*self.pool.get().map_err(internal_error).unwrap();
// let result = conn.immediate_transaction::<_, DocError, _>(|| {
// for rev_id in rev_ids {
// let changeset = RevChangeset {
// doc_id: self.doc_id.clone(),
// rev_id: rev_id.clone(),
// state: RevState::Acked,
// };
// let _ = self.op_sql.update_rev_table(changeset, conn)?;
// }
// Ok(())
// });
//
// result
// }
// fn delete_revision(&self, rev_id: RevId) {
// let op_sql = self.op_sql.clone();
// let pool = self.pool.clone();
// let doc_id = self.doc_id.clone();
// tokio::spawn(async move {
// let conn = &*pool.get().map_err(internal_error).unwrap();
// let result = conn.immediate_transaction::<_, DocError, _>(|| {
// let _ = op_sql.delete_rev_table(&doc_id, rev_id, conn)?;
// Ok(())
// });
//
// match result {
// Ok(_) => {},
// Err(e) => log::error!("Delete revision failed: {:?}", e),
// }
// });
// }
pub(crate) enum PendingMsg {
Revision { ret: RevIdReceiver },
}
pub(crate) type PendingSender = mpsc::UnboundedSender<PendingMsg>;
pub(crate) type PendingReceiver = mpsc::UnboundedReceiver<PendingMsg>;
pub(crate) struct RevisionUploadStream {
revisions: Arc<dyn RevisionIterator>,
receiver: Option<PendingReceiver>,
ws_revision_sender: mpsc::UnboundedSender<Revision>,
}
impl RevisionUploadStream {
pub(crate) fn new(
revisions: Arc<dyn RevisionIterator>,
pending_rx: PendingReceiver,
ws_revision_sender: mpsc::UnboundedSender<Revision>,
) -> Self {
Self {
revisions,
receiver: Some(pending_rx),
ws_revision_sender,
}
}
pub async fn run(mut self) {
let mut receiver = self.receiver.take().expect("Should only call once");
let stream = stream! {
loop {
match receiver.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
stream
.for_each(|msg| async {
match self.handle_msg(msg).await {
Ok(_) => {},
Err(e) => log::error!("{:?}", e),
}
})
.await;
}
async fn handle_msg(&self, msg: PendingMsg) -> DocResult<()> {
match msg {
PendingMsg::Revision { ret } => self.prepare_next_pending_rev(ret).await,
}
}
async fn prepare_next_pending_rev(&self, mut ret: RevIdReceiver) -> DocResult<()> {
match self.revisions.next().await? {
None => Ok(()),
Some(revision) => {
let _ = self.ws_revision_sender.send(revision).map_err(internal_error);
let _ = tokio::time::timeout(Duration::from_millis(2000), ret.recv()).await;
Ok(())
},
}
}
}

View File

@ -0,0 +1,76 @@
use crate::{
errors::{internal_error, DocResult},
services::doc::revision::RevisionIterator,
};
use async_stream::stream;
use futures::stream::StreamExt;
use lib_ot::revision::Revision;
use std::sync::Arc;
use tokio::{
sync::mpsc,
time::{interval, Duration},
};
pub(crate) enum RevisionMsg {
Tick,
}
pub(crate) struct RevisionUploadStream {
revisions: Arc<dyn RevisionIterator>,
ws_sender: mpsc::UnboundedSender<Revision>,
}
impl RevisionUploadStream {
pub(crate) fn new(revisions: Arc<dyn RevisionIterator>, ws_sender: mpsc::UnboundedSender<Revision>) -> Self {
Self { revisions, ws_sender }
}
pub async fn run(self) {
let (tx, mut rx) = mpsc::unbounded_channel();
tokio::spawn(tick(tx));
let stream = stream! {
loop {
match rx.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
stream
.for_each(|msg| async {
match self.handle_msg(msg).await {
Ok(_) => {},
Err(e) => log::error!("{:?}", e),
}
})
.await;
}
async fn handle_msg(&self, msg: RevisionMsg) -> DocResult<()> {
match msg {
RevisionMsg::Tick => self.send_next_revision().await,
}
}
async fn send_next_revision(&self) -> DocResult<()> {
match self.revisions.next().await? {
None => Ok(()),
Some(record) => {
let _ = self.ws_sender.send(record.revision).map_err(internal_error);
// let _ = tokio::time::timeout(Duration::from_millis(2000), ret.recv()).await;
Ok(())
},
}
}
}
async fn tick(sender: mpsc::UnboundedSender<RevisionMsg>) {
let mut i = interval(Duration::from_secs(2));
loop {
match sender.send(RevisionMsg::Tick) {
Ok(_) => {},
Err(e) => log::error!("RevisionUploadStream tick error: {}", e),
}
i.tick().await;
}
}

View File

@ -1,30 +1,27 @@
use crate::{
errors::DocError,
sql_tables::{doc::RevTable, RevChangeset, RevTableType, SqlRevState},
sql_tables::{doc::RevTable, RevChangeset, RevTableState, RevTableType},
};
use diesel::update;
use flowy_database::{insert_or_ignore_into, prelude::*, schema::rev_table::dsl, SqliteConnection};
use lib_ot::revision::{Revision, RevisionRange};
use lib_ot::revision::{Revision, RevisionRange, RevisionRecord};
pub struct RevTableSql {}
impl RevTableSql {
pub(crate) fn create_rev_table(
&self,
revisions: Vec<(Revision, SqlRevState)>,
conn: &SqliteConnection,
) -> Result<(), DocError> {
pub(crate) fn create_rev_table(revisions: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), DocError> {
// Batch insert: https://diesel.rs/guides/all-about-inserts.html
let records = revisions
.into_iter()
.map(|(revision, new_state)| {
let rev_ty: RevTableType = revision.ty.into();
.map(|record| {
let rev_ty: RevTableType = record.revision.ty.into();
let rev_state: RevTableState = record.state.into();
(
dsl::doc_id.eq(revision.doc_id),
dsl::base_rev_id.eq(revision.base_rev_id),
dsl::rev_id.eq(revision.rev_id),
dsl::data.eq(revision.delta_data),
dsl::state.eq(new_state),
dsl::doc_id.eq(record.revision.doc_id),
dsl::base_rev_id.eq(record.revision.base_rev_id),
dsl::rev_id.eq(record.revision.rev_id),
dsl::data.eq(record.revision.delta_data),
dsl::state.eq(rev_state),
dsl::ty.eq(rev_ty),
)
})
@ -35,7 +32,7 @@ impl RevTableSql {
}
#[allow(dead_code)]
pub(crate) fn update_rev_table(&self, changeset: RevChangeset, conn: &SqliteConnection) -> Result<(), DocError> {
pub(crate) fn update_rev_table(changeset: RevChangeset, conn: &SqliteConnection) -> Result<(), DocError> {
let filter = dsl::rev_table
.filter(dsl::rev_id.eq(changeset.rev_id.as_ref()))
.filter(dsl::doc_id.eq(changeset.doc_id));
@ -44,7 +41,7 @@ impl RevTableSql {
Ok(())
}
pub(crate) fn read_rev_tables(&self, doc_id: &str, conn: &SqliteConnection) -> Result<Vec<Revision>, DocError> {
pub(crate) fn read_rev_tables(doc_id: &str, conn: &SqliteConnection) -> Result<Vec<Revision>, DocError> {
let filter = dsl::rev_table
.filter(dsl::doc_id.eq(doc_id))
.order(dsl::rev_id.asc())
@ -58,7 +55,6 @@ impl RevTableSql {
}
pub(crate) fn read_rev_table(
&self,
doc_id: &str,
revision_id: &i64,
conn: &SqliteConnection,
@ -76,7 +72,6 @@ impl RevTableSql {
}
pub(crate) fn read_rev_tables_with_range(
&self,
doc_id_s: &str,
range: RevisionRange,
conn: &SqliteConnection,
@ -96,12 +91,7 @@ impl RevTableSql {
}
#[allow(dead_code)]
pub(crate) fn delete_rev_table(
&self,
doc_id_s: &str,
rev_id_s: i64,
conn: &SqliteConnection,
) -> Result<(), DocError> {
pub(crate) fn delete_rev_table(doc_id_s: &str, rev_id_s: i64, conn: &SqliteConnection) -> Result<(), DocError> {
let filter = dsl::rev_table
.filter(dsl::rev_id.eq(rev_id_s))
.filter(dsl::doc_id.eq(doc_id_s));

View File

@ -11,45 +11,54 @@ pub(crate) struct RevTable {
pub(crate) base_rev_id: i64,
pub(crate) rev_id: i64,
pub(crate) data: Vec<u8>,
pub(crate) state: SqlRevState,
pub(crate) state: RevTableState,
pub(crate) ty: RevTableType,
}
#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)]
#[repr(i32)]
#[sql_type = "Integer"]
pub enum SqlRevState {
pub enum RevTableState {
Local = 0,
Acked = 1,
}
impl std::default::Default for SqlRevState {
fn default() -> Self { SqlRevState::Local }
impl std::default::Default for RevTableState {
fn default() -> Self { RevTableState::Local }
}
impl std::convert::From<i32> for SqlRevState {
impl std::convert::From<i32> for RevTableState {
fn from(value: i32) -> Self {
match value {
0 => SqlRevState::Local,
1 => SqlRevState::Acked,
0 => RevTableState::Local,
1 => RevTableState::Acked,
o => {
log::error!("Unsupported rev state {}, fallback to RevState::Local", o);
SqlRevState::Local
RevTableState::Local
},
}
}
}
impl SqlRevState {
impl RevTableState {
pub fn value(&self) -> i32 { *self as i32 }
}
impl_sql_integer_expression!(RevState);
impl_sql_integer_expression!(RevTableState);
impl std::convert::From<SqlRevState> for RevState {
fn from(s: SqlRevState) -> Self {
impl std::convert::From<RevTableState> for RevState {
fn from(s: RevTableState) -> Self {
match s {
SqlRevState::Local => RevState.Local,
SqlRevState::Acked => RevState.Acked,
RevTableState::Local => RevState::Local,
RevTableState::Acked => RevState::Acked,
}
}
}
impl std::convert::From<RevState> for RevTableState {
fn from(s: RevState) -> Self {
match s {
RevState::Local => RevTableState::Local,
RevState::Acked => RevTableState::Acked,
}
}
}
@ -119,5 +128,5 @@ impl_sql_integer_expression!(RevTableType);
pub(crate) struct RevChangeset {
pub(crate) doc_id: String,
pub(crate) rev_id: RevId,
pub(crate) state: SqlRevState,
pub(crate) state: RevTableState,
}

View File

@ -1,15 +1,17 @@
use crate::{
errors::OTError,
revision::{RevId, Revision, RevisionRange},
revision::{Revision, RevisionRange},
};
use dashmap::{mapref::one::RefMut, DashMap};
use std::{collections::VecDeque, sync::Arc};
use std::{collections::VecDeque, fmt::Debug, sync::Arc};
use tokio::sync::{broadcast, RwLock};
pub trait RevisionDiskCache {
fn create_revision(&self, revision: &Revision) -> Result<(), OTError>;
fn revisions_in_range(&self, range: RevisionRange) -> Result<Option<Vec<Revision>>, OTError>;
fn read_revision(&self, rev_id: i64) -> Result<Option<Revision>, OTError>;
pub trait RevisionDiskCache: Sync + Send {
type Error: Debug;
fn create_revisions(&self, revisions: Vec<RevisionRecord>) -> Result<(), Self::Error>;
fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result<Vec<Revision>, Self::Error>;
fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result<Option<Revision>, Self::Error>;
fn read_revisions(&self, doc_id: &str) -> Result<Vec<Revision>, Self::Error>;
}
pub struct RevisionMemoryCache {
@ -32,7 +34,7 @@ impl RevisionMemoryCache {
pub async fn add_revision(&self, revision: Revision) -> Result<(), OTError> {
if self.revs_map.contains_key(&revision.rev_id) {
return Err(OTError::duplicate_revision().context(format!("Duplicate revision id: {}", revision.rev_id)));
return Ok(());
}
self.pending_revs.write().await.push_back(revision.rev_id);
@ -40,18 +42,20 @@ impl RevisionMemoryCache {
Ok(())
}
pub async fn mut_revision<F>(&self, rev_id: i64, f: F)
pub fn remove_revisions(&self, ids: Vec<i64>) { self.revs_map.retain(|k, _| !ids.contains(k)); }
pub fn mut_revision<F>(&self, rev_id: &i64, f: F)
where
F: Fn(RefMut<i64, RevisionRecord>),
{
if let Some(m_revision) = self.revs_map.get_mut(&rev_id) {
if let Some(m_revision) = self.revs_map.get_mut(rev_id) {
f(m_revision)
} else {
log::error!("Can't find revision with id {}", rev_id);
}
}
pub async fn revisions_in_range(&self, range: RevisionRange) -> Result<Option<Vec<Revision>>, OTError> {
pub async fn revisions_in_range(&self, range: &RevisionRange) -> Result<Vec<Revision>, OTError> {
let revs = range
.iter()
.flat_map(|rev_id| match self.revs_map.get(&rev_id) {
@ -61,21 +65,47 @@ impl RevisionMemoryCache {
.collect::<Vec<Revision>>();
if revs.len() == range.len() as usize {
Ok(Some(revs))
Ok(revs)
} else {
Ok(None)
Ok(vec![])
}
}
pub fn contains(&self, rev_id: &i64) -> bool { self.revs_map.contains_key(rev_id) }
pub fn is_empty(&self) -> bool { self.revs_map.is_empty() }
pub fn revisions(&self) -> (Vec<i64>, Vec<RevisionRecord>) {
let mut records: Vec<RevisionRecord> = vec![];
let mut ids: Vec<i64> = vec![];
self.revs_map.iter().for_each(|kv| {
records.push(kv.value().clone());
ids.push(*kv.key());
});
(ids, records)
}
pub async fn front_revision(&self) -> Option<(i64, RevisionRecord)> {
match self.pending_revs.read().await.front() {
None => None,
Some(rev_id) => self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())),
}
}
pub async fn front_rev_id(&self) -> Option<i64> { self.pending_revs.read().await.front().copied() }
}
pub type RevIdReceiver = broadcast::Receiver<i64>;
pub type RevIdSender = broadcast::Sender<i64>;
#[derive(Clone, Eq, PartialEq)]
pub enum RevState {
Local = 0,
Acked = 1,
}
#[derive(Clone)]
pub struct RevisionRecord {
pub revision: Revision,
pub state: RevState,
@ -88,6 +118,8 @@ impl RevisionRecord {
state: RevState::Local,
}
}
pub fn ack(&mut self) { self.state = RevState::Acked; }
}
pub struct PendingRevId {