From 60e9071685777296908c479a82c59adab2d855a4 Mon Sep 17 00:00:00 2001 From: appflowy Date: Wed, 12 Jan 2022 12:40:41 +0800 Subject: [PATCH] add some code documentation --- backend/Cargo.lock | 1 + frontend/rust-lib/flowy-document/Cargo.toml | 1 + .../rust-lib/flowy-document/src/controller.rs | 2 +- .../flowy-document/src/core/edit/editor.rs | 50 +++++------ .../flowy-document/src/core/edit/queue.rs | 27 +++--- .../flowy-document/src/core/web_socket/mod.rs | 21 +++-- .../src/core/web_socket/ws_manager.rs | 89 ++++++++++--------- .../flowy-document/src/ws_receivers.rs | 26 +++--- .../flowy-net/src/ws/local/local_server.rs | 9 +- .../flowy-net/src/ws/local/persistence.rs | 2 +- .../src/deps_resolve/document_deps.rs | 7 +- .../flowy-collaboration/src/sync/server.rs | 6 +- shared-lib/lib-ws/src/ws.rs | 1 - 13 files changed, 132 insertions(+), 110 deletions(-) diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 324673e384..3de7da43e3 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1325,6 +1325,7 @@ name = "flowy-document" version = "0.1.0" dependencies = [ "async-stream", + "async-trait", "bytecount", "byteorder", "bytes", diff --git a/frontend/rust-lib/flowy-document/Cargo.toml b/frontend/rust-lib/flowy-document/Cargo.toml index a9023b409b..bb20153ab9 100644 --- a/frontend/rust-lib/flowy-document/Cargo.toml +++ b/frontend/rust-lib/flowy-document/Cargo.toml @@ -39,6 +39,7 @@ chrono = "0.4.19" futures-util = "0.3.15" byteorder = {version = "1.3.4"} async-stream = "0.3.2" +async-trait = "0.1.52" futures = "0.3.15" pin-project = "1.0.0" diff --git a/frontend/rust-lib/flowy-document/src/controller.rs b/frontend/rust-lib/flowy-document/src/controller.rs index d3b9cb740c..a59f9e3992 100644 --- a/frontend/rust-lib/flowy-document/src/controller.rs +++ b/frontend/rust-lib/flowy-document/src/controller.rs @@ -194,7 +194,7 @@ impl OpenDocCache { fn listen_ws_state_changed(mut state_receiver: WSStateReceiver, receivers: Arc) { tokio::spawn(async move { while let Ok(state) = state_receiver.recv().await { - receivers.ws_connect_state_changed(&state); + receivers.ws_connect_state_changed(&state).await; } }); } diff --git a/frontend/rust-lib/flowy-document/src/core/edit/editor.rs b/frontend/rust-lib/flowy-document/src/core/edit/editor.rs index 66b0988901..d75c9fc5fb 100644 --- a/frontend/rust-lib/flowy-document/src/core/edit/editor.rs +++ b/frontend/rust-lib/flowy-document/src/core/edit/editor.rs @@ -1,8 +1,7 @@ use crate::{ context::DocumentUser, core::{ - web_socket::{make_document_ws_manager, DocumentWebSocketManager}, - DocumentMD5, + web_socket::{make_document_ws_manager, DocumentWebSocketManager, EditorCommandSender}, DocumentRevisionManager, DocumentWSReceiver, DocumentWebSocket, @@ -20,14 +19,14 @@ use lib_ot::{ rich_text::{RichTextAttribute, RichTextDelta}, }; use std::sync::Arc; -use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot}; +use tokio::sync::{mpsc, oneshot}; pub struct ClientDocumentEditor { pub doc_id: String, #[allow(dead_code)] rev_manager: Arc, ws_manager: Arc, - edit_queue: UnboundedSender, + edit_cmd_tx: EditorCommandSender, } impl ClientDocumentEditor { @@ -43,11 +42,11 @@ impl ClientDocumentEditor { let doc_id = doc_id.to_string(); let user_id = user.user_id()?; - let edit_queue = spawn_edit_queue(user, rev_manager.clone(), delta); + let edit_cmd_tx = spawn_edit_queue(user, rev_manager.clone(), delta); let ws_manager = make_document_ws_manager( doc_id.clone(), user_id.clone(), - edit_queue.clone(), + edit_cmd_tx.clone(), rev_manager.clone(), ws, ) @@ -56,7 +55,7 @@ impl ClientDocumentEditor { doc_id, rev_manager, ws_manager, - edit_queue, + edit_cmd_tx, }); Ok(editor) } @@ -68,7 +67,7 @@ impl ClientDocumentEditor { data: data.to_string(), ret, }; - let _ = self.edit_queue.send(msg); + let _ = self.edit_cmd_tx.send(msg).await; let _ = rx.await.map_err(internal_error)??; Ok(()) } @@ -76,7 +75,7 @@ impl ClientDocumentEditor { pub async fn delete(&self, interval: Interval) -> Result<(), FlowyError> { let (ret, rx) = oneshot::channel::>(); let msg = EditorCommand::Delete { interval, ret }; - let _ = self.edit_queue.send(msg); + let _ = self.edit_cmd_tx.send(msg).await; let _ = rx.await.map_err(internal_error)??; Ok(()) } @@ -88,7 +87,7 @@ impl ClientDocumentEditor { attribute, ret, }; - let _ = self.edit_queue.send(msg); + let _ = self.edit_cmd_tx.send(msg).await; let _ = rx.await.map_err(internal_error)??; Ok(()) } @@ -100,7 +99,7 @@ impl ClientDocumentEditor { data: data.to_string(), ret, }; - let _ = self.edit_queue.send(msg); + let _ = self.edit_cmd_tx.send(msg).await; let _ = rx.await.map_err(internal_error)??; Ok(()) } @@ -108,21 +107,21 @@ impl ClientDocumentEditor { pub async fn can_undo(&self) -> bool { let (ret, rx) = oneshot::channel::(); let msg = EditorCommand::CanUndo { ret }; - let _ = self.edit_queue.send(msg); + let _ = self.edit_cmd_tx.send(msg).await; rx.await.unwrap_or(false) } pub async fn can_redo(&self) -> bool { let (ret, rx) = oneshot::channel::(); let msg = EditorCommand::CanRedo { ret }; - let _ = self.edit_queue.send(msg); + let _ = self.edit_cmd_tx.send(msg).await; rx.await.unwrap_or(false) } pub async fn undo(&self) -> Result<(), FlowyError> { let (ret, rx) = oneshot::channel(); let msg = EditorCommand::Undo { ret }; - let _ = self.edit_queue.send(msg); + let _ = self.edit_cmd_tx.send(msg).await; let _ = rx.await.map_err(internal_error)??; Ok(()) } @@ -130,15 +129,15 @@ impl ClientDocumentEditor { pub async fn redo(&self) -> Result<(), FlowyError> { let (ret, rx) = oneshot::channel(); let msg = EditorCommand::Redo { ret }; - let _ = self.edit_queue.send(msg); + let _ = self.edit_cmd_tx.send(msg).await; let _ = rx.await.map_err(internal_error)??; Ok(()) } pub async fn document_json(&self) -> FlowyResult { let (ret, rx) = oneshot::channel::>(); - let msg = EditorCommand::ReadDoc { ret }; - let _ = self.edit_queue.send(msg); + let msg = EditorCommand::ReadDocumentAsJson { ret }; + let _ = self.edit_cmd_tx.send(msg).await; let json = rx.await.map_err(internal_error)??; Ok(json) } @@ -151,7 +150,7 @@ impl ClientDocumentEditor { delta: delta.clone(), ret, }; - let _ = self.edit_queue.send(msg); + let _ = self.edit_cmd_tx.send(msg).await; let _ = rx.await.map_err(internal_error)??; Ok(()) } @@ -162,12 +161,13 @@ impl ClientDocumentEditor { pub(crate) fn ws_handler(&self) -> Arc { self.ws_manager.clone() } } +// The edit queue will exit after the EditorCommandSender was dropped. fn spawn_edit_queue( user: Arc, rev_manager: Arc, delta: RichTextDelta, -) -> UnboundedSender { - let (sender, receiver) = mpsc::unbounded_channel::(); +) -> EditorCommandSender { + let (sender, receiver) = mpsc::channel(1000); let actor = EditorCommandQueue::new(user, rev_manager, delta, receiver); tokio::spawn(actor.run()); sender @@ -176,17 +176,17 @@ fn spawn_edit_queue( #[cfg(feature = "flowy_unit_test")] impl ClientDocumentEditor { pub async fn doc_json(&self) -> FlowyResult { - let (ret, rx) = oneshot::channel::>(); - let msg = EditorCommand::ReadDoc { ret }; - let _ = self.edit_queue.send(msg); + let (ret, rx) = oneshot::channel::>(); + let msg = EditorCommand::ReadDocumentAsJson { ret }; + let _ = self.edit_cmd_tx.send(msg).await; let s = rx.await.map_err(internal_error)??; Ok(s) } pub async fn doc_delta(&self) -> FlowyResult { let (ret, rx) = oneshot::channel::>(); - let msg = EditorCommand::ReadDocDelta { ret }; - let _ = self.edit_queue.send(msg); + let msg = EditorCommand::ReadDocumentAsDelta { ret }; + let _ = self.edit_cmd_tx.send(msg).await; let delta = rx.await.map_err(internal_error)??; Ok(delta) } diff --git a/frontend/rust-lib/flowy-document/src/core/edit/queue.rs b/frontend/rust-lib/flowy-document/src/core/edit/queue.rs index 6873ff0766..c22d6ed29f 100644 --- a/frontend/rust-lib/flowy-document/src/core/edit/queue.rs +++ b/frontend/rust-lib/flowy-document/src/core/edit/queue.rs @@ -1,4 +1,7 @@ -use crate::{context::DocumentUser, core::DocumentRevisionManager}; +use crate::{ + context::DocumentUser, + core::{web_socket::EditorCommandReceiver, DocumentRevisionManager}, +}; use async_stream::stream; use flowy_collaboration::{ document::{history::UndoResult, Document, NewlineDoc}, @@ -12,14 +15,16 @@ use lib_ot::{ core::{Interval, OperationTransformable}, rich_text::{RichTextAttribute, RichTextDelta}, }; -use std::sync::Arc; -use tokio::sync::{mpsc, oneshot, RwLock}; +use std::{cell::Cell, sync::Arc}; +use tokio::sync::{oneshot, RwLock}; +// The EditorCommandQueue executes each command that will alter the document in +// serial. pub(crate) struct EditorCommandQueue { document: Arc>, user: Arc, rev_manager: Arc, - receiver: Option>, + receiver: Option, } impl EditorCommandQueue { @@ -27,7 +32,7 @@ impl EditorCommandQueue { user: Arc, rev_manager: Arc, delta: RichTextDelta, - receiver: mpsc::UnboundedReceiver, + receiver: EditorCommandReceiver, ) -> Self { let document = Arc::new(RwLock::new(Document::from_delta(delta))); Self { @@ -183,11 +188,11 @@ impl EditorCommandQueue { let _ = self.save_local_delta(delta, md5).await?; let _ = ret.send(Ok(())); }, - EditorCommand::ReadDoc { ret } => { + EditorCommand::ReadDocumentAsJson { ret } => { let data = self.document.read().await.to_json(); let _ = ret.send(Ok(data)); }, - EditorCommand::ReadDocDelta { ret } => { + EditorCommand::ReadDocumentAsDelta { ret } => { let delta = self.document.read().await.delta().clone(); let _ = ret.send(Ok(delta)); }, @@ -286,11 +291,11 @@ pub(crate) enum EditorCommand { Redo { ret: Ret<()>, }, - ReadDoc { + ReadDocumentAsJson { ret: Ret, }, #[allow(dead_code)] - ReadDocDelta { + ReadDocumentAsDelta { ret: Ret, }, } @@ -310,8 +315,8 @@ impl std::fmt::Debug for EditorCommand { EditorCommand::CanRedo { .. } => "CanRedo", EditorCommand::Undo { .. } => "Undo", EditorCommand::Redo { .. } => "Redo", - EditorCommand::ReadDoc { .. } => "ReadDoc", - EditorCommand::ReadDocDelta { .. } => "ReadDocDelta", + EditorCommand::ReadDocumentAsJson { .. } => "ReadDocumentAsJson", + EditorCommand::ReadDocumentAsDelta { .. } => "ReadDocumentAsDelta", }; f.write_str(s) } diff --git a/frontend/rust-lib/flowy-document/src/core/web_socket/mod.rs b/frontend/rust-lib/flowy-document/src/core/web_socket/mod.rs index 19d342b2e8..4c11fd331a 100644 --- a/frontend/rust-lib/flowy-document/src/core/web_socket/mod.rs +++ b/frontend/rust-lib/flowy-document/src/core/web_socket/mod.rs @@ -4,7 +4,6 @@ pub use ws_manager::*; use crate::core::{ web_socket::{DocumentWSSinkDataProvider, DocumentWSSteamConsumer}, DocumentRevisionManager, - DocumentWSReceiver, DocumentWebSocket, EditorCommand, TransformDeltas, @@ -21,12 +20,20 @@ use flowy_error::{internal_error, FlowyError, FlowyResult}; use lib_infra::future::FutureResult; use lib_ws::WSConnectState; use std::{collections::VecDeque, convert::TryFrom, sync::Arc}; -use tokio::sync::{broadcast, mpsc::UnboundedSender, oneshot, RwLock}; +use tokio::sync::{ + broadcast, + mpsc::{Receiver, Sender}, + oneshot, + RwLock, +}; + +pub(crate) type EditorCommandSender = Sender; +pub(crate) type EditorCommandReceiver = Receiver; pub(crate) async fn make_document_ws_manager( doc_id: String, user_id: String, - edit_cmd_tx: UnboundedSender, + edit_cmd_tx: EditorCommandSender, rev_manager: Arc, ws_conn: Arc, ) -> Arc { @@ -68,7 +75,7 @@ fn listen_document_ws_state( pub(crate) struct DocumentWebSocketSteamConsumerAdapter { pub(crate) doc_id: String, - pub(crate) edit_cmd_tx: UnboundedSender, + pub(crate) edit_cmd_tx: EditorCommandSender, pub(crate) rev_manager: Arc, pub(crate) shared_sink: Arc, } @@ -94,7 +101,7 @@ impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter { } fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> FutureResult<(), FlowyError> { - // the _new_user will be used later + // Do nothing by now, just a placeholder for future extension. FutureResult::new(async move { Ok(()) }) } @@ -121,7 +128,7 @@ impl DocumentWSSinkDataProvider for DocumentWSSinkDataProviderAdapter { async fn transform_pushed_revisions( revisions: Vec, - edit_cmd: &UnboundedSender, + edit_cmd: &EditorCommandSender, ) -> FlowyResult { let (ret, rx) = oneshot::channel::>(); let _ = edit_cmd.send(EditorCommand::TransformRevision { revisions, ret }); @@ -130,7 +137,7 @@ async fn transform_pushed_revisions( #[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))] pub(crate) async fn handle_remote_revision( - edit_cmd_tx: UnboundedSender, + edit_cmd_tx: EditorCommandSender, rev_manager: Arc, bytes: Bytes, ) -> FlowyResult> { diff --git a/frontend/rust-lib/flowy-document/src/core/web_socket/ws_manager.rs b/frontend/rust-lib/flowy-document/src/core/web_socket/ws_manager.rs index ddbcdd96a0..724b867a05 100644 --- a/frontend/rust-lib/flowy-document/src/core/web_socket/ws_manager.rs +++ b/frontend/rust-lib/flowy-document/src/core/web_socket/ws_manager.rs @@ -3,6 +3,7 @@ use crate::{ ws_receivers::{DocumentWSReceiver, DocumentWebSocket}, }; use async_stream::stream; +use async_trait::async_trait; use bytes::Bytes; use flowy_collaboration::entities::{ revision::{RevId, RevisionRange}, @@ -17,21 +18,35 @@ use tokio::{ sync::{ broadcast, mpsc, - mpsc::{UnboundedReceiver, UnboundedSender}, + mpsc::{Receiver, Sender}, }, task::spawn_blocking, time::{interval, Duration}, }; +// The consumer consumes the messages pushed by the web socket. +pub trait DocumentWSSteamConsumer: Send + Sync { + fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError>; + fn receive_ack(&self, id: String, ty: DocumentServerWSDataType) -> FutureResult<(), FlowyError>; + fn receive_new_user_connect(&self, new_user: NewDocumentUser) -> FutureResult<(), FlowyError>; + fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError>; +} + +// The sink provides the data that will be sent through the web socket to the +// backend. +pub trait DocumentWSSinkDataProvider: Send + Sync { + fn next(&self) -> FutureResult, FlowyError>; +} + pub struct DocumentWebSocketManager { doc_id: String, data_provider: Arc, stream_consumer: Arc, ws_conn: Arc, - ws_msg_tx: UnboundedSender, - ws_msg_rx: Option>, + ws_passthrough_tx: Sender, + ws_passthrough_rx: Option>, + state_passthrough_tx: broadcast::Sender, stop_sync_tx: SinkStopTx, - state: broadcast::Sender, } impl DocumentWebSocketManager { @@ -41,26 +56,26 @@ impl DocumentWebSocketManager { data_provider: Arc, stream_consumer: Arc, ) -> Self { - let (ws_msg_tx, ws_msg_rx) = mpsc::unbounded_channel(); + let (ws_passthrough_tx, ws_passthrough_rx) = mpsc::channel(1000); let (stop_sync_tx, _) = tokio::sync::broadcast::channel(2); let doc_id = doc_id.to_string(); - let (state, _) = broadcast::channel(2); + let (state_passthrough_tx, _) = broadcast::channel(2); let mut manager = DocumentWebSocketManager { doc_id, data_provider, stream_consumer, ws_conn, - ws_msg_tx, - ws_msg_rx: Some(ws_msg_rx), + ws_passthrough_tx, + ws_passthrough_rx: Some(ws_passthrough_rx), + state_passthrough_tx, stop_sync_tx, - state, }; manager.run(); manager } fn run(&mut self) { - let ws_msg_rx = self.ws_msg_rx.take().expect("Only take once"); + let ws_msg_rx = self.ws_passthrough_rx.take().expect("Only take once"); let sink = DocumentWSSink::new( &self.doc_id, self.data_provider.clone(), @@ -77,7 +92,7 @@ impl DocumentWebSocketManager { tokio::spawn(stream.run()); } - pub fn scribe_state(&self) -> broadcast::Receiver { self.state.subscribe() } + pub fn scribe_state(&self) -> broadcast::Receiver { self.state_passthrough_tx.subscribe() } pub(crate) fn stop(&self) { if self.stop_sync_tx.send(()).is_ok() { @@ -86,16 +101,22 @@ impl DocumentWebSocketManager { } } +// DocumentWebSocketManager registers itself as a DocumentWSReceiver for each +// opened document. It will receive the web socket message and parser it into +// DocumentServerWSData. +#[async_trait] impl DocumentWSReceiver for DocumentWebSocketManager { - fn receive_ws_data(&self, doc_data: DocumentServerWSData) { - match self.ws_msg_tx.send(doc_data) { - Ok(_) => {}, - Err(e) => tracing::error!("❌ Propagate ws message failed. {}", e), - } + #[tracing::instrument(level = "debug", skip(self, doc_data), err)] + async fn receive_ws_data(&self, doc_data: DocumentServerWSData) -> Result<(), FlowyError> { + let _ = self.ws_passthrough_tx.send(doc_data).await.map_err(|e| { + let err_msg = format!("{} passthrough error: {}", self.doc_id, e); + FlowyError::internal().context(err_msg) + })?; + Ok(()) } - fn connect_state_changed(&self, state: &WSConnectState) { - match self.state.send(state.clone()) { + fn connect_state_changed(&self, state: WSConnectState) { + match self.state_passthrough_tx.send(state) { Ok(_) => {}, Err(e) => tracing::error!("{}", e), } @@ -103,20 +124,13 @@ impl DocumentWSReceiver for DocumentWebSocketManager { } impl std::ops::Drop for DocumentWebSocketManager { - fn drop(&mut self) { tracing::debug!("{} HttpWebSocketManager was drop", self.doc_id) } -} - -pub trait DocumentWSSteamConsumer: Send + Sync { - fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError>; - fn receive_ack(&self, id: String, ty: DocumentServerWSDataType) -> FutureResult<(), FlowyError>; - fn receive_new_user_connect(&self, new_user: NewDocumentUser) -> FutureResult<(), FlowyError>; - fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError>; + fn drop(&mut self) { tracing::trace!("{} DocumentWebSocketManager was dropped", self.doc_id) } } pub struct DocumentWSStream { doc_id: String, consumer: Arc, - ws_msg_rx: Option>, + ws_msg_rx: Option>, stop_rx: Option, } @@ -124,7 +138,7 @@ impl DocumentWSStream { pub fn new( doc_id: &str, consumer: Arc, - ws_msg_rx: mpsc::UnboundedReceiver, + ws_msg_rx: mpsc::Receiver, stop_rx: SinkStopRx, ) -> Self { DocumentWSStream { @@ -193,21 +207,14 @@ impl DocumentWSStream { DocumentServerWSDataType::UserConnect => { let new_user = NewDocumentUser::try_from(bytes)?; let _ = self.consumer.receive_new_user_connect(new_user).await; - // Notify the user that someone has connected to this document }, } Ok(()) } } -pub type Tick = (); -pub type SinkStopRx = broadcast::Receiver<()>; -pub type SinkStopTx = broadcast::Sender<()>; - -pub trait DocumentWSSinkDataProvider: Send + Sync { - fn next(&self) -> FutureResult, FlowyError>; -} - +type SinkStopRx = broadcast::Receiver<()>; +type SinkStopTx = broadcast::Sender<()>; pub struct DocumentWSSink { provider: Arc, ws_sender: Arc, @@ -231,7 +238,7 @@ impl DocumentWSSink { } pub async fn run(mut self) { - let (tx, mut rx) = mpsc::unbounded_channel(); + let (tx, mut rx) = mpsc::channel(1); let mut stop_rx = self.stop_rx.take().expect("Only take once"); let doc_id = self.doc_id.clone(); tokio::spawn(tick(tx)); @@ -245,7 +252,7 @@ impl DocumentWSSink { } }, _ = stop_rx.recv() => { - tracing::debug!("[DocumentSink:{}] loop exit", doc_id); + tracing::trace!("[DocumentSink:{}] loop exit", doc_id); break }, }; @@ -275,9 +282,9 @@ impl DocumentWSSink { } } -async fn tick(sender: mpsc::UnboundedSender) { +async fn tick(sender: mpsc::Sender<()>) { let mut interval = interval(Duration::from_millis(SYNC_INTERVAL_IN_MILLIS)); - while sender.send(()).is_ok() { + while sender.send(()).await.is_ok() { interval.tick().await; } } diff --git a/frontend/rust-lib/flowy-document/src/ws_receivers.rs b/frontend/rust-lib/flowy-document/src/ws_receivers.rs index 6816b116e0..f34145083b 100644 --- a/frontend/rust-lib/flowy-document/src/ws_receivers.rs +++ b/frontend/rust-lib/flowy-document/src/ws_receivers.rs @@ -1,13 +1,15 @@ use crate::errors::FlowyError; +use async_trait::async_trait; use bytes::Bytes; use dashmap::DashMap; use flowy_collaboration::entities::ws::{DocumentClientWSData, DocumentServerWSData}; use lib_ws::WSConnectState; use std::{convert::TryInto, sync::Arc}; +#[async_trait] pub(crate) trait DocumentWSReceiver: Send + Sync { - fn receive_ws_data(&self, data: DocumentServerWSData); - fn connect_state_changed(&self, state: &WSConnectState); + async fn receive_ws_data(&self, data: DocumentServerWSData) -> Result<(), FlowyError>; + fn connect_state_changed(&self, state: WSConnectState); } pub type WSStateReceiver = tokio::sync::broadcast::Receiver; @@ -18,6 +20,7 @@ pub trait DocumentWebSocket: Send + Sync { pub struct DocumentWSReceivers { // key: the document id + // value: DocumentWSReceiver receivers: Arc>>, } @@ -40,21 +43,20 @@ impl DocumentWSReceivers { pub(crate) fn remove(&self, id: &str) { self.receivers.remove(id); } - pub fn did_receive_data(&self, data: Bytes) { + pub async fn did_receive_data(&self, data: Bytes) { let data: DocumentServerWSData = data.try_into().unwrap(); match self.receivers.get(&data.doc_id) { - None => { - log::error!("Can't find any source handler for {:?}", data.doc_id); - }, - Some(handler) => { - handler.receive_ws_data(data); + None => tracing::error!("Can't find any source handler for {:?}", data.doc_id), + Some(handler) => match handler.receive_ws_data(data).await { + Ok(_) => {}, + Err(e) => tracing::error!("{}", e), }, } } - pub fn ws_connect_state_changed(&self, state: &WSConnectState) { - self.receivers.iter().for_each(|receiver| { - receiver.value().connect_state_changed(&state); - }); + pub async fn ws_connect_state_changed(&self, state: &WSConnectState) { + for receiver in self.receivers.iter() { + receiver.value().connect_state_changed(state.clone()); + } } } diff --git a/frontend/rust-lib/flowy-net/src/ws/local/local_server.rs b/frontend/rust-lib/flowy-net/src/ws/local/local_server.rs index d1e38ae3aa..572d68117e 100644 --- a/frontend/rust-lib/flowy-net/src/ws/local/local_server.rs +++ b/frontend/rust-lib/flowy-net/src/ws/local/local_server.rs @@ -13,18 +13,13 @@ use tokio::sync::{mpsc, mpsc::UnboundedSender}; pub struct LocalDocumentServer { pub doc_manager: Arc, sender: mpsc::UnboundedSender, - persistence: Arc, } impl LocalDocumentServer { pub fn new(sender: mpsc::UnboundedSender) -> Self { let persistence = Arc::new(LocalDocumentCloudPersistence::default()); - let doc_manager = Arc::new(ServerDocumentManager::new(persistence.clone())); - LocalDocumentServer { - doc_manager, - sender, - persistence, - } + let doc_manager = Arc::new(ServerDocumentManager::new(persistence)); + LocalDocumentServer { doc_manager, sender } } pub async fn handle_client_data( diff --git a/frontend/rust-lib/flowy-net/src/ws/local/persistence.rs b/frontend/rust-lib/flowy-net/src/ws/local/persistence.rs index b9e3a23e9c..8eca19d2dc 100644 --- a/frontend/rust-lib/flowy-net/src/ws/local/persistence.rs +++ b/frontend/rust-lib/flowy-net/src/ws/local/persistence.rs @@ -1,5 +1,5 @@ use crate::ws::local::DocumentCloudStorage; -use dashmap::DashMap; + use flowy_collaboration::{ entities::doc::DocumentInfo, errors::CollaborateError, diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs index 14958cf8fa..7a77c9632b 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs @@ -87,7 +87,12 @@ impl DocumentWebSocket for DocumentWebSocketImpl { struct WSMessageReceiverImpl(Arc); impl WSMessageReceiver for WSMessageReceiverImpl { fn source(&self) -> WSModule { WSModule::Doc } - fn receive_message(&self, msg: WebSocketRawMessage) { self.0.did_receive_data(Bytes::from(msg.data)); } + fn receive_message(&self, msg: WebSocketRawMessage) { + let receivers = self.0.clone(); + tokio::spawn(async move { + receivers.did_receive_data(Bytes::from(msg.data)).await; + }); + } } fn make_document_cloud_service(server_config: &ClientServerConfiguration) -> Arc { diff --git a/shared-lib/flowy-collaboration/src/sync/server.rs b/shared-lib/flowy-collaboration/src/sync/server.rs index ba5ba87344..5f144c31af 100644 --- a/shared-lib/flowy-collaboration/src/sync/server.rs +++ b/shared-lib/flowy-collaboration/src/sync/server.rs @@ -166,7 +166,7 @@ impl ServerDocumentManager { impl std::ops::Drop for ServerDocumentManager { fn drop(&mut self) { - log::debug!("ServerDocumentManager was drop"); + log::trace!("ServerDocumentManager was dropped"); } } @@ -241,7 +241,7 @@ impl OpenDocHandle { impl std::ops::Drop for OpenDocHandle { fn drop(&mut self) { - tracing::debug!("{} OpenDocHandle was drop", self.doc_id); + tracing::trace!("{} OpenDocHandle was dropped", self.doc_id); } } @@ -327,7 +327,7 @@ impl DocumentCommandQueue { impl std::ops::Drop for DocumentCommandQueue { fn drop(&mut self) { - tracing::debug!("{} DocumentCommandQueue was drop", self.doc_id); + tracing::trace!("{} DocumentCommandQueue was dropped", self.doc_id); } } diff --git a/shared-lib/lib-ws/src/ws.rs b/shared-lib/lib-ws/src/ws.rs index 386ec9b372..130064e6b9 100644 --- a/shared-lib/lib-ws/src/ws.rs +++ b/shared-lib/lib-ws/src/ws.rs @@ -160,7 +160,6 @@ async fn spawn_stream_and_handlers( pub struct WSHandlerFuture { #[pin] msg_rx: MsgReceiver, - // Opti: Hashmap would be better handlers: Handlers, }