save document revision if local not exist

This commit is contained in:
appflowy 2021-10-05 23:18:19 +08:00
parent ef4ee320f7
commit 1c8d2c5ac0
8 changed files with 48 additions and 98 deletions

View File

@ -4,6 +4,7 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "app_flowy",
"request": "launch",

View File

@ -15,7 +15,6 @@ use crate::{
server::Server,
ws::WsDocumentManager,
},
sql_tables::doc::DocTableSql,
};
use flowy_database::{ConnectionPool, SqliteConnection};
use flowy_infra::future::{wrap_future, FnFuture, ResultFuture};
@ -24,7 +23,6 @@ use tokio::time::{interval, Duration};
pub(crate) struct DocController {
server: Server,
doc_sql: Arc<DocTableSql>,
ws_manager: Arc<WsDocumentManager>,
cache: Arc<DocCache>,
user: Arc<dyn DocumentUser>,
@ -32,11 +30,9 @@ pub(crate) struct DocController {
impl DocController {
pub(crate) fn new(server: Server, user: Arc<dyn DocumentUser>, ws: Arc<WsDocumentManager>) -> Self {
let doc_sql = Arc::new(DocTableSql {});
let cache = Arc::new(DocCache::new());
let controller = Self {
server,
doc_sql,
user,
ws_manager: ws,
cache: cache.clone(),
@ -51,11 +47,11 @@ impl DocController {
#[tracing::instrument(skip(self, conn), err)]
pub(crate) fn create(&self, params: CreateDocParams, conn: &SqliteConnection) -> Result<(), DocError> {
let _doc = Doc {
id: params.id,
data: params.data,
rev_id: 0,
};
// let _doc = Doc {
// id: params.id,
// data: params.data,
// rev_id: 0,
// };
// let _ = self.doc_sql.create_doc_table(DocTable::new(doc), conn)?;
Ok(())
}
@ -84,8 +80,6 @@ impl DocController {
#[tracing::instrument(level = "debug", skip(self, conn), err)]
pub(crate) fn delete(&self, params: QueryDocParams, conn: &SqliteConnection) -> Result<(), DocError> {
let doc_id = &params.doc_id;
let _ = self.doc_sql.delete_doc(doc_id, &*conn)?;
self.cache.remove(doc_id);
self.ws_manager.remove_handler(doc_id);
let _ = self.delete_doc_on_server(params)?;
@ -135,32 +129,6 @@ impl DocController {
self.cache.set(edit_ctx.clone());
Ok(edit_ctx)
}
#[allow(dead_code)]
#[tracing::instrument(level = "debug", skip(self, pool), err)]
async fn read_doc(&self, doc_id: &str, pool: Arc<ConnectionPool>) -> Result<Doc, DocError> {
match self.doc_sql.read_doc_table(doc_id, pool.clone()) {
Ok(doc_table) => Ok(doc_table.into()),
Err(error) => {
if error.is_record_not_found() {
let token = self.user.token()?;
let params = QueryDocParams {
doc_id: doc_id.to_string(),
};
match self.server.read_doc(&token, params).await? {
None => Err(DocError::not_found()),
Some(doc) => {
let conn = &*pool.get().map_err(internal_error)?;
let _ = self.doc_sql.create_doc_table(doc.clone().into(), conn)?;
Ok(doc)
},
}
} else {
return Err(error);
}
},
}
}
}
struct RevisionServerImpl {
@ -182,6 +150,7 @@ impl RevisionServer for RevisionServerImpl {
Some(doc) => {
let delta = Delta::from_bytes(doc.data)?;
Ok(DocRevision {
base_rev_id: 0.into(),
rev_id: doc.rev_id.into(),
delta,
})

View File

@ -8,7 +8,6 @@ use crate::{
},
Document,
},
sql_tables::{DocTableChangeset, DocTableSql},
};
use async_stream::stream;
use flowy_database::ConnectionPool;
@ -134,20 +133,6 @@ impl DocumentActor {
);
result
}
#[tracing::instrument(level = "debug", skip(self, rev_id), err)]
async fn save_to_disk(&self, rev_id: RevId) -> DocResult<()> {
let data = self.document.read().await.to_json();
let changeset = DocTableChangeset {
id: self.doc_id.clone(),
data,
rev_id: rev_id.into(),
};
let sql = DocTableSql {};
let conn = self.pool.get().map_err(internal_error)?;
let _ = sql.update_doc_table(changeset, &*conn)?;
Ok(())
}
}
// #[tracing::instrument(level = "debug", skip(self, params), err)]

View File

@ -46,7 +46,11 @@ impl ClientEditDoc {
user: Arc<dyn DocumentUser>,
) -> DocResult<Self> {
let rev_store = spawn_rev_store_actor(doc_id, pool.clone(), server.clone());
let DocRevision { rev_id, delta } = fetch_document(rev_store.clone()).await?;
let DocRevision {
base_rev_id: _,
rev_id,
delta,
} = load_document(rev_store.clone()).await?;
let rev_manager = Arc::new(RevisionManager::new(doc_id, rev_id, rev_store));
let document = spawn_doc_edit_actor(doc_id, delta, pool.clone());
let doc_id = doc_id.to_string();
@ -323,15 +327,9 @@ fn spawn_doc_edit_actor(doc_id: &str, delta: Delta, pool: Arc<ConnectionPool>) -
sender
}
async fn fetch_document(sender: mpsc::Sender<RevisionCmd>) -> DocResult<DocRevision> {
async fn load_document(sender: mpsc::Sender<RevisionCmd>) -> DocResult<DocRevision> {
let (ret, rx) = oneshot::channel();
let _ = sender.send(RevisionCmd::DocumentDelta { ret }).await;
match rx.await {
Ok(result) => Ok(result?),
Err(e) => {
log::error!("fetch_document: {}", e);
Err(DocError::internal().context(format!("fetch_document: {}", e)))
},
}
let result = rx.await.map_err(internal_error)?;
result
}

View File

@ -8,6 +8,7 @@ use flowy_ot::core::{Delta, OperationTransformable};
use tokio::sync::{mpsc, oneshot};
pub struct DocRevision {
pub base_rev_id: RevId,
pub rev_id: RevId,
pub delta: Delta,
}

View File

@ -1,5 +1,5 @@
use crate::{
entities::doc::{RevId, Revision, RevisionRange},
entities::doc::{RevId, RevType, Revision, RevisionRange},
errors::{internal_error, DocError, DocResult},
services::doc::revision::{model::RevisionOperation, DocRevision, RevisionServer},
sql_tables::{RevState, RevTableSql},
@ -89,7 +89,7 @@ impl RevisionStoreActor {
let _ = ret.send(result);
},
RevisionCmd::DocumentDelta { ret } => {
let delta = fetch_document(&self.doc_id, self.server.clone(), self.persistence.clone()).await;
let delta = self.fetch_document().await;
let _ = ret.send(delta);
},
}
@ -180,38 +180,29 @@ impl RevisionStoreActor {
result
}
}
}
async fn fetch_document(
doc_id: &str,
server: Arc<dyn RevisionServer>,
persistence: Arc<Persistence>,
) -> DocResult<DocRevision> {
let fetch_from_remote = server.fetch_document_from_remote(doc_id).or_else(|result| {
log::error!(
"Fetch document delta from remote failed: {:?}, try to fetch from local",
result
);
fetch_from_local(doc_id, persistence.clone())
});
async fn fetch_document(&self) -> DocResult<DocRevision> {
let result = fetch_from_local(&self.doc_id, self.persistence.clone()).await;
if result.is_ok() {
return result;
}
let fetch_from_local = fetch_from_local(doc_id, persistence.clone()).or_else(|result| async move {
log::error!(
"Fetch document delta from local failed: {:?}, try to fetch from remote",
result
);
server.fetch_document_from_remote(doc_id).await
});
match self.server.fetch_document_from_remote(&self.doc_id).await {
Ok(doc_revision) => {
let delta_data = doc_revision.delta.to_bytes();
let revision = Revision::new(
doc_revision.base_rev_id.clone(),
doc_revision.rev_id.clone(),
delta_data.to_vec(),
&self.doc_id,
RevType::Remote,
);
self.handle_new_revision(revision);
tokio::select! {
result = fetch_from_remote => {
log::debug!("Finish fetching document from remote");
result
},
result = fetch_from_local => {
log::debug!("Finish fetching document from local");
result
},
Ok(doc_revision)
},
Err(e) => Err(e),
}
}
}
@ -225,6 +216,7 @@ async fn fetch_from_local(doc_id: &str, persistence: Arc<Persistence>) -> DocRes
return Err(DocError::not_found());
}
let base_rev_id: RevId = revisions.last().unwrap().base_rev_id.into();
let rev_id: RevId = revisions.last().unwrap().rev_id.into();
let mut delta = Delta::new();
for revision in revisions {
@ -240,7 +232,11 @@ async fn fetch_from_local(doc_id: &str, persistence: Arc<Persistence>) -> DocRes
delta.insert("\n", Attributes::default());
Result::<DocRevision, DocError>::Ok(DocRevision { rev_id, delta })
Result::<DocRevision, DocError>::Ok(DocRevision {
base_rev_id,
rev_id,
delta,
})
})
.await
.map_err(internal_error)?

View File

@ -7,6 +7,7 @@ pub(crate) struct DocTable {
pub(crate) id: String,
pub(crate) data: String,
pub(crate) rev_id: i64,
pub(crate) base_rev_id: i64,
}
impl DocTable {
@ -15,6 +16,7 @@ impl DocTable {
id: doc.id,
data: doc.data,
rev_id: doc.rev_id.into(),
base_rev_id: doc.base_rev_id.into(),
}
}
}
@ -33,6 +35,7 @@ impl std::convert::Into<Doc> for DocTable {
id: self.id,
data: self.data,
rev_id: self.rev_id.into(),
base_rev_id: self.base_rev_id.into(),
}
}
}
@ -43,6 +46,7 @@ impl std::convert::From<Doc> for DocTable {
id: doc.id,
data: doc.data,
rev_id: doc.rev_id.into(),
base_rev_id: doc.base_rev_id.into(),
}
}
}

View File

@ -1,9 +1,5 @@
mod doc_sql;
mod doc_table;
mod rev_sql;
mod rev_table;
pub(crate) use doc_sql::*;
pub(crate) use doc_table::*;
pub(crate) use rev_sql::*;
pub(crate) use rev_table::*;