From 63d2ca27d3bd8e23c67eca4207c73d3ad945e042 Mon Sep 17 00:00:00 2001 From: appflowy Date: Tue, 2 Nov 2021 22:18:13 +0800 Subject: [PATCH] [flutter]: fix bugs --- .../presentation/widgets/menu/menu.dart | 1 + .../packages/flowy_infra_ui/pubspec.yaml | 2 +- app_flowy/pubspec.yaml | 2 +- .../src/services/doc/edit/doc_actor.rs | 2 +- .../src/services/doc/edit/edit_doc.rs | 2 +- .../src/services/doc/revision/manager.rs | 2 +- .../src/services/doc/revision/mod.rs | 4 +- .../src/services/doc/revision/model.rs | 76 ++----------------- .../revision/{rev_store.rs => persistence.rs} | 76 +++++++++++++++++-- .../src/services/workspace_controller.rs | 5 +- 10 files changed, 84 insertions(+), 88 deletions(-) rename rust-lib/flowy-document/src/services/doc/revision/{rev_store.rs => persistence.rs} (80%) diff --git a/app_flowy/lib/workspace/presentation/widgets/menu/menu.dart b/app_flowy/lib/workspace/presentation/widgets/menu/menu.dart index 0866cddd16..e7c7147ee6 100644 --- a/app_flowy/lib/workspace/presentation/widgets/menu/menu.dart +++ b/app_flowy/lib/workspace/presentation/widgets/menu/menu.dart @@ -164,6 +164,7 @@ class MenuSharedState extends ChangeNotifier { super.addListener(() { if (_forcedOpenView != null) { callback(_forcedOpenView!); + _forcedOpenView = null; } }); } diff --git a/app_flowy/packages/flowy_infra_ui/pubspec.yaml b/app_flowy/packages/flowy_infra_ui/pubspec.yaml index 35c3fc397a..297663b619 100644 --- a/app_flowy/packages/flowy_infra_ui/pubspec.yaml +++ b/app_flowy/packages/flowy_infra_ui/pubspec.yaml @@ -17,7 +17,7 @@ dependencies: dartz: '0.10.0-nullsafety.2' provider: ^6.0.1 styled_widget: '>=0.3.1' - equatable: '>=2.0.2' + equatable: '>=2.0.3' animations: ^2.0.0 loading_indicator: ^3.0.1 diff --git a/app_flowy/pubspec.yaml b/app_flowy/pubspec.yaml index 4579b3bd5d..f9cd2a0d14 100644 --- a/app_flowy/pubspec.yaml +++ b/app_flowy/pubspec.yaml @@ -47,7 +47,7 @@ dependencies: # third party packages time: '>=2.0.0' - equatable: '>=2.0.2' + equatable: '>=2.0.3' freezed_annotation: get_it: '>=7.1.3' flutter_bloc: '>=7.3.1' diff --git a/rust-lib/flowy-document/src/services/doc/edit/doc_actor.rs b/rust-lib/flowy-document/src/services/doc/edit/doc_actor.rs index 725e30971c..202b8dc318 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/doc_actor.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/doc_actor.rs @@ -115,7 +115,7 @@ impl DocumentActor { let mut document = self.document.write().await; let result = document.compose_delta(&delta); log::debug!( - "Client compose push delta: {}. result: {}", + "Compose push delta: {}. result: {}", delta.to_json(), document.to_json() ); diff --git a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs index cb4586f667..567bb199a4 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs @@ -265,7 +265,7 @@ impl ClientEditDoc { }, WsDataType::PullRev => { let range = RevisionRange::try_from(bytes)?; - let revision = self.rev_manager.construct_revisions(range).await?; + let revision = self.rev_manager.mk_revisions(range).await?; let _ = self.ws.send(revision.into()); }, WsDataType::NewDocUser => {}, diff --git a/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/rust-lib/flowy-document/src/services/doc/revision/manager.rs index 2646bbc1c0..3dc2915416 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -61,7 +61,7 @@ impl RevisionManager { pub fn set_rev_id(&self, rev_id: i64) { self.rev_id_counter.set(rev_id); } - pub async fn construct_revisions(&self, range: RevisionRange) -> Result { + pub async fn mk_revisions(&self, range: RevisionRange) -> Result { debug_assert!(&range.doc_id == &self.doc_id); let revisions = self.rev_store.revs_in_range(range.clone()).await?; let mut new_delta = Delta::new(); diff --git a/rust-lib/flowy-document/src/services/doc/revision/mod.rs b/rust-lib/flowy-document/src/services/doc/revision/mod.rs index 0c04f436b2..7288a5301b 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/mod.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/mod.rs @@ -1,6 +1,6 @@ mod manager; mod model; -mod rev_store; +mod persistence; pub use manager::*; -pub use rev_store::*; +pub use persistence::*; diff --git a/rust-lib/flowy-document/src/services/doc/revision/model.rs b/rust-lib/flowy-document/src/services/doc/revision/model.rs index 5a8a1debb3..6efa7f1d48 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/model.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/model.rs @@ -3,12 +3,12 @@ use crate::{ errors::{internal_error, DocError, DocResult}, sql_tables::{RevState, RevTableSql}, }; -use async_stream::stream; + use flowy_database::ConnectionPool; use flowy_infra::future::ResultFuture; -use futures::{stream::StreamExt}; -use std::{sync::Arc, time::Duration}; -use tokio::sync::{broadcast, mpsc}; + +use std::sync::Arc; +use tokio::sync::broadcast; pub type RevIdReceiver = broadcast::Receiver; pub type RevIdSender = broadcast::Sender; @@ -56,10 +56,10 @@ impl Persistence { Self { rev_sql, pool } } - pub(crate) fn create_revs(&self, revisions_state: Vec<(Revision, RevState)>) -> DocResult<()> { + pub(crate) fn create_revs(&self, revisions: Vec<(Revision, RevState)>) -> DocResult<()> { let conn = &*self.pool.get().map_err(internal_error)?; conn.immediate_transaction::<_, DocError, _>(|| { - let _ = self.rev_sql.create_rev_table(revisions_state, conn)?; + let _ = self.rev_sql.create_rev_table(revisions, conn)?; Ok(()) }) } @@ -80,67 +80,3 @@ impl Persistence { pub trait RevisionIterator: Send + Sync { fn next(&self) -> ResultFuture, DocError>; } - -pub(crate) enum PendingMsg { - Revision { ret: RevIdReceiver }, -} - -pub(crate) type PendingSender = mpsc::UnboundedSender; -pub(crate) type PendingReceiver = mpsc::UnboundedReceiver; - -pub(crate) struct PendingRevisionStream { - revisions: Arc, - receiver: Option, - next_revision: mpsc::UnboundedSender, -} - -impl PendingRevisionStream { - pub(crate) fn new( - revisions: Arc, - pending_rx: PendingReceiver, - next_revision: mpsc::UnboundedSender, - ) -> Self { - Self { - revisions, - receiver: Some(pending_rx), - next_revision, - } - } - - pub async fn run(mut self) { - let mut receiver = self.receiver.take().expect("Should only call once"); - let stream = stream! { - loop { - match receiver.recv().await { - Some(msg) => yield msg, - None => break, - } - } - }; - stream - .for_each(|msg| async { - match self.handle_msg(msg).await { - Ok(_) => {}, - Err(e) => log::error!("{:?}", e), - } - }) - .await; - } - - async fn handle_msg(&self, msg: PendingMsg) -> DocResult<()> { - match msg { - PendingMsg::Revision { ret } => self.prepare_next_pending_rev(ret).await, - } - } - - async fn prepare_next_pending_rev(&self, mut ret: RevIdReceiver) -> DocResult<()> { - match self.revisions.next().await? { - None => Ok(()), - Some(revision) => { - let _ = self.next_revision.send(revision).map_err(internal_error); - let _ = tokio::time::timeout(Duration::from_millis(2000), ret.recv()).await; - Ok(()) - }, - } - } -} diff --git a/rust-lib/flowy-document/src/services/doc/revision/rev_store.rs b/rust-lib/flowy-document/src/services/doc/revision/persistence.rs similarity index 80% rename from rust-lib/flowy-document/src/services/doc/revision/rev_store.rs rename to rust-lib/flowy-document/src/services/doc/revision/persistence.rs index a4d67f28dc..07ed273c12 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/rev_store.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/persistence.rs @@ -1,17 +1,15 @@ use crate::{ entities::doc::{revision_from_doc, Doc, RevId, RevType, Revision, RevisionRange}, errors::{internal_error, DocError, DocResult}, - services::doc::revision::{ - model::{RevisionIterator, *}, - RevisionServer, - }, + services::doc::revision::{model::*, RevisionServer}, sql_tables::RevState, }; - +use async_stream::stream; use dashmap::DashMap; use flowy_database::ConnectionPool; use flowy_infra::future::ResultFuture; use flowy_ot::core::{Delta, OperationTransformable}; +use futures::stream::StreamExt; use std::{collections::VecDeque, sync::Arc, time::Duration}; use tokio::{ sync::{broadcast, mpsc, RwLock}, @@ -33,7 +31,7 @@ impl RevisionStore { doc_id: &str, pool: Arc, server: Arc, - next_revision: mpsc::UnboundedSender, + ws_revision_sender: mpsc::UnboundedSender, ) -> Arc { let doc_id = doc_id.to_owned(); let persistence = Arc::new(Persistence::new(pool)); @@ -51,7 +49,7 @@ impl RevisionStore { server, }); - tokio::spawn(PendingRevisionStream::new(store.clone(), pending_rx, next_revision).run()); + tokio::spawn(RevisionStream::new(store.clone(), pending_rx, ws_revision_sender).run()); store } @@ -265,3 +263,67 @@ async fn fetch_from_local(doc_id: &str, persistence: Arc) -> DocRes // } // }); // } + +pub(crate) enum PendingMsg { + Revision { ret: RevIdReceiver }, +} + +pub(crate) type PendingSender = mpsc::UnboundedSender; +pub(crate) type PendingReceiver = mpsc::UnboundedReceiver; + +pub(crate) struct RevisionStream { + revisions: Arc, + receiver: Option, + ws_revision_sender: mpsc::UnboundedSender, +} + +impl RevisionStream { + pub(crate) fn new( + revisions: Arc, + pending_rx: PendingReceiver, + ws_revision_sender: mpsc::UnboundedSender, + ) -> Self { + Self { + revisions, + receiver: Some(pending_rx), + ws_revision_sender, + } + } + + pub async fn run(mut self) { + let mut receiver = self.receiver.take().expect("Should only call once"); + let stream = stream! { + loop { + match receiver.recv().await { + Some(msg) => yield msg, + None => break, + } + } + }; + stream + .for_each(|msg| async { + match self.handle_msg(msg).await { + Ok(_) => {}, + Err(e) => log::error!("{:?}", e), + } + }) + .await; + } + + async fn handle_msg(&self, msg: PendingMsg) -> DocResult<()> { + match msg { + PendingMsg::Revision { ret } => self.prepare_next_pending_rev(ret).await, + } + } + + async fn prepare_next_pending_rev(&self, mut ret: RevIdReceiver) -> DocResult<()> { + match self.revisions.next().await? { + None => Ok(()), + Some(revision) => { + let _ = self.ws_revision_sender.send(revision).map_err(internal_error); + let _ = tokio::time::timeout(Duration::from_millis(2000), ret.recv()).await; + Ok(()) + }, + } + } +} diff --git a/rust-lib/flowy-workspace/src/services/workspace_controller.rs b/rust-lib/flowy-workspace/src/services/workspace_controller.rs index 105c24e0c1..0a9a95f996 100644 --- a/rust-lib/flowy-workspace/src/services/workspace_controller.rs +++ b/rust-lib/flowy-workspace/src/services/workspace_controller.rs @@ -1,8 +1,5 @@ use crate::{ - entities::{ - app::{App, RepeatedApp}, - workspace::*, - }, + entities::{app::RepeatedApp, workspace::*}, errors::*, module::{WorkspaceDatabase, WorkspaceUser}, notify::*,