compare revisions

This commit is contained in:
appflowy 2021-12-25 21:44:45 +08:00
parent e069bfb057
commit 95e0418d97
28 changed files with 603 additions and 585 deletions

View File

@ -19,7 +19,11 @@ use flowy_collaboration::{
};
use lib_infra::future::FutureResultSend;
use std::{convert::TryInto, sync::Arc};
use std::{
convert::TryInto,
fmt::{Debug, Formatter},
sync::Arc,
};
use tokio::sync::{mpsc, oneshot};
pub fn make_document_ws_receiver(persistence: Arc<FlowyPersistence>) -> Arc<DocumentWebSocketReceiver> {
@ -69,23 +73,11 @@ impl WebSocketReceiver for DocumentWebSocketReceiver {
}
struct DocumentPersistenceImpl(Arc<FlowyPersistence>);
impl DocumentPersistence for DocumentPersistenceImpl {
// fn update_doc(&self, doc_id: &str, rev_id: i64, delta: RichTextDelta) ->
// FutureResultSend<(), CollaborateError> { let pg_pool =
// self.0.pg_pool(); let mut params = ResetDocumentParams::new();
// let doc_json = delta.to_json();
// params.set_doc_id(doc_id.to_string());
// params.set_data(doc_json);
// params.set_rev_id(rev_id);
//
// FutureResultSend::new(async move {
// let _ = update_doc(&pg_pool, params)
// .await
// .map_err(server_error_to_collaborate_error)?;
// Ok(())
// })
// }
impl Debug for DocumentPersistenceImpl {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("DocumentPersistenceImpl") }
}
impl DocumentPersistence for DocumentPersistenceImpl {
fn read_doc(&self, doc_id: &str) -> FutureResultSend<DocumentInfo, CollaborateError> {
let params = DocIdentifier {
doc_id: doc_id.to_string(),
@ -118,6 +110,21 @@ impl DocumentPersistence for DocumentPersistenceImpl {
Ok(doc)
})
}
fn get_revisions(&self, doc_id: &str, rev_ids: Vec<i64>) -> FutureResultSend<Vec<Revision>, CollaborateError> {
let kv_store = self.0.kv_store();
let doc_id = doc_id.to_owned();
let f = || async move {
let expected_len = rev_ids.len();
let mut pb = kv_store.batch_get_revisions(&doc_id, rev_ids).await?;
let repeated_revision: RepeatedRevision = (&mut pb).try_into()?;
let revisions = repeated_revision.into_inner();
assert_eq!(expected_len, revisions.len());
Ok(revisions)
};
FutureResultSend::new(async move { f().await.map_err(server_error_to_collaborate_error) })
}
}
fn server_error_to_collaborate_error(error: ServerError) -> CollaborateError {

View File

@ -13,6 +13,7 @@ use flowy_collaboration::{
};
use futures::stream::StreamExt;
use flowy_collaboration::protobuf::RepeatedRevision;
use std::{convert::TryInto, sync::Arc};
use tokio::sync::{mpsc, oneshot};
@ -108,39 +109,30 @@ impl DocumentWebSocketActor {
let mut new_user = spawn_blocking(move || parse_from_bytes::<NewDocumentUser>(&document_data.data))
.await
.map_err(internal_error)??;
let revision_pb = spawn_blocking(move || parse_from_bytes::<Revision>(&new_user.take_revision_data()))
.await
.map_err(internal_error)??;
let _ = self.handle_revision(user, revision_pb).await?;
let repeated_revisions =
spawn_blocking(move || parse_from_bytes::<RepeatedRevision>(&new_user.take_revision_data()))
.await
.map_err(internal_error)??;
let _ = self.handle_revision(user, repeated_revisions).await?;
Ok(())
}
async fn handle_pushed_rev(&self, user: Arc<ServerDocUser>, data: Vec<u8>) -> Result<()> {
let revision_pb = spawn_blocking(move || {
let revision: Revision = parse_from_bytes(&data)?;
// let _ = verify_md5(&revision)?;
Result::Ok(revision)
})
.await
.map_err(internal_error)??;
self.handle_revision(user, revision_pb).await
let repeated_revision = spawn_blocking(move || parse_from_bytes::<RepeatedRevision>(&data))
.await
.map_err(internal_error)??;
self.handle_revision(user, repeated_revision).await
}
async fn handle_revision(&self, user: Arc<ServerDocUser>, mut revision: Revision) -> Result<()> {
let revision: flowy_collaboration::entities::revision::Revision =
(&mut revision).try_into().map_err(internal_error)?;
// Create the document if it doesn't exist
let handler = match self.doc_manager.get(&revision.doc_id).await {
None => self
.doc_manager
.create_doc(revision.clone())
.await
.map_err(internal_error)?,
Some(handler) => handler,
};
handler.apply_revision(user, revision).await.map_err(internal_error)?;
async fn handle_revision(&self, user: Arc<ServerDocUser>, mut revisions: RepeatedRevision) -> Result<()> {
let repeated_revision: flowy_collaboration::entities::revision::RepeatedRevision =
(&mut revisions).try_into().map_err(internal_error)?;
let revisions = repeated_revision.into_inner();
let _ = self
.doc_manager
.apply_revisions(user, revisions)
.await
.map_err(internal_error)?;
Ok(())
}
}
@ -186,11 +178,14 @@ impl RevisionUser for ServerDocUser {
let msg: WSMessageAdaptor = data.into();
self.socket.try_send(msg).map_err(internal_error)
},
SyncResponse::NewRevision(revision) => {
SyncResponse::NewRevision(revisions) => {
let kv_store = self.persistence.kv_store();
tokio::task::spawn(async move {
let revision: flowy_collaboration::protobuf::Revision = revision.try_into().unwrap();
match kv_store.batch_set_revision(vec![revision]).await {
let revisions = revisions
.into_iter()
.map(|revision| revision.try_into().unwrap())
.collect::<Vec<_>>();
match kv_store.batch_set_revision(revisions).await {
Ok(_) => {},
Err(e) => log::error!("{}", e),
}

View File

@ -2,7 +2,7 @@ use std::sync::Arc;
use backend_service::configuration::ClientServerConfiguration;
use flowy_database::DBConnection;
use flowy_document::module::FlowyDocument;
use flowy_document::context::DocumentContext;
use lib_dispatch::prelude::*;
use lib_sqlite::ConnectionPool;
@ -43,7 +43,7 @@ pub trait WorkspaceDatabase: Send + Sync {
pub fn init_core(
user: Arc<dyn WorkspaceUser>,
database: Arc<dyn WorkspaceDatabase>,
flowy_document: Arc<FlowyDocument>,
flowy_document: Arc<DocumentContext>,
server_config: &ClientServerConfiguration,
) -> Arc<CoreContext> {
let server = construct_workspace_server(server_config);

View File

@ -20,7 +20,7 @@ use crate::{
};
use flowy_core_data_model::entities::share::{ExportData, ExportParams};
use flowy_database::kv::KV;
use flowy_document::module::FlowyDocument;
use flowy_document::context::DocumentContext;
const LATEST_VIEW_ID: &str = "latest_view_id";
@ -29,7 +29,7 @@ pub(crate) struct ViewController {
server: Server,
database: Arc<dyn WorkspaceDatabase>,
trash_can: Arc<TrashController>,
document: Arc<FlowyDocument>,
document_ctx: Arc<DocumentContext>,
}
impl ViewController {
@ -38,19 +38,19 @@ impl ViewController {
database: Arc<dyn WorkspaceDatabase>,
server: Server,
trash_can: Arc<TrashController>,
document: Arc<FlowyDocument>,
document_ctx: Arc<DocumentContext>,
) -> Self {
Self {
user,
server,
database,
trash_can,
document,
document_ctx,
}
}
pub(crate) fn init(&self) -> Result<(), FlowyError> {
let _ = self.document.init()?;
let _ = self.document_ctx.init()?;
self.listen_trash_can_event();
Ok(())
}
@ -112,7 +112,7 @@ impl ViewController {
#[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
pub(crate) async fn open_view(&self, params: DocIdentifier) -> Result<DocumentDelta, FlowyError> {
let doc_id = params.doc_id.clone();
let edit_context = self.document.open(params).await?;
let edit_context = self.document_ctx.open(params).await?;
KV::set_str(LATEST_VIEW_ID, doc_id);
Ok(edit_context.delta().await.map_err(internal_error)?)
@ -120,7 +120,7 @@ impl ViewController {
#[tracing::instrument(level = "debug", skip(self,params), fields(doc_id = %params.doc_id), err)]
pub(crate) async fn close_view(&self, params: DocIdentifier) -> Result<(), FlowyError> {
let _ = self.document.close(params).await?;
let _ = self.document_ctx.close(params).await?;
Ok(())
}
@ -131,7 +131,7 @@ impl ViewController {
let _ = KV::remove(LATEST_VIEW_ID);
}
}
let _ = self.document.close(params).await?;
let _ = self.document_ctx.close(params).await?;
Ok(())
}
@ -139,7 +139,7 @@ impl ViewController {
pub(crate) async fn duplicate_view(&self, params: DocIdentifier) -> Result<(), FlowyError> {
let view: View = ViewTableSql::read_view(&params.doc_id, &*self.database.db_connection()?)?.into();
let _delta_data = self
.document
.document_ctx
.read_document_data(params, self.database.db_pool()?)
.await?;
@ -159,7 +159,7 @@ impl ViewController {
pub(crate) async fn export_doc(&self, params: ExportParams) -> Result<ExportData, FlowyError> {
let doc_identifier: DocIdentifier = params.doc_id.into();
let doc = self
.document
.document_ctx
.read_document_data(doc_identifier, self.database.db_pool()?)
.await?;
@ -201,7 +201,7 @@ impl ViewController {
}
pub(crate) async fn apply_doc_delta(&self, params: DocumentDelta) -> Result<DocumentDelta, FlowyError> {
let doc = self.document.apply_doc_delta(params).await?;
let doc = self.document_ctx.apply_doc_delta(params).await?;
Ok(doc)
}
@ -276,7 +276,7 @@ impl ViewController {
fn listen_trash_can_event(&self) {
let mut rx = self.trash_can.subscribe();
let database = self.database.clone();
let document = self.document.clone();
let document = self.document_ctx.clone();
let trash_can = self.trash_can.clone();
let _ = tokio::spawn(async move {
loop {
@ -295,10 +295,10 @@ impl ViewController {
}
}
#[tracing::instrument(level = "trace", skip(database, document, trash_can))]
#[tracing::instrument(level = "trace", skip(database, context, trash_can))]
async fn handle_trash_event(
database: Arc<dyn WorkspaceDatabase>,
document: Arc<FlowyDocument>,
context: Arc<DocumentContext>,
trash_can: Arc<TrashController>,
event: TrashEvent,
) {
@ -337,7 +337,7 @@ async fn handle_trash_event(
for identifier in identifiers.items {
let view_table = ViewTableSql::read_view(&identifier.id, conn)?;
let _ = ViewTableSql::delete_view(&identifier.id, conn)?;
let _ = document.delete(identifier.id.clone().into())?;
let _ = context.delete(identifier.id.clone().into())?;
notify_ids.insert(view_table.belong_to_id);
}

View File

@ -2,7 +2,7 @@ use crate::{
errors::FlowyError,
services::{
controller::DocController,
doc::{edit::ClientDocEditor, DocumentWsHandlers},
doc::{edit::ClientDocEditor, DocumentWSReceivers, DocumentWebSocket},
server::construct_doc_server,
},
};
@ -18,19 +18,20 @@ pub trait DocumentUser: Send + Sync {
fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError>;
}
pub struct FlowyDocument {
pub struct DocumentContext {
doc_ctrl: Arc<DocController>,
user: Arc<dyn DocumentUser>,
}
impl FlowyDocument {
impl DocumentContext {
pub fn new(
user: Arc<dyn DocumentUser>,
ws_manager: Arc<DocumentWsHandlers>,
ws_receivers: Arc<DocumentWSReceivers>,
ws_sender: Arc<dyn DocumentWebSocket>,
server_config: &ClientServerConfiguration,
) -> FlowyDocument {
) -> DocumentContext {
let server = construct_doc_server(server_config);
let doc_ctrl = Arc::new(DocController::new(server, user.clone(), ws_manager));
let doc_ctrl = Arc::new(DocController::new(server, user.clone(), ws_receivers, ws_sender));
Self { doc_ctrl, user }
}

View File

@ -1,4 +1,4 @@
pub mod module;
pub mod context;
mod notify;
pub mod protobuf;
pub mod services;

View File

@ -1,11 +1,13 @@
use crate::{
context::DocumentUser,
errors::FlowyError,
module::DocumentUser,
services::{
doc::{
edit::ClientDocEditor,
revision::{RevisionCache, RevisionManager, RevisionServer},
DocumentWsHandlers,
DocumentWSReceivers,
DocumentWebSocket,
WSStateReceiver,
},
server::Server,
},
@ -20,24 +22,33 @@ use std::sync::Arc;
pub(crate) struct DocController {
server: Server,
ws_handlers: Arc<DocumentWsHandlers>,
ws_receivers: Arc<DocumentWSReceivers>,
ws_sender: Arc<dyn DocumentWebSocket>,
open_cache: Arc<OpenDocCache>,
user: Arc<dyn DocumentUser>,
}
impl DocController {
pub(crate) fn new(server: Server, user: Arc<dyn DocumentUser>, ws_handlers: Arc<DocumentWsHandlers>) -> Self {
pub(crate) fn new(
server: Server,
user: Arc<dyn DocumentUser>,
ws_receivers: Arc<DocumentWSReceivers>,
ws_sender: Arc<dyn DocumentWebSocket>,
) -> Self {
let open_cache = Arc::new(OpenDocCache::new());
Self {
server,
ws_handlers,
ws_receivers,
ws_sender,
open_cache,
user,
}
}
pub(crate) fn init(&self) -> FlowyResult<()> {
self.ws_handlers.init();
let notify = self.ws_sender.subscribe_state_changed();
listen_ws_state_changed(notify, self.ws_receivers.clone());
Ok(())
}
@ -47,7 +58,7 @@ impl DocController {
pool: Arc<ConnectionPool>,
) -> Result<Arc<ClientDocEditor>, FlowyError> {
if !self.open_cache.contains(&params.doc_id) {
let edit_ctx = self.make_edit_context(&params.doc_id, pool.clone()).await?;
let edit_ctx = self.make_editor(&params.doc_id, pool.clone()).await?;
return Ok(edit_ctx);
}
@ -58,7 +69,7 @@ impl DocController {
pub(crate) fn close(&self, doc_id: &str) -> Result<(), FlowyError> {
tracing::debug!("Close document {}", doc_id);
self.open_cache.remove(doc_id);
self.ws_handlers.remove_handler(doc_id);
self.ws_receivers.remove_receiver(doc_id);
Ok(())
}
@ -66,7 +77,7 @@ impl DocController {
pub(crate) fn delete(&self, params: DocIdentifier) -> Result<(), FlowyError> {
let doc_id = &params.doc_id;
self.open_cache.remove(doc_id);
self.ws_handlers.remove_handler(doc_id);
self.ws_receivers.remove_receiver(doc_id);
Ok(())
}
@ -92,11 +103,7 @@ impl DocController {
}
impl DocController {
async fn make_edit_context(
&self,
doc_id: &str,
pool: Arc<ConnectionPool>,
) -> Result<Arc<ClientDocEditor>, FlowyError> {
async fn make_editor(&self, doc_id: &str, pool: Arc<ConnectionPool>) -> Result<Arc<ClientDocEditor>, FlowyError> {
let user = self.user.clone();
let token = self.user.token()?;
let rev_manager = self.make_rev_manager(doc_id, pool.clone())?;
@ -104,17 +111,13 @@ impl DocController {
token,
server: self.server.clone(),
});
let doc_editor = ClientDocEditor::new(doc_id, user, pool, rev_manager, self.ws_handlers.ws(), server).await?;
let ws_handler = doc_editor.ws_handler();
self.ws_handlers.register_handler(doc_id, ws_handler);
let doc_editor = ClientDocEditor::new(doc_id, user, pool, rev_manager, self.ws_sender.clone(), server).await?;
self.ws_receivers.register_receiver(doc_id, doc_editor.ws_handler());
self.open_cache.insert(&doc_id, &doc_editor);
Ok(doc_editor)
}
fn make_rev_manager(&self, doc_id: &str, pool: Arc<ConnectionPool>) -> Result<RevisionManager, FlowyError> {
// Opti: require upgradable_read lock and then upgrade to write lock using
// RwLockUpgradableReadGuard::upgrade(xx) of ws
// let document = self.read_doc(doc_id, pool.clone()).await?;
let user_id = self.user.user_id()?;
let cache = Arc::new(RevisionCache::new(&user_id, doc_id, pool));
Ok(RevisionManager::new(&user_id, doc_id, cache))
@ -181,3 +184,12 @@ impl OpenDocCache {
fn doc_not_found() -> FlowyError {
FlowyError::record_not_found().context("Doc is close or you should call open first")
}
#[tracing::instrument(level = "debug", skip(state_receiver, receivers))]
fn listen_ws_state_changed(mut state_receiver: WSStateReceiver, receivers: Arc<DocumentWSReceivers>) {
tokio::spawn(async move {
while let Ok(state) = state_receiver.recv().await {
receivers.ws_connect_state_changed(&state);
}
});
}

View File

@ -1,8 +1,8 @@
use crate::{
context::DocumentUser,
errors::FlowyError,
module::DocumentUser,
services::doc::{
web_socket::{initialize_document_web_socket, DocumentWebSocketContext, EditorWebSocket},
web_socket::{make_document_ws_manager, DocumentWebSocketManager},
*,
},
};
@ -27,8 +27,8 @@ use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
pub struct ClientDocEditor {
pub doc_id: String,
rev_manager: Arc<RevisionManager>,
editor_ws: Arc<dyn EditorWebSocket>,
editor_cmd_sender: UnboundedSender<EditorCommand>,
ws_manager: Arc<dyn DocumentWebSocketManager>,
edit_queue: UnboundedSender<EditorCommand>,
user: Arc<dyn DocumentUser>,
}
@ -42,25 +42,24 @@ impl ClientDocEditor {
server: Arc<dyn RevisionServer>,
) -> FlowyResult<Arc<Self>> {
let delta = rev_manager.load_document(server).await?;
let editor_cmd_sender = spawn_edit_queue(doc_id, delta, pool.clone());
let edit_queue = spawn_edit_queue(doc_id, delta, pool.clone());
let doc_id = doc_id.to_string();
let user_id = user.user_id()?;
let rev_manager = Arc::new(rev_manager);
let context = DocumentWebSocketContext {
doc_id: doc_id.to_owned(),
user_id: user_id.clone(),
editor_cmd_sender: editor_cmd_sender.clone(),
rev_manager: rev_manager.clone(),
let ws_manager = make_document_ws_manager(
doc_id.clone(),
user_id.clone(),
edit_queue.clone(),
rev_manager.clone(),
ws,
};
let editor_ws = initialize_document_web_socket(context).await;
)
.await;
let editor = Arc::new(Self {
doc_id,
rev_manager,
editor_ws,
editor_cmd_sender,
ws_manager,
edit_queue,
user,
});
Ok(editor)
@ -73,7 +72,7 @@ impl ClientDocEditor {
data: data.to_string(),
ret,
};
let _ = self.editor_cmd_sender.send(msg);
let _ = self.edit_queue.send(msg);
let (delta, md5) = rx.await.map_err(internal_error)??;
let _ = self.save_local_delta(delta, md5).await?;
Ok(())
@ -82,7 +81,7 @@ impl ClientDocEditor {
pub async fn delete(&self, interval: Interval) -> Result<(), FlowyError> {
let (ret, rx) = oneshot::channel::<CollaborateResult<NewDelta>>();
let msg = EditorCommand::Delete { interval, ret };
let _ = self.editor_cmd_sender.send(msg);
let _ = self.edit_queue.send(msg);
let (delta, md5) = rx.await.map_err(internal_error)??;
let _ = self.save_local_delta(delta, md5).await?;
Ok(())
@ -95,7 +94,7 @@ impl ClientDocEditor {
attribute,
ret,
};
let _ = self.editor_cmd_sender.send(msg);
let _ = self.edit_queue.send(msg);
let (delta, md5) = rx.await.map_err(internal_error)??;
let _ = self.save_local_delta(delta, md5).await?;
Ok(())
@ -108,7 +107,7 @@ impl ClientDocEditor {
data: data.to_string(),
ret,
};
let _ = self.editor_cmd_sender.send(msg);
let _ = self.edit_queue.send(msg);
let (delta, md5) = rx.await.map_err(internal_error)??;
let _ = self.save_local_delta(delta, md5).await?;
Ok(())
@ -117,21 +116,21 @@ impl ClientDocEditor {
pub async fn can_undo(&self) -> bool {
let (ret, rx) = oneshot::channel::<bool>();
let msg = EditorCommand::CanUndo { ret };
let _ = self.editor_cmd_sender.send(msg);
let _ = self.edit_queue.send(msg);
rx.await.unwrap_or(false)
}
pub async fn can_redo(&self) -> bool {
let (ret, rx) = oneshot::channel::<bool>();
let msg = EditorCommand::CanRedo { ret };
let _ = self.editor_cmd_sender.send(msg);
let _ = self.edit_queue.send(msg);
rx.await.unwrap_or(false)
}
pub async fn undo(&self) -> Result<UndoResult, FlowyError> {
let (ret, rx) = oneshot::channel::<CollaborateResult<UndoResult>>();
let msg = EditorCommand::Undo { ret };
let _ = self.editor_cmd_sender.send(msg);
let _ = self.edit_queue.send(msg);
let r = rx.await.map_err(internal_error)??;
Ok(r)
}
@ -139,7 +138,7 @@ impl ClientDocEditor {
pub async fn redo(&self) -> Result<UndoResult, FlowyError> {
let (ret, rx) = oneshot::channel::<CollaborateResult<UndoResult>>();
let msg = EditorCommand::Redo { ret };
let _ = self.editor_cmd_sender.send(msg);
let _ = self.edit_queue.send(msg);
let r = rx.await.map_err(internal_error)??;
Ok(r)
}
@ -147,7 +146,7 @@ impl ClientDocEditor {
pub async fn delta(&self) -> FlowyResult<DocumentDelta> {
let (ret, rx) = oneshot::channel::<CollaborateResult<DocumentMD5>>();
let msg = EditorCommand::ReadDoc { ret };
let _ = self.editor_cmd_sender.send(msg);
let _ = self.edit_queue.send(msg);
let data = rx.await.map_err(internal_error)??;
Ok(DocumentDelta {
@ -181,7 +180,7 @@ impl ClientDocEditor {
delta: delta.clone(),
ret,
};
let _ = self.editor_cmd_sender.send(msg);
let _ = self.edit_queue.send(msg);
let md5 = rx.await.map_err(internal_error)??;
let _ = self.save_local_delta(delta, md5).await?;
@ -189,9 +188,9 @@ impl ClientDocEditor {
}
#[tracing::instrument(level = "debug", skip(self))]
pub fn stop(&self) { self.editor_ws.stop_web_socket(); }
pub fn stop(&self) { self.ws_manager.stop(); }
pub(crate) fn ws_handler(&self) -> Arc<dyn DocumentWsHandler> { self.editor_ws.ws_handler() }
pub(crate) fn ws_handler(&self) -> Arc<dyn DocumentWSReceiver> { self.ws_manager.receiver() }
}
fn spawn_edit_queue(doc_id: &str, delta: RichTextDelta, _pool: Arc<ConnectionPool>) -> UnboundedSender<EditorCommand> {
@ -206,7 +205,7 @@ impl ClientDocEditor {
pub async fn doc_json(&self) -> FlowyResult<String> {
let (ret, rx) = oneshot::channel::<CollaborateResult<DocumentMD5>>();
let msg = EditorCommand::ReadDoc { ret };
let _ = self.editor_cmd_sender.send(msg);
let _ = self.edit_queue.send(msg);
let s = rx.await.map_err(internal_error)??;
Ok(s)
}
@ -214,7 +213,7 @@ impl ClientDocEditor {
pub async fn doc_delta(&self) -> FlowyResult<RichTextDelta> {
let (ret, rx) = oneshot::channel::<CollaborateResult<RichTextDelta>>();
let msg = EditorCommand::ReadDocDelta { ret };
let _ = self.editor_cmd_sender.send(msg);
let _ = self.edit_queue.send(msg);
let delta = rx.await.map_err(internal_error)??;
Ok(delta)
}

View File

@ -1,5 +1,5 @@
mod editor;
mod editor_cmd_queue;
mod queue;
pub use editor::*;
pub(crate) use editor_cmd_queue::*;
pub(crate) use queue::*;

View File

@ -1,8 +1,8 @@
use async_stream::stream;
use bytes::Bytes;
use flowy_collaboration::{
core::document::{history::UndoResult, Document},
entities::revision::{RevId, Revision},
entities::revision::Revision,
errors::CollaborateError,
};
use flowy_error::FlowyError;
@ -11,7 +11,7 @@ use lib_ot::{
core::{Interval, OperationTransformable},
rich_text::{RichTextAttribute, RichTextDelta},
};
use std::{convert::TryFrom, sync::Arc};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, RwLock};
pub(crate) struct EditorCommandQueue {
@ -56,19 +56,29 @@ impl EditorCommandQueue {
let result = self.composed_delta(delta).await;
let _ = ret.send(result);
},
EditorCommand::ProcessRemoteRevision { bytes, ret } => {
EditorCommand::ProcessRemoteRevision { revisions, ret } => {
let f = || async {
let revision = Revision::try_from(bytes)?;
let delta = RichTextDelta::from_bytes(&revision.delta_data)?;
let server_rev_id: RevId = revision.rev_id.into();
let mut new_delta = RichTextDelta::new();
for revision in revisions {
match RichTextDelta::from_bytes(revision.delta_data) {
Ok(delta) => {
new_delta = new_delta.compose(&delta)?;
},
Err(e) => {
let err_msg = format!("Deserialize remote revision failed: {:?}", e);
log::error!("{}", err_msg);
return Err(CollaborateError::internal().context(err_msg));
},
}
}
let read_guard = self.document.read().await;
let (server_prime, client_prime) = read_guard.delta().transform(&delta)?;
let (server_prime, client_prime) = read_guard.delta().transform(&new_delta)?;
drop(read_guard);
let transform_delta = TransformDeltas {
client_prime,
server_prime,
server_rev_id,
};
Ok::<TransformDeltas, CollaborateError>(transform_delta)
@ -157,7 +167,7 @@ pub(crate) enum EditorCommand {
ret: Ret<DocumentMD5>,
},
ProcessRemoteRevision {
bytes: Bytes,
revisions: Vec<Revision>,
ret: Ret<TransformDeltas>,
},
Insert {
@ -203,5 +213,4 @@ pub(crate) enum EditorCommand {
pub(crate) struct TransformDeltas {
pub client_prime: RichTextDelta,
pub server_prime: RichTextDelta,
pub server_rev_id: RevId,
}

View File

@ -1,7 +1,7 @@
pub mod edit;
pub mod revision;
mod web_socket;
pub use crate::services::ws_handlers::*;
pub use crate::services::ws_receivers::*;
pub use edit::*;
pub use revision::*;

View File

@ -86,15 +86,15 @@ impl RevisionCache {
pub async fn latest_revision(&self) -> Revision {
let rev_id = self.latest_rev_id.load(SeqCst);
self.get_revision(&self.doc_id, rev_id).await.unwrap().revision
self.get_revision(rev_id).await.unwrap().revision
}
pub async fn get_revision(&self, doc_id: &str, rev_id: i64) -> Option<RevisionRecord> {
pub async fn get_revision(&self, rev_id: i64) -> Option<RevisionRecord> {
match self.memory_cache.get_revision(&rev_id).await {
None => match self.disk_cache.read_revision(&self.doc_id, rev_id) {
Ok(Some(revision)) => Some(revision),
Ok(None) => {
tracing::warn!("Can't find revision in {} with rev_id: {}", doc_id, rev_id);
tracing::warn!("Can't find revision in {} with rev_id: {}", &self.doc_id, rev_id);
None
},
Err(e) => {

View File

@ -47,16 +47,19 @@ impl RevisionManager {
.load()
.await?;
let doc = mk_doc_from_revisions(&self.doc_id, revisions)?;
self.update_rev_id_counter_value(doc.rev_id);
self.rev_id_counter.set(doc.rev_id);
Ok(doc.delta()?)
}
pub async fn add_remote_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
assert_eq!(revision.ty, RevType::Remote);
self.rev_id_counter.set(revision.rev_id);
let _ = self.cache.add_remote_revision(revision.clone()).await?;
Ok(())
}
pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
assert_eq!(revision.ty, RevType::Local);
let _ = self.cache.add_local_revision(revision.clone()).await?;
Ok(())
}
@ -76,38 +79,19 @@ impl RevisionManager {
pub fn update_rev_id_counter_value(&self, rev_id: i64) { self.rev_id_counter.set(rev_id); }
pub async fn mk_revisions(&self, range: RevisionRange) -> Result<Revision, FlowyError> {
pub async fn get_revisions_in_range(&self, range: RevisionRange) -> Result<Vec<Revision>, FlowyError> {
debug_assert!(range.doc_id == self.doc_id);
let revisions = self.cache.revisions_in_range(range.clone()).await?;
let mut new_delta = RichTextDelta::new();
// TODO: generate delta from revision should be wrapped into function.
for revision in revisions {
match RichTextDelta::from_bytes(revision.delta_data) {
Ok(delta) => {
new_delta = new_delta.compose(&delta)?;
},
Err(e) => log::error!("{}", e),
}
}
let delta_data = new_delta.to_bytes();
let md5 = md5(&delta_data);
let revision = Revision::new(
&self.doc_id,
range.start,
range.end,
delta_data,
RevType::Remote,
&self.user_id,
md5,
);
Ok(revision)
Ok(revisions)
}
pub fn next_sync_revision(&self) -> FutureResult<Option<Revision>, FlowyError> { self.cache.next_sync_revision() }
pub async fn latest_revision(&self) -> Revision { self.cache.latest_revision().await }
pub async fn get_revision(&self, rev_id: i64) -> Option<Revision> {
self.cache.get_revision(rev_id).await.map(|record| record.revision)
}
}
#[cfg(feature = "flowy_unit_test")]

View File

@ -1,6 +1,6 @@
use crate::services::{
doc::{web_socket::web_socket::EditorWebSocket, SYNC_INTERVAL_IN_MILLIS},
ws_handlers::{DocumentWebSocket, DocumentWsHandler},
doc::{web_socket::web_socket::DocumentWebSocketManager, SYNC_INTERVAL_IN_MILLIS},
ws_receivers::{DocumentWSReceiver, DocumentWebSocket},
};
use async_stream::stream;
use bytes::Bytes;
@ -23,7 +23,7 @@ use tokio::{
time::{interval, Duration},
};
pub struct EditorHttpWebSocket {
pub(crate) struct HttpWebSocketManager {
doc_id: String,
data_provider: Arc<dyn DocumentWSSinkDataProvider>,
stream_consumer: Arc<dyn DocumentWSSteamConsumer>,
@ -34,8 +34,8 @@ pub struct EditorHttpWebSocket {
state: broadcast::Sender<WSConnectState>,
}
impl EditorHttpWebSocket {
pub fn new(
impl HttpWebSocketManager {
pub(crate) fn new(
doc_id: &str,
ws: Arc<dyn DocumentWebSocket>,
data_provider: Arc<dyn DocumentWSSinkDataProvider>,
@ -45,7 +45,7 @@ impl EditorHttpWebSocket {
let (stop_sync_tx, _) = tokio::sync::broadcast::channel(2);
let doc_id = doc_id.to_string();
let (state, _) = broadcast::channel(2);
let mut manager = EditorHttpWebSocket {
let mut manager = HttpWebSocketManager {
doc_id,
data_provider,
stream_consumer,
@ -55,19 +55,19 @@ impl EditorHttpWebSocket {
stop_sync_tx,
state,
};
manager.start_web_socket();
manager.run();
manager
}
fn start_web_socket(&mut self) {
fn run(&mut self) {
let ws_msg_rx = self.ws_msg_rx.take().expect("Only take once");
let sink = DocumentWebSocketSink::new(
let sink = DocumentWSSink::new(
&self.doc_id,
self.data_provider.clone(),
self.ws.clone(),
self.stop_sync_tx.subscribe(),
);
let stream = DocumentWebSocketStream::new(
let stream = DocumentWSStream::new(
&self.doc_id,
self.stream_consumer.clone(),
ws_msg_rx,
@ -80,18 +80,18 @@ impl EditorHttpWebSocket {
pub fn scribe_state(&self) -> broadcast::Receiver<WSConnectState> { self.state.subscribe() }
}
impl EditorWebSocket for Arc<EditorHttpWebSocket> {
fn stop_web_socket(&self) {
impl DocumentWebSocketManager for Arc<HttpWebSocketManager> {
fn stop(&self) {
if self.stop_sync_tx.send(()).is_ok() {
tracing::debug!("{} stop sync", self.doc_id)
}
}
fn ws_handler(&self) -> Arc<dyn DocumentWsHandler> { self.clone() }
fn receiver(&self) -> Arc<dyn DocumentWSReceiver> { self.clone() }
}
impl DocumentWsHandler for EditorHttpWebSocket {
fn receive(&self, doc_data: DocumentWSData) {
impl DocumentWSReceiver for HttpWebSocketManager {
fn receive_ws_data(&self, doc_data: DocumentWSData) {
match self.ws_msg_tx.send(doc_data) {
Ok(_) => {},
Err(e) => tracing::error!("❌Propagate ws message failed. {}", e),
@ -110,24 +110,24 @@ pub trait DocumentWSSteamConsumer: Send + Sync {
fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError>;
fn receive_ack(&self, id: String, ty: DocumentWSDataType) -> FutureResult<(), FlowyError>;
fn receive_new_user_connect(&self, new_user: NewDocumentUser) -> FutureResult<(), FlowyError>;
fn send_revision_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError>;
fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError>;
}
pub struct DocumentWebSocketStream {
pub struct DocumentWSStream {
doc_id: String,
consumer: Arc<dyn DocumentWSSteamConsumer>,
ws_msg_rx: Option<mpsc::UnboundedReceiver<DocumentWSData>>,
stop_rx: Option<SinkStopRx>,
}
impl DocumentWebSocketStream {
impl DocumentWSStream {
pub fn new(
doc_id: &str,
consumer: Arc<dyn DocumentWSSteamConsumer>,
ws_msg_rx: mpsc::UnboundedReceiver<DocumentWSData>,
stop_rx: SinkStopRx,
) -> Self {
DocumentWebSocketStream {
DocumentWSStream {
doc_id: doc_id.to_owned(),
consumer,
ws_msg_rx: Some(ws_msg_rx),
@ -190,7 +190,7 @@ impl DocumentWebSocketStream {
},
DocumentWSDataType::PullRev => {
let range = RevisionRange::try_from(bytes)?;
let _ = self.consumer.send_revision_in_range(range).await?;
let _ = self.consumer.pull_revisions_in_range(range).await?;
},
DocumentWSDataType::Ack => {
let _ = self.consumer.receive_ack(id, ty).await;
@ -214,14 +214,14 @@ pub trait DocumentWSSinkDataProvider: Send + Sync {
fn next(&self) -> FutureResult<Option<DocumentWSData>, FlowyError>;
}
pub struct DocumentWebSocketSink {
pub struct DocumentWSSink {
provider: Arc<dyn DocumentWSSinkDataProvider>,
ws_sender: Arc<dyn DocumentWebSocket>,
stop_rx: Option<SinkStopRx>,
doc_id: String,
}
impl DocumentWebSocketSink {
impl DocumentWSSink {
pub fn new(
doc_id: &str,
provider: Arc<dyn DocumentWSSinkDataProvider>,

View File

@ -1,18 +1,18 @@
use crate::services::doc::{web_socket::EditorWebSocket, DocumentWsHandler};
use crate::services::doc::{web_socket::DocumentWebSocketManager, DocumentWSReceiver};
use flowy_collaboration::entities::ws::DocumentWSData;
use lib_ws::WSConnectState;
use std::sync::Arc;
pub(crate) struct EditorLocalWebSocket {}
pub(crate) struct LocalWebSocketManager {}
impl EditorWebSocket for Arc<EditorLocalWebSocket> {
fn stop_web_socket(&self) {}
impl DocumentWebSocketManager for Arc<LocalWebSocketManager> {
fn stop(&self) {}
fn ws_handler(&self) -> Arc<dyn DocumentWsHandler> { self.clone() }
fn receiver(&self) -> Arc<dyn DocumentWSReceiver> { self.clone() }
}
impl DocumentWsHandler for EditorLocalWebSocket {
fn receive(&self, _doc_data: DocumentWSData) {}
impl DocumentWSReceiver for LocalWebSocketManager {
fn receive_ws_data(&self, _doc_data: DocumentWSData) {}
fn connect_state_changed(&self, _state: &WSConnectState) {}
}

View File

@ -1,13 +1,13 @@
use crate::services::doc::{
web_socket::{
local_ws_impl::EditorLocalWebSocket,
local_ws_impl::LocalWebSocketManager,
DocumentWSSinkDataProvider,
DocumentWSSteamConsumer,
EditorHttpWebSocket,
HttpWebSocketManager,
},
DocumentMD5,
DocumentWSReceiver,
DocumentWebSocket,
DocumentWsHandler,
EditorCommand,
RevisionManager,
TransformDeltas,
@ -15,14 +15,14 @@ use crate::services::doc::{
use bytes::Bytes;
use flowy_collaboration::{
entities::{
revision::{Revision, RevisionRange},
revision::{RepeatedRevision, RevType, Revision, RevisionRange},
ws::{DocumentWSData, DocumentWSDataBuilder, DocumentWSDataType, NewDocumentUser},
},
errors::CollaborateResult,
};
use flowy_error::{internal_error, FlowyError, FlowyResult};
use lib_infra::future::FutureResult;
use lib_ot::rich_text::RichTextDelta;
use lib_ws::WSConnectState;
use std::{
collections::VecDeque,
@ -31,66 +31,50 @@ use std::{
};
use tokio::sync::{broadcast, mpsc::UnboundedSender, oneshot, RwLock};
pub(crate) trait EditorWebSocket: Send + Sync {
fn stop_web_socket(&self);
fn ws_handler(&self) -> Arc<dyn DocumentWsHandler>;
pub(crate) trait DocumentWebSocketManager: Send + Sync {
fn stop(&self);
fn receiver(&self) -> Arc<dyn DocumentWSReceiver>;
}
pub(crate) struct DocumentWebSocketContext {
pub(crate) doc_id: String,
pub(crate) user_id: String,
pub(crate) editor_cmd_sender: UnboundedSender<EditorCommand>,
pub(crate) rev_manager: Arc<RevisionManager>,
pub(crate) ws: Arc<dyn DocumentWebSocket>,
}
pub(crate) async fn initialize_document_web_socket(ctx: DocumentWebSocketContext) -> Arc<dyn EditorWebSocket> {
pub(crate) async fn make_document_ws_manager(
doc_id: String,
user_id: String,
editor_edit_queue: UnboundedSender<EditorCommand>,
rev_manager: Arc<RevisionManager>,
ws: Arc<dyn DocumentWebSocket>,
) -> Arc<dyn DocumentWebSocketManager> {
if cfg!(feature = "http_server") {
let combined_sink = Arc::new(CombinedSink::new(ctx.rev_manager.clone()));
let shared_sink = Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone()));
let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter {
doc_id: ctx.doc_id.clone(),
user_id: ctx.user_id.clone(),
editor_cmd_sender: ctx.editor_cmd_sender.clone(),
rev_manager: ctx.rev_manager.clone(),
combined_sink: combined_sink.clone(),
doc_id: doc_id.clone(),
user_id: user_id.clone(),
editor_edit_queue: editor_edit_queue.clone(),
rev_manager: rev_manager.clone(),
shared_sink: shared_sink.clone(),
});
let ws_stream_provider = DocumentWSSinkDataProviderAdapter(combined_sink.clone());
let editor_ws = Arc::new(EditorHttpWebSocket::new(
&ctx.doc_id,
ctx.ws.clone(),
let ws_stream_provider = DocumentWSSinkDataProviderAdapter(shared_sink.clone());
let ws_manager = Arc::new(HttpWebSocketManager::new(
&doc_id,
ws.clone(),
Arc::new(ws_stream_provider),
ws_stream_consumer,
));
notify_user_has_connected(&user_id, &doc_id, rev_manager.clone(), shared_sink).await;
listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(), rev_manager.clone());
notify_user_conn(
&ctx.user_id,
&ctx.doc_id,
ctx.rev_manager.clone(),
combined_sink.clone(),
)
.await;
listen_document_ws_state(
&ctx.user_id,
&ctx.doc_id,
editor_ws.scribe_state(),
ctx.rev_manager.clone(),
combined_sink,
);
Arc::new(editor_ws)
Arc::new(ws_manager)
} else {
Arc::new(Arc::new(EditorLocalWebSocket {}))
Arc::new(Arc::new(LocalWebSocketManager {}))
}
}
async fn notify_user_conn(
async fn notify_user_has_connected(
user_id: &str,
doc_id: &str,
rev_manager: Arc<RevisionManager>,
combined_sink: Arc<CombinedSink>,
shared_sink: Arc<SharedWSSinkDataProvider>,
) {
let need_notify = match combined_sink.front().await {
let need_notify = match shared_sink.front().await {
None => true,
Some(data) => data.ty != DocumentWSDataType::UserConnect,
};
@ -104,29 +88,22 @@ async fn notify_user_conn(
};
let data = DocumentWSDataBuilder::build_new_document_user_message(doc_id, new_connect);
combined_sink.push_front(data).await;
shared_sink.push_front(data).await;
}
}
fn listen_document_ws_state(
user_id: &str,
doc_id: &str,
_user_id: &str,
_doc_id: &str,
mut subscriber: broadcast::Receiver<WSConnectState>,
rev_manager: Arc<RevisionManager>,
sink_data_provider: Arc<CombinedSink>,
_rev_manager: Arc<RevisionManager>,
) {
let user_id = user_id.to_owned();
let doc_id = doc_id.to_owned();
tokio::spawn(async move {
while let Ok(state) = subscriber.recv().await {
match state {
WSConnectState::Init => {},
WSConnectState::Connecting => {},
WSConnectState::Connected => {
// self.notify_user_conn()
notify_user_conn(&user_id, &doc_id, rev_manager.clone(), sink_data_provider.clone()).await;
},
WSConnectState::Connected => {},
WSConnectState::Disconnected => {},
}
}
@ -136,29 +113,31 @@ fn listen_document_ws_state(
pub(crate) struct DocumentWebSocketSteamConsumerAdapter {
pub(crate) doc_id: String,
pub(crate) user_id: String,
pub(crate) editor_cmd_sender: UnboundedSender<EditorCommand>,
pub(crate) editor_edit_queue: UnboundedSender<EditorCommand>,
pub(crate) rev_manager: Arc<RevisionManager>,
pub(crate) combined_sink: Arc<CombinedSink>,
pub(crate) shared_sink: Arc<SharedWSSinkDataProvider>,
}
impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter {
fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError> {
let user_id = self.user_id.clone();
let rev_manager = self.rev_manager.clone();
let edit_cmd_tx = self.editor_cmd_sender.clone();
let combined_sink = self.combined_sink.clone();
let edit_cmd_tx = self.editor_edit_queue.clone();
let shared_sink = self.shared_sink.clone();
let doc_id = self.doc_id.clone();
FutureResult::new(async move {
if let Some(revision) = handle_push_rev(&doc_id, &user_id, edit_cmd_tx, rev_manager, bytes).await? {
combined_sink.push_back(revision.into()).await;
if let Some(server_composed_revision) =
handle_push_rev(&doc_id, &user_id, edit_cmd_tx, rev_manager, bytes).await?
{
shared_sink.push_back(server_composed_revision.into()).await;
}
Ok(())
})
}
fn receive_ack(&self, id: String, ty: DocumentWSDataType) -> FutureResult<(), FlowyError> {
let combined_sink = self.combined_sink.clone();
FutureResult::new(async move { combined_sink.ack(id, ty).await })
let shared_sink = self.shared_sink.clone();
FutureResult::new(async move { shared_sink.ack(id, ty).await })
}
fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> FutureResult<(), FlowyError> {
@ -166,22 +145,28 @@ impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter {
FutureResult::new(async move { Ok(()) })
}
fn send_revision_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> {
fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> {
let rev_manager = self.rev_manager.clone();
let combined_sink = self.combined_sink.clone();
let shared_sink = self.shared_sink.clone();
FutureResult::new(async move {
let revision = rev_manager.mk_revisions(range).await?;
combined_sink.push_back(revision.into()).await;
let data = rev_manager
.get_revisions_in_range(range)
.await?
.into_iter()
.map(|revision| revision.into())
.collect::<Vec<DocumentWSData>>();
shared_sink.append(data).await;
Ok(())
})
}
}
pub(crate) struct DocumentWSSinkDataProviderAdapter(pub(crate) Arc<CombinedSink>);
pub(crate) struct DocumentWSSinkDataProviderAdapter(pub(crate) Arc<SharedWSSinkDataProvider>);
impl DocumentWSSinkDataProvider for DocumentWSSinkDataProviderAdapter {
fn next(&self) -> FutureResult<Option<DocumentWSData>, FlowyError> {
let combined_sink = self.0.clone();
FutureResult::new(async move { combined_sink.next().await })
let shared_sink = self.0.clone();
FutureResult::new(async move { shared_sink.next().await })
}
}
@ -194,60 +179,68 @@ pub(crate) async fn handle_push_rev(
bytes: Bytes,
) -> FlowyResult<Option<Revision>> {
// Transform the revision
let (_ret, _rx) = oneshot::channel::<CollaborateResult<TransformDeltas>>();
let revision = Revision::try_from(bytes)?;
let delta = RichTextDelta::from_bytes(&revision.delta_data)?;
let server_rev_id = revision.rev_id;
// let _ = edit_cmd_tx.send(EditorCommand::ProcessRemoteRevision { bytes, ret
// }); let TransformDeltas {
// client_prime,
// server_prime,
// server_rev_id,
// } = rx.await.map_err(internal_error)??;
//
// if rev_manager.rev_id() >= server_rev_id.value {
// // Ignore this push revision if local_rev_id >= server_rev_id
// return Ok(None);
// }
let (ret, rx) = oneshot::channel::<CollaborateResult<TransformDeltas>>();
let mut revisions = RepeatedRevision::try_from(bytes)?.into_inner();
if revisions.is_empty() {
return Ok(None);
}
let first_revision = revisions.first().unwrap();
if let Some(local_revision) = rev_manager.get_revision(first_revision.rev_id).await {
if local_revision.md5 != first_revision.md5 {
// The local revision is equal to the pushed revision. Just ignore it.
return Ok(None);
}
}
let revisions = revisions.split_off(1);
if revisions.is_empty() {
return Ok(None);
}
let _ = edit_cmd_tx.send(EditorCommand::ProcessRemoteRevision {
revisions: revisions.clone(),
ret,
});
let TransformDeltas {
client_prime,
server_prime,
} = rx.await.map_err(internal_error)??;
for revision in &revisions {
let _ = rev_manager.add_remote_revision(revision).await?;
}
// compose delta
let (ret, rx) = oneshot::channel::<CollaborateResult<DocumentMD5>>();
let msg = EditorCommand::ComposeDelta {
delta: delta.clone(),
let _ = edit_cmd_tx.send(EditorCommand::ComposeDelta {
delta: client_prime.clone(),
ret,
};
let _ = edit_cmd_tx.send(msg);
let _md5 = rx.await.map_err(internal_error)??;
// update rev id
rev_manager.update_rev_id_counter_value(server_rev_id);
// let (local_base_rev_id, local_rev_id) = rev_manager.next_rev_id();
// let delta_data = client_prime.to_bytes();
// // save the revision
// let revision = Revision::new(
// &doc_id,
// local_base_rev_id,
// local_rev_id,
// delta_data,
// RevType::Remote,
// &user_id,
// md5.clone(),
// );
});
let md5 = rx.await.map_err(internal_error)??;
let (local_base_rev_id, local_rev_id) = rev_manager.next_rev_id();
// save the revision
let revision = Revision::new(
&doc_id,
local_base_rev_id,
local_rev_id,
client_prime.to_bytes(),
RevType::Remote,
&user_id,
md5.clone(),
);
let _ = rev_manager.add_remote_revision(&revision).await?;
// send the server_prime delta
// let delta_data = server_prime.to_bytes();
// Ok(Some(Revision::new(
// &doc_id,
// local_base_rev_id,
// local_rev_id,
// delta_data,
// RevType::Remote,
// &user_id,
// md5,
// )))
Ok(None)
Ok(Some(Revision::new(
&doc_id,
local_base_rev_id,
local_rev_id,
server_prime.to_bytes(),
RevType::Local,
&user_id,
md5,
)))
}
#[derive(Clone)]
@ -257,15 +250,15 @@ enum SourceType {
}
#[derive(Clone)]
pub(crate) struct CombinedSink {
pub(crate) struct SharedWSSinkDataProvider {
shared: Arc<RwLock<VecDeque<DocumentWSData>>>,
rev_manager: Arc<RevisionManager>,
source_ty: Arc<RwLock<SourceType>>,
}
impl CombinedSink {
impl SharedWSSinkDataProvider {
pub(crate) fn new(rev_manager: Arc<RevisionManager>) -> Self {
CombinedSink {
SharedWSSinkDataProvider {
shared: Arc::new(RwLock::new(VecDeque::new())),
rev_manager,
source_ty: Arc::new(RwLock::new(SourceType::Shared)),
@ -279,6 +272,11 @@ impl CombinedSink {
async fn push_back(&self, data: DocumentWSData) { self.shared.write().await.push_back(data); }
async fn append(&self, data: Vec<DocumentWSData>) {
let mut buf: VecDeque<_> = data.into_iter().collect();
self.shared.write().await.append(&mut buf);
}
async fn next(&self) -> FlowyResult<Option<DocumentWSData>> {
let source_ty = self.source_ty.read().await.clone();
match source_ty {

View File

@ -1,4 +1,4 @@
pub(crate) mod controller;
pub mod doc;
pub mod server;
mod ws_handlers;
mod ws_receivers;

View File

@ -1,67 +0,0 @@
use crate::errors::FlowyError;
use bytes::Bytes;
use dashmap::DashMap;
use flowy_collaboration::entities::ws::DocumentWSData;
use lib_ws::WSConnectState;
use std::{convert::TryInto, sync::Arc};
pub(crate) trait DocumentWsHandler: Send + Sync {
fn receive(&self, data: DocumentWSData);
fn connect_state_changed(&self, state: &WSConnectState);
}
pub type WsStateReceiver = tokio::sync::broadcast::Receiver<WSConnectState>;
pub trait DocumentWebSocket: Send + Sync {
fn send(&self, data: DocumentWSData) -> Result<(), FlowyError>;
fn subscribe_state_changed(&self) -> WsStateReceiver;
}
pub struct DocumentWsHandlers {
ws: Arc<dyn DocumentWebSocket>,
// key: the document id
handlers: Arc<DashMap<String, Arc<dyn DocumentWsHandler>>>,
}
impl DocumentWsHandlers {
pub fn new(ws: Arc<dyn DocumentWebSocket>) -> Self {
let handlers: Arc<DashMap<String, Arc<dyn DocumentWsHandler>>> = Arc::new(DashMap::new());
Self { ws, handlers }
}
pub(crate) fn init(&self) { listen_ws_state_changed(self.ws.clone(), self.handlers.clone()); }
pub(crate) fn register_handler(&self, id: &str, handler: Arc<dyn DocumentWsHandler>) {
if self.handlers.contains_key(id) {
log::error!("Duplicate handler registered for {:?}", id);
}
self.handlers.insert(id.to_string(), handler);
}
pub(crate) fn remove_handler(&self, id: &str) { self.handlers.remove(id); }
pub fn did_receive_data(&self, data: Bytes) {
let data: DocumentWSData = data.try_into().unwrap();
match self.handlers.get(&data.doc_id) {
None => {
log::error!("Can't find any source handler for {:?}", data.doc_id);
},
Some(handler) => {
handler.receive(data);
},
}
}
pub fn ws(&self) -> Arc<dyn DocumentWebSocket> { self.ws.clone() }
}
#[tracing::instrument(level = "debug", skip(ws, handlers))]
fn listen_ws_state_changed(ws: Arc<dyn DocumentWebSocket>, handlers: Arc<DashMap<String, Arc<dyn DocumentWsHandler>>>) {
let mut notify = ws.subscribe_state_changed();
tokio::spawn(async move {
while let Ok(state) = notify.recv().await {
handlers.iter().for_each(|handle| {
handle.value().connect_state_changed(&state);
});
}
});
}

View File

@ -0,0 +1,60 @@
use crate::errors::FlowyError;
use bytes::Bytes;
use dashmap::DashMap;
use flowy_collaboration::entities::ws::DocumentWSData;
use lib_ws::WSConnectState;
use std::{convert::TryInto, sync::Arc};
pub(crate) trait DocumentWSReceiver: Send + Sync {
fn receive_ws_data(&self, data: DocumentWSData);
fn connect_state_changed(&self, state: &WSConnectState);
}
pub type WSStateReceiver = tokio::sync::broadcast::Receiver<WSConnectState>;
pub trait DocumentWebSocket: Send + Sync {
fn send(&self, data: DocumentWSData) -> Result<(), FlowyError>;
fn subscribe_state_changed(&self) -> WSStateReceiver;
}
pub struct DocumentWSReceivers {
// key: the document id
receivers: Arc<DashMap<String, Arc<dyn DocumentWSReceiver>>>,
}
impl std::default::Default for DocumentWSReceivers {
fn default() -> Self {
let receivers: Arc<DashMap<String, Arc<dyn DocumentWSReceiver>>> = Arc::new(DashMap::new());
DocumentWSReceivers { receivers }
}
}
impl DocumentWSReceivers {
pub fn new() -> Self { DocumentWSReceivers::default() }
pub(crate) fn register_receiver(&self, doc_id: &str, receiver: Arc<dyn DocumentWSReceiver>) {
if self.receivers.contains_key(doc_id) {
log::error!("Duplicate handler registered for {:?}", doc_id);
}
self.receivers.insert(doc_id.to_string(), receiver);
}
pub(crate) fn remove_receiver(&self, id: &str) { self.receivers.remove(id); }
pub fn did_receive_data(&self, data: Bytes) {
let data: DocumentWSData = data.try_into().unwrap();
match self.receivers.get(&data.doc_id) {
None => {
log::error!("Can't find any source handler for {:?}", data.doc_id);
},
Some(handler) => {
handler.receive_ws_data(data);
},
}
}
pub fn ws_connect_state_changed(&self, state: &WSConnectState) {
self.receivers.iter().for_each(|receiver| {
receiver.value().connect_state_changed(&state);
});
}
}

View File

@ -5,7 +5,7 @@ use flowy_collaboration::{
core::sync::{DocumentPersistence, RevisionUser, ServerDocumentManager, SyncResponse},
entities::{
doc::DocumentInfo,
revision::Revision,
revision::{RepeatedRevision, Revision},
ws::{DocumentWSData, DocumentWSDataBuilder, DocumentWSDataType, NewDocumentUser},
},
errors::CollaborateError,
@ -16,6 +16,7 @@ use lib_ws::WSModule;
use parking_lot::RwLock;
use std::{
convert::{TryFrom, TryInto},
fmt::{Debug, Formatter},
sync::Arc,
};
use tokio::sync::{broadcast, broadcast::Receiver, mpsc};
@ -114,18 +115,14 @@ impl MockDocServer {
unimplemented!()
},
DocumentWSDataType::PushRev => {
let revision = Revision::try_from(bytes).unwrap();
let handler = match self.manager.get(&revision.doc_id).await {
None => self.manager.create_doc(revision.clone()).await.unwrap(),
Some(handler) => handler,
};
let revisions = RepeatedRevision::try_from(bytes).unwrap().into_inner();
let (tx, rx) = mpsc::channel(1);
let user = MockDocUser {
user_id: revision.user_id.clone(),
tx,
};
handler.apply_revision(Arc::new(user), revision).await.unwrap();
self.manager.apply_revisions(user, revisions).await.unwrap();
rx
},
DocumentWSDataType::PullRev => {
@ -151,6 +148,10 @@ struct MockDocServerPersistence {
inner: Arc<DashMap<String, DocumentInfo>>,
}
impl Debug for MockDocServerPersistence {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("MockDocServerPersistence") }
}
impl std::default::Default for MockDocServerPersistence {
fn default() -> Self {
MockDocServerPersistence {
@ -183,6 +184,10 @@ impl DocumentPersistence for MockDocServerPersistence {
Ok(document_info)
})
}
fn get_revisions(&self, _doc_id: &str, _rev_ids: Vec<i64>) -> FutureResultSend<Vec<Revision>, CollaborateError> {
unimplemented!()
}
}
#[derive(Debug)]

View File

@ -1,4 +1,4 @@
pub mod ws;
#[cfg(feature = "flowy_unit_test")]
mod mock;
// #[cfg(feature = "flowy_unit_test")]
// mod mock;

View File

@ -6,10 +6,13 @@ mod conn;
mod manager;
mod ws_local;
#[cfg(not(feature = "flowy_unit_test"))]
pub(crate) fn local_web_socket() -> Arc<dyn FlowyWebSocket> { Arc::new(Arc::new(ws_local::LocalWebSocket::default())) }
// #[cfg(not(feature = "flowy_unit_test"))]
// pub(crate) fn local_web_socket() -> Arc<dyn FlowyWebSocket> {
// Arc::new(Arc::new(ws_local::LocalWebSocket::default())) }
//
// #[cfg(feature = "flowy_unit_test")]
// pub(crate) fn local_web_socket() -> Arc<dyn FlowyWebSocket> {
// Arc::new(Arc::new(crate::services::mock::MockWebSocket::default()))
// }
#[cfg(feature = "flowy_unit_test")]
pub(crate) fn local_web_socket() -> Arc<dyn FlowyWebSocket> {
Arc::new(Arc::new(crate::services::mock::MockWebSocket::default()))
}
pub(crate) fn local_web_socket() -> Arc<dyn FlowyWebSocket> { Arc::new(Arc::new(ws_local::LocalWebSocket::default())) }

View File

@ -2,9 +2,9 @@ use bytes::Bytes;
use flowy_collaboration::entities::ws::DocumentWSData;
use flowy_database::ConnectionPool;
use flowy_document::{
context::DocumentUser,
errors::{internal_error, FlowyError},
module::DocumentUser,
services::doc::{DocumentWebSocket, DocumentWsHandlers, WsStateReceiver},
services::doc::{DocumentWSReceivers, DocumentWebSocket, WSStateReceiver},
};
use flowy_net::services::ws::WsManager;
use flowy_user::services::user::UserSession;
@ -16,16 +16,20 @@ impl DocumentDepsResolver {
pub fn resolve(
ws_manager: Arc<WsManager>,
user_session: Arc<UserSession>,
) -> (Arc<dyn DocumentUser>, Arc<DocumentWsHandlers>) {
) -> (
Arc<dyn DocumentUser>,
Arc<DocumentWSReceivers>,
Arc<dyn DocumentWebSocket>,
) {
let user = Arc::new(DocumentUserImpl { user: user_session });
let sender = Arc::new(WsSenderImpl {
let ws_sender = Arc::new(DocumentWebSocketAdapter {
ws_manager: ws_manager.clone(),
});
let document_ws_handlers = Arc::new(DocumentWsHandlers::new(sender));
let receiver = Arc::new(WsMessageReceiverAdaptor(document_ws_handlers.clone()));
let ws_receivers = Arc::new(DocumentWSReceivers::new());
let receiver = Arc::new(WSMessageReceiverAdaptor(ws_receivers.clone()));
ws_manager.add_receiver(receiver).unwrap();
(user, document_ws_handlers)
(user, ws_receivers, ws_sender)
}
}
@ -56,11 +60,11 @@ impl DocumentUser for DocumentUserImpl {
fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError> { self.user.db_pool() }
}
struct WsSenderImpl {
struct DocumentWebSocketAdapter {
ws_manager: Arc<WsManager>,
}
impl DocumentWebSocket for WsSenderImpl {
impl DocumentWebSocket for DocumentWebSocketAdapter {
fn send(&self, data: DocumentWSData) -> Result<(), FlowyError> {
let bytes: Bytes = data.try_into().unwrap();
let msg = WSMessage {
@ -73,12 +77,12 @@ impl DocumentWebSocket for WsSenderImpl {
Ok(())
}
fn subscribe_state_changed(&self) -> WsStateReceiver { self.ws_manager.subscribe_websocket_state() }
fn subscribe_state_changed(&self) -> WSStateReceiver { self.ws_manager.subscribe_websocket_state() }
}
struct WsMessageReceiverAdaptor(Arc<DocumentWsHandlers>);
struct WSMessageReceiverAdaptor(Arc<DocumentWSReceivers>);
impl WSMessageReceiver for WsMessageReceiverAdaptor {
impl WSMessageReceiver for WSMessageReceiverAdaptor {
fn source(&self) -> WSModule { WSModule::Doc }
fn receive_message(&self, msg: WSMessage) { self.0.did_receive_data(Bytes::from(msg.data)); }
}

View File

@ -4,7 +4,7 @@ pub mod module;
use crate::deps_resolve::{DocumentDepsResolver, WorkspaceDepsResolver};
use backend_service::configuration::ClientServerConfiguration;
use flowy_core::{errors::FlowyError, module::init_core, prelude::CoreContext};
use flowy_document::module::FlowyDocument;
use flowy_document::context::DocumentContext;
use flowy_net::{
entities::NetworkType,
services::ws::{listen_on_websocket, WsManager},
@ -69,7 +69,7 @@ pub struct FlowySDK {
#[allow(dead_code)]
config: FlowySDKConfig,
pub user_session: Arc<UserSession>,
pub flowy_document: Arc<FlowyDocument>,
pub flowy_document: Arc<DocumentContext>,
pub core: Arc<CoreContext>,
pub dispatcher: Arc<EventDispatcher>,
pub ws_manager: Arc<WsManager>,
@ -192,7 +192,7 @@ fn mk_user_session(config: &FlowySDKConfig) -> Arc<UserSession> {
fn mk_core_context(
user_session: Arc<UserSession>,
flowy_document: Arc<FlowyDocument>,
flowy_document: Arc<DocumentContext>,
server_config: &ClientServerConfiguration,
) -> Arc<CoreContext> {
let workspace_deps = WorkspaceDepsResolver::new(user_session);
@ -204,7 +204,7 @@ pub fn mk_document(
ws_manager: Arc<WsManager>,
user_session: Arc<UserSession>,
server_config: &ClientServerConfiguration,
) -> Arc<FlowyDocument> {
let (user, ws_doc) = DocumentDepsResolver::resolve(ws_manager, user_session);
Arc::new(FlowyDocument::new(user, ws_doc, server_config))
) -> Arc<DocumentContext> {
let (user, ws_receivers, ws_sender) = DocumentDepsResolver::resolve(ws_manager, user_session);
Arc::new(DocumentContext::new(user, ws_receivers, ws_sender, server_config))
}

View File

@ -46,7 +46,6 @@ impl EditorTest {
async fn run_script(&mut self, script: EditorScript) {
let rev_manager = self.editor.rev_manager();
let cache = rev_manager.revision_cache();
let doc_id = self.editor.doc_id.clone();
let _user_id = self.sdk.user_session.user_id().unwrap();
let ws_manager = self.sdk.ws_manager.clone();
let token = self.sdk.user_session.token().unwrap();
@ -69,7 +68,7 @@ impl EditorTest {
self.editor.replace(interval, s).await.unwrap();
},
EditorScript::AssertRevisionState(rev_id, state) => {
let record = cache.get_revision(&doc_id, rev_id).await.unwrap();
let record = cache.get_revision(rev_id).await.unwrap();
assert_eq!(record.state, state);
},
EditorScript::AssertCurrentRevId(rev_id) => {

View File

@ -10,18 +10,17 @@ use async_stream::stream;
use dashmap::DashMap;
use futures::stream::StreamExt;
use lib_infra::future::FutureResultSend;
use lib_ot::{errors::OTError, rich_text::RichTextDelta};
use std::sync::{atomic::Ordering::SeqCst, Arc};
use lib_ot::rich_text::RichTextDelta;
use std::{fmt::Debug, sync::Arc};
use tokio::{
sync::{mpsc, oneshot},
task::spawn_blocking,
};
pub trait DocumentPersistence: Send + Sync {
// fn update_doc(&self, doc_id: &str, rev_id: i64, delta: RichTextDelta) ->
// FutureResultSend<(), CollaborateError>;
pub trait DocumentPersistence: Send + Sync + Debug {
fn read_doc(&self, doc_id: &str) -> FutureResultSend<DocumentInfo, CollaborateError>;
fn create_doc(&self, revision: Revision) -> FutureResultSend<DocumentInfo, CollaborateError>;
fn get_revisions(&self, doc_id: &str, rev_ids: Vec<i64>) -> FutureResultSend<Vec<Revision>, CollaborateError>;
}
pub struct ServerDocumentManager {
@ -37,13 +36,34 @@ impl ServerDocumentManager {
}
}
pub async fn get(&self, doc_id: &str) -> Option<Arc<OpenDocHandle>> {
pub async fn apply_revisions(
&self,
user: Arc<dyn RevisionUser>,
revisions: Vec<Revision>,
) -> Result<(), CollaborateError> {
if revisions.is_empty() {
return Ok(());
}
let revision = revisions.first().unwrap();
let handler = match self.get_document_handler(&revision.doc_id).await {
None => {
// Create the document if it doesn't exist
self.create_document(revision.clone()).await.map_err(internal_error)?
},
Some(handler) => handler,
};
handler.apply_revisions(user, revisions).await.map_err(internal_error)?;
Ok(())
}
async fn get_document_handler(&self, doc_id: &str) -> Option<Arc<OpenDocHandle>> {
match self.open_doc_map.get(doc_id).map(|ctx| ctx.clone()) {
Some(edit_doc) => Some(edit_doc),
None => {
let f = || async {
let doc = self.persistence.read_doc(doc_id).await?;
let handler = self.cache(doc).await.map_err(internal_error)?;
let handler = self.cache_document(doc).await.map_err(internal_error)?;
Ok::<Arc<OpenDocHandle>, CollaborateError>(handler)
};
match f().await {
@ -57,15 +77,16 @@ impl ServerDocumentManager {
}
}
pub async fn create_doc(&self, revision: Revision) -> Result<Arc<OpenDocHandle>, CollaborateError> {
async fn create_document(&self, revision: Revision) -> Result<Arc<OpenDocHandle>, CollaborateError> {
let doc = self.persistence.create_doc(revision).await?;
let handler = self.cache(doc).await?;
let handler = self.cache_document(doc).await?;
Ok(handler)
}
async fn cache(&self, doc: DocumentInfo) -> Result<Arc<OpenDocHandle>, CollaborateError> {
async fn cache_document(&self, doc: DocumentInfo) -> Result<Arc<OpenDocHandle>, CollaborateError> {
let doc_id = doc.id.clone();
let handle = spawn_blocking(|| OpenDocHandle::new(doc))
let persistence = self.persistence.clone();
let handle = spawn_blocking(|| OpenDocHandle::new(doc, persistence))
.await
.map_err(internal_error)?;
let handle = Arc::new(handle?);
@ -74,42 +95,43 @@ impl ServerDocumentManager {
}
}
pub struct OpenDocHandle {
sender: mpsc::Sender<DocCommand>,
struct OpenDocHandle {
sender: mpsc::Sender<EditCommand>,
persistence: Arc<dyn DocumentPersistence>,
}
impl OpenDocHandle {
pub fn new(doc: DocumentInfo) -> Result<Self, CollaborateError> {
fn new(doc: DocumentInfo, persistence: Arc<dyn DocumentPersistence>) -> Result<Self, CollaborateError> {
let (sender, receiver) = mpsc::channel(100);
let queue = DocCommandQueue::new(receiver, doc)?;
let queue = EditCommandQueue::new(receiver, doc)?;
tokio::task::spawn(queue.run());
Ok(Self { sender })
Ok(Self { sender, persistence })
}
pub async fn apply_revision(
async fn apply_revisions(
&self,
user: Arc<dyn RevisionUser>,
revision: Revision,
revisions: Vec<Revision>,
) -> Result<(), CollaborateError> {
let (ret, rx) = oneshot::channel();
let msg = DocCommand::ReceiveRevision { user, revision, ret };
let persistence = self.persistence.clone();
let msg = EditCommand::ApplyRevisions {
user,
revisions,
persistence,
ret,
};
let _ = self.send(msg, rx).await?;
Ok(())
}
pub async fn document_json(&self) -> CollaborateResult<String> {
let (ret, rx) = oneshot::channel();
let msg = DocCommand::GetDocJson { ret };
let msg = EditCommand::GetDocumentJson { ret };
self.send(msg, rx).await?
}
pub async fn rev_id(&self) -> CollaborateResult<i64> {
let (ret, rx) = oneshot::channel();
let msg = DocCommand::GetDocRevId { ret };
self.send(msg, rx).await?
}
async fn send<T>(&self, msg: DocCommand, rx: oneshot::Receiver<T>) -> CollaborateResult<T> {
async fn send<T>(&self, msg: EditCommand, rx: oneshot::Receiver<T>) -> CollaborateResult<T> {
let _ = self.sender.send(msg).await.map_err(internal_error)?;
let result = rx.await.map_err(internal_error)?;
Ok(result)
@ -117,31 +139,40 @@ impl OpenDocHandle {
}
#[derive(Debug)]
enum DocCommand {
ReceiveRevision {
enum EditCommand {
ApplyRevisions {
user: Arc<dyn RevisionUser>,
revision: Revision,
revisions: Vec<Revision>,
persistence: Arc<dyn DocumentPersistence>,
ret: oneshot::Sender<CollaborateResult<()>>,
},
GetDocJson {
GetDocumentJson {
ret: oneshot::Sender<CollaborateResult<String>>,
},
GetDocRevId {
ret: oneshot::Sender<CollaborateResult<i64>>,
},
}
struct DocCommandQueue {
receiver: Option<mpsc::Receiver<DocCommand>>,
edit_doc: Arc<ServerDocEditor>,
struct EditCommandQueue {
pub doc_id: String,
receiver: Option<mpsc::Receiver<EditCommand>>,
synchronizer: Arc<RevisionSynchronizer>,
users: DashMap<String, Arc<dyn RevisionUser>>,
}
impl DocCommandQueue {
fn new(receiver: mpsc::Receiver<DocCommand>, doc: DocumentInfo) -> Result<Self, CollaborateError> {
let edit_doc = Arc::new(ServerDocEditor::new(doc).map_err(internal_error)?);
impl EditCommandQueue {
fn new(receiver: mpsc::Receiver<EditCommand>, doc: DocumentInfo) -> Result<Self, CollaborateError> {
let delta = RichTextDelta::from_bytes(&doc.text)?;
let users = DashMap::new();
let synchronizer = Arc::new(RevisionSynchronizer::new(
&doc.id,
doc.rev_id,
Document::from_delta(delta),
));
Ok(Self {
doc_id: doc.id,
receiver: Some(receiver),
edit_doc,
synchronizer,
users,
})
}
@ -162,72 +193,28 @@ impl DocCommandQueue {
stream.for_each(|msg| self.handle_message(msg)).await;
}
async fn handle_message(&self, msg: DocCommand) {
async fn handle_message(&self, msg: EditCommand) {
match msg {
DocCommand::ReceiveRevision { user, revision, ret } => {
// let revision = (&mut revision).try_into().map_err(internal_error).unwrap();
let _ = ret.send(
self.edit_doc
.apply_revision(user, revision)
.await
.map_err(internal_error),
);
EditCommand::ApplyRevisions {
user,
revisions,
persistence,
ret,
} => {
self.users.insert(user.user_id(), user.clone());
self.synchronizer
.apply_revisions(user, revisions, persistence)
.await
.unwrap();
let _ = ret.send(Ok(()));
},
DocCommand::GetDocJson { ret } => {
let edit_context = self.edit_doc.clone();
let json = spawn_blocking(move || edit_context.document_json())
EditCommand::GetDocumentJson { ret } => {
let synchronizer = self.synchronizer.clone();
let json = spawn_blocking(move || synchronizer.doc_json())
.await
.map_err(internal_error);
let _ = ret.send(json);
},
DocCommand::GetDocRevId { ret } => {
let _ = ret.send(Ok(self.edit_doc.rev_id()));
},
}
}
}
#[rustfmt::skip]
// ┌──────────────────────┐ ┌────────────┐
// ┌───▶│ RevisionSynchronizer │────▶│ Document │
// │ └──────────────────────┘ └────────────┘
// ┌────────────────┐ │
// ───▶│ServerDocEditor │────┤
// └────────────────┘ │
// │
// │ ┌────────┐ ┌────────────┐
// └───▶│ Users │◆──────│RevisionUser│
// └────────┘ └────────────┘
pub struct ServerDocEditor {
pub doc_id: String,
synchronizer: Arc<RevisionSynchronizer>,
users: DashMap<String, Arc<dyn RevisionUser>>,
}
impl ServerDocEditor {
pub fn new(doc: DocumentInfo) -> Result<Self, OTError> {
let delta = RichTextDelta::from_bytes(&doc.text)?;
let users = DashMap::new();
let synchronizer = Arc::new(RevisionSynchronizer::new(
&doc.id,
doc.rev_id,
Document::from_delta(delta),
));
Ok(Self {
doc_id: doc.id,
synchronizer,
users,
})
}
pub async fn apply_revision(&self, user: Arc<dyn RevisionUser>, revision: Revision) -> Result<(), OTError> {
self.users.insert(user.user_id(), user.clone());
self.synchronizer.apply_revision(user, revision).unwrap();
Ok(())
}
pub fn document_json(&self) -> String { self.synchronizer.doc_json() }
pub fn rev_id(&self) -> i64 { self.synchronizer.rev_id.load(SeqCst) }
}

View File

@ -1,7 +1,7 @@
use crate::{
core::document::Document,
core::{document::Document, sync::DocumentPersistence},
entities::{
revision::{RevType, Revision, RevisionRange},
revision::{Revision, RevisionRange},
ws::{DocumentWSData, DocumentWSDataBuilder},
},
};
@ -26,7 +26,7 @@ pub enum SyncResponse {
Pull(DocumentWSData),
Push(DocumentWSData),
Ack(DocumentWSData),
NewRevision(Revision),
NewRevision(Vec<Revision>),
}
pub struct RevisionSynchronizer {
@ -45,68 +45,79 @@ impl RevisionSynchronizer {
}
}
#[tracing::instrument(
level = "debug",
skip(self, user, revision),
fields(
cur_rev_id = %self.rev_id.load(SeqCst),
base_rev_id = %revision.base_rev_id,
rev_id = %revision.rev_id,
),
err
)]
pub fn apply_revision(&self, user: Arc<dyn RevisionUser>, revision: Revision) -> Result<(), OTError> {
#[tracing::instrument(level = "debug", skip(self, user, revisions, persistence), err)]
pub async fn apply_revisions(
&self,
user: Arc<dyn RevisionUser>,
revisions: Vec<Revision>,
persistence: Arc<dyn DocumentPersistence>,
) -> Result<(), OTError> {
if revisions.is_empty() {
tracing::warn!("Receive empty revisions");
return Ok(());
}
let server_base_rev_id = self.rev_id.load(SeqCst);
match server_base_rev_id.cmp(&revision.rev_id) {
let first_revision = revisions.first().unwrap().clone();
if self.is_applied_before(&first_revision, &persistence).await {
// Server has received this revision before, so ignore the following revisions
return Ok(());
}
match server_base_rev_id.cmp(&first_revision.rev_id) {
Ordering::Less => {
let server_rev_id = next(server_base_rev_id);
if server_base_rev_id == revision.base_rev_id || server_rev_id == revision.rev_id {
if server_base_rev_id == first_revision.base_rev_id || server_rev_id == first_revision.rev_id {
// The rev is in the right order, just compose it.
let _ = self.compose_revision(&revision)?;
user.receive(SyncResponse::Ack(DocumentWSDataBuilder::build_ack_message(
&revision.doc_id,
&revision.rev_id.to_string(),
)));
user.receive(SyncResponse::NewRevision(revision));
{
for revision in &revisions {
let _ = self.compose_revision(revision)?;
}
}
user.receive(SyncResponse::NewRevision(revisions));
} else {
// The server document is outdated, pull the missing revision from the client.
let range = RevisionRange {
doc_id: self.doc_id.clone(),
start: server_rev_id,
end: revision.rev_id,
end: first_revision.rev_id,
};
let msg = DocumentWSDataBuilder::build_pull_message(&self.doc_id, range, revision.rev_id);
let msg = DocumentWSDataBuilder::build_pull_message(&self.doc_id, range, first_revision.rev_id);
user.receive(SyncResponse::Pull(msg));
}
},
Ordering::Equal => {
// Do nothing
log::warn!("Applied revision rev_id is the same as cur_rev_id");
let data = DocumentWSDataBuilder::build_ack_message(&revision.doc_id, &revision.rev_id.to_string());
user.receive(SyncResponse::Ack(data));
},
Ordering::Greater => {
// The client document is outdated. Transform the client revision delta and then
// send the prime delta to the client. Client should compose the this prime
// delta.
let id = revision.rev_id.to_string();
let (cli_delta, server_delta) = self.transform_revision(&revision)?;
let _ = self.compose_delta(server_delta)?;
let id = first_revision.rev_id.to_string();
let from_rev_id = first_revision.rev_id;
let to_rev_id = server_base_rev_id;
let rev_ids: Vec<i64> = (from_rev_id..=to_rev_id).collect();
let revisions = match persistence.get_revisions(&self.doc_id, rev_ids).await {
Ok(revisions) => {
assert_eq!(revisions.is_empty(), false);
revisions
},
Err(e) => {
tracing::error!("{}", e);
vec![]
},
};
//
let _doc_id = self.doc_id.clone();
let _doc_json = self.doc_json();
// user.receive(SyncResponse::NewRevision {
// rev_id: self.rev_id(),
// doc_json,
// doc_id,
// });
let cli_revision = self.mk_revision(revision.rev_id, cli_delta);
let data = DocumentWSDataBuilder::build_push_message(&self.doc_id, cli_revision, &id);
let data = DocumentWSDataBuilder::build_push_message(&self.doc_id, revisions, &id);
user.receive(SyncResponse::Push(data));
},
}
user.receive(SyncResponse::Ack(DocumentWSDataBuilder::build_ack_message(
&first_revision.doc_id,
&first_revision.rev_id.to_string(),
)));
Ok(())
}
@ -140,29 +151,40 @@ impl RevisionSynchronizer {
Ok(())
}
fn mk_revision(&self, base_rev_id: i64, delta: RichTextDelta) -> Revision {
let delta_data = delta.to_bytes().to_vec();
let md5 = md5(&delta_data);
Revision {
base_rev_id,
rev_id: self.rev_id.load(SeqCst),
delta_data,
md5,
doc_id: self.doc_id.to_string(),
ty: RevType::Remote,
user_id: "".to_string(),
}
}
// fn mk_revision(&self, base_rev_id: i64, delta: RichTextDelta) -> Revision {
// let delta_data = delta.to_bytes().to_vec();
// let md5 = md5(&delta_data);
// Revision {
// base_rev_id,
// rev_id: self.rev_id.load(SeqCst),
// delta_data,
// md5,
// doc_id: self.doc_id.to_string(),
// ty: RevType::Remote,
// user_id: "".to_string(),
// }
// }
#[allow(dead_code)]
fn rev_id(&self) -> i64 { self.rev_id.load(SeqCst) }
pub(crate) fn rev_id(&self) -> i64 { self.rev_id.load(SeqCst) }
async fn is_applied_before(&self, new_revision: &Revision, persistence: &Arc<dyn DocumentPersistence>) -> bool {
if let Ok(revisions) = persistence.get_revisions(&self.doc_id, vec![new_revision.rev_id]).await {
if let Some(revision) = revisions.first() {
if revision.md5 == new_revision.md5 {
return true;
}
}
};
false
}
}
#[inline]
fn next(rev_id: i64) -> i64 { rev_id + 1 }
#[inline]
fn md5<T: AsRef<[u8]>>(data: T) -> String {
let md5 = format!("{:x}", md5::compute(data));
md5
}
// #[inline]
// fn md5<T: AsRef<[u8]>>(data: T) -> String {
// let md5 = format!("{:x}", md5::compute(data));
// md5
// }

View File

@ -1,5 +1,5 @@
use crate::{
entities::revision::{Revision, RevisionRange},
entities::revision::{RepeatedRevision, Revision, RevisionRange},
errors::CollaborateError,
};
use bytes::Bytes;
@ -62,9 +62,9 @@ impl std::convert::From<Revision> for DocumentWSData {
pub struct DocumentWSDataBuilder();
impl DocumentWSDataBuilder {
// DocumentWSDataType::PushRev -> Revision
pub fn build_push_message(doc_id: &str, revision: Revision, id: &str) -> DocumentWSData {
let _rev_id = revision.rev_id;
let bytes: Bytes = revision.try_into().unwrap();
pub fn build_push_message(doc_id: &str, revisions: Vec<Revision>, id: &str) -> DocumentWSData {
let repeated_revision = RepeatedRevision { items: revisions };
let bytes: Bytes = repeated_revision.try_into().unwrap();
DocumentWSData {
doc_id: doc_id.to_string(),
ty: DocumentWSDataType::PushRev,