diff --git a/frontend/rust-lib/flowy-document/src/module.rs b/frontend/rust-lib/flowy-document/src/module.rs index 865f09111a..c8b8372db7 100644 --- a/frontend/rust-lib/flowy-document/src/module.rs +++ b/frontend/rust-lib/flowy-document/src/module.rs @@ -1,7 +1,8 @@ use crate::{ errors::FlowyError, services::{ - doc::{controller::DocController, edit::ClientDocEditor, DocumentWsHandlers}, + controller::DocController, + doc::{edit::ClientDocEditor, DocumentWsHandlers}, server::construct_doc_server, }, }; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/controller.rs b/frontend/rust-lib/flowy-document/src/services/controller.rs similarity index 100% rename from frontend/rust-lib/flowy-document/src/services/doc/controller.rs rename to frontend/rust-lib/flowy-document/src/services/controller.rs diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs index 8f5a697c93..5aa7698eb9 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs @@ -2,7 +2,10 @@ use crate::{errors::FlowyError, module::DocumentUser, services::doc::*}; use bytes::Bytes; use flowy_collaboration::{ core::document::history::UndoResult, - entities::{doc::DocDelta, ws::DocumentWSData}, + entities::{ + doc::DocDelta, + ws::{DocumentConnected, DocumentWSData, DocumentWSDataType, WsDocumentDataBuilder}, + }, errors::CollaborateResult, }; use flowy_database::ConnectionPool; @@ -13,15 +16,15 @@ use lib_ot::{ revision::{RevId, RevType, Revision, RevisionRange}, rich_text::{RichTextAttribute, RichTextDelta}, }; +use lib_ws::WSConnectState; use std::{collections::VecDeque, sync::Arc}; -use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot, RwLock}; +use tokio::sync::{broadcast, mpsc, mpsc::UnboundedSender, oneshot, RwLock}; pub struct ClientDocEditor { pub doc_id: String, rev_manager: Arc, - ws_manager: Arc, - edit_cmd_tx: UnboundedSender, - sink_data_provider: SinkDataProvider, + editor_ws: Arc, + editor_cmd_sender: UnboundedSender, user: Arc, } @@ -35,8 +38,9 @@ impl ClientDocEditor { server: Arc, ) -> FlowyResult> { let delta = rev_manager.load_document(server).await?; - let edit_cmd_tx = spawn_edit_queue(doc_id, delta, pool.clone()); + let editor_cmd_sender = spawn_edit_queue(doc_id, delta, pool.clone()); let doc_id = doc_id.to_string(); + let user_id = user.user_id()?; let rev_manager = Arc::new(rev_manager); let sink_data_provider = Arc::new(RwLock::new(VecDeque::new())); let data_provider = Arc::new(DocumentSinkDataProviderAdapter { @@ -45,18 +49,27 @@ impl ClientDocEditor { }); let stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { doc_id: doc_id.clone(), - edit_cmd_tx: edit_cmd_tx.clone(), + editor_cmd_sender: editor_cmd_sender.clone(), rev_manager: rev_manager.clone(), user: user.clone(), sink_data_provider: sink_data_provider.clone(), }); - let ws_manager = Arc::new(WebSocketManager::new(&doc_id, ws, data_provider, stream_consumer)); + let editor_ws = Arc::new(EditorWebSocket::new(&doc_id, ws, data_provider, stream_consumer)); + notify_user_conn(&user_id, &doc_id, rev_manager.clone(), sink_data_provider.clone()).await; + + listen_document_ws_state( + &user_id, + &doc_id, + editor_ws.scribe_state(), + rev_manager.clone(), + sink_data_provider, + ); + let editor = Arc::new(Self { doc_id, rev_manager, - ws_manager, - edit_cmd_tx, - sink_data_provider, + editor_ws, + editor_cmd_sender, user, }); Ok(editor) @@ -64,12 +77,12 @@ impl ClientDocEditor { pub async fn insert(&self, index: usize, data: T) -> Result<(), FlowyError> { let (ret, rx) = oneshot::channel::>(); - let msg = EditCommand::Insert { + let msg = EditorCommand::Insert { index, data: data.to_string(), ret, }; - let _ = self.edit_cmd_tx.send(msg); + let _ = self.editor_cmd_sender.send(msg); let (delta, md5) = rx.await.map_err(internal_error)??; let _ = self.save_local_delta(delta, md5).await?; Ok(()) @@ -77,8 +90,8 @@ impl ClientDocEditor { pub async fn delete(&self, interval: Interval) -> Result<(), FlowyError> { let (ret, rx) = oneshot::channel::>(); - let msg = EditCommand::Delete { interval, ret }; - let _ = self.edit_cmd_tx.send(msg); + let msg = EditorCommand::Delete { interval, ret }; + let _ = self.editor_cmd_sender.send(msg); let (delta, md5) = rx.await.map_err(internal_error)??; let _ = self.save_local_delta(delta, md5).await?; Ok(()) @@ -86,12 +99,12 @@ impl ClientDocEditor { pub async fn format(&self, interval: Interval, attribute: RichTextAttribute) -> Result<(), FlowyError> { let (ret, rx) = oneshot::channel::>(); - let msg = EditCommand::Format { + let msg = EditorCommand::Format { interval, attribute, ret, }; - let _ = self.edit_cmd_tx.send(msg); + let _ = self.editor_cmd_sender.send(msg); let (delta, md5) = rx.await.map_err(internal_error)??; let _ = self.save_local_delta(delta, md5).await?; Ok(()) @@ -99,12 +112,12 @@ impl ClientDocEditor { pub async fn replace(&self, interval: Interval, data: T) -> Result<(), FlowyError> { let (ret, rx) = oneshot::channel::>(); - let msg = EditCommand::Replace { + let msg = EditorCommand::Replace { interval, data: data.to_string(), ret, }; - let _ = self.edit_cmd_tx.send(msg); + let _ = self.editor_cmd_sender.send(msg); let (delta, md5) = rx.await.map_err(internal_error)??; let _ = self.save_local_delta(delta, md5).await?; Ok(()) @@ -112,38 +125,38 @@ impl ClientDocEditor { pub async fn can_undo(&self) -> bool { let (ret, rx) = oneshot::channel::(); - let msg = EditCommand::CanUndo { ret }; - let _ = self.edit_cmd_tx.send(msg); + let msg = EditorCommand::CanUndo { ret }; + let _ = self.editor_cmd_sender.send(msg); rx.await.unwrap_or(false) } pub async fn can_redo(&self) -> bool { let (ret, rx) = oneshot::channel::(); - let msg = EditCommand::CanRedo { ret }; - let _ = self.edit_cmd_tx.send(msg); + let msg = EditorCommand::CanRedo { ret }; + let _ = self.editor_cmd_sender.send(msg); rx.await.unwrap_or(false) } pub async fn undo(&self) -> Result { let (ret, rx) = oneshot::channel::>(); - let msg = EditCommand::Undo { ret }; - let _ = self.edit_cmd_tx.send(msg); + let msg = EditorCommand::Undo { ret }; + let _ = self.editor_cmd_sender.send(msg); let r = rx.await.map_err(internal_error)??; Ok(r) } pub async fn redo(&self) -> Result { let (ret, rx) = oneshot::channel::>(); - let msg = EditCommand::Redo { ret }; - let _ = self.edit_cmd_tx.send(msg); + let msg = EditorCommand::Redo { ret }; + let _ = self.editor_cmd_sender.send(msg); let r = rx.await.map_err(internal_error)??; Ok(r) } pub async fn delta(&self) -> FlowyResult { let (ret, rx) = oneshot::channel::>(); - let msg = EditCommand::ReadDoc { ret }; - let _ = self.edit_cmd_tx.send(msg); + let msg = EditorCommand::ReadDoc { ret }; + let _ = self.editor_cmd_sender.send(msg); let data = rx.await.map_err(internal_error)??; Ok(DocDelta { @@ -173,11 +186,11 @@ impl ClientDocEditor { pub(crate) async fn composing_local_delta(&self, data: Bytes) -> Result<(), FlowyError> { let delta = RichTextDelta::from_bytes(&data)?; let (ret, rx) = oneshot::channel::>(); - let msg = EditCommand::ComposeDelta { + let msg = EditorCommand::ComposeDelta { delta: delta.clone(), ret, }; - let _ = self.edit_cmd_tx.send(msg); + let _ = self.editor_cmd_sender.send(msg); let md5 = rx.await.map_err(internal_error)??; let _ = self.save_local_delta(delta, md5).await?; @@ -185,21 +198,21 @@ impl ClientDocEditor { } #[tracing::instrument(level = "debug", skip(self))] - pub fn stop_sync(&self) { self.ws_manager.stop(); } + pub fn stop_sync(&self) { self.editor_ws.stop(); } - pub(crate) fn ws_handler(&self) -> Arc { self.ws_manager.clone() } + pub(crate) fn ws_handler(&self) -> Arc { self.editor_ws.clone() } } -fn spawn_edit_queue(doc_id: &str, delta: RichTextDelta, _pool: Arc) -> UnboundedSender { - let (sender, receiver) = mpsc::unbounded_channel::(); - let actor = EditCommandQueue::new(doc_id, delta, receiver); +fn spawn_edit_queue(doc_id: &str, delta: RichTextDelta, _pool: Arc) -> UnboundedSender { + let (sender, receiver) = mpsc::unbounded_channel::(); + let actor = EditorCommandQueue::new(doc_id, delta, receiver); tokio::spawn(actor.run()); sender } struct DocumentWebSocketSteamConsumerAdapter { doc_id: String, - edit_cmd_tx: UnboundedSender, + editor_cmd_sender: UnboundedSender, rev_manager: Arc, user: Arc, sink_data_provider: SinkDataProvider, @@ -209,7 +222,7 @@ impl DocumentWebSocketSteamConsumer for DocumentWebSocketSteamConsumerAdapter { fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError> { let user = self.user.clone(); let rev_manager = self.rev_manager.clone(); - let edit_cmd_tx = self.edit_cmd_tx.clone(); + let edit_cmd_tx = self.editor_cmd_sender.clone(); let sink_data_provider = self.sink_data_provider.clone(); let doc_id = self.doc_id.clone(); FutureResult::new(async move { @@ -240,6 +253,54 @@ impl DocumentWebSocketSteamConsumer for DocumentWebSocketSteamConsumerAdapter { } } +async fn notify_user_conn( + user_id: &str, + doc_id: &str, + rev_manager: Arc, + sink_data_provider: SinkDataProvider, +) { + let need_notify = match sink_data_provider.read().await.front() { + None => true, + Some(data) => data.ty != DocumentWSDataType::UserConnect, + }; + + if need_notify { + let document_conn = DocumentConnected { + user_id: user_id.to_owned(), + doc_id: doc_id.to_owned(), + rev_id: rev_manager.latest_rev_id(), + }; + + let data = WsDocumentDataBuilder::build_document_conn_message(doc_id, document_conn); + sink_data_provider.write().await.push_front(data); + } +} + +fn listen_document_ws_state( + user_id: &str, + doc_id: &str, + mut subscriber: broadcast::Receiver, + rev_manager: Arc, + sink_data_provider: SinkDataProvider, +) { + let user_id = user_id.to_owned(); + let doc_id = doc_id.to_owned(); + + tokio::spawn(async move { + while let Ok(state) = subscriber.recv().await { + match state { + WSConnectState::Init => {}, + WSConnectState::Connecting => {}, + WSConnectState::Connected => { + // self.notify_user_conn() + notify_user_conn(&user_id, &doc_id, rev_manager.clone(), sink_data_provider.clone()).await; + }, + WSConnectState::Disconnected => {}, + } + } + }); +} + type SinkDataProvider = Arc>>; struct DocumentSinkDataProviderAdapter { @@ -256,7 +317,7 @@ impl DocumentSinkDataProvider for DocumentSinkDataProviderAdapter { if data_provider.read().await.is_empty() { match rev_manager.next_sync_revision().await? { Some(rev) => { - tracing::debug!("[DocumentSinkDataProvider]: revision: {}:{:?}", rev.doc_id, rev.rev_id); + tracing::debug!("[DocumentSinkDataProvider]: {}:{:?}", rev.doc_id, rev.rev_id); Ok(Some(rev.into())) }, None => Ok(None), @@ -278,13 +339,13 @@ impl DocumentSinkDataProvider for DocumentSinkDataProviderAdapter { pub(crate) async fn handle_push_rev( doc_id: &str, user_id: &str, - edit_cmd_tx: UnboundedSender, + edit_cmd_tx: UnboundedSender, rev_manager: Arc, bytes: Bytes, ) -> FlowyResult> { // Transform the revision let (ret, rx) = oneshot::channel::>(); - let _ = edit_cmd_tx.send(EditCommand::ProcessRemoteRevision { bytes, ret }); + let _ = edit_cmd_tx.send(EditorCommand::ProcessRemoteRevision { bytes, ret }); let TransformDeltas { client_prime, server_prime, @@ -298,7 +359,7 @@ pub(crate) async fn handle_push_rev( // compose delta let (ret, rx) = oneshot::channel::>(); - let msg = EditCommand::ComposeDelta { + let msg = EditorCommand::ComposeDelta { delta: client_prime.clone(), ret, }; @@ -339,16 +400,16 @@ pub(crate) async fn handle_push_rev( impl ClientDocEditor { pub async fn doc_json(&self) -> FlowyResult { let (ret, rx) = oneshot::channel::>(); - let msg = EditCommand::ReadDoc { ret }; - let _ = self.edit_cmd_tx.send(msg); + let msg = EditorCommand::ReadDoc { ret }; + let _ = self.editor_cmd_sender.send(msg); let s = rx.await.map_err(internal_error)??; Ok(s) } pub async fn doc_delta(&self) -> FlowyResult { let (ret, rx) = oneshot::channel::>(); - let msg = EditCommand::ReadDocDelta { ret }; - let _ = self.edit_cmd_tx.send(msg); + let msg = EditorCommand::ReadDocDelta { ret }; + let _ = self.editor_cmd_sender.send(msg); let delta = rx.await.map_err(internal_error)??; Ok(delta) } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/edit_queue.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor_edit_cmd_queue.rs similarity index 86% rename from frontend/rust-lib/flowy-document/src/services/doc/edit/edit_queue.rs rename to frontend/rust-lib/flowy-document/src/services/doc/edit/editor_edit_cmd_queue.rs index faa239dd3f..3cf6d25034 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/edit_queue.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor_edit_cmd_queue.rs @@ -14,14 +14,14 @@ use lib_ot::{ use std::{convert::TryFrom, sync::Arc}; use tokio::sync::{mpsc, oneshot, RwLock}; -pub(crate) struct EditCommandQueue { +pub(crate) struct EditorCommandQueue { doc_id: String, document: Arc>, - receiver: Option>, + receiver: Option>, } -impl EditCommandQueue { - pub(crate) fn new(doc_id: &str, delta: RichTextDelta, receiver: mpsc::UnboundedReceiver) -> Self { +impl EditorCommandQueue { + pub(crate) fn new(doc_id: &str, delta: RichTextDelta, receiver: mpsc::UnboundedReceiver) -> Self { let document = Arc::new(RwLock::new(Document::from_delta(delta))); Self { doc_id: doc_id.to_owned(), @@ -50,13 +50,13 @@ impl EditCommandQueue { .await; } - async fn handle_message(&self, msg: EditCommand) -> Result<(), FlowyError> { + async fn handle_message(&self, msg: EditorCommand) -> Result<(), FlowyError> { match msg { - EditCommand::ComposeDelta { delta, ret } => { + EditorCommand::ComposeDelta { delta, ret } => { let result = self.composed_delta(delta).await; let _ = ret.send(result); }, - EditCommand::ProcessRemoteRevision { bytes, ret } => { + EditorCommand::ProcessRemoteRevision { bytes, ret } => { let f = || async { let revision = Revision::try_from(bytes)?; let delta = RichTextDelta::from_bytes(&revision.delta_data)?; @@ -75,19 +75,19 @@ impl EditCommandQueue { }; let _ = ret.send(f().await); }, - EditCommand::Insert { index, data, ret } => { + EditorCommand::Insert { index, data, ret } => { let mut write_guard = self.document.write().await; let delta = write_guard.insert(index, data)?; let md5 = write_guard.md5(); let _ = ret.send(Ok((delta, md5))); }, - EditCommand::Delete { interval, ret } => { + EditorCommand::Delete { interval, ret } => { let mut write_guard = self.document.write().await; let delta = write_guard.delete(interval)?; let md5 = write_guard.md5(); let _ = ret.send(Ok((delta, md5))); }, - EditCommand::Format { + EditorCommand::Format { interval, attribute, ret, @@ -97,31 +97,31 @@ impl EditCommandQueue { let md5 = write_guard.md5(); let _ = ret.send(Ok((delta, md5))); }, - EditCommand::Replace { interval, data, ret } => { + EditorCommand::Replace { interval, data, ret } => { let mut write_guard = self.document.write().await; let delta = write_guard.replace(interval, data)?; let md5 = write_guard.md5(); let _ = ret.send(Ok((delta, md5))); }, - EditCommand::CanUndo { ret } => { + EditorCommand::CanUndo { ret } => { let _ = ret.send(self.document.read().await.can_undo()); }, - EditCommand::CanRedo { ret } => { + EditorCommand::CanRedo { ret } => { let _ = ret.send(self.document.read().await.can_redo()); }, - EditCommand::Undo { ret } => { + EditorCommand::Undo { ret } => { let result = self.document.write().await.undo(); let _ = ret.send(result); }, - EditCommand::Redo { ret } => { + EditorCommand::Redo { ret } => { let result = self.document.write().await.redo(); let _ = ret.send(result); }, - EditCommand::ReadDoc { ret } => { + EditorCommand::ReadDoc { ret } => { let data = self.document.read().await.to_json(); let _ = ret.send(Ok(data)); }, - EditCommand::ReadDocDelta { ret } => { + EditorCommand::ReadDocDelta { ret } => { let delta = self.document.read().await.delta().clone(); let _ = ret.send(Ok(delta)); }, @@ -151,7 +151,7 @@ pub(crate) type NewDelta = (RichTextDelta, String); pub(crate) type DocumentMD5 = String; #[allow(dead_code)] -pub(crate) enum EditCommand { +pub(crate) enum EditorCommand { ComposeDelta { delta: RichTextDelta, ret: Ret, diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/web_socket.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor_web_socket.rs similarity index 86% rename from frontend/rust-lib/flowy-document/src/services/doc/edit/web_socket.rs rename to frontend/rust-lib/flowy-document/src/services/doc/edit/editor_web_socket.rs index adcdf39a42..1f8ba327f1 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/web_socket.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor_web_socket.rs @@ -18,7 +18,7 @@ use tokio::{ time::{interval, Duration}, }; -pub(crate) struct WebSocketManager { +pub(crate) struct EditorWebSocket { doc_id: String, data_provider: Arc, stream_consumer: Arc, @@ -26,9 +26,10 @@ pub(crate) struct WebSocketManager { ws_msg_tx: UnboundedSender, ws_msg_rx: Option>, stop_sync_tx: SinkStopTx, + state: broadcast::Sender, } -impl WebSocketManager { +impl EditorWebSocket { pub(crate) fn new( doc_id: &str, ws: Arc, @@ -38,7 +39,8 @@ impl WebSocketManager { let (ws_msg_tx, ws_msg_rx) = mpsc::unbounded_channel(); let (stop_sync_tx, _) = tokio::sync::broadcast::channel(2); let doc_id = doc_id.to_string(); - let mut manager = WebSocketManager { + let (state, _) = broadcast::channel(2); + let mut manager = EditorWebSocket { doc_id, data_provider, stream_consumer, @@ -46,6 +48,7 @@ impl WebSocketManager { ws_msg_tx, ws_msg_rx: Some(ws_msg_rx), stop_sync_tx, + state, }; manager.start_sync(); manager @@ -67,7 +70,6 @@ impl WebSocketManager { ); tokio::spawn(sink.run()); tokio::spawn(stream.run()); - self.notify_user_conn(); } pub(crate) fn stop(&self) { @@ -76,24 +78,10 @@ impl WebSocketManager { } } - #[tracing::instrument(level = "debug", skip(self))] - fn notify_user_conn(&self) { - // let rev_id: RevId = self.rev_manager.rev_id().into(); - // if let Ok(user_id) = self.user.user_id() { - // let action = OpenDocAction::new(&user_id, &self.doc_id, &rev_id, - // &self.ws_sender); let strategy = - // ExponentialBackoff::from_millis(50).take(3); let retry = - // Retry::spawn(strategy, action); tokio::spawn(async move { - // match retry.await { - // Ok(_) => log::debug!("Notify open doc success"), - // Err(e) => log::error!("Notify open doc failed: {}", e), - // } - // }); - // } - } + pub(crate) fn scribe_state(&self) -> broadcast::Receiver { self.state.subscribe() } } -impl DocumentWsHandler for WebSocketManager { +impl DocumentWsHandler for EditorWebSocket { fn receive(&self, doc_data: DocumentWSData) { match self.ws_msg_tx.send(doc_data) { Ok(_) => {}, @@ -102,11 +90,9 @@ impl DocumentWsHandler for WebSocketManager { } fn connect_state_changed(&self, state: &WSConnectState) { - match state { - WSConnectState::Init => {}, - WSConnectState::Connecting => {}, - WSConnectState::Connected => self.notify_user_conn(), - WSConnectState::Disconnected => {}, + match self.state.send(state.clone()) { + Ok(_) => {}, + Err(e) => tracing::error!("{}", e), } } } @@ -199,7 +185,9 @@ impl DocumentWebSocketStream { let rev_id = RevId::try_from(bytes)?; let _ = self.consumer.receive_ack_revision(rev_id.into()).await; }, - DocumentWSDataType::UserConnect => {}, + DocumentWSDataType::UserConnect => { + // Notify the user that someone has connected to this document + }, } Ok(()) @@ -270,7 +258,7 @@ impl DocumentWebSocketSink { async fn send_next_revision(&self) -> FlowyResult<()> { match self.provider.next().await? { None => { - tracing::debug!("Finish synchronizing revisions"); + tracing::trace!("Finish synchronizing revisions"); Ok(()) }, Some(data) => { diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs index de21adfac6..cfbe4f27f1 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs @@ -1,8 +1,7 @@ -mod edit_queue; mod editor; -mod model; -mod web_socket; +mod editor_edit_cmd_queue; +mod editor_web_socket; -pub(crate) use edit_queue::*; pub use editor::*; -pub use web_socket::*; +pub(crate) use editor_edit_cmd_queue::*; +pub use editor_web_socket::*; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/model.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/model.rs deleted file mode 100644 index bd78e32c15..0000000000 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/model.rs +++ /dev/null @@ -1,48 +0,0 @@ -#![allow(clippy::all)] -#![cfg_attr(rustfmt, rustfmt::skip)] -use crate::{errors::FlowyError}; -use futures::future::BoxFuture; -use lib_infra::retry::Action; -use lib_ot::revision::RevId; -use std::{future, sync::Arc}; -use crate::services::doc::DocumentWebSocket; - -#[allow(dead_code)] -pub(crate) struct OpenDocAction { - user_id: String, - rev_id: RevId, - doc_id: String, - ws: Arc, -} - -#[allow(dead_code)] -impl OpenDocAction { - pub(crate) fn new(user_id: &str, doc_id: &str, rev_id: &RevId, ws: &Arc) -> Self { - Self { - user_id: user_id.to_owned(), - rev_id: rev_id.clone(), - doc_id: doc_id.to_owned(), - ws: ws.clone(), - } - } -} - -impl Action for OpenDocAction { - type Future = BoxFuture<'static, Result>; - type Item = (); - type Error = FlowyError; - - fn run(&mut self) -> Self::Future { - // let new_doc_user = NewDocUser { - // user_id: self.user_id.clone(), - // rev_id: self.rev_id.clone().into(), - // doc_id: self.doc_id.clone(), - // }; - // - // match self.ws.send(new_doc_user.into()) { - // Ok(_) => Box::pin(future::ready(Ok::<(), DocError>(()))), - // Err(e) => Box::pin(future::ready(Err::<(), DocError>(e))), - // } - Box::pin(future::ready(Ok::<(), FlowyError>(()))) - } -} diff --git a/frontend/rust-lib/flowy-document/src/services/doc/mod.rs b/frontend/rust-lib/flowy-document/src/services/doc/mod.rs index c276f674d3..6b9355e13a 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/mod.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/mod.rs @@ -1,11 +1,8 @@ pub mod edit; pub mod revision; -pub(crate) mod controller; - -mod ws_handlers; +pub use crate::services::ws_handlers::*; pub use edit::*; pub use revision::*; -pub use ws_handlers::*; -pub const SYNC_INTERVAL_IN_MILLIS: u64 = 500; +pub const SYNC_INTERVAL_IN_MILLIS: u64 = 1000; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs index ce85c8c4dd..b3d12b56ae 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs @@ -3,26 +3,22 @@ use crate::{ services::doc::revision::{ cache::{ disk::{Persistence, RevisionDiskCache}, - memory::{RevisionMemoryCache, RevisionMemoryCacheMissing}, + memory::{RevisionMemoryCache, RevisionMemoryCacheDelegate}, sync::RevisionSyncSeq, }, RevisionRecord, }, + sql_tables::{RevChangeset, RevTableState}, }; - use flowy_database::ConnectionPool; use flowy_error::{internal_error, FlowyResult}; use lib_infra::future::FutureResult; -use lib_ot::{ - core::Operation, - revision::{RevState, Revision, RevisionRange}, - rich_text::RichTextDelta, -}; -use std::sync::Arc; -use tokio::{ - sync::RwLock, - task::{spawn_blocking, JoinHandle}, +use lib_ot::revision::{RevState, Revision, RevisionRange}; +use std::sync::{ + atomic::{AtomicI64, Ordering::SeqCst}, + Arc, }; +use tokio::task::spawn_blocking; type DocRevisionDiskCache = dyn RevisionDiskCache; @@ -31,7 +27,7 @@ pub struct RevisionCache { pub disk_cache: Arc, memory_cache: Arc, sync_seq: Arc, - defer_save: RwLock>>, + latest_rev_id: AtomicI64, } impl RevisionCache { @@ -45,7 +41,7 @@ impl RevisionCache { disk_cache, memory_cache, sync_seq, - defer_save: RwLock::new(None), + latest_rev_id: AtomicI64::new(0), } } @@ -54,13 +50,14 @@ impl RevisionCache { if self.memory_cache.contains(&revision.rev_id) { return Err(FlowyError::internal().context(format!("Duplicate revision id: {}", revision.rev_id))); } + let rev_id = revision.rev_id; let record = RevisionRecord { revision, state: RevState::StateLocal, }; let _ = self.memory_cache.add_revision(&record).await; self.sync_seq.add_revision(record).await?; - self.save_revisions().await; + let _ = self.latest_rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id)); Ok(()) } @@ -69,58 +66,63 @@ impl RevisionCache { if self.memory_cache.contains(&revision.rev_id) { return Err(FlowyError::internal().context(format!("Duplicate revision id: {}", revision.rev_id))); } + let rev_id = revision.rev_id; let record = RevisionRecord { revision, - state: RevState::StateLocal, + state: RevState::Acked, }; self.memory_cache.add_revision(&record).await; - self.save_revisions().await; + let _ = self.latest_rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id)); Ok(()) } #[tracing::instrument(level = "debug", skip(self, rev_id), fields(rev_id = %rev_id))] pub async fn ack_revision(&self, rev_id: i64) { - self.sync_seq.ack_revision(&rev_id).await; - self.save_revisions().await; - } - - pub async fn get_revision(&self, _doc_id: &str, rev_id: i64) -> Option { - self.memory_cache.get_revision(&rev_id).await - } - - async fn save_revisions(&self) { - // https://github.com/async-graphql/async-graphql/blob/ed8449beec3d9c54b94da39bab33cec809903953/src/dataloader/mod.rs#L362 - if let Some(handler) = self.defer_save.write().await.take() { - handler.abort(); + if self.sync_seq.ack_revision(&rev_id).await.is_ok() { + self.memory_cache.ack_revision(&rev_id).await; } + } - // if self.sync_seq.is_empty() { - // return; - // } + pub fn latest_rev_id(&self) -> i64 { self.latest_rev_id.load(SeqCst) } - // let memory_cache = self.sync_seq.clone(); - // let disk_cache = self.disk_cache.clone(); - // *self.defer_save.write().await = Some(tokio::spawn(async move { - // tokio::time::sleep(Duration::from_millis(300)).await; - // let (ids, records) = memory_cache.revisions(); - // match disk_cache.create_revisions(records) { - // Ok(_) => { - // memory_cache.remove_revisions(ids); - // }, - // Err(e) => log::error!("Save revision failed: {:?}", e), - // } - // })); + pub async fn get_revision(&self, doc_id: &str, rev_id: i64) -> Option { + match self.memory_cache.get_revision(&rev_id).await { + None => match self.disk_cache.read_revision(&self.doc_id, rev_id) { + Ok(Some(revision)) => Some(revision), + Ok(None) => { + tracing::warn!("Can't find revision in {} with rev_id: {}", doc_id, rev_id); + None + }, + Err(e) => { + tracing::error!("{}", e); + None + }, + }, + Some(revision) => Some(revision), + } } pub async fn revisions_in_range(&self, range: RevisionRange) -> FlowyResult> { - let records = self.memory_cache.get_revisions_in_range(&range).await?; + let mut records = self.memory_cache.get_revisions_in_range(&range).await?; + let range_len = range.len() as usize; + if records.len() != range_len { + let disk_cache = self.disk_cache.clone(); + let doc_id = self.doc_id.clone(); + records = spawn_blocking(move || disk_cache.revisions_in_range(&doc_id, &range)) + .await + .map_err(internal_error)??; + + if records.len() != range_len { + log::error!("Revisions len is not equal to range required"); + } + } Ok(records .into_iter() .map(|record| record.revision) .collect::>()) } - pub(crate) fn next_revision(&self) -> FutureResult, FlowyError> { + pub(crate) fn next_sync_revision(&self) -> FutureResult, FlowyError> { let sync_seq = self.sync_seq.clone(); let disk_cache = self.disk_cache.clone(); let doc_id = self.doc_id.clone(); @@ -139,32 +141,20 @@ impl RevisionCache { } } -impl RevisionMemoryCacheMissing for Arc { - fn get_revision_record(&self, doc_id: &str, rev_id: i64) -> Result, FlowyError> { - match self.read_revision(&doc_id, rev_id)? { - None => { - tracing::warn!("Can't find revision in {} with rev_id: {}", doc_id, rev_id); - Ok(None) - }, - Some(record) => Ok(Some(record)), +impl RevisionMemoryCacheDelegate for Arc { + fn receive_checkpoint(&self, records: Vec) -> FlowyResult<()> { self.create_revisions(records) } + + fn receive_ack(&self, doc_id: &str, rev_id: i64) { + let changeset = RevChangeset { + doc_id: doc_id.to_string(), + rev_id: rev_id.into(), + state: RevTableState::Acked, + }; + match self.update_revisions(vec![changeset]) { + Ok(_) => {}, + Err(e) => tracing::error!("{}", e), } } - - fn get_revision_records_with_range( - &self, - doc_id: &str, - range: RevisionRange, - ) -> FutureResult, FlowyError> { - let disk_cache = self.clone(); - let doc_id = doc_id.to_owned(); - FutureResult::new(async move { - let records = spawn_blocking(move || disk_cache.revisions_in_range(&doc_id, &range)) - .await - .map_err(internal_error)??; - - Ok::, FlowyError>(records) - }) - } } #[cfg(feature = "flowy_unit_test")] diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/disk.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/disk.rs index 1cbde7dd91..b65d44b5ee 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/disk.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/disk.rs @@ -1,8 +1,8 @@ use crate::services::doc::revision::RevisionRecord; -use crate::sql_tables::RevTableSql; +use crate::sql_tables::{RevChangeset, RevTableSql}; use flowy_database::ConnectionPool; -use flowy_error::{internal_error, FlowyError}; +use flowy_error::{internal_error, FlowyError, FlowyResult}; use lib_ot::revision::RevisionRange; use std::{fmt::Debug, sync::Arc}; @@ -12,6 +12,7 @@ pub trait RevisionDiskCache: Sync + Send { fn revisions_in_range(&self, doc_id: &str, range: &RevisionRange) -> Result, Self::Error>; fn read_revision(&self, doc_id: &str, rev_id: i64) -> Result, Self::Error>; fn read_revisions(&self, doc_id: &str) -> Result, Self::Error>; + fn update_revisions(&self, changesets: Vec) -> FlowyResult<()>; } pub(crate) struct Persistence { @@ -47,6 +48,17 @@ impl RevisionDiskCache for Persistence { let some = RevTableSql::read_rev_tables(&self.user_id, doc_id, &*conn)?; Ok(some) } + + fn update_revisions(&self, changesets: Vec) -> FlowyResult<()> { + let conn = &*self.pool.get().map_err(internal_error)?; + let _ = conn.immediate_transaction::<_, FlowyError, _>(|| { + for changeset in changesets { + let _ = RevTableSql::update_rev_table(changeset, conn)?; + } + Ok(()) + })?; + Ok(()) + } } impl Persistence { diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/memory.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/memory.rs index a468dffc9e..9c8df20824 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/memory.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/memory.rs @@ -1,87 +1,116 @@ use crate::services::doc::RevisionRecord; use dashmap::DashMap; -use flowy_error::FlowyError; -use lib_infra::future::FutureResult; -use lib_ot::revision::RevisionRange; -use std::sync::Arc; -use tokio::sync::RwLock; +use flowy_error::{FlowyError, FlowyResult}; -pub(crate) trait RevisionMemoryCacheMissing: Send + Sync { - fn get_revision_record(&self, doc_id: &str, rev_id: i64) -> Result, FlowyError>; - fn get_revision_records_with_range( - &self, - doc_id: &str, - range: RevisionRange, - ) -> FutureResult, FlowyError>; +use lib_ot::revision::RevisionRange; +use std::{sync::Arc, time::Duration}; +use tokio::{sync::RwLock, task::JoinHandle}; + +pub(crate) trait RevisionMemoryCacheDelegate: Send + Sync { + fn receive_checkpoint(&self, records: Vec) -> FlowyResult<()>; + fn receive_ack(&self, doc_id: &str, rev_id: i64); } pub(crate) struct RevisionMemoryCache { doc_id: String, revs_map: Arc>, - rev_loader: Arc, - revs_order: Arc>>, + delegate: Arc, + pending_write_revs: Arc>>, + defer_save: RwLock>>, } -// TODO: remove outdated revisions to reduce memory usage impl RevisionMemoryCache { - pub(crate) fn new(doc_id: &str, rev_loader: Arc) -> Self { + pub(crate) fn new(doc_id: &str, delegate: Arc) -> Self { RevisionMemoryCache { doc_id: doc_id.to_owned(), revs_map: Arc::new(DashMap::new()), - rev_loader, - revs_order: Arc::new(RwLock::new(vec![])), + delegate, + pending_write_revs: Arc::new(RwLock::new(vec![])), + defer_save: RwLock::new(None), } } - pub(crate) async fn is_empty(&self) -> bool { self.revs_order.read().await.is_empty() } - pub(crate) fn contains(&self, rev_id: &i64) -> bool { self.revs_map.contains_key(rev_id) } pub(crate) async fn add_revision(&self, record: &RevisionRecord) { - if let Some(rev_id) = self.revs_order.read().await.last() { + if let Some(rev_id) = self.pending_write_revs.read().await.last() { if *rev_id >= record.revision.rev_id { tracing::error!("Duplicated revision added to memory_cache"); return; } } + // FIXME: Remove outdated revisions to reduce memory usage self.revs_map.insert(record.revision.rev_id, record.clone()); - self.revs_order.write().await.push(record.revision.rev_id); + self.pending_write_revs.write().await.push(record.revision.rev_id); + self.make_checkpoint().await; + } + + pub(crate) async fn ack_revision(&self, rev_id: &i64) { + match self.revs_map.get_mut(rev_id) { + None => {}, + Some(mut record) => record.ack(), + } + + if !self.pending_write_revs.read().await.contains(rev_id) { + // The revision must be saved on disk if the pending_write_revs + // doesn't contains the rev_id. + self.delegate.receive_ack(&self.doc_id, *rev_id); + } else { + self.make_checkpoint().await; + } } pub(crate) async fn get_revision(&self, rev_id: &i64) -> Option { - match self.revs_map.get(&rev_id).map(|r| r.value().clone()) { - None => match self.rev_loader.get_revision_record(&self.doc_id, *rev_id) { - Ok(revision) => revision, - Err(e) => { - tracing::error!("{}", e); - None - }, - }, - Some(revision) => Some(revision), - } + self.revs_map.get(&rev_id).map(|r| r.value().clone()) } pub(crate) async fn get_revisions_in_range( &self, range: &RevisionRange, ) -> Result, FlowyError> { - let range_len = range.len() as usize; let revs = range .iter() .flat_map(|rev_id| self.revs_map.get(&rev_id).map(|record| record.clone())) .collect::>(); + Ok(revs) + } - if revs.len() == range_len { - Ok(revs) - } else { - let revs = self - .rev_loader - .get_revision_records_with_range(&self.doc_id, range.clone()) - .await?; - if revs.len() != range_len { - log::error!("Revisions len is not equal to range required"); - } - Ok(revs) + async fn make_checkpoint(&self) { + // https://github.com/async-graphql/async-graphql/blob/ed8449beec3d9c54b94da39bab33cec809903953/src/dataloader/mod.rs#L362 + if let Some(handler) = self.defer_save.write().await.take() { + handler.abort(); } + + if self.pending_write_revs.read().await.is_empty() { + return; + } + + let rev_map = self.revs_map.clone(); + let pending_write_revs = self.pending_write_revs.clone(); + let delegate = self.delegate.clone(); + + *self.defer_save.write().await = Some(tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(300)).await; + let mut revs_write_guard = pending_write_revs.write().await; + // FIXME: + // It may cause performance issues because we hold the write lock of the + // rev_order and the lock will be released after the checkpoint has been written + // to the disk. + // + // Use saturating_sub and split_off ? + // https://stackoverflow.com/questions/28952411/what-is-the-idiomatic-way-to-pop-the-last-n-elements-in-a-mutable-vec + let mut save_records: Vec = vec![]; + revs_write_guard.iter().for_each(|rev_id| match rev_map.get(rev_id) { + None => {}, + Some(value) => { + save_records.push(value.value().clone()); + }, + }); + + if delegate.receive_checkpoint(save_records).is_ok() { + revs_write_guard.clear(); + drop(revs_write_guard); + } + })); } } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/sync.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/sync.rs index 449d3101e0..b9f454e9f5 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/sync.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache/sync.rs @@ -1,5 +1,6 @@ use crate::services::doc::revision::RevisionRecord; use dashmap::DashMap; +use flowy_error::{FlowyError, FlowyResult}; use lib_ot::errors::OTError; use std::{collections::VecDeque, sync::Arc}; use tokio::sync::RwLock; @@ -30,34 +31,33 @@ impl RevisionSyncSeq { .context(format!("The new revision's id must be greater than {}", rev_id))); } } + self.local_revs.write().await.push_back(record.revision.rev_id); self.revs_map.insert(record.revision.rev_id, record); Ok(()) } - pub async fn ack_revision(&self, rev_id: &i64) { + pub async fn ack_revision(&self, rev_id: &i64) -> FlowyResult<()> { if let Some(pop_rev_id) = self.next_sync_rev_id().await { if &pop_rev_id != rev_id { - tracing::error!("The next ack rev_id must be equal to the next rev_id"); - assert_eq!(&pop_rev_id, rev_id); - return; + let desc = format!( + "The ack rev_id:{} is not equal to the current rev_id:{}", + rev_id, pop_rev_id + ); + // tracing::error!("{}", desc); + return Err(FlowyError::internal().context(desc)); } tracing::debug!("pop revision {}", pop_rev_id); self.revs_map.remove(&pop_rev_id); let _ = self.local_revs.write().await.pop_front(); } + Ok(()) } pub async fn next_sync_revision(&self) -> Option<(i64, RevisionRecord)> { match self.local_revs.read().await.front() { None => None, - Some(rev_id) => match self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())) { - None => None, - Some(val) => { - tracing::debug!("{}:try send revision {}", val.1.revision.doc_id, val.1.revision.rev_id); - Some(val) - }, - }, + Some(rev_id) => self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())), } } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs index a1c5f99cdb..b6a828ed49 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -102,7 +102,9 @@ impl RevisionManager { Ok(revision) } - pub fn next_sync_revision(&self) -> FutureResult, FlowyError> { self.cache.next_revision() } + pub fn next_sync_revision(&self) -> FutureResult, FlowyError> { self.cache.next_sync_revision() } + + pub fn latest_rev_id(&self) -> i64 { self.cache.latest_rev_id() } } #[cfg(feature = "flowy_unit_test")] @@ -134,7 +136,7 @@ impl RevisionLoader { &self.user_id, doc_md5, ); - let _ = self.cache.add_remote_revision(revision.clone()).await?; + let _ = self.cache.add_local_revision(revision.clone()).await?; revisions = vec![revision]; } else { for record in &records { diff --git a/frontend/rust-lib/flowy-document/src/services/mod.rs b/frontend/rust-lib/flowy-document/src/services/mod.rs index 82dbce10c4..634e507938 100644 --- a/frontend/rust-lib/flowy-document/src/services/mod.rs +++ b/frontend/rust-lib/flowy-document/src/services/mod.rs @@ -1,2 +1,4 @@ +pub(crate) mod controller; pub mod doc; pub mod server; +mod ws_handlers; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/ws_handlers.rs b/frontend/rust-lib/flowy-document/src/services/ws_handlers.rs similarity index 100% rename from frontend/rust-lib/flowy-document/src/services/doc/ws_handlers.rs rename to frontend/rust-lib/flowy-document/src/services/ws_handlers.rs diff --git a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs index 80a9f106bf..84520b57f5 100644 --- a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs +++ b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs @@ -32,7 +32,6 @@ impl RevTableSql { Ok(()) } - #[allow(dead_code)] pub(crate) fn update_rev_table(changeset: RevChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> { let filter = dsl::rev_table .filter(dsl::rev_id.eq(changeset.rev_id.as_ref())) diff --git a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs index d6ef355c5f..732f4ae9a9 100644 --- a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs +++ b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs @@ -128,8 +128,7 @@ impl RevTableType { } impl_sql_integer_expression!(RevTableType); -#[allow(dead_code)] -pub(crate) struct RevChangeset { +pub struct RevChangeset { pub(crate) doc_id: String, pub(crate) rev_id: RevId, pub(crate) state: RevTableState, diff --git a/shared-lib/flowy-collaboration/src/core/sync/server_editor.rs b/shared-lib/flowy-collaboration/src/core/sync/server_editor.rs index a15f3201cf..3bd1111375 100644 --- a/shared-lib/flowy-collaboration/src/core/sync/server_editor.rs +++ b/shared-lib/flowy-collaboration/src/core/sync/server_editor.rs @@ -81,12 +81,6 @@ impl ServerDocManager { } pub async fn create_doc(&self, revision: Revision) -> Result, CollaborateError> { - if !revision.is_initial() { - return Err( - CollaborateError::revision_conflict().context("Revision's rev_id should be 0 when creating the doc") - ); - } - let doc = self.persistence.create_doc(revision).await?; let handler = self.cache(doc).await?; Ok(handler) diff --git a/shared-lib/flowy-collaboration/src/entities/ws/ws.rs b/shared-lib/flowy-collaboration/src/entities/ws/ws.rs index 53fb551f16..818a07cbf3 100644 --- a/shared-lib/flowy-collaboration/src/entities/ws/ws.rs +++ b/shared-lib/flowy-collaboration/src/entities/ws/ws.rs @@ -95,6 +95,18 @@ impl WsDocumentDataBuilder { id: Some(cloned_rev_id), } } + + // DocumentWSDataType::UserConnect -> DocumentConnected + pub fn build_document_conn_message(doc_id: &str, document_conn: DocumentConnected) -> DocumentWSData { + let rev_id = document_conn.rev_id; + let bytes: Bytes = document_conn.try_into().unwrap(); + DocumentWSData { + doc_id: doc_id.to_string(), + ty: DocumentWSDataType::UserConnect, + data: bytes.to_vec(), + id: Some(rev_id), + } + } } #[derive(ProtoBuf, Default, Debug, Clone)] diff --git a/shared-lib/lib-ot/src/revision/model.rs b/shared-lib/lib-ot/src/revision/model.rs index f9ba9aa837..5ca284cc63 100644 --- a/shared-lib/lib-ot/src/revision/model.rs +++ b/shared-lib/lib-ot/src/revision/model.rs @@ -39,6 +39,7 @@ impl Revision { pub fn pair_rev_id(&self) -> (i64, i64) { (self.base_rev_id, self.rev_id) } + #[allow(dead_code)] pub fn is_initial(&self) -> bool { self.rev_id == 0 } }