fix: channel buffer is full without pull the receiver

This commit is contained in:
appflowy 2021-10-08 15:08:56 +08:00
parent 3cbce2c505
commit 7730539278
6 changed files with 66 additions and 44 deletions

View File

@ -5,6 +5,7 @@ use crate::service::{
};
use actix_web::web::Data;
use crate::service::ws::entities::Socket;
use bytes::Bytes;
use dashmap::DashMap;
use flowy_document::{
@ -84,7 +85,8 @@ impl ServerEditDoc {
cur_rev_id = %self.rev_id.load(SeqCst),
base_rev_id = %revision.base_rev_id,
rev_id = %revision.rev_id,
)
),
err
)]
pub async fn apply_revision(
&self,
@ -100,14 +102,10 @@ impl ServerEditDoc {
if cur_rev_id == revision.base_rev_id || next_rev_id == revision.base_rev_id {
// The rev is in the right order, just compose it.
let _ = self.compose_revision(&revision, pg_pool).await?;
user.socket
.do_send(mk_acked_message(&revision))
.map_err(internal_error)?;
let _ = send_acked_msg(&user.socket, &revision)?;
} else {
// The server document is outdated, pull the missing revision from the client.
user.socket
.do_send(mk_pull_message(&self.doc_id, next_rev_id, revision.rev_id))
.map_err(internal_error)?;
let _ = send_pull_message(&user.socket, &self.doc_id, next_rev_id, revision.rev_id)?;
}
},
Ordering::Equal => {
@ -119,9 +117,7 @@ impl ServerEditDoc {
// send the prime delta to the client. Client should compose the this prime
// delta.
let cli_revision = self.transform_revision(&revision)?;
user.socket
.do_send(mk_push_message(&self.doc_id, cli_revision))
.map_err(internal_error)?;
let _ = send_push_message(&user.socket, &self.doc_id, cli_revision)?;
},
}
Ok(())
@ -204,6 +200,12 @@ impl ServerEditDoc {
}
}
#[tracing::instrument(level = "debug", skip(socket, doc_id, revision), err)]
fn send_push_message(socket: &Socket, doc_id: &str, revision: Revision) -> Result<(), ServerError> {
let msg = mk_push_message(doc_id, revision);
socket.try_send(msg).map_err(internal_error)
}
fn mk_push_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor {
let bytes = revision.write_to_bytes().unwrap();
let data = WsDocumentData {
@ -214,6 +216,12 @@ fn mk_push_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor {
mk_ws_message(data)
}
#[tracing::instrument(level = "debug", skip(socket, doc_id), err)]
fn send_pull_message(socket: &Socket, doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> Result<(), ServerError> {
let msg = mk_pull_message(doc_id, from_rev_id, to_rev_id);
socket.try_send(msg).map_err(internal_error)
}
fn mk_pull_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsMessageAdaptor {
let range = RevisionRange {
doc_id: doc_id.to_string(),
@ -231,6 +239,12 @@ fn mk_pull_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsMessageA
mk_ws_message(data)
}
#[tracing::instrument(level = "debug", skip(socket, revision), err)]
fn send_acked_msg(socket: &Socket, revision: &Revision) -> Result<(), ServerError> {
let msg = mk_acked_message(revision);
socket.try_send(msg).map_err(internal_error)
}
fn mk_acked_message(revision: &Revision) -> WsMessageAdaptor {
// let mut wtr = vec![];
// let _ = wtr.write_i64::<BigEndian>(revision.rev_id);

View File

@ -45,7 +45,7 @@ impl ClientEditDoc {
server: Arc<dyn RevisionServer>,
user: Arc<dyn DocumentUser>,
) -> DocResult<Self> {
let (sender, receiver) = mpsc::channel(1);
let (sender, receiver) = mpsc::unbounded_channel();
let mut rev_manager = RevisionManager::new(doc_id, pool.clone(), server.clone(), sender);
spawn_rev_receiver(receiver, ws.clone());
@ -273,7 +273,7 @@ impl ClientEditDoc {
WsDataType::NewDocUser => {},
WsDataType::Acked => {
let rev_id = RevId::try_from(bytes)?;
let _ = self.rev_manager.ack_rev(rev_id);
let _ = self.rev_manager.ack_rev(rev_id).await?;
},
WsDataType::Conflict => {},
}
@ -302,7 +302,7 @@ impl WsDocumentHandler for EditDocWsHandler {
}
}
fn spawn_rev_receiver(mut receiver: mpsc::Receiver<Revision>, ws: Arc<dyn DocumentWebSocket>) {
fn spawn_rev_receiver(mut receiver: mpsc::UnboundedReceiver<Revision>, ws: Arc<dyn DocumentWebSocket>) {
tokio::spawn(async move {
loop {
while let Some(revision) = receiver.recv().await {

View File

@ -30,7 +30,7 @@ impl RevisionManager {
doc_id: &str,
pool: Arc<ConnectionPool>,
server: Arc<dyn RevisionServer>,
pending_rev_sender: mpsc::Sender<Revision>,
pending_rev_sender: mpsc::UnboundedSender<Revision>,
) -> Self {
let rev_store = RevisionStore::new(doc_id, pool, server, pending_rev_sender);
let rev_id_counter = RevIdCounter::new(0);
@ -52,11 +52,8 @@ impl RevisionManager {
Ok(())
}
pub fn ack_rev(&self, rev_id: RevId) -> Result<(), DocError> {
let rev_store = self.rev_store.clone();
tokio::spawn(async move {
rev_store.handle_revision_acked(rev_id).await;
});
pub async fn ack_rev(&self, rev_id: RevId) -> Result<(), DocError> {
self.rev_store.handle_revision_acked(rev_id).await;
Ok(())
}

View File

@ -91,14 +91,14 @@ pub(crate) type PendingReceiver = mpsc::UnboundedReceiver<PendingMsg>;
pub(crate) struct PendingRevisionStream {
revisions: Arc<dyn RevisionIterator>,
receiver: Option<PendingReceiver>,
next_revision: mpsc::Sender<Revision>,
next_revision: mpsc::UnboundedSender<Revision>,
}
impl PendingRevisionStream {
pub(crate) fn new(
revisions: Arc<dyn RevisionIterator>,
pending_rx: PendingReceiver,
next_revision: mpsc::Sender<Revision>,
next_revision: mpsc::UnboundedSender<Revision>,
) -> Self {
Self {
revisions,
@ -137,7 +137,7 @@ impl PendingRevisionStream {
match self.revisions.next().await? {
None => Ok(()),
Some(revision) => {
self.next_revision.send(revision).await.map_err(internal_error);
self.next_revision.send(revision).map_err(internal_error);
let _ = tokio::time::timeout(Duration::from_millis(2000), ret.recv()).await;
Ok(())
},

View File

@ -34,7 +34,7 @@ impl RevisionStore {
doc_id: &str,
pool: Arc<ConnectionPool>,
server: Arc<dyn RevisionServer>,
next_revision: mpsc::Sender<Revision>,
next_revision: mpsc::UnboundedSender<Revision>,
) -> Arc<RevisionStore> {
let doc_id = doc_id.to_owned();
let persistence = Arc::new(Persistence::new(pool));
@ -63,7 +63,7 @@ impl RevisionStore {
return Err(DocError::duplicate_rev().context(format!("Duplicate revision id: {}", revision.rev_id)));
}
let (sender, receiver) = broadcast::channel(1);
let (sender, receiver) = broadcast::channel(2);
let revs_map = self.revs_map.clone();
let mut rx = sender.subscribe();
tokio::spawn(async move {

View File

@ -92,28 +92,39 @@ impl WsStream {
msg_tx: msg_tx.clone(),
inner: Some((
Box::pin(async move {
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
let _ = ws_read
.for_each(|message| async {
match tx.send(post_message(msg_tx.clone(), message)).await {
Ok(_) => {},
Err(e) => log::error!("WsStream tx closed unexpectedly: {} ", e),
}
})
.await;
loop {
match rx.recv().await {
None => {
return Err(WsError::internal().context("WsStream rx closed unexpectedly"));
},
Some(result) => {
if result.is_err() {
return result;
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
let read = async {
ws_read
.for_each(|message| async {
match tx.send(post_message(msg_tx.clone(), message)).await {
Ok(_) => {},
Err(e) => log::error!("WsStream tx closed unexpectedly: {} ", e),
}
},
})
.await;
Ok(())
};
let ret = async {
loop {
match rx.recv().await {
None => {
return Err(WsError::internal().context("WsStream rx closed unexpectedly"));
},
Some(result) => {
if result.is_err() {
return result;
}
},
}
}
}
};
futures::pin_mut!(ret);
futures::pin_mut!(read);
tokio::select! {
result = read => {return result},
result = ret => {return result},
};
}),
Box::pin(async move {
let result = ws_rx.map(Ok).forward(ws_write).await.map_err(internal_error);