diff --git a/app_flowy/.vscode/launch.json b/app_flowy/.vscode/launch.json index cfda11ed31..adae73641b 100644 --- a/app_flowy/.vscode/launch.json +++ b/app_flowy/.vscode/launch.json @@ -4,6 +4,7 @@ // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ + { "name": "app_flowy", "request": "launch", diff --git a/rust-lib/flowy-document/src/services/doc/doc_controller.rs b/rust-lib/flowy-document/src/services/doc/doc_controller.rs index a550bee8ff..7608e63907 100644 --- a/rust-lib/flowy-document/src/services/doc/doc_controller.rs +++ b/rust-lib/flowy-document/src/services/doc/doc_controller.rs @@ -15,7 +15,6 @@ use crate::{ server::Server, ws::WsDocumentManager, }, - sql_tables::doc::DocTableSql, }; use flowy_database::{ConnectionPool, SqliteConnection}; use flowy_infra::future::{wrap_future, FnFuture, ResultFuture}; @@ -24,7 +23,6 @@ use tokio::time::{interval, Duration}; pub(crate) struct DocController { server: Server, - doc_sql: Arc, ws_manager: Arc, cache: Arc, user: Arc, @@ -32,11 +30,9 @@ pub(crate) struct DocController { impl DocController { pub(crate) fn new(server: Server, user: Arc, ws: Arc) -> Self { - let doc_sql = Arc::new(DocTableSql {}); let cache = Arc::new(DocCache::new()); let controller = Self { server, - doc_sql, user, ws_manager: ws, cache: cache.clone(), @@ -51,11 +47,11 @@ impl DocController { #[tracing::instrument(skip(self, conn), err)] pub(crate) fn create(&self, params: CreateDocParams, conn: &SqliteConnection) -> Result<(), DocError> { - let _doc = Doc { - id: params.id, - data: params.data, - rev_id: 0, - }; + // let _doc = Doc { + // id: params.id, + // data: params.data, + // rev_id: 0, + // }; // let _ = self.doc_sql.create_doc_table(DocTable::new(doc), conn)?; Ok(()) } @@ -84,8 +80,6 @@ impl DocController { #[tracing::instrument(level = "debug", skip(self, conn), err)] pub(crate) fn delete(&self, params: QueryDocParams, conn: &SqliteConnection) -> Result<(), DocError> { let doc_id = ¶ms.doc_id; - let _ = self.doc_sql.delete_doc(doc_id, &*conn)?; - self.cache.remove(doc_id); self.ws_manager.remove_handler(doc_id); let _ = self.delete_doc_on_server(params)?; @@ -135,32 +129,6 @@ impl DocController { self.cache.set(edit_ctx.clone()); Ok(edit_ctx) } - - #[allow(dead_code)] - #[tracing::instrument(level = "debug", skip(self, pool), err)] - async fn read_doc(&self, doc_id: &str, pool: Arc) -> Result { - match self.doc_sql.read_doc_table(doc_id, pool.clone()) { - Ok(doc_table) => Ok(doc_table.into()), - Err(error) => { - if error.is_record_not_found() { - let token = self.user.token()?; - let params = QueryDocParams { - doc_id: doc_id.to_string(), - }; - match self.server.read_doc(&token, params).await? { - None => Err(DocError::not_found()), - Some(doc) => { - let conn = &*pool.get().map_err(internal_error)?; - let _ = self.doc_sql.create_doc_table(doc.clone().into(), conn)?; - Ok(doc) - }, - } - } else { - return Err(error); - } - }, - } - } } struct RevisionServerImpl { @@ -182,6 +150,7 @@ impl RevisionServer for RevisionServerImpl { Some(doc) => { let delta = Delta::from_bytes(doc.data)?; Ok(DocRevision { + base_rev_id: 0.into(), rev_id: doc.rev_id.into(), delta, }) diff --git a/rust-lib/flowy-document/src/services/doc/edit/doc_actor.rs b/rust-lib/flowy-document/src/services/doc/edit/doc_actor.rs index e909c3cb3c..246bdf2f7f 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/doc_actor.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/doc_actor.rs @@ -8,7 +8,6 @@ use crate::{ }, Document, }, - sql_tables::{DocTableChangeset, DocTableSql}, }; use async_stream::stream; use flowy_database::ConnectionPool; @@ -134,20 +133,6 @@ impl DocumentActor { ); result } - - #[tracing::instrument(level = "debug", skip(self, rev_id), err)] - async fn save_to_disk(&self, rev_id: RevId) -> DocResult<()> { - let data = self.document.read().await.to_json(); - let changeset = DocTableChangeset { - id: self.doc_id.clone(), - data, - rev_id: rev_id.into(), - }; - let sql = DocTableSql {}; - let conn = self.pool.get().map_err(internal_error)?; - let _ = sql.update_doc_table(changeset, &*conn)?; - Ok(()) - } } // #[tracing::instrument(level = "debug", skip(self, params), err)] diff --git a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs index 98029f81e6..b793ab056f 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs @@ -46,7 +46,11 @@ impl ClientEditDoc { user: Arc, ) -> DocResult { let rev_store = spawn_rev_store_actor(doc_id, pool.clone(), server.clone()); - let DocRevision { rev_id, delta } = fetch_document(rev_store.clone()).await?; + let DocRevision { + base_rev_id: _, + rev_id, + delta, + } = load_document(rev_store.clone()).await?; let rev_manager = Arc::new(RevisionManager::new(doc_id, rev_id, rev_store)); let document = spawn_doc_edit_actor(doc_id, delta, pool.clone()); let doc_id = doc_id.to_string(); @@ -323,15 +327,9 @@ fn spawn_doc_edit_actor(doc_id: &str, delta: Delta, pool: Arc) - sender } -async fn fetch_document(sender: mpsc::Sender) -> DocResult { +async fn load_document(sender: mpsc::Sender) -> DocResult { let (ret, rx) = oneshot::channel(); let _ = sender.send(RevisionCmd::DocumentDelta { ret }).await; - - match rx.await { - Ok(result) => Ok(result?), - Err(e) => { - log::error!("fetch_document: {}", e); - Err(DocError::internal().context(format!("fetch_document: {}", e))) - }, - } + let result = rx.await.map_err(internal_error)?; + result } diff --git a/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/rust-lib/flowy-document/src/services/doc/revision/manager.rs index 3c241339a1..89969b19a5 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -8,6 +8,7 @@ use flowy_ot::core::{Delta, OperationTransformable}; use tokio::sync::{mpsc, oneshot}; pub struct DocRevision { + pub base_rev_id: RevId, pub rev_id: RevId, pub delta: Delta, } diff --git a/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs b/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs index 63515b798b..e54b14e4f1 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/store_actor.rs @@ -1,5 +1,5 @@ use crate::{ - entities::doc::{RevId, Revision, RevisionRange}, + entities::doc::{RevId, RevType, Revision, RevisionRange}, errors::{internal_error, DocError, DocResult}, services::doc::revision::{model::RevisionOperation, DocRevision, RevisionServer}, sql_tables::{RevState, RevTableSql}, @@ -89,7 +89,7 @@ impl RevisionStoreActor { let _ = ret.send(result); }, RevisionCmd::DocumentDelta { ret } => { - let delta = fetch_document(&self.doc_id, self.server.clone(), self.persistence.clone()).await; + let delta = self.fetch_document().await; let _ = ret.send(delta); }, } @@ -180,38 +180,29 @@ impl RevisionStoreActor { result } } -} -async fn fetch_document( - doc_id: &str, - server: Arc, - persistence: Arc, -) -> DocResult { - let fetch_from_remote = server.fetch_document_from_remote(doc_id).or_else(|result| { - log::error!( - "Fetch document delta from remote failed: {:?}, try to fetch from local", - result - ); - fetch_from_local(doc_id, persistence.clone()) - }); + async fn fetch_document(&self) -> DocResult { + let result = fetch_from_local(&self.doc_id, self.persistence.clone()).await; + if result.is_ok() { + return result; + } - let fetch_from_local = fetch_from_local(doc_id, persistence.clone()).or_else(|result| async move { - log::error!( - "Fetch document delta from local failed: {:?}, try to fetch from remote", - result - ); - server.fetch_document_from_remote(doc_id).await - }); + match self.server.fetch_document_from_remote(&self.doc_id).await { + Ok(doc_revision) => { + let delta_data = doc_revision.delta.to_bytes(); + let revision = Revision::new( + doc_revision.base_rev_id.clone(), + doc_revision.rev_id.clone(), + delta_data.to_vec(), + &self.doc_id, + RevType::Remote, + ); + self.handle_new_revision(revision); - tokio::select! { - result = fetch_from_remote => { - log::debug!("Finish fetching document from remote"); - result - }, - result = fetch_from_local => { - log::debug!("Finish fetching document from local"); - result - }, + Ok(doc_revision) + }, + Err(e) => Err(e), + } } } @@ -225,6 +216,7 @@ async fn fetch_from_local(doc_id: &str, persistence: Arc) -> DocRes return Err(DocError::not_found()); } + let base_rev_id: RevId = revisions.last().unwrap().base_rev_id.into(); let rev_id: RevId = revisions.last().unwrap().rev_id.into(); let mut delta = Delta::new(); for revision in revisions { @@ -240,7 +232,11 @@ async fn fetch_from_local(doc_id: &str, persistence: Arc) -> DocRes delta.insert("\n", Attributes::default()); - Result::::Ok(DocRevision { rev_id, delta }) + Result::::Ok(DocRevision { + base_rev_id, + rev_id, + delta, + }) }) .await .map_err(internal_error)? diff --git a/rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs b/rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs index ae8ec15794..74d3cb2283 100644 --- a/rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs +++ b/rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs @@ -7,6 +7,7 @@ pub(crate) struct DocTable { pub(crate) id: String, pub(crate) data: String, pub(crate) rev_id: i64, + pub(crate) base_rev_id: i64, } impl DocTable { @@ -15,6 +16,7 @@ impl DocTable { id: doc.id, data: doc.data, rev_id: doc.rev_id.into(), + base_rev_id: doc.base_rev_id.into(), } } } @@ -33,6 +35,7 @@ impl std::convert::Into for DocTable { id: self.id, data: self.data, rev_id: self.rev_id.into(), + base_rev_id: self.base_rev_id.into(), } } } @@ -43,6 +46,7 @@ impl std::convert::From for DocTable { id: doc.id, data: doc.data, rev_id: doc.rev_id.into(), + base_rev_id: doc.base_rev_id.into(), } } } diff --git a/rust-lib/flowy-document/src/sql_tables/doc/mod.rs b/rust-lib/flowy-document/src/sql_tables/doc/mod.rs index 202bf740a3..1d96022349 100644 --- a/rust-lib/flowy-document/src/sql_tables/doc/mod.rs +++ b/rust-lib/flowy-document/src/sql_tables/doc/mod.rs @@ -1,9 +1,5 @@ -mod doc_sql; -mod doc_table; mod rev_sql; mod rev_table; -pub(crate) use doc_sql::*; -pub(crate) use doc_table::*; pub(crate) use rev_sql::*; pub(crate) use rev_table::*;