generic cloud storage

This commit is contained in:
appflowy 2022-01-11 22:23:19 +08:00
parent e7aad4045b
commit 4bdd9df54c
30 changed files with 879 additions and 918 deletions

View File

@ -7,7 +7,7 @@ use actix_web::web::Data;
use crate::services::document::{
persistence::DocumentKVPersistence,
ws_receiver::{make_document_ws_receiver, HttpServerDocumentPersistence},
ws_receiver::{make_document_ws_receiver, HttpDocumentCloudPersistence},
};
use flowy_collaboration::sync::ServerDocumentManager;
use lib_ws::WSModule;
@ -28,16 +28,16 @@ impl AppContext {
let mut ws_receivers = WebSocketReceivers::new();
let kv_store = make_document_kv_store(pg_pool.clone());
let persistence = Arc::new(FlowyPersistence { pg_pool, kv_store });
let flowy_persistence = Arc::new(FlowyPersistence { pg_pool, kv_store });
let document_persistence = Arc::new(HttpServerDocumentPersistence(persistence.clone()));
let document_persistence = Arc::new(HttpDocumentCloudPersistence(flowy_persistence.kv_store()));
let document_manager = Arc::new(ServerDocumentManager::new(document_persistence));
let document_ws_receiver = make_document_ws_receiver(persistence.clone(), document_manager.clone());
let document_ws_receiver = make_document_ws_receiver(flowy_persistence.clone(), document_manager.clone());
ws_receivers.set(WSModule::Doc, document_ws_receiver);
AppContext {
ws_server,
persistence: Data::new(persistence),
persistence: Data::new(flowy_persistence),
ws_receivers: Data::new(ws_receivers),
document_manager: Data::new(document_manager),
}

View File

@ -15,8 +15,9 @@ use flowy_collaboration::{
Revision as RevisionPB,
},
sync::ServerDocumentManager,
util::make_doc_from_revisions,
};
use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta};
use protobuf::Message;
use std::sync::Arc;
use uuid::Uuid;
@ -27,7 +28,7 @@ pub(crate) async fn create_document(
mut params: CreateDocParams,
) -> Result<(), ServerError> {
let revisions = params.take_revisions().take_items();
let _ = kv_store.batch_set_revision(revisions.into()).await?;
let _ = kv_store.set_revision(revisions.into()).await?;
Ok(())
}
@ -37,8 +38,12 @@ pub async fn read_document(
params: DocumentId,
) -> Result<DocumentInfo, ServerError> {
let _ = Uuid::parse_str(&params.doc_id).context("Parse document id to uuid failed")?;
let revisions = kv_store.batch_get_revisions(&params.doc_id, None).await?;
make_doc_from_revisions(&params.doc_id, revisions)
let revisions = kv_store.get_revisions(&params.doc_id, None).await?;
match make_doc_from_revisions(&params.doc_id, revisions) {
Ok(Some(document_info)) => Ok(document_info),
Ok(None) => Err(ServerError::record_not_found().context(format!("{} not exist", params.doc_id))),
Err(e) => Err(ServerError::internal().context(e)),
}
}
#[tracing::instrument(level = "debug", skip(document_manager, params), fields(delta), err)]
@ -60,7 +65,8 @@ pub async fn reset_document(
#[tracing::instrument(level = "debug", skip(kv_store), err)]
pub(crate) async fn delete_document(kv_store: &Arc<DocumentKVPersistence>, doc_id: Uuid) -> Result<(), ServerError> {
let _ = kv_store.batch_delete_revisions(&doc_id.to_string(), None).await?;
// TODO: delete revisions may cause time issue. Maybe delete asynchronously?
let _ = kv_store.delete_revisions(&doc_id.to_string(), None).await?;
Ok(())
}
@ -81,23 +87,14 @@ impl std::ops::DerefMut for DocumentKVPersistence {
impl DocumentKVPersistence {
pub(crate) fn new(kv_store: Arc<KVStore>) -> Self { DocumentKVPersistence { inner: kv_store } }
pub(crate) async fn batch_set_revision(&self, revisions: Vec<RevisionPB>) -> Result<(), ServerError> {
pub(crate) async fn set_revision(&self, revisions: Vec<RevisionPB>) -> Result<(), ServerError> {
let items = revisions_to_key_value_items(revisions)?;
self.inner
.transaction(|mut t| Box::pin(async move { t.batch_set(items).await }))
.await
}
pub(crate) async fn get_doc_revisions(&self, doc_id: &str) -> Result<RepeatedRevisionPB, ServerError> {
let doc_id = doc_id.to_owned();
let items = self
.inner
.transaction(|mut t| Box::pin(async move { t.batch_get_start_with(&doc_id).await }))
.await?;
Ok(key_value_items_to_revisions(items))
}
pub(crate) async fn batch_get_revisions<T: Into<Option<Vec<i64>>>>(
pub(crate) async fn get_revisions<T: Into<Option<Vec<i64>>>>(
&self,
doc_id: &str,
rev_ids: T,
@ -125,7 +122,7 @@ impl DocumentKVPersistence {
Ok(key_value_items_to_revisions(items))
}
pub(crate) async fn batch_delete_revisions<T: Into<Option<Vec<i64>>>>(
pub(crate) async fn delete_revisions<T: Into<Option<Vec<i64>>>>(
&self,
doc_id: &str,
rev_ids: T,
@ -182,35 +179,3 @@ fn key_value_items_to_revisions(items: Vec<KeyValue>) -> RepeatedRevisionPB {
#[inline]
fn make_revision_key(doc_id: &str, rev_id: i64) -> String { format!("{}:{}", doc_id, rev_id) }
#[inline]
fn make_doc_from_revisions(doc_id: &str, mut revisions: RepeatedRevisionPB) -> Result<DocumentInfo, ServerError> {
let revisions = revisions.take_items();
if revisions.is_empty() {
return Err(ServerError::record_not_found().context(format!("{} not exist", doc_id)));
}
let mut document_delta = RichTextDelta::new();
let mut base_rev_id = 0;
let mut rev_id = 0;
// TODO: replace with make_delta_from_revisions
for revision in revisions {
base_rev_id = revision.base_rev_id;
rev_id = revision.rev_id;
if revision.delta_data.is_empty() {
tracing::warn!("revision delta_data is empty");
}
let delta = RichTextDelta::from_bytes(revision.delta_data).map_err(internal_error)?;
document_delta = document_delta.compose(&delta).map_err(internal_error)?;
}
let text = document_delta.to_json();
let mut document_info = DocumentInfo::new();
document_info.set_doc_id(doc_id.to_owned());
document_info.set_text(text);
document_info.set_base_rev_id(base_rev_id);
document_info.set_rev_id(rev_id);
Ok(document_info)
}

View File

@ -62,15 +62,15 @@ impl DocumentWebSocketActor {
match msg {
WSActorMessage::ClientData {
client_data,
persistence,
persistence: _,
ret,
} => {
let _ = ret.send(self.handle_client_data(client_data, persistence).await);
let _ = ret.send(self.handle_client_data(client_data).await);
},
}
}
async fn handle_client_data(&self, client_data: WSClientData, persistence: Arc<FlowyPersistence>) -> Result<()> {
async fn handle_client_data(&self, client_data: WSClientData) -> Result<()> {
let WSClientData { user, socket, data } = client_data;
let document_client_data = spawn_blocking(move || parse_from_bytes::<DocumentClientWSDataPB>(&data))
.await
@ -83,11 +83,7 @@ impl DocumentWebSocketActor {
document_client_data.ty
);
let user = Arc::new(HttpDocumentUser {
user,
socket,
persistence,
});
let user = Arc::new(HttpDocumentUser { user, socket });
match &document_client_data.ty {
DocumentClientWSDataTypePB::ClientPushRev => {
@ -122,7 +118,6 @@ fn verify_md5(revision: &RevisionPB) -> Result<()> {
pub struct HttpDocumentUser {
pub user: Arc<WSUser>,
pub(crate) socket: Socket,
pub persistence: Arc<FlowyPersistence>,
}
impl std::fmt::Debug for HttpDocumentUser {
@ -151,17 +146,6 @@ impl RevisionUser for HttpDocumentUser {
let msg: WebSocketMessage = data.into();
self.socket.try_send(msg).map_err(internal_error)
},
SyncResponse::NewRevision(mut repeated_revision) => {
let kv_store = self.persistence.kv_store();
tokio::task::spawn(async move {
let revisions = repeated_revision.take_items().into();
match kv_store.batch_set_revision(revisions).await {
Ok(_) => {},
Err(e) => log::error!("{}", e),
}
});
Ok(())
},
};
match result {

View File

@ -2,7 +2,7 @@ use crate::{
context::FlowyPersistence,
services::{
document::{
persistence::{create_document, read_document, revisions_to_key_value_items},
persistence::{create_document, read_document, revisions_to_key_value_items, DocumentKVPersistence},
ws_actor::{DocumentWebSocketActor, WSActorMessage},
},
web_socket::{WSClientData, WebSocketReceiver},
@ -18,7 +18,7 @@ use flowy_collaboration::{
RepeatedRevision as RepeatedRevisionPB,
Revision as RevisionPB,
},
sync::{ServerDocumentManager, ServerDocumentPersistence},
sync::{DocumentCloudPersistence, ServerDocumentManager},
util::repeated_revision_from_repeated_revision_pb,
};
use lib_infra::future::BoxResultFuture;
@ -76,18 +76,19 @@ impl WebSocketReceiver for DocumentWebSocketReceiver {
}
}
pub struct HttpServerDocumentPersistence(pub Arc<FlowyPersistence>);
impl Debug for HttpServerDocumentPersistence {
pub struct HttpDocumentCloudPersistence(pub Arc<DocumentKVPersistence>);
impl Debug for HttpDocumentCloudPersistence {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("DocumentPersistenceImpl") }
}
impl ServerDocumentPersistence for HttpServerDocumentPersistence {
impl DocumentCloudPersistence for HttpDocumentCloudPersistence {
fn enable_sync(&self) -> bool { true }
fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> {
let params = DocumentId {
doc_id: doc_id.to_string(),
..Default::default()
};
let kv_store = self.0.kv_store();
let kv_store = self.0.clone();
Box::pin(async move {
let mut pb_doc = read_document(&kv_store, params)
.await
@ -104,7 +105,7 @@ impl ServerDocumentPersistence for HttpServerDocumentPersistence {
doc_id: &str,
repeated_revision: RepeatedRevisionPB,
) -> BoxResultFuture<DocumentInfo, CollaborateError> {
let kv_store = self.0.kv_store();
let kv_store = self.0.clone();
let doc_id = doc_id.to_owned();
Box::pin(async move {
let revisions = repeated_revision_from_repeated_revision_pb(repeated_revision.clone())?.into_inner();
@ -125,19 +126,22 @@ impl ServerDocumentPersistence for HttpServerDocumentPersistence {
doc_id: &str,
rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
let kv_store = self.0.kv_store();
let kv_store = self.0.clone();
let doc_id = doc_id.to_owned();
let f = || async move {
match rev_ids {
None => {
let mut repeated_revision = kv_store.get_doc_revisions(&doc_id).await?;
Ok(repeated_revision.take_items().into())
},
Some(rev_ids) => {
let mut repeated_revision = kv_store.batch_get_revisions(&doc_id, rev_ids).await?;
Ok(repeated_revision.take_items().into())
},
}
let mut repeated_revision = kv_store.get_revisions(&doc_id, rev_ids).await?;
Ok(repeated_revision.take_items().into())
};
Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) })
}
fn save_revisions(&self, mut repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> {
let kv_store = self.0.clone();
let f = || async move {
let revisions = repeated_revision.take_items().into();
let _ = kv_store.set_revision(revisions).await?;
Ok(())
};
Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) })
@ -148,7 +152,7 @@ impl ServerDocumentPersistence for HttpServerDocumentPersistence {
doc_id: &str,
mut repeated_revision: RepeatedRevisionPB,
) -> BoxResultFuture<(), CollaborateError> {
let kv_store = self.0.kv_store();
let kv_store = self.0.clone();
let doc_id = doc_id.to_owned();
let f = || async move {
kv_store

View File

@ -17,7 +17,7 @@ use backend::services::document::persistence::{read_document, reset_document};
use flowy_collaboration::entities::revision::{RepeatedRevision, Revision};
use flowy_collaboration::protobuf::{RepeatedRevision as RepeatedRevisionPB, DocumentId as DocumentIdPB};
use flowy_collaboration::sync::ServerDocumentManager;
use flowy_net::services::ws_conn::FlowyWebSocketConnect;
use flowy_net::ws::connection::FlowyWebSocketConnect;
use lib_ot::core::Interval;
pub struct DocumentTest {

View File

@ -2,7 +2,13 @@ use crate::{
context::DocumentUser,
core::{
web_socket::{make_document_ws_manager, DocumentWebSocketManager},
*,
DocumentMD5,
DocumentRevisionManager,
DocumentWSReceiver,
DocumentWebSocket,
EditorCommand,
EditorCommandQueue,
RevisionServer,
},
errors::FlowyError,
};
@ -20,7 +26,7 @@ pub struct ClientDocumentEditor {
pub doc_id: String,
#[allow(dead_code)]
rev_manager: Arc<DocumentRevisionManager>,
ws_manager: Arc<dyn DocumentWebSocketManager>,
ws_manager: Arc<DocumentWebSocketManager>,
edit_queue: UnboundedSender<EditorCommand>,
}
@ -153,7 +159,7 @@ impl ClientDocumentEditor {
#[tracing::instrument(level = "debug", skip(self))]
pub fn stop(&self) { self.ws_manager.stop(); }
pub(crate) fn ws_handler(&self) -> Arc<dyn DocumentWSReceiver> { self.ws_manager.receiver() }
pub(crate) fn ws_handler(&self) -> Arc<dyn DocumentWSReceiver> { self.ws_manager.clone() }
}
fn spawn_edit_queue(

View File

@ -1,288 +0,0 @@
use crate::{
core::{web_socket::ws_manager::DocumentWebSocketManager, SYNC_INTERVAL_IN_MILLIS},
ws_receivers::{DocumentWSReceiver, DocumentWebSocket},
};
use async_stream::stream;
use bytes::Bytes;
use flowy_collaboration::entities::{
revision::{RevId, RevisionRange},
ws::{DocumentClientWSData, DocumentServerWSData, DocumentServerWSDataType, NewDocumentUser},
};
use flowy_error::{internal_error, FlowyError, FlowyResult};
use futures::stream::StreamExt;
use lib_infra::future::FutureResult;
use lib_ws::WSConnectState;
use std::{convert::TryFrom, sync::Arc};
use tokio::{
sync::{
broadcast,
mpsc,
mpsc::{UnboundedReceiver, UnboundedSender},
},
task::spawn_blocking,
time::{interval, Duration},
};
pub(crate) struct HttpWebSocketManager {
doc_id: String,
data_provider: Arc<dyn DocumentWSSinkDataProvider>,
stream_consumer: Arc<dyn DocumentWSSteamConsumer>,
ws_conn: Arc<dyn DocumentWebSocket>,
ws_msg_tx: UnboundedSender<DocumentServerWSData>,
ws_msg_rx: Option<UnboundedReceiver<DocumentServerWSData>>,
stop_sync_tx: SinkStopTx,
state: broadcast::Sender<WSConnectState>,
}
impl HttpWebSocketManager {
pub(crate) fn new(
doc_id: &str,
ws_conn: Arc<dyn DocumentWebSocket>,
data_provider: Arc<dyn DocumentWSSinkDataProvider>,
stream_consumer: Arc<dyn DocumentWSSteamConsumer>,
) -> Self {
let (ws_msg_tx, ws_msg_rx) = mpsc::unbounded_channel();
let (stop_sync_tx, _) = tokio::sync::broadcast::channel(2);
let doc_id = doc_id.to_string();
let (state, _) = broadcast::channel(2);
let mut manager = HttpWebSocketManager {
doc_id,
data_provider,
stream_consumer,
ws_conn,
ws_msg_tx,
ws_msg_rx: Some(ws_msg_rx),
stop_sync_tx,
state,
};
manager.run();
manager
}
fn run(&mut self) {
let ws_msg_rx = self.ws_msg_rx.take().expect("Only take once");
let sink = DocumentWSSink::new(
&self.doc_id,
self.data_provider.clone(),
self.ws_conn.clone(),
self.stop_sync_tx.subscribe(),
);
let stream = DocumentWSStream::new(
&self.doc_id,
self.stream_consumer.clone(),
ws_msg_rx,
self.stop_sync_tx.subscribe(),
);
tokio::spawn(sink.run());
tokio::spawn(stream.run());
}
pub fn scribe_state(&self) -> broadcast::Receiver<WSConnectState> { self.state.subscribe() }
}
impl DocumentWebSocketManager for Arc<HttpWebSocketManager> {
fn stop(&self) {
if self.stop_sync_tx.send(()).is_ok() {
tracing::debug!("{} stop sync", self.doc_id)
}
}
fn receiver(&self) -> Arc<dyn DocumentWSReceiver> { self.clone() }
}
impl DocumentWSReceiver for HttpWebSocketManager {
fn receive_ws_data(&self, doc_data: DocumentServerWSData) {
match self.ws_msg_tx.send(doc_data) {
Ok(_) => {},
Err(e) => tracing::error!("❌ Propagate ws message failed. {}", e),
}
}
fn connect_state_changed(&self, state: &WSConnectState) {
match self.state.send(state.clone()) {
Ok(_) => {},
Err(e) => tracing::error!("{}", e),
}
}
}
impl std::ops::Drop for HttpWebSocketManager {
fn drop(&mut self) { tracing::debug!("{} HttpWebSocketManager was drop", self.doc_id) }
}
pub trait DocumentWSSteamConsumer: Send + Sync {
fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError>;
fn receive_ack(&self, id: String, ty: DocumentServerWSDataType) -> FutureResult<(), FlowyError>;
fn receive_new_user_connect(&self, new_user: NewDocumentUser) -> FutureResult<(), FlowyError>;
fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError>;
}
pub struct DocumentWSStream {
doc_id: String,
consumer: Arc<dyn DocumentWSSteamConsumer>,
ws_msg_rx: Option<mpsc::UnboundedReceiver<DocumentServerWSData>>,
stop_rx: Option<SinkStopRx>,
}
impl DocumentWSStream {
pub fn new(
doc_id: &str,
consumer: Arc<dyn DocumentWSSteamConsumer>,
ws_msg_rx: mpsc::UnboundedReceiver<DocumentServerWSData>,
stop_rx: SinkStopRx,
) -> Self {
DocumentWSStream {
doc_id: doc_id.to_owned(),
consumer,
ws_msg_rx: Some(ws_msg_rx),
stop_rx: Some(stop_rx),
}
}
pub async fn run(mut self) {
let mut receiver = self.ws_msg_rx.take().expect("Only take once");
let mut stop_rx = self.stop_rx.take().expect("Only take once");
let doc_id = self.doc_id.clone();
let stream = stream! {
loop {
tokio::select! {
result = receiver.recv() => {
match result {
Some(msg) => {
yield msg
},
None => {
tracing::debug!("[DocumentStream:{}] loop exit", doc_id);
break;
},
}
},
_ = stop_rx.recv() => {
tracing::debug!("[DocumentStream:{}] loop exit", doc_id);
break
},
};
}
};
stream
.for_each(|msg| async {
match self.handle_message(msg).await {
Ok(_) => {},
Err(e) => log::error!("[DocumentStream:{}] error: {}", self.doc_id, e),
}
})
.await;
}
async fn handle_message(&self, msg: DocumentServerWSData) -> FlowyResult<()> {
let DocumentServerWSData { doc_id: _, ty, data } = msg;
let bytes = spawn_blocking(move || Bytes::from(data))
.await
.map_err(internal_error)?;
tracing::trace!("[DocumentStream]: new message: {:?}", ty);
match ty {
DocumentServerWSDataType::ServerPushRev => {
let _ = self.consumer.receive_push_revision(bytes).await?;
},
DocumentServerWSDataType::ServerPullRev => {
let range = RevisionRange::try_from(bytes)?;
let _ = self.consumer.pull_revisions_in_range(range).await?;
},
DocumentServerWSDataType::ServerAck => {
let rev_id = RevId::try_from(bytes).unwrap().value;
let _ = self.consumer.receive_ack(rev_id.to_string(), ty).await;
},
DocumentServerWSDataType::UserConnect => {
let new_user = NewDocumentUser::try_from(bytes)?;
let _ = self.consumer.receive_new_user_connect(new_user).await;
// Notify the user that someone has connected to this document
},
}
Ok(())
}
}
pub type Tick = ();
pub type SinkStopRx = broadcast::Receiver<()>;
pub type SinkStopTx = broadcast::Sender<()>;
pub trait DocumentWSSinkDataProvider: Send + Sync {
fn next(&self) -> FutureResult<Option<DocumentClientWSData>, FlowyError>;
}
pub struct DocumentWSSink {
provider: Arc<dyn DocumentWSSinkDataProvider>,
ws_sender: Arc<dyn DocumentWebSocket>,
stop_rx: Option<SinkStopRx>,
doc_id: String,
}
impl DocumentWSSink {
pub fn new(
doc_id: &str,
provider: Arc<dyn DocumentWSSinkDataProvider>,
ws_sender: Arc<dyn DocumentWebSocket>,
stop_rx: SinkStopRx,
) -> Self {
Self {
provider,
ws_sender,
stop_rx: Some(stop_rx),
doc_id: doc_id.to_owned(),
}
}
pub async fn run(mut self) {
let (tx, mut rx) = mpsc::unbounded_channel();
let mut stop_rx = self.stop_rx.take().expect("Only take once");
let doc_id = self.doc_id.clone();
tokio::spawn(tick(tx));
let stream = stream! {
loop {
tokio::select! {
result = rx.recv() => {
match result {
Some(msg) => yield msg,
None => break,
}
},
_ = stop_rx.recv() => {
tracing::debug!("[DocumentSink:{}] loop exit", doc_id);
break
},
};
}
};
stream
.for_each(|_| async {
match self.send_next_revision().await {
Ok(_) => {},
Err(e) => log::error!("[DocumentSink]: Send failed, {:?}", e),
}
})
.await;
}
async fn send_next_revision(&self) -> FlowyResult<()> {
match self.provider.next().await? {
None => {
tracing::trace!("Finish synchronizing revisions");
Ok(())
},
Some(data) => {
tracing::trace!("[DocumentSink]: send: {}:{}-{:?}", data.doc_id, data.id(), data.ty);
self.ws_sender.send(data)
// let _ = tokio::time::timeout(Duration::from_millis(2000),
},
}
}
}
async fn tick(sender: mpsc::UnboundedSender<Tick>) {
let mut interval = interval(Duration::from_millis(SYNC_INTERVAL_IN_MILLIS));
while sender.send(()).is_ok() {
interval.tick().await;
}
}

View File

@ -1,18 +0,0 @@
use crate::core::{web_socket::DocumentWebSocketManager, DocumentWSReceiver};
use flowy_collaboration::entities::ws::DocumentServerWSData;
use lib_ws::WSConnectState;
use std::sync::Arc;
pub(crate) struct LocalWebSocketManager {}
impl DocumentWebSocketManager for Arc<LocalWebSocketManager> {
fn stop(&self) {}
fn receiver(&self) -> Arc<dyn DocumentWSReceiver> { self.clone() }
}
impl DocumentWSReceiver for LocalWebSocketManager {
fn receive_ws_data(&self, _doc_data: DocumentServerWSData) {}
fn connect_state_changed(&self, _state: &WSConnectState) {}
}

View File

@ -1,7 +1,283 @@
#![allow(clippy::module_inception)]
mod http_ws_impl;
mod local_ws_impl;
mod ws_manager;
pub use ws_manager::*;
pub(crate) use http_ws_impl::*;
pub(crate) use ws_manager::*;
use crate::core::{
web_socket::{DocumentWSSinkDataProvider, DocumentWSSteamConsumer},
DocumentRevisionManager,
DocumentWSReceiver,
DocumentWebSocket,
EditorCommand,
TransformDeltas,
};
use bytes::Bytes;
use flowy_collaboration::{
entities::{
revision::{RepeatedRevision, Revision, RevisionRange},
ws::{DocumentClientWSData, DocumentServerWSDataType, NewDocumentUser},
},
errors::CollaborateResult,
};
use flowy_error::{internal_error, FlowyError, FlowyResult};
use lib_infra::future::FutureResult;
use lib_ws::WSConnectState;
use std::{collections::VecDeque, convert::TryFrom, sync::Arc};
use tokio::sync::{broadcast, mpsc::UnboundedSender, oneshot, RwLock};
pub(crate) async fn make_document_ws_manager(
doc_id: String,
user_id: String,
edit_cmd_tx: UnboundedSender<EditorCommand>,
rev_manager: Arc<DocumentRevisionManager>,
ws_conn: Arc<dyn DocumentWebSocket>,
) -> Arc<DocumentWebSocketManager> {
let shared_sink = Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone()));
let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter {
doc_id: doc_id.clone(),
edit_cmd_tx,
rev_manager: rev_manager.clone(),
shared_sink: shared_sink.clone(),
});
let data_provider = Arc::new(DocumentWSSinkDataProviderAdapter(shared_sink));
let ws_manager = Arc::new(DocumentWebSocketManager::new(
&doc_id,
ws_conn,
data_provider,
ws_stream_consumer,
));
listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(), rev_manager);
ws_manager
}
fn listen_document_ws_state(
_user_id: &str,
_doc_id: &str,
mut subscriber: broadcast::Receiver<WSConnectState>,
_rev_manager: Arc<DocumentRevisionManager>,
) {
tokio::spawn(async move {
while let Ok(state) = subscriber.recv().await {
match state {
WSConnectState::Init => {},
WSConnectState::Connecting => {},
WSConnectState::Connected => {},
WSConnectState::Disconnected => {},
}
}
});
}
pub(crate) struct DocumentWebSocketSteamConsumerAdapter {
pub(crate) doc_id: String,
pub(crate) edit_cmd_tx: UnboundedSender<EditorCommand>,
pub(crate) rev_manager: Arc<DocumentRevisionManager>,
pub(crate) shared_sink: Arc<SharedWSSinkDataProvider>,
}
impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter {
fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError> {
let rev_manager = self.rev_manager.clone();
let edit_cmd_tx = self.edit_cmd_tx.clone();
let shared_sink = self.shared_sink.clone();
let doc_id = self.doc_id.clone();
FutureResult::new(async move {
if let Some(server_composed_revision) = handle_remote_revision(edit_cmd_tx, rev_manager, bytes).await? {
let data = DocumentClientWSData::from_revisions(&doc_id, vec![server_composed_revision]);
shared_sink.push_back(data).await;
}
Ok(())
})
}
fn receive_ack(&self, id: String, ty: DocumentServerWSDataType) -> FutureResult<(), FlowyError> {
let shared_sink = self.shared_sink.clone();
FutureResult::new(async move { shared_sink.ack(id, ty).await })
}
fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> FutureResult<(), FlowyError> {
// the _new_user will be used later
FutureResult::new(async move { Ok(()) })
}
fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> {
let rev_manager = self.rev_manager.clone();
let shared_sink = self.shared_sink.clone();
let doc_id = self.doc_id.clone();
FutureResult::new(async move {
let revisions = rev_manager.get_revisions_in_range(range).await?;
let data = DocumentClientWSData::from_revisions(&doc_id, revisions);
shared_sink.push_back(data).await;
Ok(())
})
}
}
pub(crate) struct DocumentWSSinkDataProviderAdapter(pub(crate) Arc<SharedWSSinkDataProvider>);
impl DocumentWSSinkDataProvider for DocumentWSSinkDataProviderAdapter {
fn next(&self) -> FutureResult<Option<DocumentClientWSData>, FlowyError> {
let shared_sink = self.0.clone();
FutureResult::new(async move { shared_sink.next().await })
}
}
async fn transform_pushed_revisions(
revisions: Vec<Revision>,
edit_cmd: &UnboundedSender<EditorCommand>,
) -> FlowyResult<TransformDeltas> {
let (ret, rx) = oneshot::channel::<CollaborateResult<TransformDeltas>>();
let _ = edit_cmd.send(EditorCommand::TransformRevision { revisions, ret });
Ok(rx.await.map_err(internal_error)??)
}
#[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))]
pub(crate) async fn handle_remote_revision(
edit_cmd_tx: UnboundedSender<EditorCommand>,
rev_manager: Arc<DocumentRevisionManager>,
bytes: Bytes,
) -> FlowyResult<Option<Revision>> {
let mut revisions = RepeatedRevision::try_from(bytes)?.into_inner();
if revisions.is_empty() {
return Ok(None);
}
let first_revision = revisions.first().unwrap();
if let Some(local_revision) = rev_manager.get_revision(first_revision.rev_id).await {
if local_revision.md5 == first_revision.md5 {
// The local revision is equal to the pushed revision. Just ignore it.
revisions = revisions.split_off(1);
if revisions.is_empty() {
return Ok(None);
}
} else {
return Ok(None);
}
}
let TransformDeltas {
client_prime,
server_prime,
} = transform_pushed_revisions(revisions.clone(), &edit_cmd_tx).await?;
match server_prime {
None => {
// The server_prime is None means the client local revisions conflict with the
// server, and it needs to override the client delta.
let (ret, rx) = oneshot::channel();
let _ = edit_cmd_tx.send(EditorCommand::OverrideDelta {
revisions,
delta: client_prime,
ret,
});
let _ = rx.await.map_err(internal_error)??;
Ok(None)
},
Some(server_prime) => {
let (ret, rx) = oneshot::channel();
let _ = edit_cmd_tx.send(EditorCommand::ComposeRemoteDelta {
revisions,
client_delta: client_prime,
server_delta: server_prime,
ret,
});
Ok(rx.await.map_err(internal_error)??)
},
}
}
#[derive(Clone)]
enum SourceType {
Shared,
Revision,
}
#[derive(Clone)]
pub(crate) struct SharedWSSinkDataProvider {
shared: Arc<RwLock<VecDeque<DocumentClientWSData>>>,
rev_manager: Arc<DocumentRevisionManager>,
source_ty: Arc<RwLock<SourceType>>,
}
impl SharedWSSinkDataProvider {
pub(crate) fn new(rev_manager: Arc<DocumentRevisionManager>) -> Self {
SharedWSSinkDataProvider {
shared: Arc::new(RwLock::new(VecDeque::new())),
rev_manager,
source_ty: Arc::new(RwLock::new(SourceType::Shared)),
}
}
#[allow(dead_code)]
pub(crate) async fn push_front(&self, data: DocumentClientWSData) { self.shared.write().await.push_front(data); }
async fn push_back(&self, data: DocumentClientWSData) { self.shared.write().await.push_back(data); }
async fn next(&self) -> FlowyResult<Option<DocumentClientWSData>> {
let source_ty = self.source_ty.read().await.clone();
match source_ty {
SourceType::Shared => match self.shared.read().await.front() {
None => {
*self.source_ty.write().await = SourceType::Revision;
Ok(None)
},
Some(data) => {
tracing::debug!("[SharedWSSinkDataProvider]: {}:{:?}", data.doc_id, data.ty);
Ok(Some(data.clone()))
},
},
SourceType::Revision => {
if !self.shared.read().await.is_empty() {
*self.source_ty.write().await = SourceType::Shared;
return Ok(None);
}
match self.rev_manager.next_sync_revision().await? {
Some(rev) => {
let doc_id = rev.doc_id.clone();
Ok(Some(DocumentClientWSData::from_revisions(&doc_id, vec![rev])))
},
None => {
//
let doc_id = self.rev_manager.doc_id.clone();
let latest_rev_id = self.rev_manager.rev_id();
Ok(Some(DocumentClientWSData::ping(&doc_id, latest_rev_id)))
},
}
},
}
}
async fn ack(&self, id: String, _ty: DocumentServerWSDataType) -> FlowyResult<()> {
// let _ = self.rev_manager.ack_revision(id).await?;
let source_ty = self.source_ty.read().await.clone();
match source_ty {
SourceType::Shared => {
let should_pop = match self.shared.read().await.front() {
None => false,
Some(val) => {
let expected_id = val.id();
if expected_id == id {
true
} else {
tracing::error!("The front element's {} is not equal to the {}", expected_id, id);
false
}
},
};
if should_pop {
let _ = self.shared.write().await.pop_front();
}
},
SourceType::Revision => {
match id.parse::<i64>() {
Ok(rev_id) => {
let _ = self.rev_manager.ack_revision(rev_id).await?;
},
Err(e) => {
tracing::error!("Parse rev_id from {} failed. {}", id, e);
},
};
},
}
Ok(())
}
}

View File

@ -1,310 +1,283 @@
use crate::core::{
web_socket::{DocumentWSSinkDataProvider, DocumentWSSteamConsumer, HttpWebSocketManager},
DocumentRevisionManager,
DocumentWSReceiver,
DocumentWebSocket,
EditorCommand,
TransformDeltas,
use crate::{
core::SYNC_INTERVAL_IN_MILLIS,
ws_receivers::{DocumentWSReceiver, DocumentWebSocket},
};
use async_stream::stream;
use bytes::Bytes;
use flowy_collaboration::{
entities::{
revision::{RepeatedRevision, Revision, RevisionRange},
ws::{DocumentClientWSData, NewDocumentUser},
},
errors::CollaborateResult,
use flowy_collaboration::entities::{
revision::{RevId, RevisionRange},
ws::{DocumentClientWSData, DocumentServerWSData, DocumentServerWSDataType, NewDocumentUser},
};
use flowy_error::{internal_error, FlowyError, FlowyResult};
use futures::stream::StreamExt;
use lib_infra::future::FutureResult;
use flowy_collaboration::entities::ws::DocumentServerWSDataType;
use lib_ws::WSConnectState;
use std::{collections::VecDeque, convert::TryFrom, sync::Arc};
use tokio::sync::{broadcast, mpsc::UnboundedSender, oneshot, RwLock};
use std::{convert::TryFrom, sync::Arc};
use tokio::{
sync::{
broadcast,
mpsc,
mpsc::{UnboundedReceiver, UnboundedSender},
},
task::spawn_blocking,
time::{interval, Duration},
};
pub(crate) trait DocumentWebSocketManager: Send + Sync {
fn stop(&self);
fn receiver(&self) -> Arc<dyn DocumentWSReceiver>;
}
pub(crate) async fn make_document_ws_manager(
pub struct DocumentWebSocketManager {
doc_id: String,
user_id: String,
edit_cmd_tx: UnboundedSender<EditorCommand>,
rev_manager: Arc<DocumentRevisionManager>,
data_provider: Arc<dyn DocumentWSSinkDataProvider>,
stream_consumer: Arc<dyn DocumentWSSteamConsumer>,
ws_conn: Arc<dyn DocumentWebSocket>,
) -> Arc<dyn DocumentWebSocketManager> {
// if cfg!(feature = "http_server") {
// let shared_sink =
// Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone()));
// let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter {
// doc_id: doc_id.clone(),
// edit_cmd_tx,
// rev_manager: rev_manager.clone(),
// shared_sink: shared_sink.clone(),
// });
// let data_provider =
// Arc::new(DocumentWSSinkDataProviderAdapter(shared_sink));
// let ws_manager = Arc::new(HttpWebSocketManager::new(
// &doc_id,
// ws_conn,
// data_provider,
// ws_stream_consumer,
// ));
// listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(),
// rev_manager); Arc::new(ws_manager)
// } else {
// Arc::new(Arc::new(LocalWebSocketManager {}))
// }
let shared_sink = Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone()));
let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter {
doc_id: doc_id.clone(),
edit_cmd_tx,
rev_manager: rev_manager.clone(),
shared_sink: shared_sink.clone(),
});
let data_provider = Arc::new(DocumentWSSinkDataProviderAdapter(shared_sink));
let ws_manager = Arc::new(HttpWebSocketManager::new(
&doc_id,
ws_conn,
data_provider,
ws_stream_consumer,
));
listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(), rev_manager);
Arc::new(ws_manager)
ws_msg_tx: UnboundedSender<DocumentServerWSData>,
ws_msg_rx: Option<UnboundedReceiver<DocumentServerWSData>>,
stop_sync_tx: SinkStopTx,
state: broadcast::Sender<WSConnectState>,
}
fn listen_document_ws_state(
_user_id: &str,
_doc_id: &str,
mut subscriber: broadcast::Receiver<WSConnectState>,
_rev_manager: Arc<DocumentRevisionManager>,
) {
tokio::spawn(async move {
while let Ok(state) = subscriber.recv().await {
match state {
WSConnectState::Init => {},
WSConnectState::Connecting => {},
WSConnectState::Connected => {},
WSConnectState::Disconnected => {},
}
impl DocumentWebSocketManager {
pub(crate) fn new(
doc_id: &str,
ws_conn: Arc<dyn DocumentWebSocket>,
data_provider: Arc<dyn DocumentWSSinkDataProvider>,
stream_consumer: Arc<dyn DocumentWSSteamConsumer>,
) -> Self {
let (ws_msg_tx, ws_msg_rx) = mpsc::unbounded_channel();
let (stop_sync_tx, _) = tokio::sync::broadcast::channel(2);
let doc_id = doc_id.to_string();
let (state, _) = broadcast::channel(2);
let mut manager = DocumentWebSocketManager {
doc_id,
data_provider,
stream_consumer,
ws_conn,
ws_msg_tx,
ws_msg_rx: Some(ws_msg_rx),
stop_sync_tx,
state,
};
manager.run();
manager
}
fn run(&mut self) {
let ws_msg_rx = self.ws_msg_rx.take().expect("Only take once");
let sink = DocumentWSSink::new(
&self.doc_id,
self.data_provider.clone(),
self.ws_conn.clone(),
self.stop_sync_tx.subscribe(),
);
let stream = DocumentWSStream::new(
&self.doc_id,
self.stream_consumer.clone(),
ws_msg_rx,
self.stop_sync_tx.subscribe(),
);
tokio::spawn(sink.run());
tokio::spawn(stream.run());
}
pub fn scribe_state(&self) -> broadcast::Receiver<WSConnectState> { self.state.subscribe() }
pub(crate) fn stop(&self) {
if self.stop_sync_tx.send(()).is_ok() {
tracing::debug!("{} stop sync", self.doc_id)
}
});
}
}
pub(crate) struct DocumentWebSocketSteamConsumerAdapter {
pub(crate) doc_id: String,
pub(crate) edit_cmd_tx: UnboundedSender<EditorCommand>,
pub(crate) rev_manager: Arc<DocumentRevisionManager>,
pub(crate) shared_sink: Arc<SharedWSSinkDataProvider>,
impl DocumentWSReceiver for DocumentWebSocketManager {
fn receive_ws_data(&self, doc_data: DocumentServerWSData) {
match self.ws_msg_tx.send(doc_data) {
Ok(_) => {},
Err(e) => tracing::error!("❌ Propagate ws message failed. {}", e),
}
}
fn connect_state_changed(&self, state: &WSConnectState) {
match self.state.send(state.clone()) {
Ok(_) => {},
Err(e) => tracing::error!("{}", e),
}
}
}
impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter {
fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError> {
let rev_manager = self.rev_manager.clone();
let edit_cmd_tx = self.edit_cmd_tx.clone();
let shared_sink = self.shared_sink.clone();
impl std::ops::Drop for DocumentWebSocketManager {
fn drop(&mut self) { tracing::debug!("{} HttpWebSocketManager was drop", self.doc_id) }
}
pub trait DocumentWSSteamConsumer: Send + Sync {
fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError>;
fn receive_ack(&self, id: String, ty: DocumentServerWSDataType) -> FutureResult<(), FlowyError>;
fn receive_new_user_connect(&self, new_user: NewDocumentUser) -> FutureResult<(), FlowyError>;
fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError>;
}
pub struct DocumentWSStream {
doc_id: String,
consumer: Arc<dyn DocumentWSSteamConsumer>,
ws_msg_rx: Option<mpsc::UnboundedReceiver<DocumentServerWSData>>,
stop_rx: Option<SinkStopRx>,
}
impl DocumentWSStream {
pub fn new(
doc_id: &str,
consumer: Arc<dyn DocumentWSSteamConsumer>,
ws_msg_rx: mpsc::UnboundedReceiver<DocumentServerWSData>,
stop_rx: SinkStopRx,
) -> Self {
DocumentWSStream {
doc_id: doc_id.to_owned(),
consumer,
ws_msg_rx: Some(ws_msg_rx),
stop_rx: Some(stop_rx),
}
}
pub async fn run(mut self) {
let mut receiver = self.ws_msg_rx.take().expect("Only take once");
let mut stop_rx = self.stop_rx.take().expect("Only take once");
let doc_id = self.doc_id.clone();
FutureResult::new(async move {
if let Some(server_composed_revision) = handle_remote_revision(edit_cmd_tx, rev_manager, bytes).await? {
let data = DocumentClientWSData::from_revisions(&doc_id, vec![server_composed_revision]);
shared_sink.push_back(data).await;
}
Ok(())
})
}
fn receive_ack(&self, id: String, ty: DocumentServerWSDataType) -> FutureResult<(), FlowyError> {
let shared_sink = self.shared_sink.clone();
FutureResult::new(async move { shared_sink.ack(id, ty).await })
}
fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> FutureResult<(), FlowyError> {
// the _new_user will be used later
FutureResult::new(async move { Ok(()) })
}
fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> {
let rev_manager = self.rev_manager.clone();
let shared_sink = self.shared_sink.clone();
let doc_id = self.doc_id.clone();
FutureResult::new(async move {
let revisions = rev_manager.get_revisions_in_range(range).await?;
let data = DocumentClientWSData::from_revisions(&doc_id, revisions);
shared_sink.push_back(data).await;
Ok(())
})
}
}
pub(crate) struct DocumentWSSinkDataProviderAdapter(pub(crate) Arc<SharedWSSinkDataProvider>);
impl DocumentWSSinkDataProvider for DocumentWSSinkDataProviderAdapter {
fn next(&self) -> FutureResult<Option<DocumentClientWSData>, FlowyError> {
let shared_sink = self.0.clone();
FutureResult::new(async move { shared_sink.next().await })
}
}
async fn transform_pushed_revisions(
revisions: Vec<Revision>,
edit_cmd: &UnboundedSender<EditorCommand>,
) -> FlowyResult<TransformDeltas> {
let (ret, rx) = oneshot::channel::<CollaborateResult<TransformDeltas>>();
let _ = edit_cmd.send(EditorCommand::TransformRevision { revisions, ret });
Ok(rx.await.map_err(internal_error)??)
}
#[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))]
pub(crate) async fn handle_remote_revision(
edit_cmd_tx: UnboundedSender<EditorCommand>,
rev_manager: Arc<DocumentRevisionManager>,
bytes: Bytes,
) -> FlowyResult<Option<Revision>> {
let mut revisions = RepeatedRevision::try_from(bytes)?.into_inner();
if revisions.is_empty() {
return Ok(None);
}
let first_revision = revisions.first().unwrap();
if let Some(local_revision) = rev_manager.get_revision(first_revision.rev_id).await {
if local_revision.md5 == first_revision.md5 {
// The local revision is equal to the pushed revision. Just ignore it.
revisions = revisions.split_off(1);
if revisions.is_empty() {
return Ok(None);
}
} else {
return Ok(None);
}
}
let TransformDeltas {
client_prime,
server_prime,
} = transform_pushed_revisions(revisions.clone(), &edit_cmd_tx).await?;
match server_prime {
None => {
// The server_prime is None means the client local revisions conflict with the
// server, and it needs to override the client delta.
let (ret, rx) = oneshot::channel();
let _ = edit_cmd_tx.send(EditorCommand::OverrideDelta {
revisions,
delta: client_prime,
ret,
});
let _ = rx.await.map_err(internal_error)??;
Ok(None)
},
Some(server_prime) => {
let (ret, rx) = oneshot::channel();
let _ = edit_cmd_tx.send(EditorCommand::ComposeRemoteDelta {
revisions,
client_delta: client_prime,
server_delta: server_prime,
ret,
});
Ok(rx.await.map_err(internal_error)??)
},
}
}
#[derive(Clone)]
enum SourceType {
Shared,
Revision,
}
#[derive(Clone)]
pub(crate) struct SharedWSSinkDataProvider {
shared: Arc<RwLock<VecDeque<DocumentClientWSData>>>,
rev_manager: Arc<DocumentRevisionManager>,
source_ty: Arc<RwLock<SourceType>>,
}
impl SharedWSSinkDataProvider {
pub(crate) fn new(rev_manager: Arc<DocumentRevisionManager>) -> Self {
SharedWSSinkDataProvider {
shared: Arc::new(RwLock::new(VecDeque::new())),
rev_manager,
source_ty: Arc::new(RwLock::new(SourceType::Shared)),
}
}
#[allow(dead_code)]
pub(crate) async fn push_front(&self, data: DocumentClientWSData) { self.shared.write().await.push_front(data); }
async fn push_back(&self, data: DocumentClientWSData) { self.shared.write().await.push_back(data); }
async fn next(&self) -> FlowyResult<Option<DocumentClientWSData>> {
let source_ty = self.source_ty.read().await.clone();
match source_ty {
SourceType::Shared => match self.shared.read().await.front() {
None => {
*self.source_ty.write().await = SourceType::Revision;
Ok(None)
},
Some(data) => {
tracing::debug!("[SharedWSSinkDataProvider]: {}:{:?}", data.doc_id, data.ty);
Ok(Some(data.clone()))
},
},
SourceType::Revision => {
if !self.shared.read().await.is_empty() {
*self.source_ty.write().await = SourceType::Shared;
return Ok(None);
}
match self.rev_manager.next_sync_revision().await? {
Some(rev) => {
let doc_id = rev.doc_id.clone();
Ok(Some(DocumentClientWSData::from_revisions(&doc_id, vec![rev])))
},
None => {
//
let doc_id = self.rev_manager.doc_id.clone();
let latest_rev_id = self.rev_manager.rev_id();
Ok(Some(DocumentClientWSData::ping(&doc_id, latest_rev_id)))
},
}
},
}
}
async fn ack(&self, id: String, _ty: DocumentServerWSDataType) -> FlowyResult<()> {
// let _ = self.rev_manager.ack_revision(id).await?;
let source_ty = self.source_ty.read().await.clone();
match source_ty {
SourceType::Shared => {
let should_pop = match self.shared.read().await.front() {
None => false,
Some(val) => {
let expected_id = val.id();
if expected_id == id {
true
} else {
tracing::error!("The front element's {} is not equal to the {}", expected_id, id);
false
let stream = stream! {
loop {
tokio::select! {
result = receiver.recv() => {
match result {
Some(msg) => {
yield msg
},
None => {
tracing::debug!("[DocumentStream:{}] loop exit", doc_id);
break;
},
}
},
_ = stop_rx.recv() => {
tracing::debug!("[DocumentStream:{}] loop exit", doc_id);
break
},
};
if should_pop {
let _ = self.shared.write().await.pop_front();
}
};
stream
.for_each(|msg| async {
match self.handle_message(msg).await {
Ok(_) => {},
Err(e) => log::error!("[DocumentStream:{}] error: {}", self.doc_id, e),
}
})
.await;
}
async fn handle_message(&self, msg: DocumentServerWSData) -> FlowyResult<()> {
let DocumentServerWSData { doc_id: _, ty, data } = msg;
let bytes = spawn_blocking(move || Bytes::from(data))
.await
.map_err(internal_error)?;
tracing::trace!("[DocumentStream]: new message: {:?}", ty);
match ty {
DocumentServerWSDataType::ServerPushRev => {
let _ = self.consumer.receive_push_revision(bytes).await?;
},
SourceType::Revision => {
match id.parse::<i64>() {
Ok(rev_id) => {
let _ = self.rev_manager.ack_revision(rev_id).await?;
},
Err(e) => {
tracing::error!("Parse rev_id from {} failed. {}", id, e);
},
};
DocumentServerWSDataType::ServerPullRev => {
let range = RevisionRange::try_from(bytes)?;
let _ = self.consumer.pull_revisions_in_range(range).await?;
},
DocumentServerWSDataType::ServerAck => {
let rev_id = RevId::try_from(bytes).unwrap().value;
let _ = self.consumer.receive_ack(rev_id.to_string(), ty).await;
},
DocumentServerWSDataType::UserConnect => {
let new_user = NewDocumentUser::try_from(bytes)?;
let _ = self.consumer.receive_new_user_connect(new_user).await;
// Notify the user that someone has connected to this document
},
}
Ok(())
}
}
pub type Tick = ();
pub type SinkStopRx = broadcast::Receiver<()>;
pub type SinkStopTx = broadcast::Sender<()>;
pub trait DocumentWSSinkDataProvider: Send + Sync {
fn next(&self) -> FutureResult<Option<DocumentClientWSData>, FlowyError>;
}
pub struct DocumentWSSink {
provider: Arc<dyn DocumentWSSinkDataProvider>,
ws_sender: Arc<dyn DocumentWebSocket>,
stop_rx: Option<SinkStopRx>,
doc_id: String,
}
impl DocumentWSSink {
pub fn new(
doc_id: &str,
provider: Arc<dyn DocumentWSSinkDataProvider>,
ws_sender: Arc<dyn DocumentWebSocket>,
stop_rx: SinkStopRx,
) -> Self {
Self {
provider,
ws_sender,
stop_rx: Some(stop_rx),
doc_id: doc_id.to_owned(),
}
}
pub async fn run(mut self) {
let (tx, mut rx) = mpsc::unbounded_channel();
let mut stop_rx = self.stop_rx.take().expect("Only take once");
let doc_id = self.doc_id.clone();
tokio::spawn(tick(tx));
let stream = stream! {
loop {
tokio::select! {
result = rx.recv() => {
match result {
Some(msg) => yield msg,
None => break,
}
},
_ = stop_rx.recv() => {
tracing::debug!("[DocumentSink:{}] loop exit", doc_id);
break
},
};
}
};
stream
.for_each(|_| async {
match self.send_next_revision().await {
Ok(_) => {},
Err(e) => log::error!("[DocumentSink] send failed, {:?}", e),
}
})
.await;
}
async fn send_next_revision(&self) -> FlowyResult<()> {
match self.provider.next().await? {
None => {
tracing::trace!("Finish synchronizing revisions");
Ok(())
},
Some(data) => {
tracing::trace!("[DocumentSink] send: {}:{}-{:?}", data.doc_id, data.id(), data.ty);
self.ws_sender.send(data)
},
}
}
}
async fn tick(sender: mpsc::UnboundedSender<Tick>) {
let mut interval = interval(Duration::from_millis(SYNC_INTERVAL_IN_MILLIS));
while sender.send(()).is_ok() {
interval.tick().await;
}
}

View File

@ -7,7 +7,7 @@ edition = "2018"
[dependencies]
lib-dispatch = { path = "../lib-dispatch" }
flowy-error = { path = "../flowy-error" }
flowy-error = { path = "../flowy-error", features = ["collaboration"] }
flowy-derive = { path = "../../../shared-lib/flowy-derive" }
flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration"}
backend-service = { path = "../../../shared-lib/backend-service" }

View File

@ -1,4 +1,4 @@
use crate::{entities::NetworkState, services::ws_conn::FlowyWebSocketConnect};
use crate::{entities::NetworkState, ws::connection::FlowyWebSocketConnect};
use flowy_error::FlowyError;
use lib_dispatch::prelude::{Data, Unit};
use std::sync::Arc;

View File

@ -4,4 +4,4 @@ mod event;
mod handlers;
pub mod module;
pub mod protobuf;
pub mod services;
pub mod ws;

View File

@ -1,4 +1,4 @@
use crate::{event::NetworkEvent, handlers::*, services::ws_conn::FlowyWebSocketConnect};
use crate::{event::NetworkEvent, handlers::*, ws::connection::FlowyWebSocketConnect};
use lib_dispatch::prelude::*;
use std::sync::Arc;

View File

@ -1,5 +0,0 @@
mod local_server;
mod local_ws;
mod persistence;
pub use local_ws::*;

View File

@ -1,74 +0,0 @@
use dashmap::DashMap;
use flowy_collaboration::{
entities::doc::DocumentInfo,
errors::CollaborateError,
protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
sync::*,
util::repeated_revision_from_repeated_revision_pb,
};
use lib_infra::future::BoxResultFuture;
use std::{
fmt::{Debug, Formatter},
sync::Arc,
};
pub(crate) struct LocalServerDocumentPersistence {
// For the moment, we use memory to cache the data, it will be implemented with other storage.
// Like the Firestore,Dropbox.etc.
inner: Arc<DashMap<String, DocumentInfo>>,
}
impl Debug for LocalServerDocumentPersistence {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("LocalDocServerPersistence") }
}
impl std::default::Default for LocalServerDocumentPersistence {
fn default() -> Self {
LocalServerDocumentPersistence {
inner: Arc::new(DashMap::new()),
}
}
}
impl ServerDocumentPersistence for LocalServerDocumentPersistence {
fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> {
let inner = self.inner.clone();
let doc_id = doc_id.to_owned();
Box::pin(async move {
match inner.get(&doc_id) {
None => Err(CollaborateError::record_not_found()),
Some(val) => {
//
Ok(val.value().clone())
},
}
})
}
fn create_document(
&self,
doc_id: &str,
repeated_revision: RepeatedRevisionPB,
) -> BoxResultFuture<DocumentInfo, CollaborateError> {
let doc_id = doc_id.to_owned();
let inner = self.inner.clone();
Box::pin(async move {
let repeated_revision = repeated_revision_from_repeated_revision_pb(repeated_revision)?;
let document_info = DocumentInfo::from_revisions(&doc_id, repeated_revision.into_inner())?;
inner.insert(doc_id, document_info.clone());
Ok(document_info)
})
}
fn read_revisions(
&self,
_doc_id: &str,
_rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
Box::pin(async move { Ok(vec![]) })
}
fn reset_document(&self, _doc_id: &str, _revisions: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> {
unimplemented!()
}
}

View File

@ -94,31 +94,27 @@ impl FlowyWebSocketConnect {
#[tracing::instrument(level = "debug", skip(ws_conn))]
pub fn listen_on_websocket(ws_conn: Arc<FlowyWebSocketConnect>) {
if cfg!(feature = "http_server") {
let ws = ws_conn.inner.clone();
let mut notify = ws_conn.inner.subscribe_connect_state();
let _ = tokio::spawn(async move {
loop {
match notify.recv().await {
Ok(state) => {
tracing::info!("Websocket state changed: {}", state);
match state {
WSConnectState::Init => {},
WSConnectState::Connected => {},
WSConnectState::Connecting => {},
WSConnectState::Disconnected => retry_connect(ws.clone(), 100).await,
}
},
Err(e) => {
tracing::error!("Websocket state notify error: {:?}", e);
break;
},
}
let ws = ws_conn.inner.clone();
let mut notify = ws_conn.inner.subscribe_connect_state();
let _ = tokio::spawn(async move {
loop {
match notify.recv().await {
Ok(state) => {
tracing::info!("Websocket state changed: {}", state);
match state {
WSConnectState::Init => {},
WSConnectState::Connected => {},
WSConnectState::Connecting => {},
WSConnectState::Disconnected => retry_connect(ws.clone(), 100).await,
}
},
Err(e) => {
tracing::error!("Websocket state notify error: {:?}", e);
break;
},
}
});
} else {
// do nothing
};
}
});
}
async fn retry_connect(ws: Arc<dyn FlowyRawWebSocket>, count: usize) {

View File

@ -1,4 +1,4 @@
use crate::services::ws_conn::{FlowyRawWebSocket, FlowyWSSender};
use crate::ws::connection::{FlowyRawWebSocket, FlowyWSSender};
use flowy_error::internal_error;
pub use flowy_error::FlowyError;
use lib_infra::future::FutureResult;

View File

@ -1,4 +1,4 @@
use crate::services::local::persistence::LocalServerDocumentPersistence;
use crate::ws::local::persistence::LocalDocumentCloudPersistence;
use bytes::Bytes;
use flowy_collaboration::{
entities::ws::{DocumentClientWSData, DocumentClientWSDataType},
@ -13,12 +13,12 @@ use tokio::sync::{mpsc, mpsc::UnboundedSender};
pub struct LocalDocumentServer {
pub doc_manager: Arc<ServerDocumentManager>,
sender: mpsc::UnboundedSender<WebSocketRawMessage>,
persistence: Arc<dyn ServerDocumentPersistence>,
persistence: Arc<LocalDocumentCloudPersistence>,
}
impl LocalDocumentServer {
pub fn new(sender: mpsc::UnboundedSender<WebSocketRawMessage>) -> Self {
let persistence = Arc::new(LocalServerDocumentPersistence::default());
let persistence = Arc::new(LocalDocumentCloudPersistence::default());
let doc_manager = Arc::new(ServerDocumentManager::new(persistence.clone()));
LocalDocumentServer {
doc_manager,
@ -41,7 +41,6 @@ impl LocalDocumentServer {
let user = Arc::new(LocalDocumentUser {
user_id,
ws_sender: self.sender.clone(),
persistence: self.persistence.clone(),
});
let ty = client_data.ty.clone();
let document_client_data: DocumentClientWSDataPB = client_data.try_into().unwrap();
@ -64,7 +63,6 @@ impl LocalDocumentServer {
struct LocalDocumentUser {
user_id: String,
ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
persistence: Arc<dyn ServerDocumentPersistence>,
}
impl RevisionUser for LocalDocumentUser {
@ -105,9 +103,6 @@ impl RevisionUser for LocalDocumentUser {
};
send_fn(sender, msg);
},
SyncResponse::NewRevision(mut _repeated_revision) => {
// unimplemented!()
},
}
});
}

View File

@ -1,6 +1,6 @@
use crate::services::{
use crate::ws::{
connection::{FlowyRawWebSocket, FlowyWSSender},
local::local_server::LocalDocumentServer,
ws_conn::{FlowyRawWebSocket, FlowyWSSender},
};
use bytes::Bytes;
use dashmap::DashMap;

View File

@ -0,0 +1,24 @@
mod local_server;
mod local_ws;
mod persistence;
use flowy_collaboration::errors::CollaborateError;
pub use local_ws::*;
use flowy_collaboration::protobuf::RepeatedRevision as RepeatedRevisionPB;
use lib_infra::future::BoxResultFuture;
pub trait DocumentCloudStorage: Send + Sync {
fn set_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>;
fn get_revisions(
&self,
doc_id: &str,
rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<RepeatedRevisionPB, CollaborateError>;
fn reset_document(
&self,
doc_id: &str,
repeated_revision: RepeatedRevisionPB,
) -> BoxResultFuture<(), CollaborateError>;
}

View File

@ -0,0 +1,130 @@
use crate::ws::local::DocumentCloudStorage;
use dashmap::DashMap;
use flowy_collaboration::{
entities::doc::DocumentInfo,
errors::CollaborateError,
protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
sync::*,
util::{make_doc_from_revisions, repeated_revision_from_repeated_revision_pb},
};
use lib_infra::future::BoxResultFuture;
use std::{
convert::TryInto,
fmt::{Debug, Formatter},
sync::Arc,
};
pub(crate) struct LocalDocumentCloudPersistence {
// For the moment, we use memory to cache the data, it will be implemented with other storage.
// Like the Firestore,Dropbox.etc.
storage: Arc<dyn DocumentCloudStorage>,
}
impl Debug for LocalDocumentCloudPersistence {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("LocalDocServerPersistence") }
}
impl std::default::Default for LocalDocumentCloudPersistence {
fn default() -> Self {
LocalDocumentCloudPersistence {
storage: Arc::new(MemoryDocumentCloudStorage::default()),
}
}
}
impl DocumentCloudPersistence for LocalDocumentCloudPersistence {
fn enable_sync(&self) -> bool { false }
fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> {
let storage = self.storage.clone();
let doc_id = doc_id.to_owned();
Box::pin(async move {
let repeated_revision = storage.get_revisions(&doc_id, None).await?;
match make_doc_from_revisions(&doc_id, repeated_revision) {
Ok(Some(mut document_info_pb)) => {
let document_info: DocumentInfo = (&mut document_info_pb)
.try_into()
.map_err(|e| CollaborateError::internal().context(e))?;
Ok(document_info)
},
Ok(None) => Err(CollaborateError::record_not_found()),
Err(e) => Err(CollaborateError::internal().context(e)),
}
})
}
fn create_document(
&self,
doc_id: &str,
repeated_revision: RepeatedRevisionPB,
) -> BoxResultFuture<DocumentInfo, CollaborateError> {
let doc_id = doc_id.to_owned();
let storage = self.storage.clone();
Box::pin(async move {
let _ = storage.set_revisions(repeated_revision.clone()).await?;
let repeated_revision = repeated_revision_from_repeated_revision_pb(repeated_revision)?;
let document_info = DocumentInfo::from_revisions(&doc_id, repeated_revision.into_inner())?;
Ok(document_info)
})
}
fn read_revisions(
&self,
doc_id: &str,
rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
let doc_id = doc_id.to_owned();
let storage = self.storage.clone();
Box::pin(async move {
let mut repeated_revision = storage.get_revisions(&doc_id, rev_ids).await?;
let revisions: Vec<RevisionPB> = repeated_revision.take_items().into();
Ok(revisions)
})
}
fn save_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> {
let storage = self.storage.clone();
Box::pin(async move {
let _ = storage.set_revisions(repeated_revision).await?;
Ok(())
})
}
fn reset_document(&self, doc_id: &str, revisions: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> {
let storage = self.storage.clone();
let doc_id = doc_id.to_owned();
Box::pin(async move {
let _ = storage.reset_document(&doc_id, revisions).await?;
Ok(())
})
}
}
struct MemoryDocumentCloudStorage {}
impl std::default::Default for MemoryDocumentCloudStorage {
fn default() -> Self { Self {} }
}
impl DocumentCloudStorage for MemoryDocumentCloudStorage {
fn set_revisions(&self, _repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> {
Box::pin(async move { Ok(()) })
}
fn get_revisions(
&self,
_doc_id: &str,
_rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<RepeatedRevisionPB, CollaborateError> {
Box::pin(async move {
let repeated_revisions = RepeatedRevisionPB::new();
Ok(repeated_revisions)
})
}
fn reset_document(
&self,
_doc_id: &str,
_repeated_revision: RepeatedRevisionPB,
) -> BoxResultFuture<(), CollaborateError> {
Box::pin(async move { Ok(()) })
}
}

View File

@ -1,3 +1,3 @@
pub mod connection;
pub mod http;
pub mod local;
pub mod ws_conn;

View File

@ -13,7 +13,7 @@ use flowy_document::{
};
use flowy_net::{
cloud::document::{DocumentHttpCloudService, DocumentLocalCloudService},
services::ws_conn::FlowyWebSocketConnect,
ws::connection::FlowyWebSocketConnect,
};
use flowy_user::services::UserSession;
use lib_infra::future::FutureResult;

View File

@ -6,9 +6,9 @@ use flowy_core::{context::CoreContext, errors::FlowyError, module::init_core};
use flowy_document::context::DocumentContext;
use flowy_net::{
entities::NetworkType,
services::{
ws::{
connection::{listen_on_websocket, FlowyRawWebSocket, FlowyWebSocketConnect},
local::LocalWebSocket,
ws_conn::{listen_on_websocket, FlowyRawWebSocket, FlowyWebSocketConnect},
},
};
use flowy_user::services::{notifier::UserStatus, UserSession, UserSessionConfig};

View File

@ -1,5 +1,5 @@
use flowy_core::context::CoreContext;
use flowy_net::services::ws_conn::FlowyWebSocketConnect;
use flowy_net::ws::connection::FlowyWebSocketConnect;
use flowy_user::services::UserSession;
use lib_dispatch::prelude::Module;
use std::sync::Arc;

View File

@ -16,7 +16,9 @@ use tokio::{
task::spawn_blocking,
};
pub trait ServerDocumentPersistence: Send + Sync + Debug {
pub trait DocumentCloudPersistence: Send + Sync + Debug {
fn enable_sync(&self) -> bool;
fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError>;
fn create_document(
@ -31,6 +33,8 @@ pub trait ServerDocumentPersistence: Send + Sync + Debug {
rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError>;
fn save_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>;
fn reset_document(
&self,
doc_id: &str,
@ -40,11 +44,11 @@ pub trait ServerDocumentPersistence: Send + Sync + Debug {
pub struct ServerDocumentManager {
open_doc_map: Arc<RwLock<HashMap<String, Arc<OpenDocHandle>>>>,
persistence: Arc<dyn ServerDocumentPersistence>,
persistence: Arc<dyn DocumentCloudPersistence>,
}
impl ServerDocumentManager {
pub fn new(persistence: Arc<dyn ServerDocumentPersistence>) -> Self {
pub fn new(persistence: Arc<dyn DocumentCloudPersistence>) -> Self {
Self {
open_doc_map: Arc::new(RwLock::new(HashMap::new())),
persistence,
@ -91,7 +95,7 @@ impl ServerDocumentManager {
let doc_id = client_data.doc_id.clone();
match self.get_document_handler(&doc_id).await {
None => {
tracing::trace!("Document:{} doesn't exist, ignore pinging", doc_id);
tracing::trace!("Document:{} doesn't exist, ignore client ping", doc_id);
Ok(())
},
Some(handler) => {
@ -169,23 +173,26 @@ impl std::ops::Drop for ServerDocumentManager {
struct OpenDocHandle {
doc_id: String,
sender: mpsc::Sender<DocumentCommand>,
persistence: Arc<dyn ServerDocumentPersistence>,
users: DashMap<String, Arc<dyn RevisionUser>>,
}
impl OpenDocHandle {
fn new(doc: DocumentInfo, persistence: Arc<dyn ServerDocumentPersistence>) -> Result<Self, CollaborateError> {
fn new(doc: DocumentInfo, persistence: Arc<dyn DocumentCloudPersistence>) -> Result<Self, CollaborateError> {
let doc_id = doc.doc_id.clone();
let (sender, receiver) = mpsc::channel(100);
let users = DashMap::new();
let queue = DocumentCommandQueue::new(receiver, doc)?;
tokio::task::spawn(queue.run());
Ok(Self {
doc_id,
sender,
let delta = RichTextDelta::from_bytes(&doc.text)?;
let synchronizer = Arc::new(RevisionSynchronizer::new(
&doc.doc_id,
doc.rev_id,
Document::from_delta(delta),
persistence,
users,
})
));
let queue = DocumentCommandQueue::new(&doc.doc_id, receiver, synchronizer)?;
tokio::task::spawn(queue.run());
Ok(Self { doc_id, sender, users })
}
#[tracing::instrument(level = "debug", skip(self, user, repeated_revision), err)]
@ -195,12 +202,10 @@ impl OpenDocHandle {
repeated_revision: RepeatedRevisionPB,
) -> Result<(), CollaborateError> {
let (ret, rx) = oneshot::channel();
let persistence = self.persistence.clone();
self.users.insert(user.user_id(), user.clone());
let msg = DocumentCommand::ApplyRevisions {
user,
repeated_revision,
persistence,
ret,
};
@ -211,13 +216,7 @@ impl OpenDocHandle {
async fn apply_ping(&self, rev_id: i64, user: Arc<dyn RevisionUser>) -> Result<(), CollaborateError> {
let (ret, rx) = oneshot::channel();
self.users.insert(user.user_id(), user.clone());
let persistence = self.persistence.clone();
let msg = DocumentCommand::Ping {
user,
persistence,
rev_id,
ret,
};
let msg = DocumentCommand::Ping { user, rev_id, ret };
let result = self.send(msg, rx).await?;
result
}
@ -225,12 +224,7 @@ impl OpenDocHandle {
#[tracing::instrument(level = "debug", skip(self, repeated_revision), err)]
async fn apply_document_reset(&self, repeated_revision: RepeatedRevisionPB) -> Result<(), CollaborateError> {
let (ret, rx) = oneshot::channel();
let persistence = self.persistence.clone();
let msg = DocumentCommand::Reset {
persistence,
repeated_revision,
ret,
};
let msg = DocumentCommand::Reset { repeated_revision, ret };
let result = self.send(msg, rx).await?;
result
}
@ -247,8 +241,7 @@ impl OpenDocHandle {
impl std::ops::Drop for OpenDocHandle {
fn drop(&mut self) {
//
log::debug!("{} OpenDocHandle was drop", self.doc_id);
tracing::debug!("{} OpenDocHandle was drop", self.doc_id);
}
}
@ -257,17 +250,14 @@ enum DocumentCommand {
ApplyRevisions {
user: Arc<dyn RevisionUser>,
repeated_revision: RepeatedRevisionPB,
persistence: Arc<dyn ServerDocumentPersistence>,
ret: oneshot::Sender<CollaborateResult<()>>,
},
Ping {
user: Arc<dyn RevisionUser>,
persistence: Arc<dyn ServerDocumentPersistence>,
rev_id: i64,
ret: oneshot::Sender<CollaborateResult<()>>,
},
Reset {
persistence: Arc<dyn ServerDocumentPersistence>,
repeated_revision: RepeatedRevisionPB,
ret: oneshot::Sender<CollaborateResult<()>>,
},
@ -280,16 +270,13 @@ struct DocumentCommandQueue {
}
impl DocumentCommandQueue {
fn new(receiver: mpsc::Receiver<DocumentCommand>, doc: DocumentInfo) -> Result<Self, CollaborateError> {
let delta = RichTextDelta::from_bytes(&doc.text)?;
let synchronizer = Arc::new(RevisionSynchronizer::new(
&doc.doc_id,
doc.rev_id,
Document::from_delta(delta),
));
fn new(
doc_id: &str,
receiver: mpsc::Receiver<DocumentCommand>,
synchronizer: Arc<RevisionSynchronizer>,
) -> Result<Self, CollaborateError> {
Ok(Self {
doc_id: doc.doc_id,
doc_id: doc_id.to_owned(),
receiver: Some(receiver),
synchronizer,
})
@ -317,39 +304,21 @@ impl DocumentCommandQueue {
DocumentCommand::ApplyRevisions {
user,
repeated_revision,
persistence,
ret,
} => {
let result = self
.synchronizer
.sync_revisions(user, repeated_revision, persistence)
.sync_revisions(user, repeated_revision)
.await
.map_err(internal_error);
let _ = ret.send(result);
},
DocumentCommand::Ping {
user,
persistence,
rev_id,
ret,
} => {
let result = self
.synchronizer
.pong(user, persistence, rev_id)
.await
.map_err(internal_error);
DocumentCommand::Ping { user, rev_id, ret } => {
let result = self.synchronizer.pong(user, rev_id).await.map_err(internal_error);
let _ = ret.send(result);
},
DocumentCommand::Reset {
persistence,
repeated_revision,
ret,
} => {
let result = self
.synchronizer
.reset(persistence, repeated_revision)
.await
.map_err(internal_error);
DocumentCommand::Reset { repeated_revision, ret } => {
let result = self.synchronizer.reset(repeated_revision).await.map_err(internal_error);
let _ = ret.send(result);
},
}
@ -358,7 +327,7 @@ impl DocumentCommandQueue {
impl std::ops::Drop for DocumentCommandQueue {
fn drop(&mut self) {
log::debug!("{} DocumentCommandQueue was drop", self.doc_id);
tracing::debug!("{} DocumentCommandQueue was drop", self.doc_id);
}
}

View File

@ -6,7 +6,7 @@ use crate::{
},
errors::CollaborateError,
protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
sync::ServerDocumentPersistence,
sync::DocumentCloudPersistence,
util::*,
};
use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta};
@ -30,36 +30,41 @@ pub enum SyncResponse {
Pull(DocumentServerWSData),
Push(DocumentServerWSData),
Ack(DocumentServerWSData),
NewRevision(RepeatedRevisionPB),
}
pub struct RevisionSynchronizer {
pub doc_id: String,
pub rev_id: AtomicI64,
document: Arc<RwLock<Document>>,
persistence: Arc<dyn DocumentCloudPersistence>,
}
impl RevisionSynchronizer {
pub fn new(doc_id: &str, rev_id: i64, document: Document) -> RevisionSynchronizer {
pub fn new(
doc_id: &str,
rev_id: i64,
document: Document,
persistence: Arc<dyn DocumentCloudPersistence>,
) -> RevisionSynchronizer {
let document = Arc::new(RwLock::new(document));
RevisionSynchronizer {
doc_id: doc_id.to_string(),
rev_id: AtomicI64::new(rev_id),
document,
persistence,
}
}
#[tracing::instrument(level = "debug", skip(self, user, repeated_revision, persistence), err)]
#[tracing::instrument(level = "debug", skip(self, user, repeated_revision), err)]
pub async fn sync_revisions(
&self,
user: Arc<dyn RevisionUser>,
repeated_revision: RepeatedRevisionPB,
persistence: Arc<dyn ServerDocumentPersistence>,
) -> Result<(), CollaborateError> {
let doc_id = self.doc_id.clone();
if repeated_revision.get_items().is_empty() {
// Return all the revisions to client
let revisions = persistence.read_revisions(&doc_id, None).await?;
let revisions = self.persistence.read_revisions(&doc_id, None).await?;
let repeated_revision = repeated_revision_from_revision_pbs(revisions)?;
let data = DocumentServerWSDataBuilder::build_push_message(&doc_id, repeated_revision);
user.receive(SyncResponse::Push(data));
@ -68,7 +73,7 @@ impl RevisionSynchronizer {
let server_base_rev_id = self.rev_id.load(SeqCst);
let first_revision = repeated_revision.get_items().first().unwrap().clone();
if self.is_applied_before(&first_revision, &persistence).await {
if self.is_applied_before(&first_revision, &self.persistence).await {
// Server has received this revision before, so ignore the following revisions
return Ok(());
}
@ -81,7 +86,7 @@ impl RevisionSynchronizer {
for revision in repeated_revision.get_items() {
let _ = self.compose_revision(revision)?;
}
user.receive(SyncResponse::NewRevision(repeated_revision));
let _ = self.persistence.save_revisions(repeated_revision).await?;
} else {
// The server document is outdated, pull the missing revision from the client.
let range = RevisionRange {
@ -103,24 +108,18 @@ impl RevisionSynchronizer {
// delta.
let from_rev_id = first_revision.rev_id;
let to_rev_id = server_base_rev_id;
let _ = self
.push_revisions_to_user(user, persistence, from_rev_id, to_rev_id)
.await;
let _ = self.push_revisions_to_user(user, from_rev_id, to_rev_id).await;
},
}
Ok(())
}
#[tracing::instrument(level = "trace", skip(self, user, persistence), fields(server_rev_id), err)]
pub async fn pong(
&self,
user: Arc<dyn RevisionUser>,
persistence: Arc<dyn ServerDocumentPersistence>,
client_rev_id: i64,
) -> Result<(), CollaborateError> {
#[tracing::instrument(level = "trace", skip(self, user), fields(server_rev_id), err)]
pub async fn pong(&self, user: Arc<dyn RevisionUser>, client_rev_id: i64) -> Result<(), CollaborateError> {
let doc_id = self.doc_id.clone();
let server_rev_id = self.rev_id();
tracing::Span::current().record("server_rev_id", &server_rev_id);
match server_rev_id.cmp(&client_rev_id) {
Ordering::Less => {
tracing::error!("Client should not send ping and the server should pull the revisions from the client")
@ -133,27 +132,21 @@ impl RevisionSynchronizer {
let from_rev_id = client_rev_id;
let to_rev_id = server_rev_id;
tracing::trace!("Push revisions to user");
let _ = self
.push_revisions_to_user(user, persistence, from_rev_id, to_rev_id)
.await;
let _ = self.push_revisions_to_user(user, from_rev_id, to_rev_id).await;
},
}
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, repeated_revision, persistence), fields(doc_id), err)]
pub async fn reset(
&self,
persistence: Arc<dyn ServerDocumentPersistence>,
repeated_revision: RepeatedRevisionPB,
) -> Result<(), CollaborateError> {
#[tracing::instrument(level = "debug", skip(self, repeated_revision), fields(doc_id), err)]
pub async fn reset(&self, repeated_revision: RepeatedRevisionPB) -> Result<(), CollaborateError> {
let doc_id = self.doc_id.clone();
tracing::Span::current().record("doc_id", &doc_id.as_str());
let revisions: Vec<RevisionPB> = repeated_revision.get_items().to_vec();
let (_, rev_id) = pair_rev_id_from_revision_pbs(&revisions);
let delta = make_delta_from_revision_pb(revisions)?;
let _ = persistence.reset_document(&doc_id, repeated_revision).await?;
let _ = self.persistence.reset_document(&doc_id, repeated_revision).await?;
*self.document.write() = Document::from_delta(delta);
let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id));
Ok(())
@ -194,7 +187,7 @@ impl RevisionSynchronizer {
async fn is_applied_before(
&self,
new_revision: &RevisionPB,
persistence: &Arc<dyn ServerDocumentPersistence>,
persistence: &Arc<dyn DocumentCloudPersistence>,
) -> bool {
let rev_ids = Some(vec![new_revision.rev_id]);
if let Ok(revisions) = persistence.read_revisions(&self.doc_id, rev_ids).await {
@ -208,15 +201,9 @@ impl RevisionSynchronizer {
false
}
async fn push_revisions_to_user(
&self,
user: Arc<dyn RevisionUser>,
persistence: Arc<dyn ServerDocumentPersistence>,
from: i64,
to: i64,
) {
async fn push_revisions_to_user(&self, user: Arc<dyn RevisionUser>, from: i64, to: i64) {
let rev_ids: Vec<i64> = (from..=to).collect();
let revisions = match persistence.read_revisions(&self.doc_id, Some(rev_ids)).await {
let revisions = match self.persistence.read_revisions(&self.doc_id, Some(rev_ids)).await {
Ok(revisions) => {
assert_eq!(
revisions.is_empty(),

View File

@ -1,10 +1,11 @@
use crate::{
entities::revision::{RepeatedRevision, Revision},
errors::{CollaborateError, CollaborateResult},
protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
protobuf::{DocumentInfo as DocumentInfoPB, RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
};
use lib_ot::{
core::{OperationTransformable, NEW_LINE, WHITESPACE},
errors::OTError,
rich_text::RichTextDelta,
};
use std::{
@ -116,3 +117,39 @@ pub fn pair_rev_id_from_revisions(revisions: &[Revision]) -> (i64, i64) {
(0, rev_id)
}
}
#[inline]
pub fn make_doc_from_revisions(
doc_id: &str,
mut revisions: RepeatedRevisionPB,
) -> Result<Option<DocumentInfoPB>, OTError> {
let revisions = revisions.take_items();
if revisions.is_empty() {
// return Err(CollaborateError::record_not_found().context(format!("{} not
// exist", doc_id)));
return Ok(None);
}
let mut document_delta = RichTextDelta::new();
let mut base_rev_id = 0;
let mut rev_id = 0;
for revision in revisions {
base_rev_id = revision.base_rev_id;
rev_id = revision.rev_id;
if revision.delta_data.is_empty() {
tracing::warn!("revision delta_data is empty");
}
let delta = RichTextDelta::from_bytes(revision.delta_data)?;
document_delta = document_delta.compose(&delta)?;
}
let text = document_delta.to_json();
let mut document_info = DocumentInfoPB::new();
document_info.set_doc_id(doc_id.to_owned());
document_info.set_text(text);
document_info.set_base_rev_id(base_rev_id);
document_info.set_rev_id(rev_id);
Ok(Some(document_info))
}