mirror of
https://github.com/AppFlowy-IO/AppFlowy.git
synced 2024-08-30 18:12:39 +00:00
add flowy-sync crate
This commit is contained in:
@ -13,6 +13,7 @@ members = [
|
||||
"dart-notify",
|
||||
"flowy-document",
|
||||
"flowy-error",
|
||||
"flowy-sync",
|
||||
]
|
||||
|
||||
[profile.dev]
|
||||
|
@ -1,4 +1,5 @@
|
||||
mod version_1;
|
||||
mod version_2;
|
||||
|
||||
use std::sync::Arc;
|
||||
pub use version_1::{app_sql::*, trash_sql::*, v1_impl::V1Transaction, view_sql::*, workspace_sql::*};
|
||||
@ -63,14 +64,4 @@ impl FlowyCorePersistence {
|
||||
let conn = self.database.db_connection()?;
|
||||
conn.immediate_transaction::<_, FlowyError, _>(|| f(Box::new(V1Transaction(&conn))))
|
||||
}
|
||||
|
||||
// pub fn scope_transaction<F, O>(&self, f: F) -> FlowyResult<O>
|
||||
// where
|
||||
// F: for<'a> FnOnce(Box<dyn FlowyCorePersistenceTransaction + 'a>) ->
|
||||
// FlowyResult<O>, {
|
||||
// match thread::scope(|_s| self.begin_transaction(f)) {
|
||||
// Ok(result) => result,
|
||||
// Err(e) => Err(FlowyError::internal().context(e)),
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
@ -9,7 +9,7 @@ use flowy_core_data_model::entities::{
|
||||
view::View,
|
||||
workspace::{CurrentWorkspaceSetting, QueryWorkspaceRequest, RepeatedWorkspace, WorkspaceId, *},
|
||||
};
|
||||
use flowy_error::FlowyResult;
|
||||
|
||||
use lib_dispatch::prelude::{data_result, Data, DataResult, Unit};
|
||||
use std::{convert::TryInto, sync::Arc};
|
||||
|
||||
|
@ -16,6 +16,7 @@ lib-infra = { path = "../../../shared-lib/lib-infra" }
|
||||
derive_more = {version = "0.99", features = ["display"]}
|
||||
lib-dispatch = { path = "../lib-dispatch" }
|
||||
flowy-database = { path = "../flowy-database" }
|
||||
flowy-sync = { path = "../flowy-sync" }
|
||||
flowy-error = { path = "../flowy-error", features = ["collaboration", "ot", "backend", "serde", "db"] }
|
||||
dart-notify = { path = "../dart-notify" }
|
||||
|
||||
@ -53,4 +54,4 @@ rand = "0.7.3"
|
||||
|
||||
[features]
|
||||
http_server = []
|
||||
flowy_unit_test = ["lib-ot/flowy_unit_test"]
|
||||
flowy_unit_test = ["lib-ot/flowy_unit_test", "flowy-sync/flowy_unit_test"]
|
@ -1,10 +1,11 @@
|
||||
use crate::{
|
||||
controller::DocumentController,
|
||||
core::{DocumentWSReceivers, DocumentWebSocket},
|
||||
errors::FlowyError,
|
||||
ws_receivers::DocumentWSReceivers,
|
||||
DocumentCloudService,
|
||||
};
|
||||
use flowy_database::ConnectionPool;
|
||||
use flowy_sync::RevisionWebSocket;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub trait DocumentUser: Send + Sync {
|
||||
@ -23,7 +24,7 @@ impl DocumentContext {
|
||||
pub fn new(
|
||||
user: Arc<dyn DocumentUser>,
|
||||
ws_receivers: Arc<DocumentWSReceivers>,
|
||||
ws_sender: Arc<dyn DocumentWebSocket>,
|
||||
ws_sender: Arc<dyn RevisionWebSocket>,
|
||||
cloud_service: Arc<dyn DocumentCloudService>,
|
||||
) -> DocumentContext {
|
||||
let doc_ctrl = Arc::new(DocumentController::new(
|
||||
|
@ -1,30 +1,26 @@
|
||||
use crate::{
|
||||
context::DocumentUser,
|
||||
core::{
|
||||
edit::ClientDocumentEditor,
|
||||
revision::{DocumentRevisionCache, DocumentRevisionManager, RevisionServer},
|
||||
DocumentWSReceivers,
|
||||
DocumentWebSocket,
|
||||
WSStateReceiver,
|
||||
},
|
||||
core::ClientDocumentEditor,
|
||||
errors::FlowyError,
|
||||
ws_receivers::DocumentWSReceivers,
|
||||
DocumentCloudService,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use dashmap::DashMap;
|
||||
use flowy_collaboration::entities::{
|
||||
doc::{DocumentDelta, DocumentId, DocumentInfo},
|
||||
revision::RepeatedRevision,
|
||||
doc::{DocumentDelta, DocumentId},
|
||||
revision::{md5, RepeatedRevision, Revision},
|
||||
};
|
||||
use flowy_database::ConnectionPool;
|
||||
use flowy_error::FlowyResult;
|
||||
use flowy_sync::{RevisionCache, RevisionCloudService, RevisionManager, RevisionWebSocket, WSStateReceiver};
|
||||
use lib_infra::future::FutureResult;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub struct DocumentController {
|
||||
cloud_service: Arc<dyn DocumentCloudService>,
|
||||
ws_receivers: Arc<DocumentWSReceivers>,
|
||||
ws_sender: Arc<dyn DocumentWebSocket>,
|
||||
ws_sender: Arc<dyn RevisionWebSocket>,
|
||||
open_cache: Arc<OpenDocCache>,
|
||||
user: Arc<dyn DocumentUser>,
|
||||
}
|
||||
@ -34,7 +30,7 @@ impl DocumentController {
|
||||
cloud_service: Arc<dyn DocumentCloudService>,
|
||||
user: Arc<dyn DocumentUser>,
|
||||
ws_receivers: Arc<DocumentWSReceivers>,
|
||||
ws_sender: Arc<dyn DocumentWebSocket>,
|
||||
ws_sender: Arc<dyn RevisionWebSocket>,
|
||||
) -> Self {
|
||||
let open_cache = Arc::new(OpenDocCache::new());
|
||||
Self {
|
||||
@ -93,7 +89,7 @@ impl DocumentController {
|
||||
let doc_id = doc_id.as_ref().to_owned();
|
||||
let db_pool = self.user.db_pool()?;
|
||||
let rev_manager = self.make_rev_manager(&doc_id, db_pool)?;
|
||||
let _ = rev_manager.reset_document(revisions).await?;
|
||||
let _ = rev_manager.reset_object(revisions).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -127,10 +123,10 @@ impl DocumentController {
|
||||
Ok(doc_editor)
|
||||
}
|
||||
|
||||
fn make_rev_manager(&self, doc_id: &str, pool: Arc<ConnectionPool>) -> Result<DocumentRevisionManager, FlowyError> {
|
||||
fn make_rev_manager(&self, doc_id: &str, pool: Arc<ConnectionPool>) -> Result<RevisionManager, FlowyError> {
|
||||
let user_id = self.user.user_id()?;
|
||||
let cache = Arc::new(DocumentRevisionCache::new(&user_id, doc_id, pool));
|
||||
Ok(DocumentRevisionManager::new(&user_id, doc_id, cache))
|
||||
let cache = Arc::new(RevisionCache::new(&user_id, doc_id, pool));
|
||||
Ok(RevisionManager::new(&user_id, doc_id, cache))
|
||||
}
|
||||
}
|
||||
|
||||
@ -139,19 +135,26 @@ struct RevisionServerImpl {
|
||||
server: Arc<dyn DocumentCloudService>,
|
||||
}
|
||||
|
||||
impl RevisionServer for RevisionServerImpl {
|
||||
impl RevisionCloudService for RevisionServerImpl {
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
fn fetch_document(&self, doc_id: &str) -> FutureResult<DocumentInfo, FlowyError> {
|
||||
fn fetch_object(&self, user_id: &str, doc_id: &str) -> FutureResult<Vec<Revision>, FlowyError> {
|
||||
let params = DocumentId {
|
||||
doc_id: doc_id.to_string(),
|
||||
};
|
||||
let server = self.server.clone();
|
||||
let token = self.token.clone();
|
||||
let user_id = user_id.to_string();
|
||||
|
||||
FutureResult::new(async move {
|
||||
match server.read_document(&token, params).await? {
|
||||
None => Err(FlowyError::record_not_found().context("Remote doesn't have this document")),
|
||||
Some(doc) => Ok(doc),
|
||||
Some(doc) => {
|
||||
let delta_data = Bytes::from(doc.text.clone());
|
||||
let doc_md5 = md5(&delta_data);
|
||||
let revision =
|
||||
Revision::new(&doc.doc_id, doc.base_rev_id, doc.rev_id, delta_data, &user_id, doc_md5);
|
||||
Ok(vec![revision])
|
||||
},
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -1,5 +0,0 @@
|
||||
mod editor;
|
||||
mod queue;
|
||||
|
||||
pub use editor::*;
|
||||
pub(crate) use queue::*;
|
@ -1,21 +1,25 @@
|
||||
use crate::{
|
||||
context::DocumentUser,
|
||||
core::{
|
||||
web_socket::{make_document_ws_manager, DocumentWebSocketManager, EditorCommandSender},
|
||||
DocumentRevisionManager,
|
||||
DocumentWSReceiver,
|
||||
DocumentWebSocket,
|
||||
EditorCommand,
|
||||
EditorCommandQueue,
|
||||
RevisionServer,
|
||||
},
|
||||
core::{make_document_ws_manager, EditorCommand, EditorCommandQueue, EditorCommandSender},
|
||||
errors::FlowyError,
|
||||
ws_receivers::DocumentWSReceiver,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use flowy_collaboration::errors::CollaborateResult;
|
||||
use flowy_collaboration::{
|
||||
entities::{doc::DocumentInfo, revision::Revision},
|
||||
errors::CollaborateResult,
|
||||
util::make_delta_from_revisions,
|
||||
};
|
||||
use flowy_error::{internal_error, FlowyResult};
|
||||
use flowy_sync::{
|
||||
RevisionCloudService,
|
||||
RevisionManager,
|
||||
RevisionObjectBuilder,
|
||||
RevisionWebSocket,
|
||||
RevisionWebSocketManager,
|
||||
};
|
||||
use lib_ot::{
|
||||
core::Interval,
|
||||
core::{Interval, Operation},
|
||||
rich_text::{RichTextAttribute, RichTextDelta},
|
||||
};
|
||||
use std::sync::Arc;
|
||||
@ -24,8 +28,8 @@ use tokio::sync::{mpsc, oneshot};
|
||||
pub struct ClientDocumentEditor {
|
||||
pub doc_id: String,
|
||||
#[allow(dead_code)]
|
||||
rev_manager: Arc<DocumentRevisionManager>,
|
||||
ws_manager: Arc<DocumentWebSocketManager>,
|
||||
rev_manager: Arc<RevisionManager>,
|
||||
ws_manager: Arc<RevisionWebSocketManager>,
|
||||
edit_cmd_tx: EditorCommandSender,
|
||||
}
|
||||
|
||||
@ -33,11 +37,12 @@ impl ClientDocumentEditor {
|
||||
pub(crate) async fn new(
|
||||
doc_id: &str,
|
||||
user: Arc<dyn DocumentUser>,
|
||||
mut rev_manager: DocumentRevisionManager,
|
||||
ws: Arc<dyn DocumentWebSocket>,
|
||||
server: Arc<dyn RevisionServer>,
|
||||
mut rev_manager: RevisionManager,
|
||||
ws: Arc<dyn RevisionWebSocket>,
|
||||
server: Arc<dyn RevisionCloudService>,
|
||||
) -> FlowyResult<Arc<Self>> {
|
||||
let delta = rev_manager.load_document(server).await?;
|
||||
let document_info = rev_manager.load::<DocumentInfoBuilder>(server).await?;
|
||||
let delta = document_info.delta()?;
|
||||
let rev_manager = Arc::new(rev_manager);
|
||||
let doc_id = doc_id.to_string();
|
||||
let user_id = user.user_id()?;
|
||||
@ -167,7 +172,7 @@ impl std::ops::Drop for ClientDocumentEditor {
|
||||
// The edit queue will exit after the EditorCommandSender was dropped.
|
||||
fn spawn_edit_queue(
|
||||
user: Arc<dyn DocumentUser>,
|
||||
rev_manager: Arc<DocumentRevisionManager>,
|
||||
rev_manager: Arc<RevisionManager>,
|
||||
delta: RichTextDelta,
|
||||
) -> EditorCommandSender {
|
||||
let (sender, receiver) = mpsc::channel(1000);
|
||||
@ -194,5 +199,40 @@ impl ClientDocumentEditor {
|
||||
Ok(delta)
|
||||
}
|
||||
|
||||
pub fn rev_manager(&self) -> Arc<DocumentRevisionManager> { self.rev_manager.clone() }
|
||||
pub fn rev_manager(&self) -> Arc<RevisionManager> { self.rev_manager.clone() }
|
||||
}
|
||||
|
||||
struct DocumentInfoBuilder();
|
||||
impl RevisionObjectBuilder for DocumentInfoBuilder {
|
||||
type Output = DocumentInfo;
|
||||
|
||||
fn build_with_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
|
||||
let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
|
||||
let mut delta = make_delta_from_revisions(revisions)?;
|
||||
correct_delta(&mut delta);
|
||||
|
||||
Result::<DocumentInfo, FlowyError>::Ok(DocumentInfo {
|
||||
doc_id: object_id.to_owned(),
|
||||
text: delta.to_json(),
|
||||
rev_id,
|
||||
base_rev_id,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// quill-editor requires the delta should end with '\n' and only contains the
|
||||
// insert operation. The function, correct_delta maybe be removed in the future.
|
||||
fn correct_delta(delta: &mut RichTextDelta) {
|
||||
if let Some(op) = delta.ops.last() {
|
||||
let op_data = op.get_data();
|
||||
if !op_data.ends_with('\n') {
|
||||
tracing::warn!("The document must end with newline. Correcting it by inserting newline op");
|
||||
delta.ops.push(Operation::Insert("\n".into()));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(op) = delta.ops.iter().find(|op| !op.is_insert()) {
|
||||
tracing::warn!("The document can only contains insert operations, but found {:?}", op);
|
||||
delta.ops.retain(|op| op.is_insert());
|
||||
}
|
||||
}
|
@ -1,9 +1,9 @@
|
||||
pub mod edit;
|
||||
pub mod revision;
|
||||
mod editor;
|
||||
mod queue;
|
||||
mod web_socket;
|
||||
|
||||
pub use crate::ws_receivers::*;
|
||||
pub use edit::*;
|
||||
pub use revision::*;
|
||||
pub use editor::*;
|
||||
pub(crate) use queue::*;
|
||||
pub(crate) use web_socket::*;
|
||||
|
||||
pub const SYNC_INTERVAL_IN_MILLIS: u64 = 1000;
|
||||
|
@ -1,7 +1,4 @@
|
||||
use crate::{
|
||||
context::DocumentUser,
|
||||
core::{web_socket::EditorCommandReceiver, DocumentRevisionManager},
|
||||
};
|
||||
use crate::{context::DocumentUser, core::web_socket::EditorCommandReceiver};
|
||||
use async_stream::stream;
|
||||
use flowy_collaboration::{
|
||||
client_document::{history::UndoResult, ClientDocument, NewlineDoc},
|
||||
@ -10,6 +7,7 @@ use flowy_collaboration::{
|
||||
util::make_delta_from_revisions,
|
||||
};
|
||||
use flowy_error::FlowyError;
|
||||
use flowy_sync::RevisionManager;
|
||||
use futures::stream::StreamExt;
|
||||
use lib_ot::{
|
||||
core::{Interval, OperationTransformable},
|
||||
@ -23,14 +21,14 @@ use tokio::sync::{oneshot, RwLock};
|
||||
pub(crate) struct EditorCommandQueue {
|
||||
document: Arc<RwLock<ClientDocument>>,
|
||||
user: Arc<dyn DocumentUser>,
|
||||
rev_manager: Arc<DocumentRevisionManager>,
|
||||
rev_manager: Arc<RevisionManager>,
|
||||
receiver: Option<EditorCommandReceiver>,
|
||||
}
|
||||
|
||||
impl EditorCommandQueue {
|
||||
pub(crate) fn new(
|
||||
user: Arc<dyn DocumentUser>,
|
||||
rev_manager: Arc<DocumentRevisionManager>,
|
||||
rev_manager: Arc<RevisionManager>,
|
||||
delta: RichTextDelta,
|
||||
receiver: EditorCommandReceiver,
|
||||
) -> Self {
|
||||
@ -88,7 +86,7 @@ impl EditorCommandQueue {
|
||||
}
|
||||
|
||||
let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair();
|
||||
let doc_id = self.rev_manager.doc_id.clone();
|
||||
let doc_id = self.rev_manager.object_id.clone();
|
||||
let user_id = self.user.user_id()?;
|
||||
let (client_revision, server_revision) = make_client_and_server_revision(
|
||||
&doc_id,
|
||||
@ -110,7 +108,7 @@ impl EditorCommandQueue {
|
||||
|
||||
let repeated_revision = RepeatedRevision::new(revisions);
|
||||
assert_eq!(repeated_revision.last().unwrap().md5, md5);
|
||||
let _ = self.rev_manager.reset_document(repeated_revision).await?;
|
||||
let _ = self.rev_manager.reset_object(repeated_revision).await?;
|
||||
let _ = ret.send(Ok(()));
|
||||
},
|
||||
EditorCommand::TransformRevision { revisions, ret } => {
|
||||
@ -204,7 +202,14 @@ impl EditorCommandQueue {
|
||||
let delta_data = delta.to_bytes();
|
||||
let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair();
|
||||
let user_id = self.user.user_id()?;
|
||||
let revision = Revision::new(&self.rev_manager.doc_id, base_rev_id, rev_id, delta_data, &user_id, md5);
|
||||
let revision = Revision::new(
|
||||
&self.rev_manager.object_id,
|
||||
base_rev_id,
|
||||
rev_id,
|
||||
delta_data,
|
||||
&user_id,
|
||||
md5,
|
||||
);
|
||||
let _ = self.rev_manager.add_local_revision(&revision).await?;
|
||||
Ok(rev_id.into())
|
||||
}
|
@ -1,8 +0,0 @@
|
||||
mod cache;
|
||||
mod disk;
|
||||
mod manager;
|
||||
mod memory;
|
||||
mod snapshot;
|
||||
|
||||
pub use cache::*;
|
||||
pub use manager::*;
|
@ -1,25 +1,27 @@
|
||||
mod ws_manager;
|
||||
pub use ws_manager::*;
|
||||
|
||||
use crate::core::{
|
||||
web_socket::{DocumentWSSinkDataProvider, DocumentWSSteamConsumer},
|
||||
DocumentRevisionManager,
|
||||
DocumentWebSocket,
|
||||
EditorCommand,
|
||||
TransformDeltas,
|
||||
use crate::{
|
||||
core::{EditorCommand, TransformDeltas, SYNC_INTERVAL_IN_MILLIS},
|
||||
ws_receivers::DocumentWSReceiver,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use flowy_collaboration::{
|
||||
entities::{
|
||||
revision::{RepeatedRevision, Revision, RevisionRange},
|
||||
ws::{DocumentClientWSData, DocumentServerWSDataType, NewDocumentUser},
|
||||
ws::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSData, ServerRevisionWSDataType},
|
||||
},
|
||||
errors::CollaborateResult,
|
||||
};
|
||||
use flowy_error::{internal_error, FlowyError, FlowyResult};
|
||||
use flowy_sync::{
|
||||
RevisionManager,
|
||||
RevisionWSSinkDataProvider,
|
||||
RevisionWSSteamConsumer,
|
||||
RevisionWebSocket,
|
||||
RevisionWebSocketManager,
|
||||
};
|
||||
use lib_infra::future::FutureResult;
|
||||
use lib_ws::WSConnectState;
|
||||
use std::{collections::VecDeque, convert::TryFrom, sync::Arc};
|
||||
use std::{collections::VecDeque, convert::TryFrom, sync::Arc, time::Duration};
|
||||
use tokio::sync::{
|
||||
broadcast,
|
||||
mpsc::{Receiver, Sender},
|
||||
@ -34,22 +36,24 @@ pub(crate) async fn make_document_ws_manager(
|
||||
doc_id: String,
|
||||
user_id: String,
|
||||
edit_cmd_tx: EditorCommandSender,
|
||||
rev_manager: Arc<DocumentRevisionManager>,
|
||||
ws_conn: Arc<dyn DocumentWebSocket>,
|
||||
) -> Arc<DocumentWebSocketManager> {
|
||||
rev_manager: Arc<RevisionManager>,
|
||||
ws_conn: Arc<dyn RevisionWebSocket>,
|
||||
) -> Arc<RevisionWebSocketManager> {
|
||||
let shared_sink = Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone()));
|
||||
let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter {
|
||||
doc_id: doc_id.clone(),
|
||||
object_id: doc_id.clone(),
|
||||
edit_cmd_tx,
|
||||
rev_manager: rev_manager.clone(),
|
||||
shared_sink: shared_sink.clone(),
|
||||
});
|
||||
let data_provider = Arc::new(DocumentWSSinkDataProviderAdapter(shared_sink));
|
||||
let ws_manager = Arc::new(DocumentWebSocketManager::new(
|
||||
let ping_duration = Duration::from_millis(SYNC_INTERVAL_IN_MILLIS);
|
||||
let ws_manager = Arc::new(RevisionWebSocketManager::new(
|
||||
&doc_id,
|
||||
ws_conn,
|
||||
data_provider,
|
||||
ws_stream_consumer,
|
||||
ping_duration,
|
||||
));
|
||||
listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(), rev_manager);
|
||||
ws_manager
|
||||
@ -59,7 +63,7 @@ fn listen_document_ws_state(
|
||||
_user_id: &str,
|
||||
_doc_id: &str,
|
||||
mut subscriber: broadcast::Receiver<WSConnectState>,
|
||||
_rev_manager: Arc<DocumentRevisionManager>,
|
||||
_rev_manager: Arc<RevisionManager>,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
while let Ok(state) = subscriber.recv().await {
|
||||
@ -74,28 +78,28 @@ fn listen_document_ws_state(
|
||||
}
|
||||
|
||||
pub(crate) struct DocumentWebSocketSteamConsumerAdapter {
|
||||
pub(crate) doc_id: String,
|
||||
pub(crate) object_id: String,
|
||||
pub(crate) edit_cmd_tx: EditorCommandSender,
|
||||
pub(crate) rev_manager: Arc<DocumentRevisionManager>,
|
||||
pub(crate) rev_manager: Arc<RevisionManager>,
|
||||
pub(crate) shared_sink: Arc<SharedWSSinkDataProvider>,
|
||||
}
|
||||
|
||||
impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter {
|
||||
impl RevisionWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter {
|
||||
fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError> {
|
||||
let rev_manager = self.rev_manager.clone();
|
||||
let edit_cmd_tx = self.edit_cmd_tx.clone();
|
||||
let shared_sink = self.shared_sink.clone();
|
||||
let doc_id = self.doc_id.clone();
|
||||
let object_id = self.object_id.clone();
|
||||
FutureResult::new(async move {
|
||||
if let Some(server_composed_revision) = handle_remote_revision(edit_cmd_tx, rev_manager, bytes).await? {
|
||||
let data = DocumentClientWSData::from_revisions(&doc_id, vec![server_composed_revision]);
|
||||
let data = ClientRevisionWSData::from_revisions(&object_id, vec![server_composed_revision]);
|
||||
shared_sink.push_back(data).await;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn receive_ack(&self, id: String, ty: DocumentServerWSDataType) -> FutureResult<(), FlowyError> {
|
||||
fn receive_ack(&self, id: String, ty: ServerRevisionWSDataType) -> FutureResult<(), FlowyError> {
|
||||
let shared_sink = self.shared_sink.clone();
|
||||
FutureResult::new(async move { shared_sink.ack(id, ty).await })
|
||||
}
|
||||
@ -108,10 +112,10 @@ impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter {
|
||||
fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> {
|
||||
let rev_manager = self.rev_manager.clone();
|
||||
let shared_sink = self.shared_sink.clone();
|
||||
let doc_id = self.doc_id.clone();
|
||||
let object_id = self.object_id.clone();
|
||||
FutureResult::new(async move {
|
||||
let revisions = rev_manager.get_revisions_in_range(range).await?;
|
||||
let data = DocumentClientWSData::from_revisions(&doc_id, revisions);
|
||||
let data = ClientRevisionWSData::from_revisions(&object_id, revisions);
|
||||
shared_sink.push_back(data).await;
|
||||
Ok(())
|
||||
})
|
||||
@ -119,8 +123,8 @@ impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter {
|
||||
}
|
||||
|
||||
pub(crate) struct DocumentWSSinkDataProviderAdapter(pub(crate) Arc<SharedWSSinkDataProvider>);
|
||||
impl DocumentWSSinkDataProvider for DocumentWSSinkDataProviderAdapter {
|
||||
fn next(&self) -> FutureResult<Option<DocumentClientWSData>, FlowyError> {
|
||||
impl RevisionWSSinkDataProvider for DocumentWSSinkDataProviderAdapter {
|
||||
fn next(&self) -> FutureResult<Option<ClientRevisionWSData>, FlowyError> {
|
||||
let shared_sink = self.0.clone();
|
||||
FutureResult::new(async move { shared_sink.next().await })
|
||||
}
|
||||
@ -138,7 +142,7 @@ async fn transform_pushed_revisions(
|
||||
#[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))]
|
||||
pub(crate) async fn handle_remote_revision(
|
||||
edit_cmd_tx: EditorCommandSender,
|
||||
rev_manager: Arc<DocumentRevisionManager>,
|
||||
rev_manager: Arc<RevisionManager>,
|
||||
bytes: Bytes,
|
||||
) -> FlowyResult<Option<Revision>> {
|
||||
let mut revisions = RepeatedRevision::try_from(bytes)?.into_inner();
|
||||
@ -198,13 +202,13 @@ enum SourceType {
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct SharedWSSinkDataProvider {
|
||||
shared: Arc<RwLock<VecDeque<DocumentClientWSData>>>,
|
||||
rev_manager: Arc<DocumentRevisionManager>,
|
||||
shared: Arc<RwLock<VecDeque<ClientRevisionWSData>>>,
|
||||
rev_manager: Arc<RevisionManager>,
|
||||
source_ty: Arc<RwLock<SourceType>>,
|
||||
}
|
||||
|
||||
impl SharedWSSinkDataProvider {
|
||||
pub(crate) fn new(rev_manager: Arc<DocumentRevisionManager>) -> Self {
|
||||
pub(crate) fn new(rev_manager: Arc<RevisionManager>) -> Self {
|
||||
SharedWSSinkDataProvider {
|
||||
shared: Arc::new(RwLock::new(VecDeque::new())),
|
||||
rev_manager,
|
||||
@ -213,11 +217,11 @@ impl SharedWSSinkDataProvider {
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn push_front(&self, data: DocumentClientWSData) { self.shared.write().await.push_front(data); }
|
||||
pub(crate) async fn push_front(&self, data: ClientRevisionWSData) { self.shared.write().await.push_front(data); }
|
||||
|
||||
async fn push_back(&self, data: DocumentClientWSData) { self.shared.write().await.push_back(data); }
|
||||
async fn push_back(&self, data: ClientRevisionWSData) { self.shared.write().await.push_back(data); }
|
||||
|
||||
async fn next(&self) -> FlowyResult<Option<DocumentClientWSData>> {
|
||||
async fn next(&self) -> FlowyResult<Option<ClientRevisionWSData>> {
|
||||
let source_ty = self.source_ty.read().await.clone();
|
||||
match source_ty {
|
||||
SourceType::Shared => match self.shared.read().await.front() {
|
||||
@ -226,7 +230,7 @@ impl SharedWSSinkDataProvider {
|
||||
Ok(None)
|
||||
},
|
||||
Some(data) => {
|
||||
tracing::debug!("[SharedWSSinkDataProvider]: {}:{:?}", data.doc_id, data.ty);
|
||||
tracing::debug!("[SharedWSSinkDataProvider]: {}:{:?}", data.object_id, data.ty);
|
||||
Ok(Some(data.clone()))
|
||||
},
|
||||
},
|
||||
@ -238,21 +242,21 @@ impl SharedWSSinkDataProvider {
|
||||
|
||||
match self.rev_manager.next_sync_revision().await? {
|
||||
Some(rev) => {
|
||||
let doc_id = rev.doc_id.clone();
|
||||
Ok(Some(DocumentClientWSData::from_revisions(&doc_id, vec![rev])))
|
||||
let doc_id = rev.object_id.clone();
|
||||
Ok(Some(ClientRevisionWSData::from_revisions(&doc_id, vec![rev])))
|
||||
},
|
||||
None => {
|
||||
//
|
||||
let doc_id = self.rev_manager.doc_id.clone();
|
||||
let doc_id = self.rev_manager.object_id.clone();
|
||||
let latest_rev_id = self.rev_manager.rev_id();
|
||||
Ok(Some(DocumentClientWSData::ping(&doc_id, latest_rev_id)))
|
||||
Ok(Some(ClientRevisionWSData::ping(&doc_id, latest_rev_id)))
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn ack(&self, id: String, _ty: DocumentServerWSDataType) -> FlowyResult<()> {
|
||||
async fn ack(&self, id: String, _ty: ServerRevisionWSDataType) -> FlowyResult<()> {
|
||||
// let _ = self.rev_manager.ack_revision(id).await?;
|
||||
let source_ty = self.source_ty.read().await.clone();
|
||||
match source_ty {
|
||||
@ -288,3 +292,24 @@ impl SharedWSSinkDataProvider {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// RevisionWebSocketManager registers itself as a DocumentWSReceiver for each
|
||||
// opened document.
|
||||
#[async_trait]
|
||||
impl DocumentWSReceiver for RevisionWebSocketManager {
|
||||
#[tracing::instrument(level = "debug", skip(self, data), err)]
|
||||
async fn receive_ws_data(&self, data: ServerRevisionWSData) -> Result<(), FlowyError> {
|
||||
let _ = self.ws_passthrough_tx.send(data).await.map_err(|e| {
|
||||
let err_msg = format!("{} passthrough error: {}", self.object_id, e);
|
||||
FlowyError::internal().context(err_msg)
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn connect_state_changed(&self, state: WSConnectState) {
|
||||
match self.state_passthrough_tx.send(state) {
|
||||
Ok(_) => {},
|
||||
Err(e) => tracing::error!("{}", e),
|
||||
}
|
||||
}
|
||||
}
|
@ -3,10 +3,7 @@ pub(crate) mod controller;
|
||||
pub mod core;
|
||||
// mod notify;
|
||||
pub mod protobuf;
|
||||
mod ws_receivers;
|
||||
|
||||
#[macro_use]
|
||||
extern crate flowy_database;
|
||||
pub mod ws_receivers;
|
||||
|
||||
pub mod errors {
|
||||
pub use flowy_error::{internal_error, ErrorCode, FlowyError};
|
||||
|
@ -2,22 +2,16 @@ use crate::errors::FlowyError;
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use dashmap::DashMap;
|
||||
use flowy_collaboration::entities::ws::{DocumentClientWSData, DocumentServerWSData};
|
||||
use flowy_collaboration::entities::ws::{ServerRevisionWSData};
|
||||
use lib_ws::WSConnectState;
|
||||
use std::{convert::TryInto, sync::Arc};
|
||||
|
||||
#[async_trait]
|
||||
pub(crate) trait DocumentWSReceiver: Send + Sync {
|
||||
async fn receive_ws_data(&self, data: DocumentServerWSData) -> Result<(), FlowyError>;
|
||||
async fn receive_ws_data(&self, data: ServerRevisionWSData) -> Result<(), FlowyError>;
|
||||
fn connect_state_changed(&self, state: WSConnectState);
|
||||
}
|
||||
|
||||
pub type WSStateReceiver = tokio::sync::broadcast::Receiver<WSConnectState>;
|
||||
pub trait DocumentWebSocket: Send + Sync {
|
||||
fn send(&self, data: DocumentClientWSData) -> Result<(), FlowyError>;
|
||||
fn subscribe_state_changed(&self) -> WSStateReceiver;
|
||||
}
|
||||
|
||||
pub struct DocumentWSReceivers {
|
||||
// key: the document id
|
||||
// value: DocumentWSReceiver
|
||||
@ -44,9 +38,9 @@ impl DocumentWSReceivers {
|
||||
pub(crate) fn remove(&self, id: &str) { self.receivers.remove(id); }
|
||||
|
||||
pub async fn did_receive_data(&self, data: Bytes) {
|
||||
let data: DocumentServerWSData = data.try_into().unwrap();
|
||||
match self.receivers.get(&data.doc_id) {
|
||||
None => tracing::error!("Can't find any source handler for {:?}", data.doc_id),
|
||||
let data: ServerRevisionWSData = data.try_into().unwrap();
|
||||
match self.receivers.get(&data.object_id) {
|
||||
None => tracing::error!("Can't find any source handler for {:?}", data.object_id),
|
||||
Some(handler) => match handler.receive_ws_data(data).await {
|
||||
Ok(_) => {},
|
||||
Err(e) => tracing::error!("{}", e),
|
||||
|
@ -1,5 +1,5 @@
|
||||
use flowy_collaboration::entities::revision::RevisionState;
|
||||
use flowy_document::core::{edit::ClientDocumentEditor, SYNC_INTERVAL_IN_MILLIS};
|
||||
use flowy_document::core::{ClientDocumentEditor, SYNC_INTERVAL_IN_MILLIS};
|
||||
use flowy_test::{helper::ViewTest, FlowySDKTest};
|
||||
use lib_ot::{core::Interval, rich_text::RichTextDelta};
|
||||
use std::sync::Arc;
|
||||
|
@ -5,10 +5,10 @@ use flowy_collaboration::{
|
||||
client_document::default::initial_delta_string,
|
||||
entities::{
|
||||
doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams},
|
||||
ws::{DocumentClientWSData, DocumentClientWSDataType},
|
||||
ws::{ClientRevisionWSData, ClientRevisionWSDataType},
|
||||
},
|
||||
errors::CollaborateError,
|
||||
protobuf::DocumentClientWSData as DocumentClientWSDataPB,
|
||||
protobuf::ClientRevisionWSData as ClientRevisionWSDataPB,
|
||||
server_document::*,
|
||||
};
|
||||
use flowy_core::module::WorkspaceCloudService;
|
||||
@ -105,19 +105,19 @@ impl LocalWebSocketRunner {
|
||||
|
||||
async fn handle_message(&self, message: WebSocketRawMessage) -> Result<(), FlowyError> {
|
||||
let bytes = Bytes::from(message.data);
|
||||
let client_data = DocumentClientWSData::try_from(bytes).map_err(internal_error)?;
|
||||
let client_data = ClientRevisionWSData::try_from(bytes).map_err(internal_error)?;
|
||||
let _ = self.handle_client_data(client_data, "".to_owned()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn handle_client_data(
|
||||
&self,
|
||||
client_data: DocumentClientWSData,
|
||||
client_data: ClientRevisionWSData,
|
||||
user_id: String,
|
||||
) -> Result<(), CollaborateError> {
|
||||
tracing::trace!(
|
||||
"[LocalDocumentServer] receive: {}:{}-{:?} ",
|
||||
client_data.doc_id,
|
||||
client_data.object_id,
|
||||
client_data.id(),
|
||||
client_data.ty,
|
||||
);
|
||||
@ -127,15 +127,15 @@ impl LocalWebSocketRunner {
|
||||
client_ws_sender,
|
||||
});
|
||||
let ty = client_data.ty.clone();
|
||||
let document_client_data: DocumentClientWSDataPB = client_data.try_into().unwrap();
|
||||
let document_client_data: ClientRevisionWSDataPB = client_data.try_into().unwrap();
|
||||
match ty {
|
||||
DocumentClientWSDataType::ClientPushRev => {
|
||||
ClientRevisionWSDataType::ClientPushRev => {
|
||||
let _ = self
|
||||
.doc_manager
|
||||
.handle_client_revisions(user, document_client_data)
|
||||
.await?;
|
||||
},
|
||||
DocumentClientWSDataType::ClientPing => {
|
||||
ClientRevisionWSDataType::ClientPing => {
|
||||
let _ = self.doc_manager.handle_client_ping(user, document_client_data).await?;
|
||||
},
|
||||
}
|
||||
|
@ -13,6 +13,7 @@ flowy-net = { path = "../flowy-net" }
|
||||
flowy-core = { path = "../flowy-core", default-features = false }
|
||||
flowy-database = { path = "../flowy-database" }
|
||||
flowy-document = { path = "../flowy-document" }
|
||||
flowy-sync = { path = "../flowy-sync" }
|
||||
|
||||
tracing = { version = "0.1" }
|
||||
log = "0.4.14"
|
||||
|
@ -1,11 +1,11 @@
|
||||
use backend_service::configuration::ClientServerConfiguration;
|
||||
use bytes::Bytes;
|
||||
use flowy_collaboration::entities::ws::DocumentClientWSData;
|
||||
use flowy_collaboration::entities::ws::ClientRevisionWSData;
|
||||
use flowy_database::ConnectionPool;
|
||||
use flowy_document::{
|
||||
context::DocumentUser,
|
||||
core::{DocumentWSReceivers, DocumentWebSocket, WSStateReceiver},
|
||||
errors::{internal_error, FlowyError},
|
||||
ws_receivers::DocumentWSReceivers,
|
||||
DocumentCloudService,
|
||||
};
|
||||
use flowy_net::{
|
||||
@ -13,15 +13,15 @@ use flowy_net::{
|
||||
local_server::LocalServer,
|
||||
ws::connection::FlowyWebSocketConnect,
|
||||
};
|
||||
use flowy_sync::{RevisionWebSocket, WSStateReceiver};
|
||||
use flowy_user::services::UserSession;
|
||||
|
||||
use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage};
|
||||
use std::{convert::TryInto, path::Path, sync::Arc};
|
||||
|
||||
pub struct DocumentDependencies {
|
||||
pub user: Arc<dyn DocumentUser>,
|
||||
pub ws_receivers: Arc<DocumentWSReceivers>,
|
||||
pub ws_sender: Arc<dyn DocumentWebSocket>,
|
||||
pub ws_sender: Arc<dyn RevisionWebSocket>,
|
||||
pub cloud_service: Arc<dyn DocumentCloudService>,
|
||||
}
|
||||
|
||||
@ -73,8 +73,8 @@ impl DocumentUser for DocumentUserImpl {
|
||||
}
|
||||
|
||||
struct DocumentWebSocketImpl(Arc<FlowyWebSocketConnect>);
|
||||
impl DocumentWebSocket for DocumentWebSocketImpl {
|
||||
fn send(&self, data: DocumentClientWSData) -> Result<(), FlowyError> {
|
||||
impl RevisionWebSocket for DocumentWebSocketImpl {
|
||||
fn send(&self, data: ClientRevisionWSData) -> Result<(), FlowyError> {
|
||||
let bytes: Bytes = data.try_into().unwrap();
|
||||
let msg = WebSocketRawMessage {
|
||||
module: WSModule::Doc,
|
||||
|
32
frontend/rust-lib/flowy-sync/Cargo.toml
Normal file
32
frontend/rust-lib/flowy-sync/Cargo.toml
Normal file
@ -0,0 +1,32 @@
|
||||
[package]
|
||||
name = "flowy-sync"
|
||||
version = "0.1.0"
|
||||
edition = "2018"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration" }
|
||||
lib-ot = { path = "../../../shared-lib/lib-ot" }
|
||||
lib-ws = { path = "../../../shared-lib/lib-ws" }
|
||||
lib-infra = { path = "../../../shared-lib/lib-infra" }
|
||||
flowy-database = { path = "../flowy-database" }
|
||||
flowy-error = { path = "../flowy-error", features = ["collaboration", "ot", "backend", "serde", "db"] }
|
||||
diesel = {version = "1.4.8", features = ["sqlite"]}
|
||||
diesel_derives = {version = "1.4.1", features = ["sqlite"]}
|
||||
protobuf = {version = "2.18.0"}
|
||||
tracing = { version = "0.1", features = ["log"] }
|
||||
tokio = {version = "1", features = ["sync"]}
|
||||
bytes = { version = "1.1" }
|
||||
strum = "0.21"
|
||||
strum_macros = "0.21"
|
||||
dashmap = "4.0"
|
||||
parking_lot = "0.11"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = {version = "1.0"}
|
||||
futures-util = "0.3.15"
|
||||
async-stream = "0.3.2"
|
||||
|
||||
|
||||
[features]
|
||||
flowy_unit_test = ["lib-ot/flowy_unit_test"]
|
@ -1,5 +1,5 @@
|
||||
mod sql_impl;
|
||||
use crate::core::revision::RevisionRecord;
|
||||
use crate::RevisionRecord;
|
||||
use diesel::SqliteConnection;
|
||||
use flowy_collaboration::entities::revision::RevisionRange;
|
||||
pub use sql_impl::*;
|
||||
@ -7,24 +7,24 @@ pub use sql_impl::*;
|
||||
use flowy_error::FlowyResult;
|
||||
use std::fmt::Debug;
|
||||
|
||||
pub trait DocumentRevisionDiskCache: Sync + Send {
|
||||
pub trait RevisionDiskCache: Sync + Send {
|
||||
type Error: Debug;
|
||||
fn write_revision_records(
|
||||
&self,
|
||||
revisions: Vec<RevisionRecord>,
|
||||
revision_records: Vec<RevisionRecord>,
|
||||
conn: &SqliteConnection,
|
||||
) -> Result<(), Self::Error>;
|
||||
|
||||
// Read all the records if the rev_ids is None
|
||||
fn read_revision_records(
|
||||
&self,
|
||||
doc_id: &str,
|
||||
object_id: &str,
|
||||
rev_ids: Option<Vec<i64>>,
|
||||
) -> Result<Vec<RevisionRecord>, Self::Error>;
|
||||
|
||||
fn read_revision_records_with_range(
|
||||
&self,
|
||||
doc_id: &str,
|
||||
object_id: &str,
|
||||
range: &RevisionRange,
|
||||
) -> Result<Vec<RevisionRecord>, Self::Error>;
|
||||
|
||||
@ -33,10 +33,10 @@ pub trait DocumentRevisionDiskCache: Sync + Send {
|
||||
// Delete all the records if the rev_ids is None
|
||||
fn delete_revision_records(
|
||||
&self,
|
||||
doc_id: &str,
|
||||
object_id: &str,
|
||||
rev_ids: Option<Vec<i64>>,
|
||||
conn: &SqliteConnection,
|
||||
) -> Result<(), Self::Error>;
|
||||
|
||||
fn reset_document(&self, doc_id: &str, revision_records: Vec<RevisionRecord>) -> Result<(), Self::Error>;
|
||||
fn reset_object(&self, object_id: &str, revision_records: Vec<RevisionRecord>) -> Result<(), Self::Error>;
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
use crate::core::revision::{disk::DocumentRevisionDiskCache, RevisionRecord};
|
||||
use crate::{cache::disk::RevisionDiskCache, RevisionRecord};
|
||||
use bytes::Bytes;
|
||||
use diesel::{sql_types::Integer, update, SqliteConnection};
|
||||
use flowy_collaboration::{
|
||||
@ -6,6 +6,7 @@ use flowy_collaboration::{
|
||||
util::md5,
|
||||
};
|
||||
use flowy_database::{
|
||||
impl_sql_integer_expression,
|
||||
insert_or_ignore_into,
|
||||
prelude::*,
|
||||
schema::{rev_table, rev_table::dsl},
|
||||
@ -19,35 +20,35 @@ pub struct SQLitePersistence {
|
||||
pub(crate) pool: Arc<ConnectionPool>,
|
||||
}
|
||||
|
||||
impl DocumentRevisionDiskCache for SQLitePersistence {
|
||||
impl RevisionDiskCache for SQLitePersistence {
|
||||
type Error = FlowyError;
|
||||
|
||||
fn write_revision_records(
|
||||
&self,
|
||||
revisions: Vec<RevisionRecord>,
|
||||
revision_records: Vec<RevisionRecord>,
|
||||
conn: &SqliteConnection,
|
||||
) -> Result<(), Self::Error> {
|
||||
let _ = RevisionTableSql::create(revisions, conn)?;
|
||||
let _ = RevisionTableSql::create(revision_records, conn)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_revision_records(
|
||||
&self,
|
||||
doc_id: &str,
|
||||
object_id: &str,
|
||||
rev_ids: Option<Vec<i64>>,
|
||||
) -> Result<Vec<RevisionRecord>, Self::Error> {
|
||||
let conn = self.pool.get().map_err(internal_error)?;
|
||||
let records = RevisionTableSql::read(&self.user_id, doc_id, rev_ids, &*conn)?;
|
||||
let records = RevisionTableSql::read(&self.user_id, object_id, rev_ids, &*conn)?;
|
||||
Ok(records)
|
||||
}
|
||||
|
||||
fn read_revision_records_with_range(
|
||||
&self,
|
||||
doc_id: &str,
|
||||
object_id: &str,
|
||||
range: &RevisionRange,
|
||||
) -> Result<Vec<RevisionRecord>, Self::Error> {
|
||||
let conn = &*self.pool.get().map_err(internal_error)?;
|
||||
let revisions = RevisionTableSql::read_with_range(&self.user_id, doc_id, range.clone(), conn)?;
|
||||
let revisions = RevisionTableSql::read_with_range(&self.user_id, object_id, range.clone(), conn)?;
|
||||
Ok(revisions)
|
||||
}
|
||||
|
||||
@ -64,18 +65,18 @@ impl DocumentRevisionDiskCache for SQLitePersistence {
|
||||
|
||||
fn delete_revision_records(
|
||||
&self,
|
||||
doc_id: &str,
|
||||
object_id: &str,
|
||||
rev_ids: Option<Vec<i64>>,
|
||||
conn: &SqliteConnection,
|
||||
) -> Result<(), Self::Error> {
|
||||
let _ = RevisionTableSql::delete(doc_id, rev_ids, conn)?;
|
||||
let _ = RevisionTableSql::delete(object_id, rev_ids, conn)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn reset_document(&self, doc_id: &str, revision_records: Vec<RevisionRecord>) -> Result<(), Self::Error> {
|
||||
fn reset_object(&self, object_id: &str, revision_records: Vec<RevisionRecord>) -> Result<(), Self::Error> {
|
||||
let conn = self.pool.get().map_err(internal_error)?;
|
||||
conn.immediate_transaction::<_, FlowyError, _>(|| {
|
||||
let _ = self.delete_revision_records(doc_id, None, &*conn)?;
|
||||
let _ = self.delete_revision_records(object_id, None, &*conn)?;
|
||||
let _ = self.write_revision_records(revision_records, &*conn)?;
|
||||
Ok(())
|
||||
})
|
||||
@ -101,7 +102,7 @@ impl RevisionTableSql {
|
||||
.map(|record| {
|
||||
let rev_state: RevisionTableState = record.state.into();
|
||||
(
|
||||
dsl::doc_id.eq(record.revision.doc_id),
|
||||
dsl::doc_id.eq(record.revision.object_id),
|
||||
dsl::base_rev_id.eq(record.revision.base_rev_id),
|
||||
dsl::rev_id.eq(record.revision.rev_id),
|
||||
dsl::data.eq(record.revision.delta_data),
|
||||
@ -118,7 +119,7 @@ impl RevisionTableSql {
|
||||
pub(crate) fn update(changeset: RevisionChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> {
|
||||
let filter = dsl::rev_table
|
||||
.filter(dsl::rev_id.eq(changeset.rev_id.as_ref()))
|
||||
.filter(dsl::doc_id.eq(changeset.doc_id));
|
||||
.filter(dsl::doc_id.eq(changeset.object_id));
|
||||
let _ = update(filter).set(dsl::state.eq(changeset.state)).execute(conn)?;
|
||||
tracing::debug!(
|
||||
"[RevisionTable] update revision:{} state:to {:?}",
|
||||
@ -130,11 +131,11 @@ impl RevisionTableSql {
|
||||
|
||||
pub(crate) fn read(
|
||||
user_id: &str,
|
||||
doc_id: &str,
|
||||
object_id: &str,
|
||||
rev_ids: Option<Vec<i64>>,
|
||||
conn: &SqliteConnection,
|
||||
) -> Result<Vec<RevisionRecord>, FlowyError> {
|
||||
let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(doc_id)).into_boxed();
|
||||
let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(object_id)).into_boxed();
|
||||
if let Some(rev_ids) = rev_ids {
|
||||
sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
|
||||
}
|
||||
@ -149,14 +150,14 @@ impl RevisionTableSql {
|
||||
|
||||
pub(crate) fn read_with_range(
|
||||
user_id: &str,
|
||||
doc_id: &str,
|
||||
object_id: &str,
|
||||
range: RevisionRange,
|
||||
conn: &SqliteConnection,
|
||||
) -> Result<Vec<RevisionRecord>, FlowyError> {
|
||||
let rev_tables = dsl::rev_table
|
||||
.filter(dsl::rev_id.ge(range.start))
|
||||
.filter(dsl::rev_id.le(range.end))
|
||||
.filter(dsl::doc_id.eq(doc_id))
|
||||
.filter(dsl::doc_id.eq(object_id))
|
||||
.order(dsl::rev_id.asc())
|
||||
.load::<RevisionTable>(conn)?;
|
||||
|
||||
@ -167,8 +168,12 @@ impl RevisionTableSql {
|
||||
Ok(revisions)
|
||||
}
|
||||
|
||||
pub(crate) fn delete(doc_id: &str, rev_ids: Option<Vec<i64>>, conn: &SqliteConnection) -> Result<(), FlowyError> {
|
||||
let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(doc_id)).into_boxed();
|
||||
pub(crate) fn delete(
|
||||
object_id: &str,
|
||||
rev_ids: Option<Vec<i64>>,
|
||||
conn: &SqliteConnection,
|
||||
) -> Result<(), FlowyError> {
|
||||
let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(object_id)).into_boxed();
|
||||
if let Some(rev_ids) = rev_ids {
|
||||
sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
|
||||
}
|
||||
@ -195,22 +200,22 @@ pub(crate) struct RevisionTable {
|
||||
#[repr(i32)]
|
||||
#[sql_type = "Integer"]
|
||||
pub enum RevisionTableState {
|
||||
Local = 0,
|
||||
Ack = 1,
|
||||
Sync = 0,
|
||||
Ack = 1,
|
||||
}
|
||||
|
||||
impl std::default::Default for RevisionTableState {
|
||||
fn default() -> Self { RevisionTableState::Local }
|
||||
fn default() -> Self { RevisionTableState::Sync }
|
||||
}
|
||||
|
||||
impl std::convert::From<i32> for RevisionTableState {
|
||||
fn from(value: i32) -> Self {
|
||||
match value {
|
||||
0 => RevisionTableState::Local,
|
||||
0 => RevisionTableState::Sync,
|
||||
1 => RevisionTableState::Ack,
|
||||
o => {
|
||||
log::error!("Unsupported rev state {}, fallback to RevState::Local", o);
|
||||
RevisionTableState::Local
|
||||
tracing::error!("Unsupported rev state {}, fallback to RevState::Local", o);
|
||||
RevisionTableState::Sync
|
||||
},
|
||||
}
|
||||
}
|
||||
@ -224,7 +229,7 @@ impl_sql_integer_expression!(RevisionTableState);
|
||||
impl std::convert::From<RevisionTableState> for RevisionState {
|
||||
fn from(s: RevisionTableState) -> Self {
|
||||
match s {
|
||||
RevisionTableState::Local => RevisionState::Local,
|
||||
RevisionTableState::Sync => RevisionState::Sync,
|
||||
RevisionTableState::Ack => RevisionState::Ack,
|
||||
}
|
||||
}
|
||||
@ -233,7 +238,7 @@ impl std::convert::From<RevisionTableState> for RevisionState {
|
||||
impl std::convert::From<RevisionState> for RevisionTableState {
|
||||
fn from(s: RevisionState) -> Self {
|
||||
match s {
|
||||
RevisionState::Local => RevisionTableState::Local,
|
||||
RevisionState::Sync => RevisionTableState::Sync,
|
||||
RevisionState::Ack => RevisionTableState::Ack,
|
||||
}
|
||||
}
|
||||
@ -274,7 +279,7 @@ impl std::convert::From<i32> for RevTableType {
|
||||
0 => RevTableType::Local,
|
||||
1 => RevTableType::Remote,
|
||||
o => {
|
||||
log::error!("Unsupported rev type {}, fallback to RevTableType::Local", o);
|
||||
tracing::error!("Unsupported rev type {}, fallback to RevTableType::Local", o);
|
||||
RevTableType::Local
|
||||
},
|
||||
}
|
||||
@ -304,7 +309,7 @@ impl std::convert::From<RevTableType> for RevType {
|
||||
}
|
||||
|
||||
pub struct RevisionChangeset {
|
||||
pub(crate) doc_id: String,
|
||||
pub(crate) object_id: String,
|
||||
pub(crate) rev_id: RevId,
|
||||
pub(crate) state: RevisionTableState,
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
use crate::core::RevisionRecord;
|
||||
use crate::RevisionRecord;
|
||||
use dashmap::DashMap;
|
||||
use flowy_collaboration::entities::revision::RevisionRange;
|
||||
use flowy_error::{FlowyError, FlowyResult};
|
||||
@ -7,21 +7,21 @@ use tokio::{sync::RwLock, task::JoinHandle};
|
||||
|
||||
pub(crate) trait RevisionMemoryCacheDelegate: Send + Sync {
|
||||
fn checkpoint_tick(&self, records: Vec<RevisionRecord>) -> FlowyResult<()>;
|
||||
fn receive_ack(&self, doc_id: &str, rev_id: i64);
|
||||
fn receive_ack(&self, object_id: &str, rev_id: i64);
|
||||
}
|
||||
|
||||
pub(crate) struct DocumentRevisionMemoryCache {
|
||||
doc_id: String,
|
||||
pub(crate) struct RevisionMemoryCache {
|
||||
object_id: String,
|
||||
revs_map: Arc<DashMap<i64, RevisionRecord>>,
|
||||
delegate: Arc<dyn RevisionMemoryCacheDelegate>,
|
||||
pending_write_revs: Arc<RwLock<Vec<i64>>>,
|
||||
defer_save: RwLock<Option<JoinHandle<()>>>,
|
||||
}
|
||||
|
||||
impl DocumentRevisionMemoryCache {
|
||||
pub(crate) fn new(doc_id: &str, delegate: Arc<dyn RevisionMemoryCacheDelegate>) -> Self {
|
||||
DocumentRevisionMemoryCache {
|
||||
doc_id: doc_id.to_owned(),
|
||||
impl RevisionMemoryCache {
|
||||
pub(crate) fn new(object_id: &str, delegate: Arc<dyn RevisionMemoryCacheDelegate>) -> Self {
|
||||
RevisionMemoryCache {
|
||||
object_id: object_id.to_owned(),
|
||||
revs_map: Arc::new(DashMap::new()),
|
||||
delegate,
|
||||
pending_write_revs: Arc::new(RwLock::new(vec![])),
|
||||
@ -62,7 +62,7 @@ impl DocumentRevisionMemoryCache {
|
||||
if !self.pending_write_revs.read().await.contains(rev_id) {
|
||||
// The revision must be saved on disk if the pending_write_revs
|
||||
// doesn't contains the rev_id.
|
||||
self.delegate.receive_ack(&self.doc_id, *rev_id);
|
||||
self.delegate.receive_ack(&self.object_id, *rev_id);
|
||||
} else {
|
||||
self.make_checkpoint().await;
|
||||
}
|
@ -1,13 +1,13 @@
|
||||
use crate::{
|
||||
core::revision::{
|
||||
disk::{DocumentRevisionDiskCache, RevisionChangeset, RevisionTableState, SQLitePersistence},
|
||||
memory::{DocumentRevisionMemoryCache, RevisionMemoryCacheDelegate},
|
||||
},
|
||||
errors::FlowyError,
|
||||
mod disk;
|
||||
mod memory;
|
||||
|
||||
use crate::cache::{
|
||||
disk::{RevisionChangeset, RevisionDiskCache, RevisionTableState, SQLitePersistence},
|
||||
memory::{RevisionMemoryCache, RevisionMemoryCacheDelegate},
|
||||
};
|
||||
use flowy_collaboration::entities::revision::{Revision, RevisionRange, RevisionState};
|
||||
use flowy_database::ConnectionPool;
|
||||
use flowy_error::{internal_error, FlowyResult};
|
||||
use flowy_error::{internal_error, FlowyError, FlowyResult};
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
sync::{
|
||||
@ -17,17 +17,17 @@ use std::{
|
||||
};
|
||||
use tokio::task::spawn_blocking;
|
||||
|
||||
pub struct DocumentRevisionCache {
|
||||
pub struct RevisionCache {
|
||||
doc_id: String,
|
||||
disk_cache: Arc<dyn DocumentRevisionDiskCache<Error = FlowyError>>,
|
||||
memory_cache: Arc<DocumentRevisionMemoryCache>,
|
||||
disk_cache: Arc<dyn RevisionDiskCache<Error = FlowyError>>,
|
||||
memory_cache: Arc<RevisionMemoryCache>,
|
||||
latest_rev_id: AtomicI64,
|
||||
}
|
||||
|
||||
impl DocumentRevisionCache {
|
||||
pub fn new(user_id: &str, doc_id: &str, pool: Arc<ConnectionPool>) -> DocumentRevisionCache {
|
||||
impl RevisionCache {
|
||||
pub fn new(user_id: &str, doc_id: &str, pool: Arc<ConnectionPool>) -> RevisionCache {
|
||||
let disk_cache = Arc::new(SQLitePersistence::new(user_id, pool));
|
||||
let memory_cache = Arc::new(DocumentRevisionMemoryCache::new(doc_id, Arc::new(disk_cache.clone())));
|
||||
let memory_cache = Arc::new(RevisionMemoryCache::new(doc_id, Arc::new(disk_cache.clone())));
|
||||
let doc_id = doc_id.to_owned();
|
||||
Self {
|
||||
doc_id,
|
||||
@ -99,7 +99,7 @@ impl DocumentRevisionCache {
|
||||
.map_err(internal_error)??;
|
||||
|
||||
if records.len() != range_len {
|
||||
log::error!("Revisions len is not equal to range required");
|
||||
tracing::error!("Revisions len is not equal to range required");
|
||||
}
|
||||
}
|
||||
Ok(records
|
||||
@ -115,13 +115,13 @@ impl DocumentRevisionCache {
|
||||
.into_iter()
|
||||
.map(|revision| RevisionRecord {
|
||||
revision,
|
||||
state: RevisionState::Local,
|
||||
state: RevisionState::Sync,
|
||||
write_to_disk: false,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let _ = self.memory_cache.reset_with_revisions(&revision_records).await?;
|
||||
let _ = self.disk_cache.reset_document(doc_id, revision_records)?;
|
||||
let _ = self.disk_cache.reset_object(doc_id, revision_records)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -146,9 +146,9 @@ impl RevisionMemoryCacheDelegate for Arc<SQLitePersistence> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn receive_ack(&self, doc_id: &str, rev_id: i64) {
|
||||
fn receive_ack(&self, object_id: &str, rev_id: i64) {
|
||||
let changeset = RevisionChangeset {
|
||||
doc_id: doc_id.to_string(),
|
||||
object_id: object_id.to_string(),
|
||||
rev_id: rev_id.into(),
|
||||
state: RevisionTableState::Ack,
|
||||
};
|
10
frontend/rust-lib/flowy-sync/src/lib.rs
Normal file
10
frontend/rust-lib/flowy-sync/src/lib.rs
Normal file
@ -0,0 +1,10 @@
|
||||
mod cache;
|
||||
mod rev_manager;
|
||||
mod ws_manager;
|
||||
|
||||
pub use cache::*;
|
||||
pub use rev_manager::*;
|
||||
pub use ws_manager::*;
|
||||
|
||||
#[macro_use]
|
||||
extern crate flowy_database;
|
@ -1,41 +1,40 @@
|
||||
use crate::{
|
||||
core::{revision::DocumentRevisionCache, RevisionRecord},
|
||||
errors::FlowyError,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use crate::{RevisionCache, RevisionRecord};
|
||||
|
||||
use dashmap::DashMap;
|
||||
use flowy_collaboration::{
|
||||
entities::{
|
||||
doc::DocumentInfo,
|
||||
revision::{RepeatedRevision, Revision, RevisionRange, RevisionState},
|
||||
},
|
||||
util::{make_delta_from_revisions, md5, pair_rev_id_from_revisions, RevIdCounter},
|
||||
entities::revision::{RepeatedRevision, Revision, RevisionRange, RevisionState},
|
||||
util::{pair_rev_id_from_revisions, RevIdCounter},
|
||||
};
|
||||
use flowy_error::FlowyResult;
|
||||
use flowy_error::{FlowyError, FlowyResult};
|
||||
use futures_util::{future, stream, stream::StreamExt};
|
||||
use lib_infra::future::FutureResult;
|
||||
use lib_ot::{core::Operation, rich_text::RichTextDelta};
|
||||
|
||||
use std::{collections::VecDeque, sync::Arc};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
pub trait RevisionServer: Send + Sync {
|
||||
fn fetch_document(&self, doc_id: &str) -> FutureResult<DocumentInfo, FlowyError>;
|
||||
pub trait RevisionCloudService: Send + Sync {
|
||||
fn fetch_object(&self, user_id: &str, object_id: &str) -> FutureResult<Vec<Revision>, FlowyError>;
|
||||
}
|
||||
|
||||
pub struct DocumentRevisionManager {
|
||||
pub(crate) doc_id: String,
|
||||
pub trait RevisionObjectBuilder: Send + Sync {
|
||||
type Output;
|
||||
fn build_with_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output>;
|
||||
}
|
||||
|
||||
pub struct RevisionManager {
|
||||
pub object_id: String,
|
||||
user_id: String,
|
||||
rev_id_counter: RevIdCounter,
|
||||
revision_cache: Arc<DocumentRevisionCache>,
|
||||
revision_cache: Arc<RevisionCache>,
|
||||
revision_sync_seq: Arc<RevisionSyncSequence>,
|
||||
}
|
||||
|
||||
impl DocumentRevisionManager {
|
||||
pub fn new(user_id: &str, doc_id: &str, revision_cache: Arc<DocumentRevisionCache>) -> Self {
|
||||
impl RevisionManager {
|
||||
pub fn new(user_id: &str, object_id: &str, revision_cache: Arc<RevisionCache>) -> Self {
|
||||
let rev_id_counter = RevIdCounter::new(0);
|
||||
let revision_sync_seq = Arc::new(RevisionSyncSequence::new());
|
||||
Self {
|
||||
doc_id: doc_id.to_string(),
|
||||
object_id: object_id.to_string(),
|
||||
user_id: user_id.to_owned(),
|
||||
rev_id_counter,
|
||||
revision_cache,
|
||||
@ -43,27 +42,28 @@ impl DocumentRevisionManager {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn load_document(&mut self, server: Arc<dyn RevisionServer>) -> FlowyResult<RichTextDelta> {
|
||||
pub async fn load<Builder>(&mut self, cloud: Arc<dyn RevisionCloudService>) -> FlowyResult<Builder::Output>
|
||||
where
|
||||
Builder: RevisionObjectBuilder,
|
||||
{
|
||||
let revisions = RevisionLoader {
|
||||
doc_id: self.doc_id.clone(),
|
||||
object_id: self.object_id.clone(),
|
||||
user_id: self.user_id.clone(),
|
||||
server,
|
||||
cloud,
|
||||
revision_cache: self.revision_cache.clone(),
|
||||
revision_sync_seq: self.revision_sync_seq.clone(),
|
||||
}
|
||||
.load()
|
||||
.await?;
|
||||
let doc = mk_doc_from_revisions(&self.doc_id, revisions)?;
|
||||
self.rev_id_counter.set(doc.rev_id);
|
||||
Ok(doc.delta()?)
|
||||
Builder::build_with_revisions(&self.object_id, revisions)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self, revisions), err)]
|
||||
pub async fn reset_document(&self, revisions: RepeatedRevision) -> FlowyResult<()> {
|
||||
pub async fn reset_object(&self, revisions: RepeatedRevision) -> FlowyResult<()> {
|
||||
let rev_id = pair_rev_id_from_revisions(&revisions).1;
|
||||
let _ = self
|
||||
.revision_cache
|
||||
.reset_with_revisions(&self.doc_id, revisions.into_inner())
|
||||
.reset_with_revisions(&self.object_id, revisions.into_inner())
|
||||
.await?;
|
||||
self.rev_id_counter.set(rev_id);
|
||||
Ok(())
|
||||
@ -90,7 +90,7 @@ impl DocumentRevisionManager {
|
||||
|
||||
let record = self
|
||||
.revision_cache
|
||||
.add(revision.clone(), RevisionState::Local, true)
|
||||
.add(revision.clone(), RevisionState::Sync, true)
|
||||
.await?;
|
||||
self.revision_sync_seq.add_revision_record(record).await?;
|
||||
Ok(())
|
||||
@ -115,7 +115,7 @@ impl DocumentRevisionManager {
|
||||
}
|
||||
|
||||
pub async fn get_revisions_in_range(&self, range: RevisionRange) -> Result<Vec<Revision>, FlowyError> {
|
||||
debug_assert!(range.doc_id == self.doc_id);
|
||||
debug_assert!(range.object_id == self.object_id);
|
||||
let revisions = self.revision_cache.revisions_in_range(range.clone()).await?;
|
||||
Ok(revisions)
|
||||
}
|
||||
@ -160,7 +160,7 @@ impl RevisionSyncSequence {
|
||||
fn new() -> Self { RevisionSyncSequence::default() }
|
||||
|
||||
async fn add_revision_record(&self, record: RevisionRecord) -> FlowyResult<()> {
|
||||
if !record.state.is_local() {
|
||||
if !record.state.is_need_sync() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@ -204,37 +204,29 @@ impl RevisionSyncSequence {
|
||||
}
|
||||
|
||||
struct RevisionLoader {
|
||||
doc_id: String,
|
||||
object_id: String,
|
||||
user_id: String,
|
||||
server: Arc<dyn RevisionServer>,
|
||||
revision_cache: Arc<DocumentRevisionCache>,
|
||||
cloud: Arc<dyn RevisionCloudService>,
|
||||
revision_cache: Arc<RevisionCache>,
|
||||
revision_sync_seq: Arc<RevisionSyncSequence>,
|
||||
}
|
||||
|
||||
impl RevisionLoader {
|
||||
async fn load(&self) -> Result<Vec<Revision>, FlowyError> {
|
||||
let records = self.revision_cache.batch_get(&self.doc_id)?;
|
||||
let records = self.revision_cache.batch_get(&self.object_id)?;
|
||||
let revisions: Vec<Revision>;
|
||||
if records.is_empty() {
|
||||
let doc = self.server.fetch_document(&self.doc_id).await?;
|
||||
let delta_data = Bytes::from(doc.text.clone());
|
||||
let doc_md5 = md5(&delta_data);
|
||||
let revision = Revision::new(
|
||||
&doc.doc_id,
|
||||
doc.base_rev_id,
|
||||
doc.rev_id,
|
||||
delta_data,
|
||||
&self.user_id,
|
||||
doc_md5,
|
||||
);
|
||||
let _ = self
|
||||
.revision_cache
|
||||
.add(revision.clone(), RevisionState::Ack, true)
|
||||
.await?;
|
||||
revisions = vec![revision];
|
||||
let remote_revisions = self.cloud.fetch_object(&self.user_id, &self.object_id).await?;
|
||||
for revision in &remote_revisions {
|
||||
let _ = self
|
||||
.revision_cache
|
||||
.add(revision.clone(), RevisionState::Ack, true)
|
||||
.await?;
|
||||
}
|
||||
revisions = remote_revisions;
|
||||
} else {
|
||||
stream::iter(records.clone())
|
||||
.filter(|record| future::ready(record.state == RevisionState::Local))
|
||||
.filter(|record| future::ready(record.state == RevisionState::Sync))
|
||||
.for_each(|record| async move {
|
||||
let f = || async {
|
||||
// Sync the records if their state is RevisionState::Local.
|
||||
@ -255,36 +247,6 @@ impl RevisionLoader {
|
||||
}
|
||||
}
|
||||
|
||||
fn mk_doc_from_revisions(doc_id: &str, revisions: Vec<Revision>) -> FlowyResult<DocumentInfo> {
|
||||
let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
|
||||
let mut delta = make_delta_from_revisions(revisions)?;
|
||||
correct_delta(&mut delta);
|
||||
|
||||
Result::<DocumentInfo, FlowyError>::Ok(DocumentInfo {
|
||||
doc_id: doc_id.to_owned(),
|
||||
text: delta.to_json(),
|
||||
rev_id,
|
||||
base_rev_id,
|
||||
})
|
||||
}
|
||||
|
||||
// quill-editor requires the delta should end with '\n' and only contains the
|
||||
// insert operation. The function, correct_delta maybe be removed in the future.
|
||||
fn correct_delta(delta: &mut RichTextDelta) {
|
||||
if let Some(op) = delta.ops.last() {
|
||||
let op_data = op.get_data();
|
||||
if !op_data.ends_with('\n') {
|
||||
log::warn!("The document must end with newline. Correcting it by inserting newline op");
|
||||
delta.ops.push(Operation::Insert("\n".into()));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(op) = delta.ops.iter().find(|op| !op.is_insert()) {
|
||||
log::warn!("The document can only contains insert operations, but found {:?}", op);
|
||||
delta.ops.retain(|op| op.is_insert());
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "flowy_unit_test")]
|
||||
impl RevisionSyncSequence {
|
||||
#[allow(dead_code)]
|
||||
@ -294,6 +256,6 @@ impl RevisionSyncSequence {
|
||||
}
|
||||
|
||||
#[cfg(feature = "flowy_unit_test")]
|
||||
impl DocumentRevisionManager {
|
||||
pub fn revision_cache(&self) -> Arc<DocumentRevisionCache> { self.revision_cache.clone() }
|
||||
impl RevisionManager {
|
||||
pub fn revision_cache(&self) -> Arc<RevisionCache> { self.revision_cache.clone() }
|
||||
}
|
@ -1,16 +1,11 @@
|
||||
use crate::{
|
||||
core::SYNC_INTERVAL_IN_MILLIS,
|
||||
ws_receivers::{DocumentWSReceiver, DocumentWebSocket},
|
||||
};
|
||||
use async_stream::stream;
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use flowy_collaboration::entities::{
|
||||
revision::{RevId, RevisionRange},
|
||||
ws::{DocumentClientWSData, DocumentServerWSData, DocumentServerWSDataType, NewDocumentUser},
|
||||
ws::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSData, ServerRevisionWSDataType},
|
||||
};
|
||||
use flowy_error::{internal_error, FlowyError, FlowyResult};
|
||||
use futures::stream::StreamExt;
|
||||
use futures_util::stream::StreamExt;
|
||||
use lib_infra::future::FutureResult;
|
||||
use lib_ws::WSConnectState;
|
||||
use std::{convert::TryFrom, sync::Arc};
|
||||
@ -25,43 +20,50 @@ use tokio::{
|
||||
};
|
||||
|
||||
// The consumer consumes the messages pushed by the web socket.
|
||||
pub trait DocumentWSSteamConsumer: Send + Sync {
|
||||
pub trait RevisionWSSteamConsumer: Send + Sync {
|
||||
fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError>;
|
||||
fn receive_ack(&self, id: String, ty: DocumentServerWSDataType) -> FutureResult<(), FlowyError>;
|
||||
fn receive_ack(&self, id: String, ty: ServerRevisionWSDataType) -> FutureResult<(), FlowyError>;
|
||||
fn receive_new_user_connect(&self, new_user: NewDocumentUser) -> FutureResult<(), FlowyError>;
|
||||
fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError>;
|
||||
}
|
||||
|
||||
// The sink provides the data that will be sent through the web socket to the
|
||||
// backend.
|
||||
pub trait DocumentWSSinkDataProvider: Send + Sync {
|
||||
fn next(&self) -> FutureResult<Option<DocumentClientWSData>, FlowyError>;
|
||||
pub trait RevisionWSSinkDataProvider: Send + Sync {
|
||||
fn next(&self) -> FutureResult<Option<ClientRevisionWSData>, FlowyError>;
|
||||
}
|
||||
|
||||
pub struct DocumentWebSocketManager {
|
||||
doc_id: String,
|
||||
data_provider: Arc<dyn DocumentWSSinkDataProvider>,
|
||||
stream_consumer: Arc<dyn DocumentWSSteamConsumer>,
|
||||
ws_conn: Arc<dyn DocumentWebSocket>,
|
||||
ws_passthrough_tx: Sender<DocumentServerWSData>,
|
||||
ws_passthrough_rx: Option<Receiver<DocumentServerWSData>>,
|
||||
state_passthrough_tx: broadcast::Sender<WSConnectState>,
|
||||
pub type WSStateReceiver = tokio::sync::broadcast::Receiver<WSConnectState>;
|
||||
pub trait RevisionWebSocket: Send + Sync {
|
||||
fn send(&self, data: ClientRevisionWSData) -> Result<(), FlowyError>;
|
||||
fn subscribe_state_changed(&self) -> WSStateReceiver;
|
||||
}
|
||||
|
||||
pub struct RevisionWebSocketManager {
|
||||
pub object_id: String,
|
||||
data_provider: Arc<dyn RevisionWSSinkDataProvider>,
|
||||
stream_consumer: Arc<dyn RevisionWSSteamConsumer>,
|
||||
ws_conn: Arc<dyn RevisionWebSocket>,
|
||||
pub ws_passthrough_tx: Sender<ServerRevisionWSData>,
|
||||
ws_passthrough_rx: Option<Receiver<ServerRevisionWSData>>,
|
||||
pub state_passthrough_tx: broadcast::Sender<WSConnectState>,
|
||||
stop_sync_tx: SinkStopTx,
|
||||
}
|
||||
|
||||
impl DocumentWebSocketManager {
|
||||
pub(crate) fn new(
|
||||
doc_id: &str,
|
||||
ws_conn: Arc<dyn DocumentWebSocket>,
|
||||
data_provider: Arc<dyn DocumentWSSinkDataProvider>,
|
||||
stream_consumer: Arc<dyn DocumentWSSteamConsumer>,
|
||||
impl RevisionWebSocketManager {
|
||||
pub fn new(
|
||||
object_id: &str,
|
||||
ws_conn: Arc<dyn RevisionWebSocket>,
|
||||
data_provider: Arc<dyn RevisionWSSinkDataProvider>,
|
||||
stream_consumer: Arc<dyn RevisionWSSteamConsumer>,
|
||||
ping_duration: Duration,
|
||||
) -> Self {
|
||||
let (ws_passthrough_tx, ws_passthrough_rx) = mpsc::channel(1000);
|
||||
let (stop_sync_tx, _) = tokio::sync::broadcast::channel(2);
|
||||
let doc_id = doc_id.to_string();
|
||||
let object_id = object_id.to_string();
|
||||
let (state_passthrough_tx, _) = broadcast::channel(2);
|
||||
let mut manager = DocumentWebSocketManager {
|
||||
doc_id,
|
||||
let mut manager = RevisionWebSocketManager {
|
||||
object_id,
|
||||
data_provider,
|
||||
stream_consumer,
|
||||
ws_conn,
|
||||
@ -70,20 +72,21 @@ impl DocumentWebSocketManager {
|
||||
state_passthrough_tx,
|
||||
stop_sync_tx,
|
||||
};
|
||||
manager.run();
|
||||
manager.run(ping_duration);
|
||||
manager
|
||||
}
|
||||
|
||||
fn run(&mut self) {
|
||||
fn run(&mut self, ping_duration: Duration) {
|
||||
let ws_msg_rx = self.ws_passthrough_rx.take().expect("Only take once");
|
||||
let sink = DocumentWSSink::new(
|
||||
&self.doc_id,
|
||||
let sink = RevisionWSSink::new(
|
||||
&self.object_id,
|
||||
self.data_provider.clone(),
|
||||
self.ws_conn.clone(),
|
||||
self.stop_sync_tx.subscribe(),
|
||||
ping_duration,
|
||||
);
|
||||
let stream = DocumentWSStream::new(
|
||||
&self.doc_id,
|
||||
let stream = RevisionWSStream::new(
|
||||
&self.object_id,
|
||||
self.stream_consumer.clone(),
|
||||
ws_msg_rx,
|
||||
self.stop_sync_tx.subscribe(),
|
||||
@ -94,59 +97,37 @@ impl DocumentWebSocketManager {
|
||||
|
||||
pub fn scribe_state(&self) -> broadcast::Receiver<WSConnectState> { self.state_passthrough_tx.subscribe() }
|
||||
|
||||
pub(crate) fn stop(&self) {
|
||||
pub fn stop(&self) {
|
||||
if self.stop_sync_tx.send(()).is_ok() {
|
||||
tracing::trace!("{} stop sync", self.doc_id)
|
||||
tracing::trace!("{} stop sync", self.object_id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// DocumentWebSocketManager registers itself as a DocumentWSReceiver for each
|
||||
// opened document. It will receive the web socket message and parser it into
|
||||
// DocumentServerWSData.
|
||||
#[async_trait]
|
||||
impl DocumentWSReceiver for DocumentWebSocketManager {
|
||||
#[tracing::instrument(level = "debug", skip(self, doc_data), err)]
|
||||
async fn receive_ws_data(&self, doc_data: DocumentServerWSData) -> Result<(), FlowyError> {
|
||||
let _ = self.ws_passthrough_tx.send(doc_data).await.map_err(|e| {
|
||||
let err_msg = format!("{} passthrough error: {}", self.doc_id, e);
|
||||
FlowyError::internal().context(err_msg)
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn connect_state_changed(&self, state: WSConnectState) {
|
||||
match self.state_passthrough_tx.send(state) {
|
||||
Ok(_) => {},
|
||||
Err(e) => tracing::error!("{}", e),
|
||||
}
|
||||
}
|
||||
impl std::ops::Drop for RevisionWebSocketManager {
|
||||
fn drop(&mut self) { tracing::trace!("{} RevisionWebSocketManager was dropped", self.object_id) }
|
||||
}
|
||||
|
||||
impl std::ops::Drop for DocumentWebSocketManager {
|
||||
fn drop(&mut self) { tracing::trace!("{} DocumentWebSocketManager was dropped", self.doc_id) }
|
||||
}
|
||||
|
||||
pub struct DocumentWSStream {
|
||||
doc_id: String,
|
||||
consumer: Arc<dyn DocumentWSSteamConsumer>,
|
||||
ws_msg_rx: Option<mpsc::Receiver<DocumentServerWSData>>,
|
||||
pub struct RevisionWSStream {
|
||||
object_id: String,
|
||||
consumer: Arc<dyn RevisionWSSteamConsumer>,
|
||||
ws_msg_rx: Option<mpsc::Receiver<ServerRevisionWSData>>,
|
||||
stop_rx: Option<SinkStopRx>,
|
||||
}
|
||||
|
||||
impl std::ops::Drop for DocumentWSStream {
|
||||
fn drop(&mut self) { tracing::trace!("{} DocumentWSStream was dropped", self.doc_id) }
|
||||
impl std::ops::Drop for RevisionWSStream {
|
||||
fn drop(&mut self) { tracing::trace!("{} RevisionWSStream was dropped", self.object_id) }
|
||||
}
|
||||
|
||||
impl DocumentWSStream {
|
||||
impl RevisionWSStream {
|
||||
pub fn new(
|
||||
doc_id: &str,
|
||||
consumer: Arc<dyn DocumentWSSteamConsumer>,
|
||||
ws_msg_rx: mpsc::Receiver<DocumentServerWSData>,
|
||||
object_id: &str,
|
||||
consumer: Arc<dyn RevisionWSSteamConsumer>,
|
||||
ws_msg_rx: mpsc::Receiver<ServerRevisionWSData>,
|
||||
stop_rx: SinkStopRx,
|
||||
) -> Self {
|
||||
DocumentWSStream {
|
||||
doc_id: doc_id.to_owned(),
|
||||
RevisionWSStream {
|
||||
object_id: object_id.to_owned(),
|
||||
consumer,
|
||||
ws_msg_rx: Some(ws_msg_rx),
|
||||
stop_rx: Some(stop_rx),
|
||||
@ -156,7 +137,7 @@ impl DocumentWSStream {
|
||||
pub async fn run(mut self) {
|
||||
let mut receiver = self.ws_msg_rx.take().expect("Only take once");
|
||||
let mut stop_rx = self.stop_rx.take().expect("Only take once");
|
||||
let doc_id = self.doc_id.clone();
|
||||
let object_id = self.object_id.clone();
|
||||
let stream = stream! {
|
||||
loop {
|
||||
tokio::select! {
|
||||
@ -166,13 +147,13 @@ impl DocumentWSStream {
|
||||
yield msg
|
||||
},
|
||||
None => {
|
||||
tracing::debug!("[DocumentStream:{}] loop exit", doc_id);
|
||||
tracing::debug!("[RevisionWSStream:{}] loop exit", object_id);
|
||||
break;
|
||||
},
|
||||
}
|
||||
},
|
||||
_ = stop_rx.recv() => {
|
||||
tracing::debug!("[DocumentStream:{}] loop exit", doc_id);
|
||||
tracing::debug!("[RevisionWSStream:{}] loop exit", object_id);
|
||||
break
|
||||
},
|
||||
};
|
||||
@ -183,32 +164,32 @@ impl DocumentWSStream {
|
||||
.for_each(|msg| async {
|
||||
match self.handle_message(msg).await {
|
||||
Ok(_) => {},
|
||||
Err(e) => log::error!("[DocumentStream:{}] error: {}", self.doc_id, e),
|
||||
Err(e) => tracing::error!("[RevisionWSStream:{}] error: {}", self.object_id, e),
|
||||
}
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn handle_message(&self, msg: DocumentServerWSData) -> FlowyResult<()> {
|
||||
let DocumentServerWSData { doc_id: _, ty, data } = msg;
|
||||
async fn handle_message(&self, msg: ServerRevisionWSData) -> FlowyResult<()> {
|
||||
let ServerRevisionWSData { object_id: _, ty, data } = msg;
|
||||
let bytes = spawn_blocking(move || Bytes::from(data))
|
||||
.await
|
||||
.map_err(internal_error)?;
|
||||
|
||||
tracing::trace!("[DocumentStream]: new message: {:?}", ty);
|
||||
tracing::trace!("[RevisionWSStream]: new message: {:?}", ty);
|
||||
match ty {
|
||||
DocumentServerWSDataType::ServerPushRev => {
|
||||
ServerRevisionWSDataType::ServerPushRev => {
|
||||
let _ = self.consumer.receive_push_revision(bytes).await?;
|
||||
},
|
||||
DocumentServerWSDataType::ServerPullRev => {
|
||||
ServerRevisionWSDataType::ServerPullRev => {
|
||||
let range = RevisionRange::try_from(bytes)?;
|
||||
let _ = self.consumer.pull_revisions_in_range(range).await?;
|
||||
},
|
||||
DocumentServerWSDataType::ServerAck => {
|
||||
ServerRevisionWSDataType::ServerAck => {
|
||||
let rev_id = RevId::try_from(bytes).unwrap().value;
|
||||
let _ = self.consumer.receive_ack(rev_id.to_string(), ty).await;
|
||||
},
|
||||
DocumentServerWSDataType::UserConnect => {
|
||||
ServerRevisionWSDataType::UserConnect => {
|
||||
let new_user = NewDocumentUser::try_from(bytes)?;
|
||||
let _ = self.consumer.receive_new_user_connect(new_user).await;
|
||||
},
|
||||
@ -219,33 +200,36 @@ impl DocumentWSStream {
|
||||
|
||||
type SinkStopRx = broadcast::Receiver<()>;
|
||||
type SinkStopTx = broadcast::Sender<()>;
|
||||
pub struct DocumentWSSink {
|
||||
provider: Arc<dyn DocumentWSSinkDataProvider>,
|
||||
ws_sender: Arc<dyn DocumentWebSocket>,
|
||||
pub struct RevisionWSSink {
|
||||
provider: Arc<dyn RevisionWSSinkDataProvider>,
|
||||
ws_sender: Arc<dyn RevisionWebSocket>,
|
||||
stop_rx: Option<SinkStopRx>,
|
||||
doc_id: String,
|
||||
object_id: String,
|
||||
ping_duration: Duration,
|
||||
}
|
||||
|
||||
impl DocumentWSSink {
|
||||
impl RevisionWSSink {
|
||||
pub fn new(
|
||||
doc_id: &str,
|
||||
provider: Arc<dyn DocumentWSSinkDataProvider>,
|
||||
ws_sender: Arc<dyn DocumentWebSocket>,
|
||||
object_id: &str,
|
||||
provider: Arc<dyn RevisionWSSinkDataProvider>,
|
||||
ws_sender: Arc<dyn RevisionWebSocket>,
|
||||
stop_rx: SinkStopRx,
|
||||
ping_duration: Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
provider,
|
||||
ws_sender,
|
||||
stop_rx: Some(stop_rx),
|
||||
doc_id: doc_id.to_owned(),
|
||||
object_id: object_id.to_owned(),
|
||||
ping_duration,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run(mut self) {
|
||||
let (tx, mut rx) = mpsc::channel(1);
|
||||
let mut stop_rx = self.stop_rx.take().expect("Only take once");
|
||||
let doc_id = self.doc_id.clone();
|
||||
tokio::spawn(tick(tx));
|
||||
let object_id = self.object_id.clone();
|
||||
tokio::spawn(tick(tx, self.ping_duration));
|
||||
let stream = stream! {
|
||||
loop {
|
||||
tokio::select! {
|
||||
@ -256,7 +240,7 @@ impl DocumentWSSink {
|
||||
}
|
||||
},
|
||||
_ = stop_rx.recv() => {
|
||||
tracing::trace!("[DocumentSink:{}] loop exit", doc_id);
|
||||
tracing::trace!("[RevisionWSSink:{}] loop exit", object_id);
|
||||
break
|
||||
},
|
||||
};
|
||||
@ -266,7 +250,7 @@ impl DocumentWSSink {
|
||||
.for_each(|_| async {
|
||||
match self.send_next_revision().await {
|
||||
Ok(_) => {},
|
||||
Err(e) => log::error!("[DocumentSink] send failed, {:?}", e),
|
||||
Err(e) => tracing::error!("[RevisionWSSink] send failed, {:?}", e),
|
||||
}
|
||||
})
|
||||
.await;
|
||||
@ -279,19 +263,19 @@ impl DocumentWSSink {
|
||||
Ok(())
|
||||
},
|
||||
Some(data) => {
|
||||
tracing::trace!("[DocumentSink] send: {}:{}-{:?}", data.doc_id, data.id(), data.ty);
|
||||
tracing::trace!("[RevisionWSSink] send: {}:{}-{:?}", data.object_id, data.id(), data.ty);
|
||||
self.ws_sender.send(data)
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Drop for DocumentWSSink {
|
||||
fn drop(&mut self) { tracing::trace!("{} DocumentWSSink was dropped", self.doc_id) }
|
||||
impl std::ops::Drop for RevisionWSSink {
|
||||
fn drop(&mut self) { tracing::trace!("{} RevisionWSSink was dropped", self.object_id) }
|
||||
}
|
||||
|
||||
async fn tick(sender: mpsc::Sender<()>) {
|
||||
let mut interval = interval(Duration::from_millis(SYNC_INTERVAL_IN_MILLIS));
|
||||
async fn tick(sender: mpsc::Sender<()>, duration: Duration) {
|
||||
let mut interval = interval(duration);
|
||||
while sender.send(()).await.is_ok() {
|
||||
interval.tick().await;
|
||||
}
|
@ -86,19 +86,19 @@ static file_descriptor_proto_data: &'static [u8] = b"\
|
||||
\n\x17dart_notification.proto*\x81\x01\n\x10UserNotification\x12\x0b\n\
|
||||
\x07Unknown\x10\0\x12\x13\n\x0fUserAuthChanged\x10\x01\x12\x16\n\x12User\
|
||||
ProfileUpdated\x10\x02\x12\x14\n\x10UserUnauthorized\x10\x03\x12\x1d\n\
|
||||
\x19UserWsConnectStateChanged\x10\x04J\xf7\x01\n\x06\x12\x04\0\0\x07\x01\
|
||||
\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x05\0\x12\x04\x01\0\x07\x01\n\
|
||||
\n\n\x03\x05\0\x01\x12\x03\x01\x05\x15\n\x0b\n\x04\x05\0\x02\0\x12\x03\
|
||||
\x02\x04\x10\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x02\x04\x0b\n\x0c\n\x05\
|
||||
\x05\0\x02\0\x02\x12\x03\x02\x0e\x0f\n\x0b\n\x04\x05\0\x02\x01\x12\x03\
|
||||
\x03\x04\x18\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x03\x04\x13\n\x0c\n\
|
||||
\x05\x05\0\x02\x01\x02\x12\x03\x03\x16\x17\n\x0b\n\x04\x05\0\x02\x02\x12\
|
||||
\x03\x04\x04\x1b\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\x04\x04\x16\n\x0c\
|
||||
\n\x05\x05\0\x02\x02\x02\x12\x03\x04\x19\x1a\n\x0b\n\x04\x05\0\x02\x03\
|
||||
\x12\x03\x05\x04\x19\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\x05\x04\x14\n\
|
||||
\x0c\n\x05\x05\0\x02\x03\x02\x12\x03\x05\x17\x18\n\x0b\n\x04\x05\0\x02\
|
||||
\x04\x12\x03\x06\x04\"\n\x0c\n\x05\x05\0\x02\x04\x01\x12\x03\x06\x04\x1d\
|
||||
\n\x0c\n\x05\x05\0\x02\x04\x02\x12\x03\x06\x20!b\x06proto3\
|
||||
\x19UserWsConnectStateChanged\x10\x04J\xf7\x01\n\x06\x12\x04\0\0\x08\x01\
|
||||
\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x05\0\x12\x04\x02\0\x08\x01\n\
|
||||
\n\n\x03\x05\0\x01\x12\x03\x02\x05\x15\n\x0b\n\x04\x05\0\x02\0\x12\x03\
|
||||
\x03\x04\x10\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x03\x04\x0b\n\x0c\n\x05\
|
||||
\x05\0\x02\0\x02\x12\x03\x03\x0e\x0f\n\x0b\n\x04\x05\0\x02\x01\x12\x03\
|
||||
\x04\x04\x18\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x04\x04\x13\n\x0c\n\
|
||||
\x05\x05\0\x02\x01\x02\x12\x03\x04\x16\x17\n\x0b\n\x04\x05\0\x02\x02\x12\
|
||||
\x03\x05\x04\x1b\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\x05\x04\x16\n\x0c\
|
||||
\n\x05\x05\0\x02\x02\x02\x12\x03\x05\x19\x1a\n\x0b\n\x04\x05\0\x02\x03\
|
||||
\x12\x03\x06\x04\x19\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\x06\x04\x14\n\
|
||||
\x0c\n\x05\x05\0\x02\x03\x02\x12\x03\x06\x17\x18\n\x0b\n\x04\x05\0\x02\
|
||||
\x04\x12\x03\x07\x04\"\n\x0c\n\x05\x05\0\x02\x04\x01\x12\x03\x07\x04\x1d\
|
||||
\n\x0c\n\x05\x05\0\x02\x04\x02\x12\x03\x07\x20!b\x06proto3\
|
||||
";
|
||||
|
||||
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
|
||||
|
@ -1,4 +1,5 @@
|
||||
syntax = "proto3";
|
||||
|
||||
enum UserNotification {
|
||||
Unknown = 0;
|
||||
UserAuthChanged = 1;
|
||||
|
Reference in New Issue
Block a user