fix local spawn issue

This commit is contained in:
appflowy 2021-10-01 20:58:13 +08:00
parent 957b10fe5e
commit 4e3ebf8876
8 changed files with 155 additions and 141 deletions

View File

@ -1,6 +1,10 @@
use crate::{ use crate::{
errors::{internal_error, DocResult}, errors::{internal_error, DocError, DocResult},
services::doc::{edit::DocId, Document, UndoResult}, services::doc::{
edit::{message::EditMsg, DocId},
Document,
UndoResult,
},
sql_tables::{DocTableChangeset, DocTableSql}, sql_tables::{DocTableChangeset, DocTableSql},
}; };
use async_stream::stream; use async_stream::stream;
@ -8,58 +12,15 @@ use flowy_database::ConnectionPool;
use flowy_ot::core::{Attribute, Delta, Interval}; use flowy_ot::core::{Attribute, Delta, Interval};
use futures::stream::StreamExt; use futures::stream::StreamExt;
use std::{cell::RefCell, sync::Arc}; use std::{cell::RefCell, sync::Arc};
use tokio::sync::{mpsc, oneshot}; use tokio::{
macros::support::Future,
pub type Ret<T> = oneshot::Sender<DocResult<T>>; sync::{mpsc, oneshot, RwLock},
pub enum EditMsg { task::spawn_blocking,
Delta { };
delta: Delta,
ret: Ret<()>,
},
Insert {
index: usize,
data: String,
ret: Ret<Delta>,
},
Delete {
interval: Interval,
ret: Ret<Delta>,
},
Format {
interval: Interval,
attribute: Attribute,
ret: Ret<Delta>,
},
Replace {
interval: Interval,
data: String,
ret: Ret<Delta>,
},
CanUndo {
ret: oneshot::Sender<bool>,
},
CanRedo {
ret: oneshot::Sender<bool>,
},
Undo {
ret: Ret<UndoResult>,
},
Redo {
ret: Ret<UndoResult>,
},
Doc {
ret: Ret<String>,
},
SaveRevision {
rev_id: i64,
ret: Ret<()>,
},
}
pub struct DocumentEditActor { pub struct DocumentEditActor {
doc_id: DocId, doc_id: DocId,
document: RefCell<Document>, document: Arc<RwLock<Document>>,
pool: Arc<ConnectionPool>, pool: Arc<ConnectionPool>,
receiver: Option<mpsc::UnboundedReceiver<EditMsg>>, receiver: Option<mpsc::UnboundedReceiver<EditMsg>>,
} }
@ -72,7 +33,7 @@ impl DocumentEditActor {
receiver: mpsc::UnboundedReceiver<EditMsg>, receiver: mpsc::UnboundedReceiver<EditMsg>,
) -> Self { ) -> Self {
let doc_id = doc_id.to_string(); let doc_id = doc_id.to_string();
let document = RefCell::new(Document::from_delta(delta)); let document = Arc::new(RwLock::new(Document::from_delta(delta)));
Self { Self {
doc_id, doc_id,
document, document,
@ -91,21 +52,28 @@ impl DocumentEditActor {
} }
} }
}; };
stream.for_each(|msg| self.handle_message(msg)).await; stream
.for_each(|msg| async {
match self.handle_message(msg).await {
Ok(_) => {},
Err(e) => log::error!("{:?}", e),
}
})
.await;
} }
async fn handle_message(&self, msg: EditMsg) { async fn handle_message(&self, msg: EditMsg) -> DocResult<()> {
match msg { match msg {
EditMsg::Delta { delta, ret } => { EditMsg::Delta { delta, ret } => {
let result = self.document.borrow_mut().compose_delta(&delta); let result = self.document.write().await.compose_delta(&delta);
let _ = ret.send(result); let _ = ret.send(result);
}, },
EditMsg::Insert { index, data, ret } => { EditMsg::Insert { index, data, ret } => {
let delta = self.document.borrow_mut().insert(index, data); let delta = self.document.write().await.insert(index, data);
let _ = ret.send(delta); let _ = ret.send(delta);
}, },
EditMsg::Delete { interval, ret } => { EditMsg::Delete { interval, ret } => {
let result = self.document.borrow_mut().delete(interval); let result = self.document.write().await.delete(interval);
let _ = ret.send(result); let _ = ret.send(result);
}, },
EditMsg::Format { EditMsg::Format {
@ -113,41 +81,42 @@ impl DocumentEditActor {
attribute, attribute,
ret, ret,
} => { } => {
let result = self.document.borrow_mut().format(interval, attribute); let result = self.document.write().await.format(interval, attribute);
let _ = ret.send(result); let _ = ret.send(result);
}, },
EditMsg::Replace { interval, data, ret } => { EditMsg::Replace { interval, data, ret } => {
let result = self.document.borrow_mut().replace(interval, data); let result = self.document.write().await.replace(interval, data);
let _ = ret.send(result); let _ = ret.send(result);
}, },
EditMsg::CanUndo { ret } => { EditMsg::CanUndo { ret } => {
let _ = ret.send(self.document.borrow().can_undo()); let _ = ret.send(self.document.read().await.can_undo());
}, },
EditMsg::CanRedo { ret } => { EditMsg::CanRedo { ret } => {
let _ = ret.send(self.document.borrow().can_redo()); let _ = ret.send(self.document.read().await.can_redo());
}, },
EditMsg::Undo { ret } => { EditMsg::Undo { ret } => {
let result = self.document.borrow_mut().undo(); let result = self.document.write().await.undo();
let _ = ret.send(result); let _ = ret.send(result);
}, },
EditMsg::Redo { ret } => { EditMsg::Redo { ret } => {
let result = self.document.borrow_mut().redo(); let result = self.document.write().await.redo();
let _ = ret.send(result); let _ = ret.send(result);
}, },
EditMsg::Doc { ret } => { EditMsg::Doc { ret } => {
let data = self.document.borrow().to_json(); let data = self.document.read().await.to_json();
let _ = ret.send(Ok(data)); let _ = ret.send(Ok(data));
}, },
EditMsg::SaveRevision { rev_id, ret } => { EditMsg::SaveRevision { rev_id, ret } => {
let result = self.save_to_disk(rev_id); let result = self.save_to_disk(rev_id).await;
let _ = ret.send(result); let _ = ret.send(result);
}, },
} }
Ok(())
} }
#[tracing::instrument(level = "debug", skip(self, rev_id), err)] #[tracing::instrument(level = "debug", skip(self, rev_id), err)]
fn save_to_disk(&self, rev_id: i64) -> DocResult<()> { async fn save_to_disk(&self, rev_id: i64) -> DocResult<()> {
let data = self.document.borrow().to_json(); let data = self.document.read().await.to_json();
let changeset = DocTableChangeset { let changeset = DocTableChangeset {
id: self.doc_id.clone(), id: self.doc_id.clone(),
data, data,

View File

@ -6,7 +6,7 @@ use crate::{
errors::{internal_error, DocError, DocResult}, errors::{internal_error, DocError, DocResult},
services::{ services::{
doc::{ doc::{
edit::cache::{DocumentEditActor, EditMsg}, edit::{actor::DocumentEditActor, message::EditMsg},
rev_manager::RevisionManager, rev_manager::RevisionManager,
UndoResult, UndoResult,
}, },
@ -38,7 +38,7 @@ impl EditDocContext {
let delta = Delta::from_bytes(doc.data)?; let delta = Delta::from_bytes(doc.data)?;
let (sender, receiver) = mpsc::unbounded_channel::<EditMsg>(); let (sender, receiver) = mpsc::unbounded_channel::<EditMsg>();
let edit_actor = DocumentEditActor::new(&doc.id, delta, pool.clone(), receiver); let edit_actor = DocumentEditActor::new(&doc.id, delta, pool.clone(), receiver);
tokio::task::spawn_local(edit_actor.run()); tokio::spawn(edit_actor.run());
let rev_manager = Arc::new(RevisionManager::new(&doc.id, doc.rev_id, pool.clone(), ws_sender)); let rev_manager = Arc::new(RevisionManager::new(&doc.id, doc.rev_id, pool.clone(), ws_sender));
let edit_context = Self { let edit_context = Self {
@ -170,7 +170,7 @@ impl WsDocumentHandler for EditDocContext {
fn receive(&self, doc_data: WsDocumentData) { fn receive(&self, doc_data: WsDocumentData) {
let document = self.document.clone(); let document = self.document.clone();
let rev_manager = self.rev_manager.clone(); let rev_manager = self.rev_manager.clone();
let f = |doc_data: WsDocumentData| async move { let handle_ws_message = |doc_data: WsDocumentData| async move {
let bytes = Bytes::from(doc_data.data); let bytes = Bytes::from(doc_data.data);
match doc_data.ty { match doc_data.ty {
WsDataType::PushRev => { WsDataType::PushRev => {
@ -190,7 +190,7 @@ impl WsDocumentHandler for EditDocContext {
}; };
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = f(doc_data).await { if let Err(e) = handle_ws_message(doc_data).await {
log::error!("{:?}", e); log::error!("{:?}", e);
} }
}); });

View File

@ -0,0 +1,54 @@
use crate::{
errors::DocResult,
services::doc::{edit::DocId, Document, UndoResult},
sql_tables::{DocTableChangeset, DocTableSql},
};
use flowy_ot::core::{Attribute, Delta, Interval};
use tokio::sync::oneshot;
pub type Ret<T> = oneshot::Sender<DocResult<T>>;
pub enum EditMsg {
Delta {
delta: Delta,
ret: Ret<()>,
},
Insert {
index: usize,
data: String,
ret: Ret<Delta>,
},
Delete {
interval: Interval,
ret: Ret<Delta>,
},
Format {
interval: Interval,
attribute: Attribute,
ret: Ret<Delta>,
},
Replace {
interval: Interval,
data: String,
ret: Ret<Delta>,
},
CanUndo {
ret: oneshot::Sender<bool>,
},
CanRedo {
ret: oneshot::Sender<bool>,
},
Undo {
ret: Ret<UndoResult>,
},
Redo {
ret: Ret<UndoResult>,
},
Doc {
ret: Ret<String>,
},
SaveRevision {
rev_id: i64,
ret: Ret<()>,
},
}

View File

@ -1,4 +1,5 @@
mod cache; mod actor;
mod context; mod context;
mod message;
pub use context::*; pub use context::*;

View File

@ -25,7 +25,7 @@ impl RevisionManager {
pub fn new(doc_id: &str, rev_id: i64, pool: Arc<ConnectionPool>, ws_sender: Arc<dyn WsDocumentSender>) -> Self { pub fn new(doc_id: &str, rev_id: i64, pool: Arc<ConnectionPool>, ws_sender: Arc<dyn WsDocumentSender>) -> Self {
let (sender, receiver) = mpsc::channel::<StoreMsg>(50); let (sender, receiver) = mpsc::channel::<StoreMsg>(50);
let store = Store::new(doc_id, pool, receiver); let store = Store::new(doc_id, pool, receiver);
tokio::task::spawn_local(store.run()); tokio::spawn(store.run());
let doc_id = doc_id.to_string(); let doc_id = doc_id.to_string();
let rev_id_counter = RevIdCounter::new(rev_id); let rev_id_counter = RevIdCounter::new(rev_id);
@ -39,21 +39,6 @@ impl RevisionManager {
} }
} }
// pub fn next_compose_revision<F>(&self, mut f: F)
// where
// F: FnMut(&Revision) -> Result<(), DocError>,
// {
// if let Some(rev) = self.pending_revs.write().pop_front() {
// match f(&rev) {
// Ok(_) => {},
// Err(e) => {
// log::error!("{}", e);
// self.pending_revs.write().push_front(rev);
// },
// }
// }
// }
pub fn push_compose_revision(&self, revision: Revision) { self.pending_revs.write().push_front(revision); } pub fn push_compose_revision(&self, revision: Revision) { self.pending_revs.write().push_front(revision); }
pub fn next_compose_revision(&self) -> Option<Revision> { self.pending_revs.write().pop_front() } pub fn next_compose_revision(&self) -> Option<Revision> { self.pending_revs.write().pop_front() }

View File

@ -11,7 +11,7 @@ use futures::stream::StreamExt;
use std::{cell::RefCell, sync::Arc, time::Duration}; use std::{cell::RefCell, sync::Arc, time::Duration};
use tokio::{ use tokio::{
sync::{mpsc, oneshot}, sync::{mpsc, oneshot, RwLock},
task::JoinHandle, task::JoinHandle,
}; };
@ -33,7 +33,7 @@ pub struct Store {
op_sql: Arc<OpTableSql>, op_sql: Arc<OpTableSql>,
pool: Arc<ConnectionPool>, pool: Arc<ConnectionPool>,
revs: Arc<DashMap<i64, RevisionOperation>>, revs: Arc<DashMap<i64, RevisionOperation>>,
save_operation: RefCell<Option<JoinHandle<()>>>, delay_save: RwLock<Option<JoinHandle<()>>>,
receiver: Option<mpsc::Receiver<StoreMsg>>, receiver: Option<mpsc::Receiver<StoreMsg>>,
} }
@ -41,7 +41,6 @@ impl Store {
pub fn new(doc_id: &str, pool: Arc<ConnectionPool>, receiver: mpsc::Receiver<StoreMsg>) -> Store { pub fn new(doc_id: &str, pool: Arc<ConnectionPool>, receiver: mpsc::Receiver<StoreMsg>) -> Store {
let op_sql = Arc::new(OpTableSql {}); let op_sql = Arc::new(OpTableSql {});
let revs = Arc::new(DashMap::new()); let revs = Arc::new(DashMap::new());
let save_operation = RefCell::new(None);
let doc_id = doc_id.to_owned(); let doc_id = doc_id.to_owned();
Self { Self {
@ -49,7 +48,7 @@ impl Store {
op_sql, op_sql,
pool, pool,
revs, revs,
save_operation, delay_save: RwLock::new(None),
receiver: Some(receiver), receiver: Some(receiver),
} }
} }
@ -70,10 +69,10 @@ impl Store {
async fn handle_message(&self, msg: StoreMsg) { async fn handle_message(&self, msg: StoreMsg) {
match msg { match msg {
StoreMsg::Revision { revision } => { StoreMsg::Revision { revision } => {
self.handle_new_revision(revision); self.handle_new_revision(revision).await;
}, },
StoreMsg::AckRevision { rev_id } => { StoreMsg::AckRevision { rev_id } => {
self.handle_revision_acked(rev_id); self.handle_revision_acked(rev_id).await;
}, },
StoreMsg::SendRevisions { range: _, ret: _ } => { StoreMsg::SendRevisions { range: _, ret: _ } => {
unimplemented!() unimplemented!()
@ -81,32 +80,37 @@ impl Store {
} }
} }
pub fn handle_new_revision(&self, revision: Revision) { async fn handle_new_revision(&self, revision: Revision) {
let mut operation = RevisionOperation::new(&revision); let mut operation = RevisionOperation::new(&revision);
let _receiver = operation.receiver(); let _receiver = operation.receiver();
self.revs.insert(revision.rev_id, operation); self.revs.insert(revision.rev_id, operation);
self.save_revisions(); self.save_revisions().await;
} }
pub fn handle_revision_acked(&self, rev_id: i64) { async fn handle_revision_acked(&self, rev_id: i64) {
match self.revs.get_mut(&rev_id) { match self.revs.get_mut(&rev_id) {
None => {}, None => {},
Some(mut rev) => rev.value_mut().finish(), Some(mut rev) => rev.value_mut().finish(),
} }
self.save_revisions().await;
} }
pub fn revs_in_range(&self, _range: RevisionRange) -> DocResult<Vec<Revision>> { unimplemented!() } pub fn revs_in_range(&self, _range: RevisionRange) -> DocResult<Vec<Revision>> { unimplemented!() }
fn save_revisions(&self) { async fn save_revisions(&self) {
if let Some(handler) = self.save_operation.borrow_mut().take() { if let Some(handler) = self.delay_save.write().await.take() {
handler.abort(); handler.abort();
} }
if self.revs.is_empty() {
return;
}
let revs = self.revs.clone(); let revs = self.revs.clone();
let pool = self.pool.clone(); let pool = self.pool.clone();
let op_sql = self.op_sql.clone(); let op_sql = self.op_sql.clone();
*self.save_operation.borrow_mut() = Some(tokio::spawn(async move { *self.delay_save.write().await = Some(tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(300)).await; tokio::time::sleep(Duration::from_millis(300)).await;
let ids = revs.iter().map(|kv| kv.key().clone()).collect::<Vec<i64>>(); let ids = revs.iter().map(|kv| kv.key().clone()).collect::<Vec<i64>>();
@ -127,46 +131,45 @@ impl Store {
} }
})); }));
} }
// fn update_revisions(&self) {
fn update_revisions(&self) { // let rev_ids = self
let rev_ids = self // .revs
.revs // .iter()
.iter() // .flat_map(|kv| match kv.state == RevState::Acked {
.flat_map(|kv| match kv.state == RevState::Acked { // true => None,
true => None, // false => Some(kv.key().clone()),
false => Some(kv.key().clone()), // })
}) // .collect::<Vec<i64>>();
.collect::<Vec<i64>>(); //
// if rev_ids.is_empty() {
if rev_ids.is_empty() { // return;
return; // }
} //
// log::debug!("Try to update {:?} state", rev_ids);
log::debug!("Try to update {:?} state", rev_ids); // match self.update(&rev_ids) {
match self.update(&rev_ids) { // Ok(_) => {
Ok(_) => { // self.revs.retain(|k, _| !rev_ids.contains(k));
self.revs.retain(|k, _| !rev_ids.contains(k)); // },
}, // Err(e) => log::error!("Save revision failed: {:?}", e),
Err(e) => log::error!("Save revision failed: {:?}", e), // }
} // }
} //
// fn update(&self, rev_ids: &Vec<i64>) -> Result<(), DocError> {
fn update(&self, rev_ids: &Vec<i64>) -> Result<(), DocError> { // let conn = &*self.pool.get().map_err(internal_error).unwrap();
let conn = &*self.pool.get().map_err(internal_error).unwrap(); // let result = conn.immediate_transaction::<_, DocError, _>(|| {
let result = conn.immediate_transaction::<_, DocError, _>(|| { // for rev_id in rev_ids {
for rev_id in rev_ids { // let changeset = RevChangeset {
let changeset = RevChangeset { // doc_id: self.doc_id.clone(),
doc_id: self.doc_id.clone(), // rev_id: rev_id.clone(),
rev_id: rev_id.clone(), // state: RevState::Acked,
state: RevState::Acked, // };
}; // let _ = self.op_sql.update_rev_table(changeset, conn)?;
let _ = self.op_sql.update_rev_table(changeset, conn)?; // }
} // Ok(())
Ok(()) // });
}); //
// result
result // }
}
// fn delete_revision(&self, rev_id: i64) { // fn delete_revision(&self, rev_id: i64) {
// let op_sql = self.op_sql.clone(); // let op_sql = self.op_sql.clone();

View File

@ -5,6 +5,7 @@ use crate::{
}; };
use diesel::{insert_into, update}; use diesel::{insert_into, update};
use flowy_database::{ use flowy_database::{
insert_or_ignore_into,
prelude::*, prelude::*,
schema::rev_table::{columns::*, dsl, dsl::doc_id}, schema::rev_table::{columns::*, dsl, dsl::doc_id},
SqliteConnection, SqliteConnection,
@ -35,7 +36,7 @@ impl OpTableSql {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let _ = insert_into(dsl::rev_table).values(&records).execute(conn)?; let _ = insert_or_ignore_into(dsl::rev_table).values(&records).execute(conn)?;
Ok(()) Ok(())
} }

View File

@ -107,6 +107,7 @@ impl RevTableType {
} }
impl_sql_integer_expression!(RevTableType); impl_sql_integer_expression!(RevTableType);
#[allow(dead_code)]
pub(crate) struct RevChangeset { pub(crate) struct RevChangeset {
pub(crate) doc_id: String, pub(crate) doc_id: String,
pub(crate) rev_id: i64, pub(crate) rev_id: i64,