From 4e3ebf8876f040770e8a1616dc30d61d42525424 Mon Sep 17 00:00:00 2001 From: appflowy Date: Fri, 1 Oct 2021 20:58:13 +0800 Subject: [PATCH] fix local spawn issue --- .../services/doc/edit/{cache.rs => actor.rs} | 103 ++++++----------- .../src/services/doc/edit/context.rs | 8 +- .../src/services/doc/edit/message.rs | 54 +++++++++ .../src/services/doc/edit/mod.rs | 3 +- .../services/doc/rev_manager/rev_manager.rs | 17 +-- .../src/services/doc/rev_manager/store.rs | 107 +++++++++--------- .../src/sql_tables/doc/doc_op_sql.rs | 3 +- .../src/sql_tables/doc/doc_op_table.rs | 1 + 8 files changed, 155 insertions(+), 141 deletions(-) rename rust-lib/flowy-document/src/services/doc/edit/{cache.rs => actor.rs} (60%) create mode 100644 rust-lib/flowy-document/src/services/doc/edit/message.rs diff --git a/rust-lib/flowy-document/src/services/doc/edit/cache.rs b/rust-lib/flowy-document/src/services/doc/edit/actor.rs similarity index 60% rename from rust-lib/flowy-document/src/services/doc/edit/cache.rs rename to rust-lib/flowy-document/src/services/doc/edit/actor.rs index f5726840f1..5051edf6e5 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/cache.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/actor.rs @@ -1,6 +1,10 @@ use crate::{ - errors::{internal_error, DocResult}, - services::doc::{edit::DocId, Document, UndoResult}, + errors::{internal_error, DocError, DocResult}, + services::doc::{ + edit::{message::EditMsg, DocId}, + Document, + UndoResult, + }, sql_tables::{DocTableChangeset, DocTableSql}, }; use async_stream::stream; @@ -8,58 +12,15 @@ use flowy_database::ConnectionPool; use flowy_ot::core::{Attribute, Delta, Interval}; use futures::stream::StreamExt; use std::{cell::RefCell, sync::Arc}; -use tokio::sync::{mpsc, oneshot}; - -pub type Ret = oneshot::Sender>; -pub enum EditMsg { - Delta { - delta: Delta, - ret: Ret<()>, - }, - Insert { - index: usize, - data: String, - ret: Ret, - }, - Delete { - interval: Interval, - ret: Ret, - }, - Format { - interval: Interval, - attribute: Attribute, - ret: Ret, - }, - - Replace { - interval: Interval, - data: String, - ret: Ret, - }, - CanUndo { - ret: oneshot::Sender, - }, - CanRedo { - ret: oneshot::Sender, - }, - Undo { - ret: Ret, - }, - Redo { - ret: Ret, - }, - Doc { - ret: Ret, - }, - SaveRevision { - rev_id: i64, - ret: Ret<()>, - }, -} +use tokio::{ + macros::support::Future, + sync::{mpsc, oneshot, RwLock}, + task::spawn_blocking, +}; pub struct DocumentEditActor { doc_id: DocId, - document: RefCell, + document: Arc>, pool: Arc, receiver: Option>, } @@ -72,7 +33,7 @@ impl DocumentEditActor { receiver: mpsc::UnboundedReceiver, ) -> Self { let doc_id = doc_id.to_string(); - let document = RefCell::new(Document::from_delta(delta)); + let document = Arc::new(RwLock::new(Document::from_delta(delta))); Self { doc_id, document, @@ -91,21 +52,28 @@ impl DocumentEditActor { } } }; - stream.for_each(|msg| self.handle_message(msg)).await; + stream + .for_each(|msg| async { + match self.handle_message(msg).await { + Ok(_) => {}, + Err(e) => log::error!("{:?}", e), + } + }) + .await; } - async fn handle_message(&self, msg: EditMsg) { + async fn handle_message(&self, msg: EditMsg) -> DocResult<()> { match msg { EditMsg::Delta { delta, ret } => { - let result = self.document.borrow_mut().compose_delta(&delta); + let result = self.document.write().await.compose_delta(&delta); let _ = ret.send(result); }, EditMsg::Insert { index, data, ret } => { - let delta = self.document.borrow_mut().insert(index, data); + let delta = self.document.write().await.insert(index, data); let _ = ret.send(delta); }, EditMsg::Delete { interval, ret } => { - let result = self.document.borrow_mut().delete(interval); + let result = self.document.write().await.delete(interval); let _ = ret.send(result); }, EditMsg::Format { @@ -113,41 +81,42 @@ impl DocumentEditActor { attribute, ret, } => { - let result = self.document.borrow_mut().format(interval, attribute); + let result = self.document.write().await.format(interval, attribute); let _ = ret.send(result); }, EditMsg::Replace { interval, data, ret } => { - let result = self.document.borrow_mut().replace(interval, data); + let result = self.document.write().await.replace(interval, data); let _ = ret.send(result); }, EditMsg::CanUndo { ret } => { - let _ = ret.send(self.document.borrow().can_undo()); + let _ = ret.send(self.document.read().await.can_undo()); }, EditMsg::CanRedo { ret } => { - let _ = ret.send(self.document.borrow().can_redo()); + let _ = ret.send(self.document.read().await.can_redo()); }, EditMsg::Undo { ret } => { - let result = self.document.borrow_mut().undo(); + let result = self.document.write().await.undo(); let _ = ret.send(result); }, EditMsg::Redo { ret } => { - let result = self.document.borrow_mut().redo(); + let result = self.document.write().await.redo(); let _ = ret.send(result); }, EditMsg::Doc { ret } => { - let data = self.document.borrow().to_json(); + let data = self.document.read().await.to_json(); let _ = ret.send(Ok(data)); }, EditMsg::SaveRevision { rev_id, ret } => { - let result = self.save_to_disk(rev_id); + let result = self.save_to_disk(rev_id).await; let _ = ret.send(result); }, } + Ok(()) } #[tracing::instrument(level = "debug", skip(self, rev_id), err)] - fn save_to_disk(&self, rev_id: i64) -> DocResult<()> { - let data = self.document.borrow().to_json(); + async fn save_to_disk(&self, rev_id: i64) -> DocResult<()> { + let data = self.document.read().await.to_json(); let changeset = DocTableChangeset { id: self.doc_id.clone(), data, diff --git a/rust-lib/flowy-document/src/services/doc/edit/context.rs b/rust-lib/flowy-document/src/services/doc/edit/context.rs index c51a0ede59..6f5f686d84 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/context.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/context.rs @@ -6,7 +6,7 @@ use crate::{ errors::{internal_error, DocError, DocResult}, services::{ doc::{ - edit::cache::{DocumentEditActor, EditMsg}, + edit::{actor::DocumentEditActor, message::EditMsg}, rev_manager::RevisionManager, UndoResult, }, @@ -38,7 +38,7 @@ impl EditDocContext { let delta = Delta::from_bytes(doc.data)?; let (sender, receiver) = mpsc::unbounded_channel::(); let edit_actor = DocumentEditActor::new(&doc.id, delta, pool.clone(), receiver); - tokio::task::spawn_local(edit_actor.run()); + tokio::spawn(edit_actor.run()); let rev_manager = Arc::new(RevisionManager::new(&doc.id, doc.rev_id, pool.clone(), ws_sender)); let edit_context = Self { @@ -170,7 +170,7 @@ impl WsDocumentHandler for EditDocContext { fn receive(&self, doc_data: WsDocumentData) { let document = self.document.clone(); let rev_manager = self.rev_manager.clone(); - let f = |doc_data: WsDocumentData| async move { + let handle_ws_message = |doc_data: WsDocumentData| async move { let bytes = Bytes::from(doc_data.data); match doc_data.ty { WsDataType::PushRev => { @@ -190,7 +190,7 @@ impl WsDocumentHandler for EditDocContext { }; tokio::spawn(async move { - if let Err(e) = f(doc_data).await { + if let Err(e) = handle_ws_message(doc_data).await { log::error!("{:?}", e); } }); diff --git a/rust-lib/flowy-document/src/services/doc/edit/message.rs b/rust-lib/flowy-document/src/services/doc/edit/message.rs new file mode 100644 index 0000000000..08cdc1d1ce --- /dev/null +++ b/rust-lib/flowy-document/src/services/doc/edit/message.rs @@ -0,0 +1,54 @@ +use crate::{ + errors::DocResult, + services::doc::{edit::DocId, Document, UndoResult}, + sql_tables::{DocTableChangeset, DocTableSql}, +}; +use flowy_ot::core::{Attribute, Delta, Interval}; + +use tokio::sync::oneshot; +pub type Ret = oneshot::Sender>; +pub enum EditMsg { + Delta { + delta: Delta, + ret: Ret<()>, + }, + Insert { + index: usize, + data: String, + ret: Ret, + }, + Delete { + interval: Interval, + ret: Ret, + }, + Format { + interval: Interval, + attribute: Attribute, + ret: Ret, + }, + + Replace { + interval: Interval, + data: String, + ret: Ret, + }, + CanUndo { + ret: oneshot::Sender, + }, + CanRedo { + ret: oneshot::Sender, + }, + Undo { + ret: Ret, + }, + Redo { + ret: Ret, + }, + Doc { + ret: Ret, + }, + SaveRevision { + rev_id: i64, + ret: Ret<()>, + }, +} diff --git a/rust-lib/flowy-document/src/services/doc/edit/mod.rs b/rust-lib/flowy-document/src/services/doc/edit/mod.rs index 74580d23da..ab421fea8e 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/mod.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/mod.rs @@ -1,4 +1,5 @@ -mod cache; +mod actor; mod context; +mod message; pub use context::*; diff --git a/rust-lib/flowy-document/src/services/doc/rev_manager/rev_manager.rs b/rust-lib/flowy-document/src/services/doc/rev_manager/rev_manager.rs index e719374b34..64d4407117 100644 --- a/rust-lib/flowy-document/src/services/doc/rev_manager/rev_manager.rs +++ b/rust-lib/flowy-document/src/services/doc/rev_manager/rev_manager.rs @@ -25,7 +25,7 @@ impl RevisionManager { pub fn new(doc_id: &str, rev_id: i64, pool: Arc, ws_sender: Arc) -> Self { let (sender, receiver) = mpsc::channel::(50); let store = Store::new(doc_id, pool, receiver); - tokio::task::spawn_local(store.run()); + tokio::spawn(store.run()); let doc_id = doc_id.to_string(); let rev_id_counter = RevIdCounter::new(rev_id); @@ -39,21 +39,6 @@ impl RevisionManager { } } - // pub fn next_compose_revision(&self, mut f: F) - // where - // F: FnMut(&Revision) -> Result<(), DocError>, - // { - // if let Some(rev) = self.pending_revs.write().pop_front() { - // match f(&rev) { - // Ok(_) => {}, - // Err(e) => { - // log::error!("{}", e); - // self.pending_revs.write().push_front(rev); - // }, - // } - // } - // } - pub fn push_compose_revision(&self, revision: Revision) { self.pending_revs.write().push_front(revision); } pub fn next_compose_revision(&self) -> Option { self.pending_revs.write().pop_front() } diff --git a/rust-lib/flowy-document/src/services/doc/rev_manager/store.rs b/rust-lib/flowy-document/src/services/doc/rev_manager/store.rs index 043e979411..eeadd81e64 100644 --- a/rust-lib/flowy-document/src/services/doc/rev_manager/store.rs +++ b/rust-lib/flowy-document/src/services/doc/rev_manager/store.rs @@ -11,7 +11,7 @@ use futures::stream::StreamExt; use std::{cell::RefCell, sync::Arc, time::Duration}; use tokio::{ - sync::{mpsc, oneshot}, + sync::{mpsc, oneshot, RwLock}, task::JoinHandle, }; @@ -33,7 +33,7 @@ pub struct Store { op_sql: Arc, pool: Arc, revs: Arc>, - save_operation: RefCell>>, + delay_save: RwLock>>, receiver: Option>, } @@ -41,7 +41,6 @@ impl Store { pub fn new(doc_id: &str, pool: Arc, receiver: mpsc::Receiver) -> Store { let op_sql = Arc::new(OpTableSql {}); let revs = Arc::new(DashMap::new()); - let save_operation = RefCell::new(None); let doc_id = doc_id.to_owned(); Self { @@ -49,7 +48,7 @@ impl Store { op_sql, pool, revs, - save_operation, + delay_save: RwLock::new(None), receiver: Some(receiver), } } @@ -70,10 +69,10 @@ impl Store { async fn handle_message(&self, msg: StoreMsg) { match msg { StoreMsg::Revision { revision } => { - self.handle_new_revision(revision); + self.handle_new_revision(revision).await; }, StoreMsg::AckRevision { rev_id } => { - self.handle_revision_acked(rev_id); + self.handle_revision_acked(rev_id).await; }, StoreMsg::SendRevisions { range: _, ret: _ } => { unimplemented!() @@ -81,32 +80,37 @@ impl Store { } } - pub fn handle_new_revision(&self, revision: Revision) { + async fn handle_new_revision(&self, revision: Revision) { let mut operation = RevisionOperation::new(&revision); let _receiver = operation.receiver(); self.revs.insert(revision.rev_id, operation); - self.save_revisions(); + self.save_revisions().await; } - pub fn handle_revision_acked(&self, rev_id: i64) { + async fn handle_revision_acked(&self, rev_id: i64) { match self.revs.get_mut(&rev_id) { None => {}, Some(mut rev) => rev.value_mut().finish(), } + self.save_revisions().await; } pub fn revs_in_range(&self, _range: RevisionRange) -> DocResult> { unimplemented!() } - fn save_revisions(&self) { - if let Some(handler) = self.save_operation.borrow_mut().take() { + async fn save_revisions(&self) { + if let Some(handler) = self.delay_save.write().await.take() { handler.abort(); } + if self.revs.is_empty() { + return; + } + let revs = self.revs.clone(); let pool = self.pool.clone(); let op_sql = self.op_sql.clone(); - *self.save_operation.borrow_mut() = Some(tokio::spawn(async move { + *self.delay_save.write().await = Some(tokio::spawn(async move { tokio::time::sleep(Duration::from_millis(300)).await; let ids = revs.iter().map(|kv| kv.key().clone()).collect::>(); @@ -127,46 +131,45 @@ impl Store { } })); } - - fn update_revisions(&self) { - let rev_ids = self - .revs - .iter() - .flat_map(|kv| match kv.state == RevState::Acked { - true => None, - false => Some(kv.key().clone()), - }) - .collect::>(); - - if rev_ids.is_empty() { - return; - } - - log::debug!("Try to update {:?} state", rev_ids); - match self.update(&rev_ids) { - Ok(_) => { - self.revs.retain(|k, _| !rev_ids.contains(k)); - }, - Err(e) => log::error!("Save revision failed: {:?}", e), - } - } - - fn update(&self, rev_ids: &Vec) -> Result<(), DocError> { - let conn = &*self.pool.get().map_err(internal_error).unwrap(); - let result = conn.immediate_transaction::<_, DocError, _>(|| { - for rev_id in rev_ids { - let changeset = RevChangeset { - doc_id: self.doc_id.clone(), - rev_id: rev_id.clone(), - state: RevState::Acked, - }; - let _ = self.op_sql.update_rev_table(changeset, conn)?; - } - Ok(()) - }); - - result - } + // fn update_revisions(&self) { + // let rev_ids = self + // .revs + // .iter() + // .flat_map(|kv| match kv.state == RevState::Acked { + // true => None, + // false => Some(kv.key().clone()), + // }) + // .collect::>(); + // + // if rev_ids.is_empty() { + // return; + // } + // + // log::debug!("Try to update {:?} state", rev_ids); + // match self.update(&rev_ids) { + // Ok(_) => { + // self.revs.retain(|k, _| !rev_ids.contains(k)); + // }, + // Err(e) => log::error!("Save revision failed: {:?}", e), + // } + // } + // + // fn update(&self, rev_ids: &Vec) -> Result<(), DocError> { + // let conn = &*self.pool.get().map_err(internal_error).unwrap(); + // let result = conn.immediate_transaction::<_, DocError, _>(|| { + // for rev_id in rev_ids { + // let changeset = RevChangeset { + // doc_id: self.doc_id.clone(), + // rev_id: rev_id.clone(), + // state: RevState::Acked, + // }; + // let _ = self.op_sql.update_rev_table(changeset, conn)?; + // } + // Ok(()) + // }); + // + // result + // } // fn delete_revision(&self, rev_id: i64) { // let op_sql = self.op_sql.clone(); diff --git a/rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs b/rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs index 13248ef570..ce1e44cf9d 100644 --- a/rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs +++ b/rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs @@ -5,6 +5,7 @@ use crate::{ }; use diesel::{insert_into, update}; use flowy_database::{ + insert_or_ignore_into, prelude::*, schema::rev_table::{columns::*, dsl, dsl::doc_id}, SqliteConnection, @@ -35,7 +36,7 @@ impl OpTableSql { }) .collect::>(); - let _ = insert_into(dsl::rev_table).values(&records).execute(conn)?; + let _ = insert_or_ignore_into(dsl::rev_table).values(&records).execute(conn)?; Ok(()) } diff --git a/rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs b/rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs index 7dfb952f59..3d348bc076 100644 --- a/rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs +++ b/rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs @@ -107,6 +107,7 @@ impl RevTableType { } impl_sql_integer_expression!(RevTableType); +#[allow(dead_code)] pub(crate) struct RevChangeset { pub(crate) doc_id: String, pub(crate) rev_id: i64,