chore: add sync feature

This commit is contained in:
appflowy 2022-04-14 21:57:00 +08:00
parent ac766c0359
commit b282a14b58
13 changed files with 62 additions and 34 deletions

View File

@ -31,7 +31,7 @@ flowy-derive = {path = "../../../shared-lib/flowy-derive" }
[features]
default = ["flowy-sdk/dart", "dart-notify/dart", "flutter"]
flutter = []
http_server = ["flowy-sdk/http_server", "flowy-sdk/use_bunyan"]
http_sync = ["flowy-sdk/http_sync", "flowy-sdk/use_bunyan"]
#use_serde = ["bincode"]
#use_protobuf= ["protobuf"]

View File

@ -44,6 +44,7 @@ lib-infra = { path = "../../../shared-lib/lib-infra", features = ["protobuf_file
[features]
default = []
http_server = []
sync = []
cloud_sync = ["sync"]
flowy_unit_test = ["lib-ot/flowy_unit_test", "flowy-revision/flowy_unit_test"]
dart = ["lib-infra/dart"]

View File

@ -1,4 +1,3 @@
use crate::services::web_socket::make_folder_ws_manager;
use flowy_sync::{
client_folder::{FolderChange, FolderPad},
entities::{revision::Revision, ws_data::ServerRevisionWSData},
@ -11,7 +10,6 @@ use flowy_sync::util::make_delta_from_revisions;
use flowy_revision::{
RevisionCloudService, RevisionCompactor, RevisionManager, RevisionObjectBuilder, RevisionWebSocket,
RevisionWebSocketManager,
};
use lib_infra::future::FutureResult;
use lib_ot::core::PlainTextAttributes;
@ -21,13 +19,16 @@ use std::sync::Arc;
pub struct ClientFolderEditor {
user_id: String,
#[allow(dead_code)]
pub(crate) folder_id: FolderId,
pub(crate) folder: Arc<RwLock<FolderPad>>,
rev_manager: Arc<RevisionManager>,
// ws_manager: Arc<RevisionWebSocketManager>,
#[cfg(feature = "sync")]
ws_manager: Arc<flowy_revision::RevisionWebSocketManager>,
}
impl ClientFolderEditor {
#[allow(unused_variables)]
pub async fn new(
user_id: &str,
folder_id: &FolderId,
@ -40,14 +41,16 @@ impl ClientFolderEditor {
});
let folder = Arc::new(RwLock::new(rev_manager.load::<FolderPadBuilder>(Some(cloud)).await?));
let rev_manager = Arc::new(rev_manager);
// let ws_manager = make_folder_ws_manager(
// user_id,
// folder_id.as_ref(),
// rev_manager.clone(),
// web_socket,
// folder.clone(),
// )
// .await;
#[cfg(feature = "sync")]
let ws_manager = crate::services::web_socket::make_folder_ws_manager(
user_id,
folder_id.as_ref(),
rev_manager.clone(),
web_socket,
folder.clone(),
)
.await;
let user_id = user_id.to_owned();
let folder_id = folder_id.to_owned();
@ -56,15 +59,23 @@ impl ClientFolderEditor {
folder_id,
folder,
rev_manager,
// ws_manager,
#[cfg(feature = "sync")]
ws_manager,
})
}
#[cfg(feature = "sync")]
pub async fn receive_ws_data(&self, data: ServerRevisionWSData) -> FlowyResult<()> {
// let _ = self.ws_manager.ws_passthrough_tx.send(data).await.map_err(|e| {
// let err_msg = format!("{} passthrough error: {}", self.folder_id, e);
// FlowyError::internal().context(err_msg)
// })?;
let _ = self.ws_manager.ws_passthrough_tx.send(data).await.map_err(|e| {
let err_msg = format!("{} passthrough error: {}", self.folder_id, e);
FlowyError::internal().context(err_msg)
})?;
Ok(())
}
#[cfg(not(feature = "sync"))]
pub async fn receive_ws_data(&self, _data: ServerRevisionWSData) -> FlowyResult<()> {
Ok(())
}

View File

@ -14,6 +14,7 @@ use lib_ot::core::{OperationTransformable, PlainTextAttributes, PlainTextDelta};
use parking_lot::RwLock;
use std::{sync::Arc, time::Duration};
#[allow(dead_code)]
pub(crate) async fn make_folder_ws_manager(
user_id: &str,
folder_id: &str,

View File

@ -47,7 +47,7 @@ impl DateTypeOption {
if self.include_time {
format!("{} {}", self.date_format.format_str(), self.time_format.format_str())
} else {
format!("{}", self.date_format.format_str())
self.date_format.format_str().to_string()
}
}
}

View File

@ -153,7 +153,7 @@ impl ClientGridEditor {
.modify(|grid| Ok(grid.switch_to_field(field_id, field_type.clone(), type_option_json_builder)?))
.await?;
let _ = self.notify_grid_did_update_field(&field_id).await?;
let _ = self.notify_grid_did_update_field(field_id).await?;
Ok(())
}

View File

@ -38,6 +38,7 @@ tokio = { version = "1", features = ["full"]}
futures-util = "0.3.15"
[features]
http_server = ["flowy-user/http_server", "flowy-folder/http_server", "flowy-text-block/http_server"]
http_sync = ["flowy-folder/cloud_sync", "flowy-text-block/cloud_sync"]
native_sync = ["flowy-folder/cloud_sync", "flowy-text-block/cloud_sync"]
use_bunyan = ["lib-log/use_bunyan"]
dart = ["flowy-user/dart", "flowy-net/dart", "flowy-folder/dart", "flowy-sync/dart", "flowy-grid/dart", "flowy-text-block/dart"]

View File

@ -193,7 +193,7 @@ fn mk_local_server(
server_config: &ClientServerConfiguration,
) -> (Option<Arc<LocalServer>>, Arc<FlowyWebSocketConnect>) {
let ws_addr = server_config.ws_addr();
if cfg!(feature = "http_server") {
if cfg!(feature = "http_sync") {
let ws_conn = Arc::new(FlowyWebSocketConnect::new(ws_addr));
(None, ws_conn)
} else {

View File

@ -51,6 +51,7 @@ rand = "0.7.3"
lib-infra = { path = "../../../shared-lib/lib-infra", features = ["protobuf_file_gen", "proto_gen"] }
[features]
http_server = []
sync = []
cloud_sync = ["sync"]
flowy_unit_test = ["lib-ot/flowy_unit_test", "flowy-revision/flowy_unit_test"]
dart = ["lib-infra/dart"]

View File

@ -1,4 +1,4 @@
use crate::web_socket::{make_block_ws_manager, EditorCommandSender};
use crate::web_socket::EditorCommandSender;
use crate::{
errors::FlowyError,
queue::{EditBlockQueue, EditorCommand},
@ -6,9 +6,7 @@ use crate::{
};
use bytes::Bytes;
use flowy_error::{internal_error, FlowyResult};
use flowy_revision::{
RevisionCloudService, RevisionManager, RevisionObjectBuilder, RevisionWebSocket, RevisionWebSocketManager,
};
use flowy_revision::{RevisionCloudService, RevisionManager, RevisionObjectBuilder, RevisionWebSocket};
use flowy_sync::entities::ws_data::ServerRevisionWSData;
use flowy_sync::{
entities::{revision::Revision, text_block_info::TextBlockInfo},
@ -27,6 +25,7 @@ pub struct ClientTextBlockEditor {
pub doc_id: String,
#[allow(dead_code)]
rev_manager: Arc<RevisionManager>,
#[cfg(feature = "sync")]
ws_manager: Arc<RevisionWebSocketManager>,
edit_cmd_tx: EditorCommandSender,
}
@ -36,16 +35,17 @@ impl ClientTextBlockEditor {
doc_id: &str,
user: Arc<dyn TextBlockUser>,
mut rev_manager: RevisionManager,
rev_web_socket: Arc<dyn RevisionWebSocket>,
_rev_web_socket: Arc<dyn RevisionWebSocket>,
cloud_service: Arc<dyn RevisionCloudService>,
) -> FlowyResult<Arc<Self>> {
let document_info = rev_manager.load::<TextBlockInfoBuilder>(Some(cloud_service)).await?;
let delta = document_info.delta()?;
let rev_manager = Arc::new(rev_manager);
let doc_id = doc_id.to_string();
let user_id = user.user_id()?;
let _user_id = user.user_id()?;
let edit_cmd_tx = spawn_edit_queue(user, rev_manager.clone(), delta);
#[cfg(feature = "sync")]
let ws_manager = make_block_ws_manager(
doc_id.clone(),
user_id.clone(),
@ -57,6 +57,7 @@ impl ClientTextBlockEditor {
let editor = Arc::new(Self {
doc_id,
rev_manager,
#[cfg(feature = "sync")]
ws_manager,
edit_cmd_tx,
});
@ -158,17 +159,29 @@ impl ClientTextBlockEditor {
Ok(())
}
#[cfg(feature = "sync")]
pub fn stop(&self) {
self.ws_manager.stop();
}
#[cfg(not(feature = "sync"))]
pub fn stop(&self) {}
#[cfg(feature = "sync")]
pub(crate) async fn receive_ws_data(&self, data: ServerRevisionWSData) -> Result<(), FlowyError> {
self.ws_manager.receive_ws_data(data).await
}
#[cfg(not(feature = "sync"))]
pub(crate) async fn receive_ws_data(&self, _data: ServerRevisionWSData) -> Result<(), FlowyError> {
Ok(())
}
#[cfg(feature = "sync")]
pub(crate) fn receive_ws_state(&self, state: &WSConnectState) {
self.ws_manager.connect_state_changed(state.clone());
}
#[cfg(not(feature = "sync"))]
pub(crate) fn receive_ws_state(&self, _state: &WSConnectState) {}
}
impl std::ops::Drop for ClientTextBlockEditor {

View File

@ -23,6 +23,7 @@ use tokio::sync::{
pub(crate) type EditorCommandSender = Sender<EditorCommand>;
pub(crate) type EditorCommandReceiver = Receiver<EditorCommand>;
#[allow(dead_code)]
pub(crate) async fn make_block_ws_manager(
doc_id: String,
user_id: String,
@ -49,6 +50,7 @@ pub(crate) async fn make_block_ws_manager(
ws_manager
}
#[allow(dead_code)]
fn listen_document_ws_state(_user_id: &str, _doc_id: &str, mut subscriber: broadcast::Receiver<WSConnectState>) {
tokio::spawn(async move {
while let Ok(state) = subscriber.recv().await {
@ -67,6 +69,7 @@ pub(crate) struct TextBlockRevisionWSDataStream {
}
impl TextBlockRevisionWSDataStream {
#[allow(dead_code)]
pub fn new(conflict_controller: RichTextConflictController) -> Self {
Self {
conflict_controller: Arc::new(conflict_controller),

View File

@ -37,7 +37,6 @@ futures = "0.3.15"
nanoid = "0.4.0"
[features]
http_server = []
dart = ["lib-infra/dart"]
[build-dependencies]

View File

@ -3,13 +3,11 @@ use crate::errors::{internal_error, CollaborateError, CollaborateResult};
use crate::util::{cal_diff, make_delta_from_revisions};
use bytes::Bytes;
use flowy_grid_data_model::entities::{
gen_field_id, gen_grid_id, FieldChangesetParams, FieldMeta, FieldOrder, FieldType, GridBlockMeta,
GridBlockMetaChangeset, GridMeta, IndexField,
gen_grid_id, FieldChangesetParams, FieldMeta, FieldOrder, FieldType, GridBlockMeta, GridBlockMetaChangeset,
GridMeta,
};
use lib_ot::core::{OperationTransformable, PlainTextAttributes, PlainTextDelta, PlainTextDeltaBuilder};
use std::collections::HashMap;
use futures::StreamExt;
use std::sync::Arc;
pub type GridMetaDelta = PlainTextDelta;