save create default workspace data

This commit is contained in:
appflowy 2022-01-01 14:23:58 +08:00
parent 45821f00a2
commit 09ef0927f7
13 changed files with 232 additions and 186 deletions

View File

@ -86,7 +86,7 @@ class DocBloc extends Bloc<DocEvent, DocState> {
final result = await docManager.readDoc(); final result = await docManager.readDoc();
yield result.fold( yield result.fold(
(doc) { (doc) {
document = _decodeJsonToDocument(doc.text); document = _decodeJsonToDocument(doc.deltaJson);
_subscription = document.changes.listen((event) { _subscription = document.changes.listen((event) {
final delta = event.item2; final delta = event.item2;
final documentDelta = document.toDelta(); final documentDelta = document.toDelta();
@ -113,7 +113,7 @@ class DocBloc extends Bloc<DocEvent, DocState> {
result.fold((rustDoc) { result.fold((rustDoc) {
// final json = utf8.decode(doc.data); // final json = utf8.decode(doc.data);
final rustDelta = Delta.fromJson(jsonDecode(rustDoc.text)); final rustDelta = Delta.fromJson(jsonDecode(rustDoc.deltaJson));
if (documentDelta != rustDelta) { if (documentDelta != rustDelta) {
Log.error("Receive : $rustDelta"); Log.error("Receive : $rustDelta");
Log.error("Expected : $documentDelta"); Log.error("Expected : $documentDelta");

View File

@ -1,4 +1,9 @@
use flowy_collaboration::entities::doc::{DocumentDelta, DocumentId}; use bytes::Bytes;
use flowy_collaboration::entities::{
doc::{DocumentDelta, DocumentId},
prelude::Revision,
revision::RepeatedRevision,
};
use flowy_database::SqliteConnection; use flowy_database::SqliteConnection;
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt};
use std::{collections::HashSet, sync::Arc}; use std::{collections::HashSet, sync::Arc};
@ -57,25 +62,35 @@ impl ViewController {
} }
#[tracing::instrument(level = "debug", skip(self, params), fields(name = %params.name), err)] #[tracing::instrument(level = "debug", skip(self, params), fields(name = %params.name), err)]
pub(crate) async fn create_view_from_params(&self, params: CreateViewParams) -> Result<View, FlowyError> { pub(crate) async fn create_view_from_params(&self, mut params: CreateViewParams) -> Result<View, FlowyError> {
let delta_data = Bytes::from(params.take_view_data());
let user_id = self.user.user_id()?;
let repeated_revision: RepeatedRevision =
Revision::initial_revision(&user_id, &params.view_id, delta_data).into();
let _ = self
.document_ctx
.controller
.save_document(&params.view_id, repeated_revision)
.await?;
let view = self.create_view_on_server(params).await?; let view = self.create_view_on_server(params).await?;
let view = self.create_view_on_local(view).await?; let _ = self.create_view_on_local(view.clone()).await?;
Ok(view) Ok(view)
} }
pub(crate) async fn create_view_on_local(&self, view: View) -> Result<View, FlowyError> { pub(crate) async fn create_view_on_local(&self, view: View) -> Result<(), FlowyError> {
let conn = &*self.database.db_connection()?; let conn = &*self.database.db_connection()?;
let trash_can = self.trash_controller.clone(); let trash_can = self.trash_controller.clone();
conn.immediate_transaction::<_, FlowyError, _>(|| { conn.immediate_transaction::<_, FlowyError, _>(|| {
let _ = self.save_view(view.clone(), conn)?; let belong_to_id = view.belong_to_id.clone();
let _ = notify_views_changed(&view.belong_to_id, trash_can, &conn)?; let _ = self.save_view(view, conn)?;
let _ = notify_views_changed(&belong_to_id, trash_can, &conn)?;
Ok(()) Ok(())
})?; })?;
Ok(view) Ok(())
} }
pub(crate) fn save_view(&self, view: View, conn: &SqliteConnection) -> Result<(), FlowyError> { pub(crate) fn save_view(&self, view: View, conn: &SqliteConnection) -> Result<(), FlowyError> {
@ -115,8 +130,7 @@ impl ViewController {
#[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)] #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
pub(crate) async fn open_view(&self, params: DocumentId) -> Result<DocumentDelta, FlowyError> { pub(crate) async fn open_view(&self, params: DocumentId) -> Result<DocumentDelta, FlowyError> {
let doc_id = params.doc_id.clone(); let doc_id = params.doc_id.clone();
let db_pool = self.database.db_pool()?; let editor = self.document_ctx.controller.open(&params.doc_id).await?;
let editor = self.document_ctx.controller.open(&params.doc_id, db_pool).await?;
KV::set_str(LATEST_VIEW_ID, doc_id.clone()); KV::set_str(LATEST_VIEW_ID, doc_id.clone());
let document_json = editor.document_json().await?; let document_json = editor.document_json().await?;
@ -146,11 +160,7 @@ impl ViewController {
#[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)] #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
pub(crate) async fn duplicate_view(&self, params: DocumentId) -> Result<(), FlowyError> { pub(crate) async fn duplicate_view(&self, params: DocumentId) -> Result<(), FlowyError> {
let view: View = ViewTableSql::read_view(&params.doc_id, &*self.database.db_connection()?)?.into(); let view: View = ViewTableSql::read_view(&params.doc_id, &*self.database.db_connection()?)?.into();
let editor = self let editor = self.document_ctx.controller.open(&params.doc_id).await?;
.document_ctx
.controller
.open(&params.doc_id, self.database.db_pool()?)
.await?;
let document_json = editor.document_json().await?; let document_json = editor.document_json().await?;
let duplicate_params = CreateViewParams { let duplicate_params = CreateViewParams {
belong_to_id: view.belong_to_id.clone(), belong_to_id: view.belong_to_id.clone(),
@ -168,11 +178,7 @@ impl ViewController {
#[tracing::instrument(level = "debug", skip(self, params), err)] #[tracing::instrument(level = "debug", skip(self, params), err)]
pub(crate) async fn export_doc(&self, params: ExportParams) -> Result<ExportData, FlowyError> { pub(crate) async fn export_doc(&self, params: ExportParams) -> Result<ExportData, FlowyError> {
let editor = self let editor = self.document_ctx.controller.open(&params.doc_id).await?;
.document_ctx
.controller
.open(&params.doc_id, self.database.db_pool()?)
.await?;
let delta_json = editor.document_json().await?; let delta_json = editor.document_json().await?;
Ok(ExportData { Ok(ExportData {
data: delta_json, data: delta_json,
@ -211,12 +217,7 @@ impl ViewController {
} }
pub(crate) async fn receive_document_delta(&self, params: DocumentDelta) -> Result<DocumentDelta, FlowyError> { pub(crate) async fn receive_document_delta(&self, params: DocumentDelta) -> Result<DocumentDelta, FlowyError> {
let db_pool = self.document_ctx.user.db_pool()?; let doc = self.document_ctx.controller.apply_document_delta(params).await?;
let doc = self
.document_ctx
.controller
.apply_document_delta(params, db_pool)
.await?;
Ok(doc) Ok(doc)
} }
@ -263,7 +264,7 @@ impl ViewController {
let token = self.user.token()?; let token = self.user.token()?;
let server = self.server.clone(); let server = self.server.clone();
let pool = self.database.db_pool()?; let pool = self.database.db_pool()?;
// Opti: retry? // TODO: Retry with RetryAction?
tokio::spawn(async move { tokio::spawn(async move {
match server.read_view(&token, params).await { match server.read_view(&token, params).await {
Ok(Some(view)) => match pool.get() { Ok(Some(view)) => match pool.get() {

View File

@ -76,21 +76,3 @@ async fn app_create_with_view() {
assert_eq!(view_from_db.belongings[0], view_a); assert_eq!(view_from_db.belongings[0], view_a);
assert_eq!(view_from_db.belongings[1], view_b); assert_eq!(view_from_db.belongings[1], view_b);
} }
// #[tokio::test]
// async fn app_set_trash_flag() {
// let test = AppTest::new().await;
// test.delete().await;
//
// let query = QueryAppRequest::new(&test.app.id).trash();
// let _ = read_app(&test.sdk, query);
// }
//
// #[tokio::test]
// #[should_panic]
// async fn app_set_trash_flag_2() {
// let test = AppTest::new().await;
// test.move_app_to_trash().await;
// let query = QueryAppRequest::new(&test.app.id);
// let _ = read_app(&test.sdk, query);
// }

View File

@ -14,7 +14,10 @@ use crate::{
}; };
use bytes::Bytes; use bytes::Bytes;
use dashmap::DashMap; use dashmap::DashMap;
use flowy_collaboration::entities::doc::{DocumentDelta, DocumentId, DocumentInfo}; use flowy_collaboration::entities::{
doc::{DocumentDelta, DocumentId, DocumentInfo},
revision::RepeatedRevision,
};
use flowy_database::ConnectionPool; use flowy_database::ConnectionPool;
use flowy_error::FlowyResult; use flowy_error::FlowyResult;
use lib_infra::future::FutureResult; use lib_infra::future::FutureResult;
@ -52,19 +55,11 @@ impl DocumentController {
Ok(()) Ok(())
} }
#[tracing::instrument(level = "debug", skip(self, doc_id, pool), fields(doc_id), err)] #[tracing::instrument(level = "debug", skip(self, doc_id), fields(doc_id), err)]
pub async fn open<T: AsRef<str>>( pub async fn open<T: AsRef<str>>(&self, doc_id: T) -> Result<Arc<ClientDocumentEditor>, FlowyError> {
&self,
doc_id: T,
pool: Arc<ConnectionPool>,
) -> Result<Arc<ClientDocumentEditor>, FlowyError> {
let doc_id = doc_id.as_ref(); let doc_id = doc_id.as_ref();
tracing::Span::current().record("doc_id", &doc_id); tracing::Span::current().record("doc_id", &doc_id);
if !self.open_cache.contains(doc_id) { self.get_editor(doc_id).await
let editor = self.make_editor(doc_id, pool.clone()).await?;
return Ok(editor);
}
self.open_cache.get(doc_id)
} }
#[tracing::instrument(level = "debug", skip(self, doc_id), fields(doc_id), err)] #[tracing::instrument(level = "debug", skip(self, doc_id), fields(doc_id), err)]
@ -85,17 +80,9 @@ impl DocumentController {
Ok(()) Ok(())
} }
#[tracing::instrument(level = "debug", skip(self, delta, db_pool), fields(doc_id = %delta.doc_id), err)] #[tracing::instrument(level = "debug", skip(self, delta), fields(doc_id = %delta.doc_id), err)]
pub async fn apply_document_delta( pub async fn apply_document_delta(&self, delta: DocumentDelta) -> Result<DocumentDelta, FlowyError> {
&self, let editor = self.get_editor(&delta.doc_id).await?;
delta: DocumentDelta,
db_pool: Arc<ConnectionPool>,
) -> Result<DocumentDelta, FlowyError> {
if !self.open_cache.contains(&delta.doc_id) {
let _ = self.open(&delta.doc_id, db_pool).await?;
}
let editor = self.open_cache.get(&delta.doc_id)?;
let _ = editor.compose_local_delta(Bytes::from(delta.delta_json)).await?; let _ = editor.compose_local_delta(Bytes::from(delta.delta_json)).await?;
let document_json = editor.document_json().await?; let document_json = editor.document_json().await?;
Ok(DocumentDelta { Ok(DocumentDelta {
@ -104,7 +91,23 @@ impl DocumentController {
}) })
} }
pub async fn save_document_delta(&self, delta: DocumentDelta) {} pub async fn save_document<T: AsRef<str>>(&self, doc_id: T, revisions: RepeatedRevision) -> FlowyResult<()> {
let doc_id = doc_id.as_ref().to_owned();
let db_pool = self.user.db_pool()?;
let rev_manager = self.make_rev_manager(&doc_id, db_pool)?;
let _ = rev_manager.reset_document(revisions).await?;
Ok(())
}
async fn get_editor(&self, doc_id: &str) -> FlowyResult<Arc<ClientDocumentEditor>> {
match self.open_cache.get(doc_id) {
None => {
let db_pool = self.user.db_pool()?;
self.make_editor(&doc_id, db_pool).await
},
Some(editor) => Ok(editor),
}
}
} }
impl DocumentController { impl DocumentController {
@ -173,28 +176,23 @@ impl OpenDocCache {
pub(crate) fn contains(&self, doc_id: &str) -> bool { self.inner.get(doc_id).is_some() } pub(crate) fn contains(&self, doc_id: &str) -> bool { self.inner.get(doc_id).is_some() }
pub(crate) fn get(&self, doc_id: &str) -> Result<Arc<ClientDocumentEditor>, FlowyError> { pub(crate) fn get(&self, doc_id: &str) -> Option<Arc<ClientDocumentEditor>> {
if !self.contains(&doc_id) { if !self.contains(&doc_id) {
return Err(doc_not_found()); return None;
} }
let opened_doc = self.inner.get(doc_id).unwrap(); let opened_doc = self.inner.get(doc_id).unwrap();
Ok(opened_doc.clone()) Some(opened_doc.clone())
} }
pub(crate) fn remove(&self, id: &str) { pub(crate) fn remove(&self, id: &str) {
let doc_id = id.to_string(); let doc_id = id.to_string();
match self.get(id) { if let Some(editor) = self.get(id) {
Ok(editor) => editor.stop(), editor.stop()
Err(e) => log::error!("{}", e),
} }
self.inner.remove(&doc_id); self.inner.remove(&doc_id);
} }
} }
fn doc_not_found() -> FlowyError {
FlowyError::record_not_found().context("Doc is close or you should call open first")
}
#[tracing::instrument(level = "debug", skip(state_receiver, receivers))] #[tracing::instrument(level = "debug", skip(state_receiver, receivers))]
fn listen_ws_state_changed(mut state_receiver: WSStateReceiver, receivers: Arc<DocumentWSReceivers>) { fn listen_ws_state_changed(mut state_receiver: WSStateReceiver, receivers: Arc<DocumentWSReceivers>) {
tokio::spawn(async move { tokio::spawn(async move {

View File

@ -4,10 +4,10 @@ use crate::{
disk::{Persistence, RevisionDiskCache}, disk::{Persistence, RevisionDiskCache},
memory::{RevisionMemoryCache, RevisionMemoryCacheDelegate}, memory::{RevisionMemoryCache, RevisionMemoryCacheDelegate},
}, },
sql_tables::{RevChangeset, RevTableState}, sql_tables::{RevTableState, RevisionChangeset},
}; };
use dashmap::DashMap; use dashmap::DashMap;
use flowy_collaboration::entities::revision::{RevState, Revision, RevisionRange}; use flowy_collaboration::entities::revision::{Revision, RevisionRange, RevisionState};
use flowy_database::ConnectionPool; use flowy_database::ConnectionPool;
use flowy_error::{internal_error, FlowyResult}; use flowy_error::{internal_error, FlowyResult};
use lib_infra::future::FutureResult; use lib_infra::future::FutureResult;
@ -21,11 +21,9 @@ use std::{
}; };
use tokio::{sync::RwLock, task::spawn_blocking}; use tokio::{sync::RwLock, task::spawn_blocking};
type DocRevisionDiskCache = dyn RevisionDiskCache<Error = FlowyError>;
pub struct RevisionCache { pub struct RevisionCache {
doc_id: String, doc_id: String,
pub disk_cache: Arc<DocRevisionDiskCache>, disk_cache: Arc<dyn RevisionDiskCache<Error = FlowyError>>,
memory_cache: Arc<RevisionMemoryCache>, memory_cache: Arc<RevisionMemoryCache>,
sync_seq: Arc<RevisionSyncSeq>, sync_seq: Arc<RevisionSyncSeq>,
latest_rev_id: AtomicI64, latest_rev_id: AtomicI64,
@ -46,6 +44,29 @@ impl RevisionCache {
} }
} }
pub fn read_revisions(&self, doc_id: &str) -> FlowyResult<Vec<RevisionRecord>> {
self.disk_cache.read_revisions(doc_id, None)
}
#[tracing::instrument(level = "debug", skip(self, doc_id, revisions))]
pub fn reset_document(&self, doc_id: &str, revisions: Vec<Revision>) -> FlowyResult<()> {
let disk_cache = self.disk_cache.clone();
let conn = disk_cache.db_pool().get().map_err(internal_error)?;
let records = revisions
.into_iter()
.map(|revision| RevisionRecord {
revision,
state: RevisionState::StateLocal,
})
.collect::<Vec<_>>();
conn.immediate_transaction::<_, FlowyError, _>(|| {
let _ = disk_cache.delete_revisions(doc_id, None, &*conn)?;
let _ = disk_cache.write_revisions(records, &*conn)?;
Ok(())
})
}
#[tracing::instrument(level = "debug", skip(self, revision))] #[tracing::instrument(level = "debug", skip(self, revision))]
pub async fn add_local_revision(&self, revision: Revision) -> FlowyResult<()> { pub async fn add_local_revision(&self, revision: Revision) -> FlowyResult<()> {
if self.memory_cache.contains(&revision.rev_id) { if self.memory_cache.contains(&revision.rev_id) {
@ -54,7 +75,7 @@ impl RevisionCache {
let rev_id = revision.rev_id; let rev_id = revision.rev_id;
let record = RevisionRecord { let record = RevisionRecord {
revision, revision,
state: RevState::StateLocal, state: RevisionState::StateLocal,
}; };
let _ = self.memory_cache.add_revision(&record).await; let _ = self.memory_cache.add_revision(&record).await;
self.sync_seq.add_revision(record).await?; self.sync_seq.add_revision(record).await?;
@ -70,7 +91,7 @@ impl RevisionCache {
let rev_id = revision.rev_id; let rev_id = revision.rev_id;
let record = RevisionRecord { let record = RevisionRecord {
revision, revision,
state: RevState::Ack, state: RevisionState::Ack,
}; };
self.memory_cache.add_revision(&record).await; self.memory_cache.add_revision(&record).await;
let _ = self.latest_rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id)); let _ = self.latest_rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id));
@ -91,11 +112,13 @@ impl RevisionCache {
pub async fn get_revision(&self, rev_id: i64) -> Option<RevisionRecord> { pub async fn get_revision(&self, rev_id: i64) -> Option<RevisionRecord> {
match self.memory_cache.get_revision(&rev_id).await { match self.memory_cache.get_revision(&rev_id).await {
None => match self.disk_cache.read_revision(&self.doc_id, rev_id) { None => match self.disk_cache.read_revisions(&self.doc_id, Some(vec![rev_id])) {
Ok(Some(revision)) => Some(revision), Ok(mut records) => {
Ok(None) => { if records.is_empty() {
tracing::warn!("Can't find revision in {} with rev_id: {}", &self.doc_id, rev_id); tracing::warn!("Can't find revision in {} with rev_id: {}", &self.doc_id, rev_id);
None }
assert_eq!(records.len(), 1);
records.pop()
}, },
Err(e) => { Err(e) => {
tracing::error!("{}", e); tracing::error!("{}", e);
@ -112,7 +135,7 @@ impl RevisionCache {
if records.len() != range_len { if records.len() != range_len {
let disk_cache = self.disk_cache.clone(); let disk_cache = self.disk_cache.clone();
let doc_id = self.doc_id.clone(); let doc_id = self.doc_id.clone();
records = spawn_blocking(move || disk_cache.revisions_in_range(&doc_id, &range)) records = spawn_blocking(move || disk_cache.read_revisions_with_range(&doc_id, &range))
.await .await
.map_err(internal_error)??; .map_err(internal_error)??;
@ -134,9 +157,13 @@ impl RevisionCache {
match sync_seq.next_sync_revision().await { match sync_seq.next_sync_revision().await {
None => match sync_seq.next_sync_rev_id().await { None => match sync_seq.next_sync_rev_id().await {
None => Ok(None), None => Ok(None),
Some(rev_id) => match disk_cache.read_revision(&doc_id, rev_id)? { Some(rev_id) => {
None => Ok(None), let records = disk_cache.read_revisions(&doc_id, Some(vec![rev_id]))?;
Some(record) => Ok(Some(record.revision)), let mut revisions = records
.into_iter()
.map(|record| record.revision)
.collect::<Vec<Revision>>();
Ok(revisions.pop())
}, },
}, },
Some((_, record)) => Ok(Some(record.revision)), Some((_, record)) => Ok(Some(record.revision)),
@ -146,10 +173,13 @@ impl RevisionCache {
} }
impl RevisionMemoryCacheDelegate for Arc<Persistence> { impl RevisionMemoryCacheDelegate for Arc<Persistence> {
fn receive_checkpoint(&self, records: Vec<RevisionRecord>) -> FlowyResult<()> { self.create_revisions(records) } fn receive_checkpoint(&self, records: Vec<RevisionRecord>) -> FlowyResult<()> {
let conn = &*self.pool.get().map_err(internal_error)?;
self.write_revisions(records, &conn)
}
fn receive_ack(&self, doc_id: &str, rev_id: i64) { fn receive_ack(&self, doc_id: &str, rev_id: i64) {
let changeset = RevChangeset { let changeset = RevisionChangeset {
doc_id: doc_id.to_string(), doc_id: doc_id.to_string(),
rev_id: rev_id.into(), rev_id: rev_id.into(),
state: RevTableState::Acked, state: RevTableState::Acked,
@ -164,11 +194,11 @@ impl RevisionMemoryCacheDelegate for Arc<Persistence> {
#[derive(Clone)] #[derive(Clone)]
pub struct RevisionRecord { pub struct RevisionRecord {
pub revision: Revision, pub revision: Revision,
pub state: RevState, pub state: RevisionState,
} }
impl RevisionRecord { impl RevisionRecord {
pub fn ack(&mut self) { self.state = RevState::Ack; } pub fn ack(&mut self) { self.state = RevisionState::Ack; }
} }
struct RevisionSyncSeq { struct RevisionSyncSeq {

View File

@ -1,6 +1,7 @@
use crate::services::doc::revision::RevisionRecord; use crate::services::doc::revision::RevisionRecord;
use crate::sql_tables::{RevChangeset, RevTableSql}; use crate::sql_tables::{RevTableSql, RevisionChangeset};
use diesel::SqliteConnection;
use flowy_collaboration::entities::revision::RevisionRange; use flowy_collaboration::entities::revision::RevisionRange;
use flowy_database::ConnectionPool; use flowy_database::ConnectionPool;
use flowy_error::{internal_error, FlowyError, FlowyResult}; use flowy_error::{internal_error, FlowyError, FlowyResult};
@ -8,11 +9,22 @@ use std::{fmt::Debug, sync::Arc};
pub trait RevisionDiskCache: Sync + Send { pub trait RevisionDiskCache: Sync + Send {
type Error: Debug; type Error: Debug;
fn create_revisions(&self, revisions: Vec<RevisionRecord>) -> Result<(), Self::Error>; fn write_revisions(&self, revisions: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), Self::Error>;
fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result<Vec<RevisionRecord>, Self::Error>; fn read_revisions(&self, doc_id: &str, rev_ids: Option<Vec<i64>>) -> Result<Vec<RevisionRecord>, Self::Error>;
fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result<Option<RevisionRecord>, Self::Error>; fn read_revisions_with_range(
fn read_revisions(&self, doc_id: &str) -> Result<Vec<RevisionRecord>, Self::Error>; &self,
fn update_revisions(&self, changesets: Vec<RevChangeset>) -> FlowyResult<()>; doc_id: &str,
range: &RevisionRange,
) -> Result<Vec<RevisionRecord>, Self::Error>;
fn update_revisions(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()>;
fn delete_revisions(
&self,
doc_id: &str,
rev_ids: Option<Vec<i64>>,
conn: &SqliteConnection,
) -> Result<(), Self::Error>;
fn db_pool(&self) -> Arc<ConnectionPool>;
} }
pub(crate) struct Persistence { pub(crate) struct Persistence {
@ -23,33 +35,28 @@ pub(crate) struct Persistence {
impl RevisionDiskCache for Persistence { impl RevisionDiskCache for Persistence {
type Error = FlowyError; type Error = FlowyError;
fn create_revisions(&self, revisions: Vec<RevisionRecord>) -> Result<(), Self::Error> { fn write_revisions(&self, revisions: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), Self::Error> {
let conn = &*self.pool.get().map_err(internal_error)?; let _ = RevTableSql::create_rev_table(revisions, conn)?;
conn.immediate_transaction::<_, FlowyError, _>(|| { Ok(())
let _ = RevTableSql::create_rev_table(revisions, conn)?;
Ok(())
})
} }
fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result<Vec<RevisionRecord>, Self::Error> { fn read_revisions(&self, doc_id: &str, rev_ids: Option<Vec<i64>>) -> Result<Vec<RevisionRecord>, Self::Error> {
let conn = &*self.pool.get().map_err(internal_error).unwrap(); let conn = self.pool.get().map_err(internal_error)?;
let records = RevTableSql::read_rev_tables(&self.user_id, doc_id, rev_ids, &*conn)?;
Ok(records)
}
fn read_revisions_with_range(
&self,
doc_id: &str,
range: &RevisionRange,
) -> Result<Vec<RevisionRecord>, Self::Error> {
let conn = &*self.pool.get().map_err(internal_error)?;
let revisions = RevTableSql::read_rev_tables_with_range(&self.user_id, doc_id, range.clone(), conn)?; let revisions = RevTableSql::read_rev_tables_with_range(&self.user_id, doc_id, range.clone(), conn)?;
Ok(revisions) Ok(revisions)
} }
fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result<Option<RevisionRecord>, Self::Error> { fn update_revisions(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()> {
let conn = self.pool.get().map_err(internal_error)?;
let some = RevTableSql::read_rev_table(&self.user_id, doc_id, &rev_id, &*conn)?;
Ok(some)
}
fn read_revisions(&self, doc_id: &str) -> Result<Vec<RevisionRecord>, Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let some = RevTableSql::read_rev_tables(&self.user_id, doc_id, &*conn)?;
Ok(some)
}
fn update_revisions(&self, changesets: Vec<RevChangeset>) -> FlowyResult<()> {
let conn = &*self.pool.get().map_err(internal_error)?; let conn = &*self.pool.get().map_err(internal_error)?;
let _ = conn.immediate_transaction::<_, FlowyError, _>(|| { let _ = conn.immediate_transaction::<_, FlowyError, _>(|| {
for changeset in changesets { for changeset in changesets {
@ -59,6 +66,18 @@ impl RevisionDiskCache for Persistence {
})?; })?;
Ok(()) Ok(())
} }
fn delete_revisions(
&self,
doc_id: &str,
rev_ids: Option<Vec<i64>>,
conn: &SqliteConnection,
) -> Result<(), Self::Error> {
let _ = RevTableSql::delete_rev_tables(doc_id, rev_ids, conn)?;
Ok(())
}
fn db_pool(&self) -> Arc<ConnectionPool> { self.pool.clone() }
} }
impl Persistence { impl Persistence {

View File

@ -3,7 +3,7 @@ use bytes::Bytes;
use flowy_collaboration::{ use flowy_collaboration::{
entities::{ entities::{
doc::DocumentInfo, doc::DocumentInfo,
revision::{RevState, RevType, Revision, RevisionRange}, revision::{RepeatedRevision, RevType, Revision, RevisionRange, RevisionState},
}, },
util::{md5, RevIdCounter}, util::{md5, RevIdCounter},
}; };
@ -51,6 +51,11 @@ impl RevisionManager {
Ok(doc.delta()?) Ok(doc.delta()?)
} }
#[tracing::instrument(level = "debug", skip(self, revisions), err)]
pub async fn reset_document(&self, revisions: RepeatedRevision) -> FlowyResult<()> {
self.cache.reset_document(&self.doc_id, revisions.into_inner())
}
pub async fn add_remote_revision(&self, revision: &Revision) -> Result<(), FlowyError> { pub async fn add_remote_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
assert_eq!(revision.ty, RevType::Remote); assert_eq!(revision.ty, RevType::Remote);
self.rev_id_counter.set(revision.rev_id); self.rev_id_counter.set(revision.rev_id);
@ -108,7 +113,7 @@ struct RevisionLoader {
impl RevisionLoader { impl RevisionLoader {
async fn load(&self) -> Result<Vec<Revision>, FlowyError> { async fn load(&self) -> Result<Vec<Revision>, FlowyError> {
let records = self.cache.disk_cache.read_revisions(&self.doc_id)?; let records = self.cache.read_revisions(&self.doc_id)?;
let revisions: Vec<Revision>; let revisions: Vec<Revision>;
if records.is_empty() { if records.is_empty() {
let doc = self.server.fetch_document(&self.doc_id).await?; let doc = self.server.fetch_document(&self.doc_id).await?;
@ -128,11 +133,11 @@ impl RevisionLoader {
} else { } else {
for record in &records { for record in &records {
match record.state { match record.state {
RevState::StateLocal => match self.cache.add_local_revision(record.revision.clone()).await { RevisionState::StateLocal => match self.cache.add_local_revision(record.revision.clone()).await {
Ok(_) => {}, Ok(_) => {},
Err(e) => tracing::error!("{}", e), Err(e) => tracing::error!("{}", e),
}, },
RevState::Ack => {}, RevisionState::Ack => {},
} }
} }
revisions = records.into_iter().map(|record| record.revision).collect::<_>(); revisions = records.into_iter().map(|record| record.revision).collect::<_>();

View File

@ -1,7 +1,7 @@
use crate::{ use crate::{
errors::FlowyError, errors::FlowyError,
services::doc::revision::RevisionRecord, services::doc::revision::RevisionRecord,
sql_tables::{doc::RevTable, mk_revision_record_from_table, RevChangeset, RevTableState, RevTableType}, sql_tables::{doc::RevTable, mk_revision_record_from_table, RevTableState, RevTableType, RevisionChangeset},
}; };
use diesel::update; use diesel::update;
use flowy_collaboration::entities::revision::RevisionRange; use flowy_collaboration::entities::revision::RevisionRange;
@ -32,7 +32,7 @@ impl RevTableSql {
Ok(()) Ok(())
} }
pub(crate) fn update_rev_table(changeset: RevChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> { pub(crate) fn update_rev_table(changeset: RevisionChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> {
let filter = dsl::rev_table let filter = dsl::rev_table
.filter(dsl::rev_id.eq(changeset.rev_id.as_ref())) .filter(dsl::rev_id.eq(changeset.rev_id.as_ref()))
.filter(dsl::doc_id.eq(changeset.doc_id)); .filter(dsl::doc_id.eq(changeset.doc_id));
@ -44,36 +44,20 @@ impl RevTableSql {
pub(crate) fn read_rev_tables( pub(crate) fn read_rev_tables(
user_id: &str, user_id: &str,
doc_id: &str, doc_id: &str,
rev_ids: Option<Vec<i64>>,
conn: &SqliteConnection, conn: &SqliteConnection,
) -> Result<Vec<RevisionRecord>, FlowyError> { ) -> Result<Vec<RevisionRecord>, FlowyError> {
let filter = dsl::rev_table let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(doc_id)).into_boxed();
.filter(dsl::doc_id.eq(doc_id)) if let Some(rev_ids) = rev_ids {
.order(dsl::rev_id.asc()) sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
.into_boxed();
let rev_tables = filter.load::<RevTable>(conn)?;
let revisions = rev_tables
.into_iter()
.map(|table| mk_revision_record_from_table(user_id, table))
.collect::<Vec<_>>();
Ok(revisions)
}
pub(crate) fn read_rev_table(
user_id: &str,
doc_id: &str,
revision_id: &i64,
conn: &SqliteConnection,
) -> Result<Option<RevisionRecord>, FlowyError> {
let filter = dsl::rev_table
.filter(dsl::doc_id.eq(doc_id))
.filter(dsl::rev_id.eq(revision_id));
let result = filter.first::<RevTable>(conn);
if Err(diesel::NotFound) == result {
Ok(None)
} else {
Ok(Some(mk_revision_record_from_table(user_id, result?)))
} }
let rows = sql.order(dsl::rev_id.asc()).load::<RevTable>(conn)?;
let records = rows
.into_iter()
.map(|row| mk_revision_record_from_table(user_id, row))
.collect::<Vec<_>>();
Ok(records)
} }
pub(crate) fn read_rev_tables_with_range( pub(crate) fn read_rev_tables_with_range(
@ -96,13 +80,18 @@ impl RevTableSql {
Ok(revisions) Ok(revisions)
} }
#[allow(dead_code)] pub(crate) fn delete_rev_tables(
pub(crate) fn delete_rev_table(doc_id_s: &str, rev_id_s: i64, conn: &SqliteConnection) -> Result<(), FlowyError> { doc_id: &str,
let filter = dsl::rev_table rev_ids: Option<Vec<i64>>,
.filter(dsl::rev_id.eq(rev_id_s)) conn: &SqliteConnection,
.filter(dsl::doc_id.eq(doc_id_s)); ) -> Result<(), FlowyError> {
let affected_row = diesel::delete(filter).execute(conn)?; let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(doc_id)).into_boxed();
debug_assert_eq!(affected_row, 1); if let Some(rev_ids) = rev_ids {
sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
}
let affected_row = sql.execute(conn)?;
tracing::debug!("Delete {} revision rows", affected_row);
Ok(()) Ok(())
} }
} }

View File

@ -1,7 +1,7 @@
use crate::services::doc::revision::RevisionRecord; use crate::services::doc::revision::RevisionRecord;
use diesel::sql_types::Integer; use diesel::sql_types::Integer;
use flowy_collaboration::{ use flowy_collaboration::{
entities::revision::{RevId, RevState, RevType, Revision}, entities::revision::{RevId, RevType, Revision, RevisionState},
util::md5, util::md5,
}; };
use flowy_database::schema::rev_table; use flowy_database::schema::rev_table;
@ -48,20 +48,20 @@ impl RevTableState {
} }
impl_sql_integer_expression!(RevTableState); impl_sql_integer_expression!(RevTableState);
impl std::convert::From<RevTableState> for RevState { impl std::convert::From<RevTableState> for RevisionState {
fn from(s: RevTableState) -> Self { fn from(s: RevTableState) -> Self {
match s { match s {
RevTableState::Local => RevState::StateLocal, RevTableState::Local => RevisionState::StateLocal,
RevTableState::Acked => RevState::Ack, RevTableState::Acked => RevisionState::Ack,
} }
} }
} }
impl std::convert::From<RevState> for RevTableState { impl std::convert::From<RevisionState> for RevTableState {
fn from(s: RevState) -> Self { fn from(s: RevisionState) -> Self {
match s { match s {
RevState::StateLocal => RevTableState::Local, RevisionState::StateLocal => RevTableState::Local,
RevState::Ack => RevTableState::Acked, RevisionState::Ack => RevTableState::Acked,
} }
} }
} }
@ -130,7 +130,7 @@ impl RevTableType {
} }
impl_sql_integer_expression!(RevTableType); impl_sql_integer_expression!(RevTableType);
pub struct RevChangeset { pub struct RevisionChangeset {
pub(crate) doc_id: String, pub(crate) doc_id: String,
pub(crate) rev_id: RevId, pub(crate) rev_id: RevId,
pub(crate) state: RevTableState, pub(crate) state: RevTableState,

View File

@ -1,5 +1,5 @@
use crate::{helper::ViewTest, FlowySDKTest}; use crate::{helper::ViewTest, FlowySDKTest};
use flowy_collaboration::entities::revision::RevState; use flowy_collaboration::entities::revision::RevisionState;
use flowy_document::services::doc::{edit::ClientDocumentEditor, SYNC_INTERVAL_IN_MILLIS}; use flowy_document::services::doc::{edit::ClientDocumentEditor, SYNC_INTERVAL_IN_MILLIS};
use lib_ot::{core::Interval, rich_text::RichTextDelta}; use lib_ot::{core::Interval, rich_text::RichTextDelta};
use std::sync::Arc; use std::sync::Arc;
@ -12,7 +12,7 @@ pub enum EditorScript {
Delete(Interval), Delete(Interval),
Replace(Interval, &'static str), Replace(Interval, &'static str),
AssertRevisionState(i64, RevState), AssertRevisionState(i64, RevisionState),
AssertNextRevId(Option<i64>), AssertNextRevId(Option<i64>),
AssertCurrentRevId(i64), AssertCurrentRevId(i64),
AssertJson(&'static str), AssertJson(&'static str),
@ -30,8 +30,7 @@ impl EditorTest {
let sdk = FlowySDKTest::setup(); let sdk = FlowySDKTest::setup();
let _ = sdk.init_user().await; let _ = sdk.init_user().await;
let test = ViewTest::new(&sdk).await; let test = ViewTest::new(&sdk).await;
let db_pool = sdk.user_session.db_pool().unwrap(); let editor = sdk.document_ctx.controller.open(&test.view.id).await.unwrap();
let editor = sdk.document_ctx.controller.open(&test.view.id, db_pool).await.unwrap();
Self { sdk, editor } Self { sdk, editor }
} }

View File

@ -1,4 +1,4 @@
use flowy_collaboration::entities::revision::RevState; use flowy_collaboration::entities::revision::RevisionState;
use flowy_test::doc_script::{EditorScript::*, *}; use flowy_test::doc_script::{EditorScript::*, *};
#[tokio::test] #[tokio::test]
@ -22,8 +22,8 @@ async fn doc_sync_retry_ws_conn() {
InsertText("3", 2), InsertText("3", 2),
StartWs, StartWs,
WaitSyncFinished, WaitSyncFinished,
AssertRevisionState(2, RevState::Ack), AssertRevisionState(2, RevisionState::Ack),
AssertRevisionState(3, RevState::Ack), AssertRevisionState(3, RevisionState::Ack),
AssertNextRevId(None), AssertNextRevId(None),
AssertJson(r#"[{"insert":"123\n"}]"#), AssertJson(r#"[{"insert":"123\n"}]"#),
]; ];

View File

@ -39,9 +39,23 @@ impl Revision {
pub fn pair_rev_id(&self) -> (i64, i64) { (self.base_rev_id, self.rev_id) } pub fn pair_rev_id(&self) -> (i64, i64) { (self.base_rev_id, self.rev_id) }
#[allow(dead_code)]
pub fn is_initial(&self) -> bool { self.rev_id == 0 } pub fn is_initial(&self) -> bool { self.rev_id == 0 }
pub fn initial_revision(user_id: &str, doc_id: &str, delta_data: Bytes) -> Self {
let user_id = user_id.to_owned();
let doc_id = doc_id.to_owned();
let md5 = md5(&delta_data);
Self {
base_rev_id: 0,
rev_id: 0,
delta_data: delta_data.to_vec(),
md5,
doc_id,
ty: RevType::Local,
user_id,
}
}
pub fn new( pub fn new(
doc_id: &str, doc_id: &str,
base_rev_id: i64, base_rev_id: i64,
@ -51,11 +65,11 @@ impl Revision {
user_id: &str, user_id: &str,
md5: String, md5: String,
) -> Revision { ) -> Revision {
let user_id = user_id.to_owned();
let doc_id = doc_id.to_owned(); let doc_id = doc_id.to_owned();
let delta_data = delta_data.to_vec(); let delta_data = delta_data.to_vec();
let base_rev_id = base_rev_id; let base_rev_id = base_rev_id;
let rev_id = rev_id; let rev_id = rev_id;
let user_id = user_id.to_owned();
if base_rev_id != 0 { if base_rev_id != 0 {
debug_assert!(base_rev_id != rev_id); debug_assert!(base_rev_id != rev_id);
@ -73,6 +87,10 @@ impl Revision {
} }
} }
impl std::convert::From<Revision> for RepeatedRevision {
fn from(revision: Revision) -> Self { RepeatedRevision { items: vec![revision] } }
}
impl std::fmt::Debug for Revision { impl std::fmt::Debug for Revision {
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
let _ = f.write_fmt(format_args!("doc_id {}, ", self.doc_id))?; let _ = f.write_fmt(format_args!("doc_id {}, ", self.doc_id))?;
@ -142,6 +160,8 @@ impl std::fmt::Display for RevId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!("{}", self.value)) } fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!("{}", self.value)) }
} }
// Deprecated
// TODO: remove RevType
#[derive(Debug, ProtoBuf_Enum, Clone, Eq, PartialEq)] #[derive(Debug, ProtoBuf_Enum, Clone, Eq, PartialEq)]
pub enum RevType { pub enum RevType {
Local = 0, Local = 0,
@ -193,7 +213,7 @@ pub fn md5<T: AsRef<[u8]>>(data: T) -> String {
} }
#[derive(Debug, Clone, Eq, PartialEq)] #[derive(Debug, Clone, Eq, PartialEq)]
pub enum RevState { pub enum RevisionState {
StateLocal = 0, StateLocal = 0,
Ack = 1, Ack = 1,
} }

View File

@ -69,6 +69,7 @@ pub struct CreateViewParams {
#[pb(index = 5)] #[pb(index = 5)]
pub view_type: ViewType, pub view_type: ViewType,
// ViewType::Doc -> Delta string
#[pb(index = 6)] #[pb(index = 6)]
pub view_data: String, pub view_data: String,
@ -96,6 +97,8 @@ impl CreateViewParams {
view_id, view_id,
} }
} }
pub fn take_view_data(&mut self) -> String { ::std::mem::replace(&mut self.view_data, String::new()) }
} }
impl TryInto<CreateViewParams> for CreateViewRequest { impl TryInto<CreateViewParams> for CreateViewRequest {