diff --git a/backend/Cargo.lock b/backend/Cargo.lock index c5504943a5..a9cc97f8c5 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -455,9 +455,9 @@ dependencies = [ "config", "dashmap", "derive_more", + "flowy-collaboration", "flowy-core-infra", "flowy-document", - "flowy-ot", "flowy-sdk", "flowy-test", "flowy-user", @@ -1195,6 +1195,26 @@ dependencies = [ "syn", ] +[[package]] +name = "flowy-collaboration" +version = "0.1.0" +dependencies = [ + "bytes", + "chrono", + "flowy-derive", + "lib-ot", + "log", + "md5", + "parking_lot", + "protobuf", + "serde", + "strum", + "strum_macros", + "tokio", + "tracing", + "url", +] + [[package]] name = "flowy-core" version = "0.1.0" @@ -1209,11 +1229,11 @@ dependencies = [ "derive_more", "diesel", "diesel_derives", + "flowy-collaboration", "flowy-core-infra", "flowy-database", "flowy-derive", "flowy-document", - "flowy-ot", "futures", "futures-core", "lazy_static", @@ -1239,8 +1259,8 @@ dependencies = [ "bytes", "chrono", "derive_more", + "flowy-collaboration", "flowy-derive", - "flowy-ot", "log", "protobuf", "strum", @@ -1284,9 +1304,9 @@ dependencies = [ "derive_more", "diesel", "diesel_derives", + "flowy-collaboration", "flowy-database", "flowy-derive", - "flowy-ot", "futures", "futures-core", "futures-util", @@ -1309,26 +1329,6 @@ dependencies = [ "url", ] -[[package]] -name = "flowy-ot" -version = "0.1.0" -dependencies = [ - "bytes", - "chrono", - "flowy-derive", - "lib-ot", - "log", - "md5", - "parking_lot", - "protobuf", - "serde", - "strum", - "strum_macros", - "tokio", - "tracing", - "url", -] - [[package]] name = "flowy-sdk" version = "0.1.0" @@ -1336,10 +1336,10 @@ dependencies = [ "backend-service", "bytes", "color-eyre", + "flowy-collaboration", "flowy-core", "flowy-database", "flowy-document", - "flowy-ot", "flowy-user", "futures-core", "lib-dispatch", @@ -1360,9 +1360,9 @@ dependencies = [ "bincode", "bytes", "claim", + "flowy-collaboration", "flowy-core", "flowy-document", - "flowy-ot", "flowy-sdk", "flowy-user", "futures-util", diff --git a/backend/src/services/doc/edit/edit_actor.rs b/backend/src/services/doc/edit/edit_actor.rs deleted file mode 100644 index 0bbca02419..0000000000 --- a/backend/src/services/doc/edit/edit_actor.rs +++ /dev/null @@ -1,123 +0,0 @@ -use crate::{ - services::doc::edit::ServerDocEditor, - web_socket::{entities::Socket, WsUser}, -}; -use actix_web::web::Data; -use async_stream::stream; -use backend_service::errors::{internal_error, Result as DocResult, ServerError}; -use flowy_collaboration::protobuf::Doc; -use futures::stream::StreamExt; -use lib_ot::protobuf::Revision; -use sqlx::PgPool; -use std::sync::{atomic::Ordering::SeqCst, Arc}; -use tokio::{ - sync::{mpsc, oneshot}, - task::spawn_blocking, -}; - -#[derive(Clone)] -pub struct EditUser { - user: Arc, - pub(crate) socket: Socket, -} - -impl EditUser { - pub fn id(&self) -> String { self.user.id().to_string() } -} - -#[derive(Debug)] -pub enum EditMsg { - Revision { - user: Arc, - socket: Socket, - revision: Revision, - ret: oneshot::Sender>, - }, - DocumentJson { - ret: oneshot::Sender>, - }, - DocumentRevId { - ret: oneshot::Sender>, - }, - NewDocUser { - user: Arc, - socket: Socket, - rev_id: i64, - ret: oneshot::Sender>, - }, -} - -pub struct EditDocActor { - receiver: Option>, - edit_doc: Arc, - pg_pool: Data, -} - -impl EditDocActor { - pub fn new(receiver: mpsc::Receiver, doc: Doc, pg_pool: Data) -> Result { - let edit_doc = Arc::new(ServerDocEditor::new(doc)?); - Ok(Self { - receiver: Some(receiver), - edit_doc, - pg_pool, - }) - } - - pub async fn run(mut self) { - let mut receiver = self - .receiver - .take() - .expect("DocActor's receiver should only take one time"); - - let stream = stream! { - loop { - match receiver.recv().await { - Some(msg) => yield msg, - None => break, - } - } - }; - stream.for_each(|msg| self.handle_message(msg)).await; - } - - async fn handle_message(&self, msg: EditMsg) { - match msg { - EditMsg::Revision { - user, - socket, - revision, - ret, - } => { - let user = EditUser { - user: user.clone(), - socket: socket.clone(), - }; - let _ = ret.send(self.edit_doc.apply_revision(user, revision, self.pg_pool.clone()).await); - }, - EditMsg::DocumentJson { ret } => { - let edit_context = self.edit_doc.clone(); - let json = spawn_blocking(move || edit_context.document_json()) - .await - .map_err(internal_error); - let _ = ret.send(json); - }, - EditMsg::DocumentRevId { ret } => { - let edit_context = self.edit_doc.clone(); - let _ = ret.send(Ok(edit_context.rev_id.load(SeqCst))); - }, - EditMsg::NewDocUser { - user, - socket, - rev_id, - ret, - } => { - log::debug!("Receive new doc user: {:?}, rev_id: {}", user, rev_id); - let user = EditUser { - user: user.clone(), - socket: socket.clone(), - }; - let _ = ret.send(self.edit_doc.new_doc_user(user, rev_id).await); - }, - } - } -} diff --git a/backend/src/services/doc/edit/editor.rs b/backend/src/services/doc/edit/editor.rs deleted file mode 100644 index f78d265e53..0000000000 --- a/backend/src/services/doc/edit/editor.rs +++ /dev/null @@ -1,265 +0,0 @@ -use crate::{ - services::{ - doc::{edit::edit_actor::EditUser, update_doc}, - util::md5, - }, - web_socket::{entities::Socket, WsMessageAdaptor}, -}; -use actix_web::web::Data; -use backend_service::errors::{internal_error, ServerError}; -use dashmap::DashMap; -use flowy_collaboration::{ - core::document::Document, - entities::ws::{WsDataType, WsDocumentData}, - protobuf::{Doc, UpdateDocParams}, -}; -use lib_ot::{ - core::OperationTransformable, - protobuf::{RevId, RevType, Revision, RevisionRange}, - rich_text::RichTextDelta, -}; -use parking_lot::RwLock; -use protobuf::Message; -use sqlx::PgPool; -use std::{ - cmp::Ordering, - sync::{ - atomic::{AtomicI64, Ordering::SeqCst}, - Arc, - }, - time::Duration, -}; - -pub struct ServerDocEditor { - pub doc_id: String, - pub rev_id: AtomicI64, - document: Arc>, - users: DashMap, -} - -impl ServerDocEditor { - pub fn new(doc: Doc) -> Result { - let delta = RichTextDelta::from_bytes(&doc.data).map_err(internal_error)?; - let document = Arc::new(RwLock::new(Document::from_delta(delta))); - let users = DashMap::new(); - Ok(Self { - doc_id: doc.id.clone(), - rev_id: AtomicI64::new(doc.rev_id), - document, - users, - }) - } - - #[tracing::instrument( - level = "debug", - skip(self, user), - fields( - user_id = %user.id(), - rev_id = %rev_id, - ) - )] - pub async fn new_doc_user(&self, user: EditUser, rev_id: i64) -> Result<(), ServerError> { - self.users.insert(user.id(), user.clone()); - let cur_rev_id = self.rev_id.load(SeqCst); - match cur_rev_id.cmp(&rev_id) { - Ordering::Less => { - user.socket - .do_send(mk_pull_message(&self.doc_id, next(cur_rev_id), rev_id)) - .map_err(internal_error)?; - }, - Ordering::Equal => {}, - Ordering::Greater => { - let doc_delta = self.document.read().delta().clone(); - let cli_revision = self.mk_revision(rev_id, doc_delta); - let ws_cli_revision = mk_push_message(&self.doc_id, cli_revision); - user.socket.do_send(ws_cli_revision).map_err(internal_error)?; - }, - } - - Ok(()) - } - - #[tracing::instrument( - level = "debug", - skip(self, user, pg_pool, revision), - fields( - cur_rev_id = %self.rev_id.load(SeqCst), - base_rev_id = %revision.base_rev_id, - rev_id = %revision.rev_id, - ), - err - )] - pub async fn apply_revision( - &self, - user: EditUser, - revision: Revision, - pg_pool: Data, - ) -> Result<(), ServerError> { - self.users.insert(user.id(), user.clone()); - let cur_rev_id = self.rev_id.load(SeqCst); - match cur_rev_id.cmp(&revision.rev_id) { - Ordering::Less => { - let next_rev_id = next(cur_rev_id); - if cur_rev_id == revision.base_rev_id || next_rev_id == revision.base_rev_id { - // The rev is in the right order, just compose it. - let _ = self.compose_revision(&revision).await?; - let _ = send_acked_msg(&user.socket, &revision)?; - let _ = self.save_revision(&revision, pg_pool).await?; - } else { - // The server document is outdated, pull the missing revision from the client. - let _ = send_pull_message(&user.socket, &self.doc_id, next_rev_id, revision.rev_id)?; - } - }, - Ordering::Equal => { - // Do nothing - log::warn!("Applied revision rev_id is the same as cur_rev_id"); - }, - Ordering::Greater => { - // The client document is outdated. Transform the client revision delta and then - // send the prime delta to the client. Client should compose the this prime - // delta. - let cli_revision = self.transform_revision(&revision)?; - let _ = send_push_message(&user.socket, &self.doc_id, cli_revision)?; - }, - } - Ok(()) - } - - pub fn document_json(&self) -> String { self.document.read().to_json() } - - async fn compose_revision(&self, revision: &Revision) -> Result<(), ServerError> { - let delta = RichTextDelta::from_bytes(&revision.delta_data).map_err(internal_error)?; - let _ = self.compose_delta(delta)?; - let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id)); - Ok(()) - } - - #[tracing::instrument(level = "debug", skip(self, revision))] - fn transform_revision(&self, revision: &Revision) -> Result { - let cli_delta = RichTextDelta::from_bytes(&revision.delta_data).map_err(internal_error)?; - let (cli_prime, server_prime) = self - .document - .read() - .delta() - .transform(&cli_delta) - .map_err(internal_error)?; - - let _ = self.compose_delta(server_prime)?; - let cli_revision = self.mk_revision(revision.rev_id, cli_prime); - Ok(cli_revision) - } - - fn mk_revision(&self, base_rev_id: i64, delta: RichTextDelta) -> Revision { - let delta_data = delta.to_bytes().to_vec(); - let md5 = md5(&delta_data); - Revision { - base_rev_id, - rev_id: self.rev_id.load(SeqCst), - delta_data, - md5, - doc_id: self.doc_id.to_string(), - ty: RevType::Remote, - ..Default::default() - } - } - - #[tracing::instrument( - level = "debug", - skip(self, delta), - fields( - revision_delta = %delta.to_json(), - result, - ) - )] - fn compose_delta(&self, delta: RichTextDelta) -> Result<(), ServerError> { - if delta.is_empty() { - log::warn!("Composed delta is empty"); - } - - match self.document.try_write_for(Duration::from_millis(300)) { - None => { - log::error!("Failed to acquire write lock of document"); - }, - Some(mut write_guard) => { - let _ = write_guard.compose_delta(delta).map_err(internal_error)?; - tracing::Span::current().record("result", &write_guard.to_json().as_str()); - }, - } - Ok(()) - } - - #[tracing::instrument(level = "debug", skip(self, revision, pg_pool), err)] - async fn save_revision(&self, revision: &Revision, pg_pool: Data) -> Result<(), ServerError> { - // Opti: save with multiple revisions - let mut params = UpdateDocParams::new(); - params.set_doc_id(self.doc_id.clone()); - params.set_data(self.document.read().to_json()); - params.set_rev_id(revision.rev_id); - let _ = update_doc(pg_pool.get_ref(), params).await?; - Ok(()) - } -} - -#[tracing::instrument(level = "debug", skip(socket, doc_id, revision), err)] -fn send_push_message(socket: &Socket, doc_id: &str, revision: Revision) -> Result<(), ServerError> { - let msg = mk_push_message(doc_id, revision); - socket.try_send(msg).map_err(internal_error) -} - -fn mk_push_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor { - let bytes = revision.write_to_bytes().unwrap(); - let data = WsDocumentData { - doc_id: doc_id.to_string(), - ty: WsDataType::PushRev, - data: bytes, - }; - data.into() -} - -#[tracing::instrument(level = "debug", skip(socket, doc_id), err)] -fn send_pull_message(socket: &Socket, doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> Result<(), ServerError> { - let msg = mk_pull_message(doc_id, from_rev_id, to_rev_id); - socket.try_send(msg).map_err(internal_error) -} - -fn mk_pull_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsMessageAdaptor { - let range = RevisionRange { - doc_id: doc_id.to_string(), - start: from_rev_id, - end: to_rev_id, - ..Default::default() - }; - - let bytes = range.write_to_bytes().unwrap(); - let data = WsDocumentData { - doc_id: doc_id.to_string(), - ty: WsDataType::PullRev, - data: bytes, - }; - data.into() -} - -#[tracing::instrument(level = "debug", skip(socket, revision), err)] -fn send_acked_msg(socket: &Socket, revision: &Revision) -> Result<(), ServerError> { - let msg = mk_acked_message(revision); - socket.try_send(msg).map_err(internal_error) -} - -fn mk_acked_message(revision: &Revision) -> WsMessageAdaptor { - // let mut wtr = vec![]; - // let _ = wtr.write_i64::(revision.rev_id); - let mut rev_id = RevId::new(); - rev_id.set_value(revision.rev_id); - let data = rev_id.write_to_bytes().unwrap(); - - let data = WsDocumentData { - doc_id: revision.doc_id.clone(), - ty: WsDataType::Acked, - data, - }; - - data.into() -} - -#[inline] -fn next(rev_id: i64) -> i64 { rev_id + 1 } diff --git a/backend/src/services/doc/edit/mod.rs b/backend/src/services/doc/edit/mod.rs deleted file mode 100644 index 9012184ffc..0000000000 --- a/backend/src/services/doc/edit/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub(crate) mod edit_actor; -mod editor; - -pub use edit_actor::*; -pub use editor::*; diff --git a/backend/src/services/doc/editor.rs b/backend/src/services/doc/editor.rs new file mode 100644 index 0000000000..c700e87dd1 --- /dev/null +++ b/backend/src/services/doc/editor.rs @@ -0,0 +1,149 @@ +use crate::{ + services::doc::update_doc, + web_socket::{entities::Socket, WsMessageAdaptor, WsUser}, +}; +use actix_web::web::Data; +use backend_service::errors::{internal_error, ServerError}; +use dashmap::DashMap; +use flowy_collaboration::{ + core::{ + document::Document, + sync::{RevisionSynchronizer, RevisionUser, SyncResponse}, + }, + protobuf::{Doc, UpdateDocParams}, +}; +use lib_ot::{protobuf::Revision, rich_text::RichTextDelta}; +use sqlx::PgPool; +use std::{ + convert::TryInto, + sync::{ + atomic::{AtomicI64, Ordering::SeqCst}, + Arc, + }, +}; + +#[rustfmt::skip] +// ┌──────────────────────┐ ┌────────────┐ +// ┌───▶│ RevisionSynchronizer │────▶│ Document │ +// │ └──────────────────────┘ └────────────┘ +// ┌────────────────┐ │ +// │ServerDocEditor │────┤ ┌───────────┐ +// └────────────────┘ │ ┌───▶│ WsUser │ +// │ │ └───────────┘ +// │ ┌────────┐ ┌───────────┐ │ ┌───────────┐ +// └───▶│ Users │◆──────│ DocUser ├───┼───▶│ Socket │ +// └────────┘ └───────────┘ │ └───────────┘ +// │ ┌───────────┐ +// └───▶│ PgPool │ +// └───────────┘ +pub struct ServerDocEditor { + pub doc_id: String, + pub rev_id: AtomicI64, + synchronizer: Arc, + users: DashMap, +} + +impl ServerDocEditor { + pub fn new(doc: Doc) -> Result { + let delta = RichTextDelta::from_bytes(&doc.data).map_err(internal_error)?; + let users = DashMap::new(); + let synchronizer = Arc::new(RevisionSynchronizer::new( + &doc.id, + doc.rev_id, + Document::from_delta(delta), + )); + + Ok(Self { + doc_id: doc.id.clone(), + rev_id: AtomicI64::new(doc.rev_id), + synchronizer, + users, + }) + } + + #[tracing::instrument( + level = "debug", + skip(self, user), + fields( + user_id = %user.id(), + rev_id = %rev_id, + ) + )] + pub async fn new_doc_user(&self, user: DocUser, rev_id: i64) -> Result<(), ServerError> { + self.users.insert(user.id(), user.clone()); + self.synchronizer.new_conn(user, rev_id); + Ok(()) + } + + #[tracing::instrument( + level = "debug", + skip(self, user, revision), + fields( + cur_rev_id = %self.rev_id.load(SeqCst), + base_rev_id = %revision.base_rev_id, + rev_id = %revision.rev_id, + ), + err + )] + pub async fn apply_revision(&self, user: DocUser, mut revision: Revision) -> Result<(), ServerError> { + self.users.insert(user.id(), user.clone()); + let revision = (&mut revision).try_into().map_err(internal_error)?; + self.synchronizer.apply_revision(user, revision).unwrap(); + Ok(()) + } + + pub fn document_json(&self) -> String { self.synchronizer.doc_json() } +} + +#[derive(Clone)] +pub struct DocUser { + pub user: Arc, + pub(crate) socket: Socket, + pub pg_pool: Data, +} + +impl DocUser { + pub fn id(&self) -> String { self.user.id().to_string() } +} + +impl RevisionUser for DocUser { + fn recv(&self, resp: SyncResponse) { + let result = match resp { + SyncResponse::Pull(data) => { + let msg: WsMessageAdaptor = data.into(); + self.socket.try_send(msg).map_err(internal_error) + }, + SyncResponse::Push(data) => { + let msg: WsMessageAdaptor = data.into(); + self.socket.try_send(msg).map_err(internal_error) + }, + SyncResponse::Ack(data) => { + let msg: WsMessageAdaptor = data.into(); + self.socket.try_send(msg).map_err(internal_error) + }, + SyncResponse::NewRevision { + rev_id, + doc_id, + doc_json, + } => { + let pg_pool = self.pg_pool.clone(); + tokio::task::spawn(async move { + let mut params = UpdateDocParams::new(); + params.set_doc_id(doc_id); + params.set_data(doc_json); + params.set_rev_id(rev_id); + match update_doc(pg_pool.get_ref(), params).await { + Ok(_) => {}, + Err(e) => log::error!("{}", e), + } + }); + Ok(()) + }, + }; + + match result { + Ok(_) => {}, + Err(e) => log::error!("{}", e), + } + } +} diff --git a/backend/src/services/doc/manager.rs b/backend/src/services/doc/manager.rs index 4ea6255726..25b1d96427 100644 --- a/backend/src/services/doc/manager.rs +++ b/backend/src/services/doc/manager.rs @@ -1,18 +1,20 @@ use crate::{ services::doc::{ - edit::edit_actor::{EditDocActor, EditMsg}, + editor::{DocUser, ServerDocEditor}, read_doc, ws_actor::{DocWsActor, DocWsMsg}, }, web_socket::{entities::Socket, WsBizHandler, WsClientData, WsUser}, }; use actix_web::web::Data; +use async_stream::stream; use backend_service::errors::{internal_error, Result as DocResult, ServerError}; use dashmap::DashMap; use flowy_collaboration::protobuf::{Doc, DocIdentifier}; +use futures::stream::StreamExt; use lib_ot::protobuf::Revision; use sqlx::PgPool; -use std::sync::Arc; +use std::sync::{atomic::Ordering::SeqCst, Arc}; use tokio::{ sync::{mpsc, oneshot}, task::spawn_blocking, @@ -43,13 +45,17 @@ impl DocumentCore { } impl WsBizHandler for DocumentCore { - fn receive_data(&self, client_data: WsClientData) { + fn receive(&self, data: WsClientData) { let (ret, rx) = oneshot::channel(); let sender = self.ws_sender.clone(); let pool = self.pg_pool.clone(); actix_rt::spawn(async move { - let msg = DocWsMsg::ClientData { client_data, ret, pool }; + let msg = DocWsMsg::ClientData { + client_data: data, + ret, + pool, + }; match sender.send(msg).await { Ok(_) => {}, Err(e) => log::error!("{}", e), @@ -63,16 +69,9 @@ impl WsBizHandler for DocumentCore { } #[rustfmt::skip] -// EditDocActor -// ┌────────────────────────────────────┐ -// │ ServerDocEditor │ -// │ ┌──────────────────────────────┐ │ -// ┌────────────┐ 1 n ┌───────────────┐ │ │ ┌──────────┐ ┌──────────┐ │ │ -// │ DocManager │─────▶│ OpenDocHandle │──────▶│ │ │ Document │ │ Users │ │ │ -// └────────────┘ └───────────────┘ │ │ └──────────┘ └──────────┘ │ │ -// │ └──────────────────────────────┘ │ -// │ │ -// └────────────────────────────────────┘ +// ┌────────────┐ 1 n ┌───────────────┐ ┌──────────────────┐ ┌────────────────┐ +// │ DocManager │───────▶│ OpenDocHandle │────▶│ DocMessageQueue │───▶│ServerDocEditor │ +// └────────────┘ └───────────────┘ └──────────────────┘ └────────────────┘ pub struct DocManager { open_doc_map: DashMap>, } @@ -109,20 +108,20 @@ impl DocManager { } pub struct OpenDocHandle { - pub sender: mpsc::Sender, + pub sender: mpsc::Sender, } impl OpenDocHandle { pub fn new(doc: Doc, pg_pool: Data) -> Result { let (sender, receiver) = mpsc::channel(100); - let actor = EditDocActor::new(receiver, doc, pg_pool)?; - tokio::task::spawn(actor.run()); + let queue = DocMessageQueue::new(receiver, doc, pg_pool)?; + tokio::task::spawn(queue.run()); Ok(Self { sender }) } pub async fn add_user(&self, user: Arc, rev_id: i64, socket: Socket) -> Result<(), ServerError> { let (ret, rx) = oneshot::channel(); - let msg = EditMsg::NewDocUser { + let msg = DocMessage::NewConnectedUser { user, socket, rev_id, @@ -139,7 +138,7 @@ impl OpenDocHandle { revision: Revision, ) -> Result<(), ServerError> { let (ret, rx) = oneshot::channel(); - let msg = EditMsg::Revision { + let msg = DocMessage::ReceiveRevision { user, socket, revision, @@ -151,19 +150,118 @@ impl OpenDocHandle { pub async fn document_json(&self) -> DocResult { let (ret, rx) = oneshot::channel(); - let msg = EditMsg::DocumentJson { ret }; + let msg = DocMessage::GetDocJson { ret }; self.send(msg, rx).await? } pub async fn rev_id(&self) -> DocResult { let (ret, rx) = oneshot::channel(); - let msg = EditMsg::DocumentRevId { ret }; + let msg = DocMessage::GetDocRevId { ret }; self.send(msg, rx).await? } - pub(crate) async fn send(&self, msg: EditMsg, rx: oneshot::Receiver) -> DocResult { + pub(crate) async fn send(&self, msg: DocMessage, rx: oneshot::Receiver) -> DocResult { let _ = self.sender.send(msg).await.map_err(internal_error)?; let result = rx.await?; Ok(result) } } + +#[derive(Debug)] +pub enum DocMessage { + NewConnectedUser { + user: Arc, + socket: Socket, + rev_id: i64, + ret: oneshot::Sender>, + }, + ReceiveRevision { + user: Arc, + socket: Socket, + revision: Revision, + ret: oneshot::Sender>, + }, + GetDocJson { + ret: oneshot::Sender>, + }, + GetDocRevId { + ret: oneshot::Sender>, + }, +} + +struct DocMessageQueue { + receiver: Option>, + edit_doc: Arc, + pg_pool: Data, +} + +impl DocMessageQueue { + fn new(receiver: mpsc::Receiver, doc: Doc, pg_pool: Data) -> Result { + let edit_doc = Arc::new(ServerDocEditor::new(doc)?); + Ok(Self { + receiver: Some(receiver), + edit_doc, + pg_pool, + }) + } + + async fn run(mut self) { + let mut receiver = self + .receiver + .take() + .expect("DocActor's receiver should only take one time"); + + let stream = stream! { + loop { + match receiver.recv().await { + Some(msg) => yield msg, + None => break, + } + } + }; + stream.for_each(|msg| self.handle_message(msg)).await; + } + + async fn handle_message(&self, msg: DocMessage) { + match msg { + DocMessage::NewConnectedUser { + user, + socket, + rev_id, + ret, + } => { + log::debug!("Receive new doc user: {:?}, rev_id: {}", user, rev_id); + let user = DocUser { + user: user.clone(), + socket: socket.clone(), + pg_pool: self.pg_pool.clone(), + }; + let _ = ret.send(self.edit_doc.new_doc_user(user, rev_id).await); + }, + DocMessage::ReceiveRevision { + user, + socket, + revision, + ret, + } => { + let user = DocUser { + user: user.clone(), + socket: socket.clone(), + pg_pool: self.pg_pool.clone(), + }; + let _ = ret.send(self.edit_doc.apply_revision(user, revision).await); + }, + DocMessage::GetDocJson { ret } => { + let edit_context = self.edit_doc.clone(); + let json = spawn_blocking(move || edit_context.document_json()) + .await + .map_err(internal_error); + let _ = ret.send(json); + }, + DocMessage::GetDocRevId { ret } => { + let rev_id = self.edit_doc.rev_id.load(SeqCst); + let _ = ret.send(Ok(rev_id)); + }, + } + } +} diff --git a/backend/src/services/doc/mod.rs b/backend/src/services/doc/mod.rs index 6e0a2c9694..f37c30a403 100644 --- a/backend/src/services/doc/mod.rs +++ b/backend/src/services/doc/mod.rs @@ -1,9 +1,10 @@ #![allow(clippy::module_inception)] + pub(crate) use crud::*; pub use router::*; pub mod crud; -mod edit; +mod editor; pub mod manager; pub mod router; mod ws_actor; diff --git a/backend/src/web_socket/biz_handler.rs b/backend/src/web_socket/biz_handler.rs index 2b98b60db3..144ee0ac79 100644 --- a/backend/src/web_socket/biz_handler.rs +++ b/backend/src/web_socket/biz_handler.rs @@ -3,7 +3,7 @@ use lib_ws::WsModule; use std::{collections::HashMap, sync::Arc}; pub trait WsBizHandler: Send + Sync { - fn receive_data(&self, client_data: WsClientData); + fn receive(&self, data: WsClientData); } pub type BizHandler = Arc; diff --git a/backend/src/web_socket/router.rs b/backend/src/web_socket/router.rs index d6c5c2c53b..59250685b7 100644 --- a/backend/src/web_socket/router.rs +++ b/backend/src/web_socket/router.rs @@ -12,6 +12,24 @@ use actix_web::{ }; use actix_web_actors::ws; +#[rustfmt::skip] +// WsClient +// ┌─────────────┐ +// │ ┌────────┐ │ +// wss://xxx ─────▶│ │ WsUser │ │───┐ +// │ └────────┘ │ │ +// └─────────────┘ │ +// │ +// │ ┌───────────────┐ ┌─────────────┐ ┌────────────────┐ +// ├───▶│ WsBizHandlers │──▶│WsBizHandler │───▶│ WsClientData │ +// │ └───────────────┘ └─────────────┘ └────────────────┘ +// WsClient │ △ +// ┌─────────────┐ │ │ +// │ ┌────────┐ │ │ │ +// wss://xxx ─────▶│ │ WsUser │ │───┘ ┌───────────────┐ +// │ └────────┘ │ │ DocumentCore │ +// └─────────────┘ └───────────────┘ + #[get("/{token}")] pub async fn establish_ws_connection( request: HttpRequest, diff --git a/backend/src/web_socket/ws_client.rs b/backend/src/web_socket/ws_client.rs index d356f1c405..f2365958fd 100644 --- a/backend/src/web_socket/ws_client.rs +++ b/backend/src/web_socket/ws_client.rs @@ -75,7 +75,7 @@ impl WsClient { socket, data: Bytes::from(message.data), }; - handler.receive_data(client_data); + handler.receive(client_data); }, } } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs index 8859951717..98b2ccace5 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs @@ -15,7 +15,7 @@ use futures::stream::StreamExt; use lib_ot::revision::{RevId, RevisionRange}; use std::{convert::TryFrom, sync::Arc}; use tokio::{ - sync::{broadcast, mpsc, mpsc::error::SendError}, + sync::{broadcast, mpsc}, task::spawn_blocking, time::{interval, Duration}, }; @@ -175,7 +175,7 @@ impl RevisionUpStream { match self.revisions.next().await? { None => Ok(()), Some(record) => { - tracing::trace!( + tracing::debug!( "[RevisionUpStream]: processes revision: {}:{:?}", record.revision.doc_id, record.revision.rev_id @@ -190,13 +190,7 @@ impl RevisionUpStream { async fn tick(sender: mpsc::UnboundedSender) { let mut i = interval(Duration::from_secs(2)); - loop { - match sender.send(UpStreamMsg::Tick) { - Ok(_) => {}, - Err(_e) => { - break; - }, - } + while sender.send(UpStreamMsg::Tick).is_ok() { i.tick().await; } } diff --git a/frontend/rust-lib/flowy-user/src/services/user/ws_manager.rs b/frontend/rust-lib/flowy-user/src/services/user/ws_manager.rs index 07fef218f7..35245c2390 100644 --- a/frontend/rust-lib/flowy-user/src/services/user/ws_manager.rs +++ b/frontend/rust-lib/flowy-user/src/services/user/ws_manager.rs @@ -218,7 +218,7 @@ mod mock { impl FlowyWsSender for broadcast::Sender { fn send(&self, _msg: WsMessage) -> Result<(), UserError> { - // let _ = self.send(msg).unwrap(); + let _ = self.send(msg).unwrap(); Ok(()) } } diff --git a/shared-lib/Cargo.lock b/shared-lib/Cargo.lock index a02292d8c6..327a427a0e 100644 --- a/shared-lib/Cargo.lock +++ b/shared-lib/Cargo.lock @@ -657,6 +657,26 @@ dependencies = [ "syn", ] +[[package]] +name = "flowy-collaboration" +version = "0.1.0" +dependencies = [ + "bytes", + "chrono", + "flowy-derive", + "lib-ot", + "log", + "md5", + "parking_lot", + "protobuf", + "serde", + "strum", + "strum_macros", + "tokio", + "tracing", + "url", +] + [[package]] name = "flowy-core-infra" version = "0.1.0" @@ -664,8 +684,8 @@ dependencies = [ "bytes", "chrono", "derive_more", + "flowy-collaboration", "flowy-derive", - "flowy-ot", "log", "protobuf", "strum", @@ -687,26 +707,6 @@ dependencies = [ "trybuild", ] -[[package]] -name = "flowy-ot" -version = "0.1.0" -dependencies = [ - "bytes", - "chrono", - "flowy-derive", - "lib-ot", - "log", - "md5", - "parking_lot", - "protobuf", - "serde", - "strum", - "strum_macros", - "tokio", - "tracing", - "url", -] - [[package]] name = "flowy-user-infra" version = "0.1.0" diff --git a/shared-lib/flowy-collaboration/src/core/sync/rev_sync.rs b/shared-lib/flowy-collaboration/src/core/sync/rev_sync.rs index 94e98fe924..0bdd9643a6 100644 --- a/shared-lib/flowy-collaboration/src/core/sync/rev_sync.rs +++ b/shared-lib/flowy-collaboration/src/core/sync/rev_sync.rs @@ -21,57 +21,59 @@ use std::{ }, time::Duration, }; -use tokio::sync::mpsc; -pub enum SynchronizerCommand { +pub trait RevisionUser { + fn recv(&self, resp: SyncResponse); +} + +pub enum SyncResponse { Pull(WsDocumentData), Push(WsDocumentData), Ack(WsDocumentData), - SaveRevision(Revision), + NewRevision { + rev_id: i64, + doc_json: String, + doc_id: String, + }, } -pub type CommandReceiver = Arc; - pub struct RevisionSynchronizer { pub doc_id: String, pub rev_id: AtomicI64, document: Arc>, - command_receiver: CommandReceiver, } impl RevisionSynchronizer { - pub fn new( - doc_id: &str, - rev_id: i64, - document: Arc>, - command_receiver: CommandReceiver, - ) -> RevisionSynchronizer { + pub fn new(doc_id: &str, rev_id: i64, document: Document) -> RevisionSynchronizer { + let document = Arc::new(RwLock::new(document)); RevisionSynchronizer { doc_id: doc_id.to_string(), rev_id: AtomicI64::new(rev_id), document, - command_receiver, } } - pub fn new_conn(&self, rev_id: i64) { + pub fn new_conn(&self, user: T, rev_id: i64) { let cur_rev_id = self.rev_id.load(SeqCst); match cur_rev_id.cmp(&rev_id) { Ordering::Less => { let msg = mk_pull_message(&self.doc_id, next(cur_rev_id), rev_id); - self.send_command(SynchronizerCommand::Pull(msg)); + user.recv(SyncResponse::Pull(msg)); }, Ordering::Equal => {}, Ordering::Greater => { let doc_delta = self.document.read().delta().clone(); let revision = self.mk_revision(rev_id, doc_delta); let data = mk_push_message(&self.doc_id, revision); - self.send_command(SynchronizerCommand::Push(data)); + user.recv(SyncResponse::Push(data)); }, } } - pub fn apply_revision(&self, revision: Revision) -> Result<(), OTError> { + pub fn apply_revision(&self, user: T, revision: Revision) -> Result<(), OTError> + where + T: RevisionUser, + { let cur_rev_id = self.rev_id.load(SeqCst); match cur_rev_id.cmp(&revision.rev_id) { Ordering::Less => { @@ -79,12 +81,19 @@ impl RevisionSynchronizer { if cur_rev_id == revision.base_rev_id || next_rev_id == revision.base_rev_id { // The rev is in the right order, just compose it. let _ = self.compose_revision(&revision)?; - self.send_command(SynchronizerCommand::Ack(mk_acked_message(&revision))); - self.send_command(SynchronizerCommand::SaveRevision(revision)); + user.recv(SyncResponse::Ack(mk_acked_message(&revision))); + let rev_id = revision.rev_id; + let doc_id = self.doc_id.clone(); + let doc_json = self.doc_json(); + user.recv(SyncResponse::NewRevision { + rev_id, + doc_id, + doc_json, + }); } else { // The server document is outdated, pull the missing revision from the client. let msg = mk_pull_message(&self.doc_id, next_rev_id, revision.rev_id); - self.send_command(SynchronizerCommand::Pull(msg)); + user.recv(SyncResponse::Pull(msg)); } }, Ordering::Equal => { @@ -97,12 +106,14 @@ impl RevisionSynchronizer { // delta. let cli_revision = self.transform_revision(&revision)?; let data = mk_push_message(&self.doc_id, cli_revision); - self.send_command(SynchronizerCommand::Push(data)); + user.recv(SyncResponse::Push(data)); }, } Ok(()) } + pub fn doc_json(&self) -> String { self.document.read().to_json() } + fn compose_revision(&self, revision: &Revision) -> Result<(), OTError> { let delta = RichTextDelta::from_bytes(&revision.delta_data)?; let _ = self.compose_delta(delta)?; @@ -120,16 +131,6 @@ impl RevisionSynchronizer { Ok(cli_revision) } - fn send_command(&self, command: SynchronizerCommand) { (self.command_receiver)(command); } - - #[tracing::instrument( - level = "debug", - skip(self, delta), - fields( - revision_delta = %delta.to_json(), - result, - ) - )] fn compose_delta(&self, delta: RichTextDelta) -> Result<(), OTError> { if delta.is_empty() { log::warn!("Composed delta is empty"); @@ -139,7 +140,6 @@ impl RevisionSynchronizer { None => log::error!("Failed to acquire write lock of document"), Some(mut write_guard) => { let _ = write_guard.compose_delta(delta); - tracing::Span::current().record("result", &write_guard.to_json().as_str()); }, } Ok(()) diff --git a/shared-lib/lib-ot/src/errors.rs b/shared-lib/lib-ot/src/errors.rs index fde37f2c42..ea0df1b69e 100644 --- a/shared-lib/lib-ot/src/errors.rs +++ b/shared-lib/lib-ot/src/errors.rs @@ -37,6 +37,7 @@ impl OTError { static_ot_error!(duplicate_revision, OTErrorCode::DuplicatedRevision); static_ot_error!(revision_id_conflict, OTErrorCode::RevisionIDConflict); + static_ot_error!(internal, OTErrorCode::Internal); } impl fmt::Display for OTError { @@ -68,6 +69,7 @@ pub enum OTErrorCode { SerdeError, DuplicatedRevision, RevisionIDConflict, + Internal, } pub struct ErrorBuilder { @@ -96,3 +98,10 @@ impl ErrorBuilder { pub fn build(mut self) -> OTError { OTError::new(self.code, &self.msg.take().unwrap_or_else(|| "".to_owned())) } } + +pub fn internal_error(e: T) -> OTError +where + T: std::fmt::Debug, +{ + OTError::internal().context(e) +} diff --git a/shared-lib/lib-ot/src/revision/model.rs b/shared-lib/lib-ot/src/revision/model.rs index 79c9ed7bff..63a56fc314 100644 --- a/shared-lib/lib-ot/src/revision/model.rs +++ b/shared-lib/lib-ot/src/revision/model.rs @@ -30,6 +30,12 @@ impl Revision { pub fn is_empty(&self) -> bool { self.base_rev_id == self.rev_id } pub fn pair_rev_id(&self) -> (i64, i64) { (self.base_rev_id, self.rev_id) } + + // pub fn from_pb(pb: &mut crate::protobuf::Revision) -> Self { + // pb.try_into().unwrap() } + + // pub fn from_pb(mut pb: crate::protobuf::Revision) -> Self { + // Revision::try_from(&mut pb).unwrap() } } impl std::fmt::Debug for Revision {