diff --git a/backend/src/service/doc/edit/edit_doc.rs b/backend/src/service/doc/edit/edit_doc.rs index 31ee41965a..04ad2c5aab 100644 --- a/backend/src/service/doc/edit/edit_doc.rs +++ b/backend/src/service/doc/edit/edit_doc.rs @@ -5,6 +5,7 @@ use crate::service::{ }; use actix_web::web::Data; +use crate::service::ws::entities::Socket; use bytes::Bytes; use dashmap::DashMap; use flowy_document::{ @@ -84,7 +85,8 @@ impl ServerEditDoc { cur_rev_id = %self.rev_id.load(SeqCst), base_rev_id = %revision.base_rev_id, rev_id = %revision.rev_id, - ) + ), + err )] pub async fn apply_revision( &self, @@ -100,14 +102,10 @@ impl ServerEditDoc { if cur_rev_id == revision.base_rev_id || next_rev_id == revision.base_rev_id { // The rev is in the right order, just compose it. let _ = self.compose_revision(&revision, pg_pool).await?; - user.socket - .do_send(mk_acked_message(&revision)) - .map_err(internal_error)?; + let _ = send_acked_msg(&user.socket, &revision)?; } else { // The server document is outdated, pull the missing revision from the client. - user.socket - .do_send(mk_pull_message(&self.doc_id, next_rev_id, revision.rev_id)) - .map_err(internal_error)?; + let _ = send_pull_message(&user.socket, &self.doc_id, next_rev_id, revision.rev_id)?; } }, Ordering::Equal => { @@ -119,9 +117,7 @@ impl ServerEditDoc { // send the prime delta to the client. Client should compose the this prime // delta. let cli_revision = self.transform_revision(&revision)?; - user.socket - .do_send(mk_push_message(&self.doc_id, cli_revision)) - .map_err(internal_error)?; + let _ = send_push_message(&user.socket, &self.doc_id, cli_revision)?; }, } Ok(()) @@ -204,6 +200,12 @@ impl ServerEditDoc { } } +#[tracing::instrument(level = "debug", skip(socket, doc_id, revision), err)] +fn send_push_message(socket: &Socket, doc_id: &str, revision: Revision) -> Result<(), ServerError> { + let msg = mk_push_message(doc_id, revision); + socket.try_send(msg).map_err(internal_error) +} + fn mk_push_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor { let bytes = revision.write_to_bytes().unwrap(); let data = WsDocumentData { @@ -214,6 +216,12 @@ fn mk_push_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor { mk_ws_message(data) } +#[tracing::instrument(level = "debug", skip(socket, doc_id), err)] +fn send_pull_message(socket: &Socket, doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> Result<(), ServerError> { + let msg = mk_pull_message(doc_id, from_rev_id, to_rev_id); + socket.try_send(msg).map_err(internal_error) +} + fn mk_pull_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsMessageAdaptor { let range = RevisionRange { doc_id: doc_id.to_string(), @@ -231,6 +239,12 @@ fn mk_pull_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsMessageA mk_ws_message(data) } +#[tracing::instrument(level = "debug", skip(socket, revision), err)] +fn send_acked_msg(socket: &Socket, revision: &Revision) -> Result<(), ServerError> { + let msg = mk_acked_message(revision); + socket.try_send(msg).map_err(internal_error) +} + fn mk_acked_message(revision: &Revision) -> WsMessageAdaptor { // let mut wtr = vec![]; // let _ = wtr.write_i64::(revision.rev_id); diff --git a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs index 1ac83a50d3..b59f1ce3cf 100644 --- a/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs +++ b/rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs @@ -45,7 +45,7 @@ impl ClientEditDoc { server: Arc, user: Arc, ) -> DocResult { - let (sender, receiver) = mpsc::channel(1); + let (sender, receiver) = mpsc::unbounded_channel(); let mut rev_manager = RevisionManager::new(doc_id, pool.clone(), server.clone(), sender); spawn_rev_receiver(receiver, ws.clone()); @@ -273,7 +273,7 @@ impl ClientEditDoc { WsDataType::NewDocUser => {}, WsDataType::Acked => { let rev_id = RevId::try_from(bytes)?; - let _ = self.rev_manager.ack_rev(rev_id); + let _ = self.rev_manager.ack_rev(rev_id).await?; }, WsDataType::Conflict => {}, } @@ -302,7 +302,7 @@ impl WsDocumentHandler for EditDocWsHandler { } } -fn spawn_rev_receiver(mut receiver: mpsc::Receiver, ws: Arc) { +fn spawn_rev_receiver(mut receiver: mpsc::UnboundedReceiver, ws: Arc) { tokio::spawn(async move { loop { while let Some(revision) = receiver.recv().await { diff --git a/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/rust-lib/flowy-document/src/services/doc/revision/manager.rs index d46661b75f..11c1ef3dca 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -30,7 +30,7 @@ impl RevisionManager { doc_id: &str, pool: Arc, server: Arc, - pending_rev_sender: mpsc::Sender, + pending_rev_sender: mpsc::UnboundedSender, ) -> Self { let rev_store = RevisionStore::new(doc_id, pool, server, pending_rev_sender); let rev_id_counter = RevIdCounter::new(0); @@ -52,11 +52,8 @@ impl RevisionManager { Ok(()) } - pub fn ack_rev(&self, rev_id: RevId) -> Result<(), DocError> { - let rev_store = self.rev_store.clone(); - tokio::spawn(async move { - rev_store.handle_revision_acked(rev_id).await; - }); + pub async fn ack_rev(&self, rev_id: RevId) -> Result<(), DocError> { + self.rev_store.handle_revision_acked(rev_id).await; Ok(()) } diff --git a/rust-lib/flowy-document/src/services/doc/revision/model.rs b/rust-lib/flowy-document/src/services/doc/revision/model.rs index 3317ce0b6c..d971b74c0e 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/model.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/model.rs @@ -91,14 +91,14 @@ pub(crate) type PendingReceiver = mpsc::UnboundedReceiver; pub(crate) struct PendingRevisionStream { revisions: Arc, receiver: Option, - next_revision: mpsc::Sender, + next_revision: mpsc::UnboundedSender, } impl PendingRevisionStream { pub(crate) fn new( revisions: Arc, pending_rx: PendingReceiver, - next_revision: mpsc::Sender, + next_revision: mpsc::UnboundedSender, ) -> Self { Self { revisions, @@ -137,7 +137,7 @@ impl PendingRevisionStream { match self.revisions.next().await? { None => Ok(()), Some(revision) => { - self.next_revision.send(revision).await.map_err(internal_error); + self.next_revision.send(revision).map_err(internal_error); let _ = tokio::time::timeout(Duration::from_millis(2000), ret.recv()).await; Ok(()) }, diff --git a/rust-lib/flowy-document/src/services/doc/revision/rev_store.rs b/rust-lib/flowy-document/src/services/doc/revision/rev_store.rs index ff03c76809..0b08856173 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/rev_store.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/rev_store.rs @@ -34,7 +34,7 @@ impl RevisionStore { doc_id: &str, pool: Arc, server: Arc, - next_revision: mpsc::Sender, + next_revision: mpsc::UnboundedSender, ) -> Arc { let doc_id = doc_id.to_owned(); let persistence = Arc::new(Persistence::new(pool)); @@ -63,7 +63,7 @@ impl RevisionStore { return Err(DocError::duplicate_rev().context(format!("Duplicate revision id: {}", revision.rev_id))); } - let (sender, receiver) = broadcast::channel(1); + let (sender, receiver) = broadcast::channel(2); let revs_map = self.revs_map.clone(); let mut rx = sender.subscribe(); tokio::spawn(async move { diff --git a/rust-lib/flowy-ws/src/connect.rs b/rust-lib/flowy-ws/src/connect.rs index 2134189b20..918d6fa99a 100644 --- a/rust-lib/flowy-ws/src/connect.rs +++ b/rust-lib/flowy-ws/src/connect.rs @@ -92,28 +92,39 @@ impl WsStream { msg_tx: msg_tx.clone(), inner: Some(( Box::pin(async move { - let (tx, mut rx) = tokio::sync::mpsc::channel(10); - let _ = ws_read - .for_each(|message| async { - match tx.send(post_message(msg_tx.clone(), message)).await { - Ok(_) => {}, - Err(e) => log::error!("WsStream tx closed unexpectedly: {} ", e), - } - }) - .await; - - loop { - match rx.recv().await { - None => { - return Err(WsError::internal().context("WsStream rx closed unexpectedly")); - }, - Some(result) => { - if result.is_err() { - return result; + let (tx, mut rx) = tokio::sync::mpsc::channel(100); + let read = async { + ws_read + .for_each(|message| async { + match tx.send(post_message(msg_tx.clone(), message)).await { + Ok(_) => {}, + Err(e) => log::error!("WsStream tx closed unexpectedly: {} ", e), } - }, + }) + .await; + Ok(()) + }; + + let ret = async { + loop { + match rx.recv().await { + None => { + return Err(WsError::internal().context("WsStream rx closed unexpectedly")); + }, + Some(result) => { + if result.is_err() { + return result; + } + }, + } } - } + }; + futures::pin_mut!(ret); + futures::pin_mut!(read); + tokio::select! { + result = read => {return result}, + result = ret => {return result}, + }; }), Box::pin(async move { let result = ws_rx.map(Ok).forward(ws_write).await.map_err(internal_error);