diff --git a/frontend/rust-lib/flowy-document/src/services/controller.rs b/frontend/rust-lib/flowy-document/src/services/controller.rs index 7ea9a59fe1..7d14d166a0 100644 --- a/frontend/rust-lib/flowy-document/src/services/controller.rs +++ b/frontend/rust-lib/flowy-document/src/services/controller.rs @@ -171,7 +171,7 @@ impl OpenDocCache { pub(crate) fn remove(&self, id: &str) { let doc_id = id.to_string(); match self.get(id) { - Ok(editor) => editor.stop_sync(), + Ok(editor) => editor.stop(), Err(e) => log::error!("{}", e), } self.inner.remove(&doc_id); diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs index f179507f58..f64f05e77d 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs @@ -1,29 +1,27 @@ -use crate::{errors::FlowyError, module::DocumentUser, services::doc::*}; -use bytes::Bytes; -use flowy_collaboration::{ - core::document::history::UndoResult, - entities::{ - doc::DocDelta, - ws::{DocumentWSData, DocumentWSDataBuilder, DocumentWSDataType, NewDocumentUser}, +use crate::{ + errors::FlowyError, + module::DocumentUser, + services::doc::{ + web_socket::{initialize_document_web_socket, DocumentWebSocketContext, EditorWebSocket}, + *, }, - errors::CollaborateResult, }; +use bytes::Bytes; +use flowy_collaboration::{core::document::history::UndoResult, entities::doc::DocDelta, errors::CollaborateResult}; use flowy_database::ConnectionPool; use flowy_error::{internal_error, FlowyResult}; -use lib_infra::future::FutureResult; use lib_ot::{ core::Interval, - revision::{RevId, RevType, Revision, RevisionRange}, + revision::{RevId, RevType, Revision}, rich_text::{RichTextAttribute, RichTextDelta}, }; -use lib_ws::WSConnectState; -use std::{collections::VecDeque, sync::Arc}; -use tokio::sync::{broadcast, mpsc, mpsc::UnboundedSender, oneshot, RwLock}; +use std::sync::Arc; +use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot}; pub struct ClientDocEditor { pub doc_id: String, rev_manager: Arc, - editor_ws: Arc, + editor_ws: Arc, editor_cmd_sender: UnboundedSender, user: Arc, } @@ -42,34 +40,16 @@ impl ClientDocEditor { let doc_id = doc_id.to_string(); let user_id = user.user_id()?; let rev_manager = Arc::new(rev_manager); - let combined_sink = Arc::new(CombinedSink::new(rev_manager.clone())); - let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { - doc_id: doc_id.clone(), + + let context = DocumentWebSocketContext { + doc_id: doc_id.to_owned(), + user_id: user_id.clone(), editor_cmd_sender: editor_cmd_sender.clone(), rev_manager: rev_manager.clone(), - user: user.clone(), - combined_sink: combined_sink.clone(), - }); - let ws_stream_provider = Arc::new(DocumentWSSinkDataProviderAdapter(combined_sink.clone())); - let editor_ws = Arc::new(EditorWebSocket::new( - &doc_id, ws, - ws_stream_provider, - ws_stream_consumer, - )); - - // - notify_user_conn(&user_id, &doc_id, rev_manager.clone(), combined_sink.clone()).await; - - // - listen_document_ws_state( - &user_id, - &doc_id, - editor_ws.scribe_state(), - rev_manager.clone(), - combined_sink, - ); + }; + let editor_ws = initialize_document_web_socket(context).await; let editor = Arc::new(Self { doc_id, rev_manager, @@ -203,9 +183,9 @@ impl ClientDocEditor { } #[tracing::instrument(level = "debug", skip(self))] - pub fn stop_sync(&self) { self.editor_ws.stop(); } + pub fn stop(&self) { self.editor_ws.stop_web_socket(); } - pub(crate) fn ws_handler(&self) -> Arc { self.editor_ws.clone() } + pub(crate) fn ws_handler(&self) -> Arc { self.editor_ws.ws_handler() } } fn spawn_edit_queue(doc_id: &str, delta: RichTextDelta, _pool: Arc) -> UnboundedSender { @@ -215,262 +195,6 @@ fn spawn_edit_queue(doc_id: &str, delta: RichTextDelta, _pool: Arc, - rev_manager: Arc, - sink_data_provider: Arc, -) { - let user_id = user_id.to_owned(); - let doc_id = doc_id.to_owned(); - - tokio::spawn(async move { - while let Ok(state) = subscriber.recv().await { - match state { - WSConnectState::Init => {}, - WSConnectState::Connecting => {}, - WSConnectState::Connected => { - // self.notify_user_conn() - notify_user_conn(&user_id, &doc_id, rev_manager.clone(), sink_data_provider.clone()).await; - }, - WSConnectState::Disconnected => {}, - } - } - }); -} - -async fn notify_user_conn( - user_id: &str, - doc_id: &str, - rev_manager: Arc, - combined_sink: Arc, -) { - let need_notify = match combined_sink.front().await { - None => true, - Some(data) => data.ty != DocumentWSDataType::UserConnect, - }; - - if need_notify { - let new_connect = NewDocumentUser { - user_id: user_id.to_owned(), - doc_id: doc_id.to_owned(), - rev_id: rev_manager.latest_rev_id(), - }; - - let data = DocumentWSDataBuilder::build_new_document_user_message(doc_id, new_connect); - combined_sink.push_front(data).await; - } -} - -struct DocumentWebSocketSteamConsumerAdapter { - doc_id: String, - editor_cmd_sender: UnboundedSender, - rev_manager: Arc, - user: Arc, - combined_sink: Arc, -} - -impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter { - fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError> { - let user = self.user.clone(); - let rev_manager = self.rev_manager.clone(); - let edit_cmd_tx = self.editor_cmd_sender.clone(); - let combined_sink = self.combined_sink.clone(); - let doc_id = self.doc_id.clone(); - FutureResult::new(async move { - let user_id = user.user_id()?; - if let Some(revision) = handle_push_rev(&doc_id, &user_id, edit_cmd_tx, rev_manager, bytes).await? { - combined_sink.push_back(revision.into()).await; - } - Ok(()) - }) - } - - fn receive_ack(&self, id: String, ty: DocumentWSDataType) -> FutureResult<(), FlowyError> { - let combined_sink = self.combined_sink.clone(); - FutureResult::new(async move { combined_sink.ack(id, ty).await }) - } - - fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> FutureResult<(), FlowyError> { - FutureResult::new(async move { Ok(()) }) - } - - fn send_revision_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> { - let rev_manager = self.rev_manager.clone(); - let combined_sink = self.combined_sink.clone(); - FutureResult::new(async move { - let revision = rev_manager.mk_revisions(range).await?; - combined_sink.push_back(revision.into()).await; - Ok(()) - }) - } -} - -struct DocumentWSSinkDataProviderAdapter(Arc); -impl DocumentWSSinkDataProvider for DocumentWSSinkDataProviderAdapter { - fn next(&self) -> FutureResult, FlowyError> { - let combined_sink = self.0.clone(); - FutureResult::new(async move { combined_sink.next().await }) - } -} - -#[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))] -pub(crate) async fn handle_push_rev( - doc_id: &str, - user_id: &str, - edit_cmd_tx: UnboundedSender, - rev_manager: Arc, - bytes: Bytes, -) -> FlowyResult> { - // Transform the revision - let (ret, rx) = oneshot::channel::>(); - let _ = edit_cmd_tx.send(EditorCommand::ProcessRemoteRevision { bytes, ret }); - let TransformDeltas { - client_prime, - server_prime, - server_rev_id, - } = rx.await.map_err(internal_error)??; - - if rev_manager.rev_id() >= server_rev_id.value { - // Ignore this push revision if local_rev_id >= server_rev_id - return Ok(None); - } - - // compose delta - let (ret, rx) = oneshot::channel::>(); - let msg = EditorCommand::ComposeDelta { - delta: client_prime.clone(), - ret, - }; - let _ = edit_cmd_tx.send(msg); - let md5 = rx.await.map_err(internal_error)??; - - // update rev id - rev_manager.update_rev_id_counter_value(server_rev_id.clone().into()); - let (local_base_rev_id, local_rev_id) = rev_manager.next_rev_id(); - let delta_data = client_prime.to_bytes(); - // save the revision - let revision = Revision::new( - &doc_id, - local_base_rev_id, - local_rev_id, - delta_data, - RevType::Remote, - &user_id, - md5.clone(), - ); - - let _ = rev_manager.add_remote_revision(&revision).await?; - - // send the server_prime delta - let delta_data = server_prime.to_bytes(); - Ok(Some(Revision::new( - &doc_id, - local_base_rev_id, - local_rev_id, - delta_data, - RevType::Remote, - &user_id, - md5, - ))) -} - -#[derive(Clone)] -enum SourceType { - Shared, - Revision, -} - -#[derive(Clone)] -struct CombinedSink { - shared: Arc>>, - rev_manager: Arc, - source_ty: Arc>, -} - -impl CombinedSink { - fn new(rev_manager: Arc) -> Self { - CombinedSink { - shared: Arc::new(RwLock::new(VecDeque::new())), - rev_manager, - source_ty: Arc::new(RwLock::new(SourceType::Shared)), - } - } - - // FIXME: return Option<&DocumentWSData> would be better - async fn front(&self) -> Option { self.shared.read().await.front().cloned() } - - async fn push_front(&self, data: DocumentWSData) { self.shared.write().await.push_front(data); } - - async fn push_back(&self, data: DocumentWSData) { self.shared.write().await.push_back(data); } - - async fn next(&self) -> FlowyResult> { - let source_ty = self.source_ty.read().await.clone(); - match source_ty { - SourceType::Shared => match self.shared.read().await.front() { - None => { - *self.source_ty.write().await = SourceType::Revision; - Ok(None) - }, - Some(data) => { - tracing::debug!("[DocumentSinkDataProvider]: {}:{:?}", data.doc_id, data.ty); - Ok(Some(data.clone())) - }, - }, - SourceType::Revision => { - if !self.shared.read().await.is_empty() { - *self.source_ty.write().await = SourceType::Shared; - return Ok(None); - } - - match self.rev_manager.next_sync_revision().await? { - Some(rev) => { - tracing::debug!("[DocumentSinkDataProvider]: {}:{:?}", rev.doc_id, rev.rev_id); - Ok(Some(rev.into())) - }, - None => Ok(None), - } - }, - } - } - - async fn ack(&self, id: String, _ty: DocumentWSDataType) -> FlowyResult<()> { - // let _ = self.rev_manager.ack_revision(id).await?; - let source_ty = self.source_ty.read().await.clone(); - match source_ty { - SourceType::Shared => { - let should_pop = match self.shared.read().await.front() { - None => false, - Some(val) => { - if val.id == id { - true - } else { - tracing::error!("The front element's {} is not equal to the {}", val.id, id); - false - } - }, - }; - if should_pop { - let _ = self.shared.write().await.pop_front(); - } - }, - SourceType::Revision => { - match id.parse::() { - Ok(rev_id) => { - let _ = self.rev_manager.ack_revision(rev_id).await?; - }, - Err(e) => { - tracing::error!("Parse rev_id from {} failed. {}", id, e); - }, - }; - }, - } - - Ok(()) - } -} - #[cfg(feature = "flowy_unit_test")] impl ClientDocEditor { pub async fn doc_json(&self) -> FlowyResult { diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor_edit_cmd_queue.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor_cmd_queue.rs similarity index 100% rename from frontend/rust-lib/flowy-document/src/services/doc/edit/editor_edit_cmd_queue.rs rename to frontend/rust-lib/flowy-document/src/services/doc/edit/editor_cmd_queue.rs diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs index cfbe4f27f1..d186ee6957 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs @@ -1,7 +1,5 @@ mod editor; -mod editor_edit_cmd_queue; -mod editor_web_socket; +mod editor_cmd_queue; pub use editor::*; -pub(crate) use editor_edit_cmd_queue::*; -pub use editor_web_socket::*; +pub(crate) use editor_cmd_queue::*; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/mod.rs b/frontend/rust-lib/flowy-document/src/services/doc/mod.rs index 6b9355e13a..68353587eb 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/mod.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/mod.rs @@ -1,6 +1,6 @@ pub mod edit; pub mod revision; - +mod web_socket; pub use crate::services::ws_handlers::*; pub use edit::*; pub use revision::*; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor_web_socket.rs b/frontend/rust-lib/flowy-document/src/services/doc/web_socket/http_ws_impl.rs similarity index 90% rename from frontend/rust-lib/flowy-document/src/services/doc/edit/editor_web_socket.rs rename to frontend/rust-lib/flowy-document/src/services/doc/web_socket/http_ws_impl.rs index d4315b230f..e4ab97b5aa 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor_web_socket.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/web_socket/http_ws_impl.rs @@ -1,4 +1,7 @@ -use crate::services::doc::{DocumentWebSocket, DocumentWsHandler, SYNC_INTERVAL_IN_MILLIS}; +use crate::services::{ + doc::{web_socket::web_socket::EditorWebSocket, SYNC_INTERVAL_IN_MILLIS}, + ws_handlers::{DocumentWebSocket, DocumentWsHandler}, +}; use async_stream::stream; use bytes::Bytes; use flowy_collaboration::entities::ws::{DocumentWSData, DocumentWSDataType, NewDocumentUser}; @@ -18,7 +21,7 @@ use tokio::{ time::{interval, Duration}, }; -pub(crate) struct EditorWebSocket { +pub struct EditorHttpWebSocket { doc_id: String, data_provider: Arc, stream_consumer: Arc, @@ -29,8 +32,8 @@ pub(crate) struct EditorWebSocket { state: broadcast::Sender, } -impl EditorWebSocket { - pub(crate) fn new( +impl EditorHttpWebSocket { + pub fn new( doc_id: &str, ws: Arc, data_provider: Arc, @@ -40,7 +43,7 @@ impl EditorWebSocket { let (stop_sync_tx, _) = tokio::sync::broadcast::channel(2); let doc_id = doc_id.to_string(); let (state, _) = broadcast::channel(2); - let mut manager = EditorWebSocket { + let mut manager = EditorHttpWebSocket { doc_id, data_provider, stream_consumer, @@ -50,11 +53,11 @@ impl EditorWebSocket { stop_sync_tx, state, }; - manager.start_sync(); + manager.start_web_socket(); manager } - fn start_sync(&mut self) { + fn start_web_socket(&mut self) { let ws_msg_rx = self.ws_msg_rx.take().expect("Only take once"); let sink = DocumentWebSocketSink::new( &self.doc_id, @@ -72,16 +75,20 @@ impl EditorWebSocket { tokio::spawn(stream.run()); } - pub(crate) fn stop(&self) { + pub fn scribe_state(&self) -> broadcast::Receiver { self.state.subscribe() } +} + +impl EditorWebSocket for Arc { + fn stop_web_socket(&self) { if self.stop_sync_tx.send(()).is_ok() { tracing::debug!("{} stop sync", self.doc_id) } } - pub(crate) fn scribe_state(&self) -> broadcast::Receiver { self.state.subscribe() } + fn ws_handler(&self) -> Arc { self.clone() } } -impl DocumentWsHandler for EditorWebSocket { +impl DocumentWsHandler for EditorHttpWebSocket { fn receive(&self, doc_data: DocumentWSData) { match self.ws_msg_tx.send(doc_data) { Ok(_) => {}, @@ -104,7 +111,7 @@ pub trait DocumentWSSteamConsumer: Send + Sync { fn send_revision_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError>; } -pub(crate) struct DocumentWebSocketStream { +pub struct DocumentWebSocketStream { doc_id: String, consumer: Arc, ws_msg_rx: Option>, @@ -112,7 +119,7 @@ pub(crate) struct DocumentWebSocketStream { } impl DocumentWebSocketStream { - pub(crate) fn new( + pub fn new( doc_id: &str, consumer: Arc, ws_msg_rx: mpsc::UnboundedReceiver, @@ -197,15 +204,15 @@ impl DocumentWebSocketStream { } } -pub(crate) type Tick = (); -pub(crate) type SinkStopRx = broadcast::Receiver<()>; -pub(crate) type SinkStopTx = broadcast::Sender<()>; +pub type Tick = (); +pub type SinkStopRx = broadcast::Receiver<()>; +pub type SinkStopTx = broadcast::Sender<()>; pub trait DocumentWSSinkDataProvider: Send + Sync { fn next(&self) -> FutureResult, FlowyError>; } -pub(crate) struct DocumentWebSocketSink { +pub struct DocumentWebSocketSink { provider: Arc, ws_sender: Arc, stop_rx: Option, @@ -213,7 +220,7 @@ pub(crate) struct DocumentWebSocketSink { } impl DocumentWebSocketSink { - pub(crate) fn new( + pub fn new( doc_id: &str, provider: Arc, ws_sender: Arc, diff --git a/frontend/rust-lib/flowy-document/src/services/doc/web_socket/local_ws_impl.rs b/frontend/rust-lib/flowy-document/src/services/doc/web_socket/local_ws_impl.rs new file mode 100644 index 0000000000..6f2d1082d4 --- /dev/null +++ b/frontend/rust-lib/flowy-document/src/services/doc/web_socket/local_ws_impl.rs @@ -0,0 +1,18 @@ +use crate::services::doc::{web_socket::EditorWebSocket, DocumentWsHandler}; +use flowy_collaboration::entities::ws::DocumentWSData; +use lib_ws::WSConnectState; +use std::sync::Arc; + +pub(crate) struct EditorLocalWebSocket {} + +impl EditorWebSocket for Arc { + fn stop_web_socket(&self) {} + + fn ws_handler(&self) -> Arc { self.clone() } +} + +impl DocumentWsHandler for EditorLocalWebSocket { + fn receive(&self, _doc_data: DocumentWSData) {} + + fn connect_state_changed(&self, _state: &WSConnectState) {} +} diff --git a/frontend/rust-lib/flowy-document/src/services/doc/web_socket/mod.rs b/frontend/rust-lib/flowy-document/src/services/doc/web_socket/mod.rs new file mode 100644 index 0000000000..519cff4c02 --- /dev/null +++ b/frontend/rust-lib/flowy-document/src/services/doc/web_socket/mod.rs @@ -0,0 +1,7 @@ +#![allow(clippy::module_inception)] +mod http_ws_impl; +mod local_ws_impl; +mod web_socket; + +pub(crate) use http_ws_impl::*; +pub(crate) use web_socket::*; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/web_socket/web_socket.rs b/frontend/rust-lib/flowy-document/src/services/doc/web_socket/web_socket.rs new file mode 100644 index 0000000000..4d41d06259 --- /dev/null +++ b/frontend/rust-lib/flowy-document/src/services/doc/web_socket/web_socket.rs @@ -0,0 +1,333 @@ +use crate::services::doc::{ + web_socket::{ + local_ws_impl::EditorLocalWebSocket, + DocumentWSSinkDataProvider, + DocumentWSSteamConsumer, + EditorHttpWebSocket, + }, + DocumentMD5, + DocumentWebSocket, + DocumentWsHandler, + EditorCommand, + RevisionManager, + TransformDeltas, +}; +use bytes::Bytes; +use flowy_collaboration::{ + entities::ws::{DocumentWSData, DocumentWSDataBuilder, DocumentWSDataType, NewDocumentUser}, + errors::CollaborateResult, +}; +use flowy_error::{internal_error, FlowyError, FlowyResult}; +use lib_infra::future::FutureResult; +use lib_ot::revision::{RevType, Revision, RevisionRange}; +use lib_ws::WSConnectState; +use std::{collections::VecDeque, sync::Arc}; +use tokio::sync::{broadcast, mpsc::UnboundedSender, oneshot, RwLock}; + +pub(crate) trait EditorWebSocket: Send + Sync { + fn stop_web_socket(&self); + fn ws_handler(&self) -> Arc; +} + +pub(crate) struct DocumentWebSocketContext { + pub(crate) doc_id: String, + pub(crate) user_id: String, + pub(crate) editor_cmd_sender: UnboundedSender, + pub(crate) rev_manager: Arc, + pub(crate) ws: Arc, +} + +pub(crate) async fn initialize_document_web_socket(ctx: DocumentWebSocketContext) -> Arc { + if cfg!(feature = "http_server") { + let combined_sink = Arc::new(CombinedSink::new(ctx.rev_manager.clone())); + let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { + doc_id: ctx.doc_id.clone(), + user_id: ctx.user_id.clone(), + editor_cmd_sender: ctx.editor_cmd_sender.clone(), + rev_manager: ctx.rev_manager.clone(), + combined_sink: combined_sink.clone(), + }); + let ws_stream_provider = DocumentWSSinkDataProviderAdapter(combined_sink.clone()); + let editor_ws = Arc::new(EditorHttpWebSocket::new( + &ctx.doc_id, + ctx.ws.clone(), + Arc::new(ws_stream_provider), + ws_stream_consumer, + )); + + notify_user_conn( + &ctx.user_id, + &ctx.doc_id, + ctx.rev_manager.clone(), + combined_sink.clone(), + ) + .await; + + listen_document_ws_state( + &ctx.user_id, + &ctx.doc_id, + editor_ws.scribe_state(), + ctx.rev_manager.clone(), + combined_sink, + ); + + Arc::new(editor_ws) + } else { + Arc::new(Arc::new(EditorLocalWebSocket {})) + } +} + +async fn notify_user_conn( + user_id: &str, + doc_id: &str, + rev_manager: Arc, + combined_sink: Arc, +) { + let need_notify = match combined_sink.front().await { + None => true, + Some(data) => data.ty != DocumentWSDataType::UserConnect, + }; + + if need_notify { + let new_connect = NewDocumentUser { + user_id: user_id.to_owned(), + doc_id: doc_id.to_owned(), + rev_id: rev_manager.latest_rev_id(), + }; + + let data = DocumentWSDataBuilder::build_new_document_user_message(doc_id, new_connect); + combined_sink.push_front(data).await; + } +} + +fn listen_document_ws_state( + user_id: &str, + doc_id: &str, + mut subscriber: broadcast::Receiver, + rev_manager: Arc, + sink_data_provider: Arc, +) { + let user_id = user_id.to_owned(); + let doc_id = doc_id.to_owned(); + + tokio::spawn(async move { + while let Ok(state) = subscriber.recv().await { + match state { + WSConnectState::Init => {}, + WSConnectState::Connecting => {}, + WSConnectState::Connected => { + // self.notify_user_conn() + notify_user_conn(&user_id, &doc_id, rev_manager.clone(), sink_data_provider.clone()).await; + }, + WSConnectState::Disconnected => {}, + } + } + }); +} + +pub(crate) struct DocumentWebSocketSteamConsumerAdapter { + pub(crate) doc_id: String, + pub(crate) user_id: String, + pub(crate) editor_cmd_sender: UnboundedSender, + pub(crate) rev_manager: Arc, + pub(crate) combined_sink: Arc, +} + +impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter { + fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError> { + let user_id = self.user_id.clone(); + let rev_manager = self.rev_manager.clone(); + let edit_cmd_tx = self.editor_cmd_sender.clone(); + let combined_sink = self.combined_sink.clone(); + let doc_id = self.doc_id.clone(); + FutureResult::new(async move { + if let Some(revision) = handle_push_rev(&doc_id, &user_id, edit_cmd_tx, rev_manager, bytes).await? { + combined_sink.push_back(revision.into()).await; + } + Ok(()) + }) + } + + fn receive_ack(&self, id: String, ty: DocumentWSDataType) -> FutureResult<(), FlowyError> { + let combined_sink = self.combined_sink.clone(); + FutureResult::new(async move { combined_sink.ack(id, ty).await }) + } + + fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> FutureResult<(), FlowyError> { + FutureResult::new(async move { Ok(()) }) + } + + fn send_revision_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> { + let rev_manager = self.rev_manager.clone(); + let combined_sink = self.combined_sink.clone(); + FutureResult::new(async move { + let revision = rev_manager.mk_revisions(range).await?; + combined_sink.push_back(revision.into()).await; + Ok(()) + }) + } +} + +pub(crate) struct DocumentWSSinkDataProviderAdapter(pub(crate) Arc); +impl DocumentWSSinkDataProvider for DocumentWSSinkDataProviderAdapter { + fn next(&self) -> FutureResult, FlowyError> { + let combined_sink = self.0.clone(); + FutureResult::new(async move { combined_sink.next().await }) + } +} + +#[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))] +pub(crate) async fn handle_push_rev( + doc_id: &str, + user_id: &str, + edit_cmd_tx: UnboundedSender, + rev_manager: Arc, + bytes: Bytes, +) -> FlowyResult> { + // Transform the revision + let (ret, rx) = oneshot::channel::>(); + let _ = edit_cmd_tx.send(EditorCommand::ProcessRemoteRevision { bytes, ret }); + let TransformDeltas { + client_prime, + server_prime, + server_rev_id, + } = rx.await.map_err(internal_error)??; + + if rev_manager.rev_id() >= server_rev_id.value { + // Ignore this push revision if local_rev_id >= server_rev_id + return Ok(None); + } + + // compose delta + let (ret, rx) = oneshot::channel::>(); + let msg = EditorCommand::ComposeDelta { + delta: client_prime.clone(), + ret, + }; + let _ = edit_cmd_tx.send(msg); + let md5 = rx.await.map_err(internal_error)??; + + // update rev id + rev_manager.update_rev_id_counter_value(server_rev_id.clone().into()); + let (local_base_rev_id, local_rev_id) = rev_manager.next_rev_id(); + let delta_data = client_prime.to_bytes(); + // save the revision + let revision = Revision::new( + &doc_id, + local_base_rev_id, + local_rev_id, + delta_data, + RevType::Remote, + &user_id, + md5.clone(), + ); + + let _ = rev_manager.add_remote_revision(&revision).await?; + + // send the server_prime delta + let delta_data = server_prime.to_bytes(); + Ok(Some(Revision::new( + &doc_id, + local_base_rev_id, + local_rev_id, + delta_data, + RevType::Remote, + &user_id, + md5, + ))) +} + +#[derive(Clone)] +enum SourceType { + Shared, + Revision, +} + +#[derive(Clone)] +pub(crate) struct CombinedSink { + shared: Arc>>, + rev_manager: Arc, + source_ty: Arc>, +} + +impl CombinedSink { + pub(crate) fn new(rev_manager: Arc) -> Self { + CombinedSink { + shared: Arc::new(RwLock::new(VecDeque::new())), + rev_manager, + source_ty: Arc::new(RwLock::new(SourceType::Shared)), + } + } + + // FIXME: return Option<&DocumentWSData> would be better + pub(crate) async fn front(&self) -> Option { self.shared.read().await.front().cloned() } + + pub(crate) async fn push_front(&self, data: DocumentWSData) { self.shared.write().await.push_front(data); } + + async fn push_back(&self, data: DocumentWSData) { self.shared.write().await.push_back(data); } + + async fn next(&self) -> FlowyResult> { + let source_ty = self.source_ty.read().await.clone(); + match source_ty { + SourceType::Shared => match self.shared.read().await.front() { + None => { + *self.source_ty.write().await = SourceType::Revision; + Ok(None) + }, + Some(data) => { + tracing::debug!("[DocumentSinkDataProvider]: {}:{:?}", data.doc_id, data.ty); + Ok(Some(data.clone())) + }, + }, + SourceType::Revision => { + if !self.shared.read().await.is_empty() { + *self.source_ty.write().await = SourceType::Shared; + return Ok(None); + } + + match self.rev_manager.next_sync_revision().await? { + Some(rev) => { + tracing::debug!("[DocumentSinkDataProvider]: {}:{:?}", rev.doc_id, rev.rev_id); + Ok(Some(rev.into())) + }, + None => Ok(None), + } + }, + } + } + + async fn ack(&self, id: String, _ty: DocumentWSDataType) -> FlowyResult<()> { + // let _ = self.rev_manager.ack_revision(id).await?; + let source_ty = self.source_ty.read().await.clone(); + match source_ty { + SourceType::Shared => { + let should_pop = match self.shared.read().await.front() { + None => false, + Some(val) => { + if val.id == id { + true + } else { + tracing::error!("The front element's {} is not equal to the {}", val.id, id); + false + } + }, + }; + if should_pop { + let _ = self.shared.write().await.pop_front(); + } + }, + SourceType::Revision => { + match id.parse::() { + Ok(rev_id) => { + let _ = self.rev_manager.ack_revision(rev_id).await?; + }, + Err(e) => { + tracing::error!("Parse rev_id from {} failed. {}", id, e); + }, + }; + }, + } + + Ok(()) + } +}