diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 3cdb4b4ab0..b71b82288e 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1262,6 +1262,7 @@ dependencies = [ "flowy-derive", "flowy-document", "flowy-error", + "flowy-sync", "futures", "futures-core", "lazy_static", diff --git a/backend/src/application.rs b/backend/src/application.rs index 81518fd9dc..2f9b5070a1 100644 --- a/backend/src/application.rs +++ b/backend/src/application.rs @@ -13,8 +13,8 @@ use crate::{ }, context::AppContext, services::{ - core::{app::router as app, trash::router as trash, view::router as view, workspace::router as workspace}, document::router as doc, + folder::{app::router as app, trash::router as trash, view::router as view, workspace::router as workspace}, user::router as user, web_socket::WSServer, }, @@ -131,7 +131,7 @@ fn user_scope() -> Scope { } pub async fn init_app_context(configuration: &Settings) -> AppContext { - let _ = crate::services::core::log::Builder::new("flowy-server") + let _ = crate::services::log::Builder::new("flowy-server") .env_filter("Trace") .build(); let pg_pool = get_connection_pool(&configuration.database) diff --git a/backend/src/context.rs b/backend/src/context.rs index c7aa48d0f0..2867a4afe7 100644 --- a/backend/src/context.rs +++ b/backend/src/context.rs @@ -5,9 +5,12 @@ use crate::services::{ use actix::Addr; use actix_web::web::Data; -use crate::services::document::{ - persistence::DocumentKVPersistence, - ws_receiver::{make_document_ws_receiver, HttpDocumentCloudPersistence}, +use crate::services::{ + document::{ + persistence::DocumentKVPersistence, + ws_receiver::{make_document_ws_receiver, HttpDocumentCloudPersistence}, + }, + folder::ws_receiver::make_folder_ws_receiver, }; use flowy_collaboration::server_document::ServerDocumentManager; use lib_ws::WSModule; @@ -35,6 +38,10 @@ impl AppContext { let document_ws_receiver = make_document_ws_receiver(flowy_persistence.clone(), document_manager.clone()); ws_receivers.set(WSModule::Doc, document_ws_receiver); + + let folder_ws_receiver = make_folder_ws_receiver(flowy_persistence.clone()); + ws_receivers.set(WSModule::Folder, folder_ws_receiver); + AppContext { ws_server, persistence: Data::new(flowy_persistence), diff --git a/backend/src/services/document/ws_actor.rs b/backend/src/services/document/ws_actor.rs index 107c8130f3..cb97162ed3 100644 --- a/backend/src/services/document/ws_actor.rs +++ b/backend/src/services/document/ws_actor.rs @@ -13,13 +13,13 @@ use flowy_collaboration::{ ClientRevisionWSDataType as ClientRevisionWSDataTypePB, Revision as RevisionPB, }, - server_document::{RevisionUser, ServerDocumentManager, SyncResponse}, + server_document::{RevisionSyncResponse, RevisionUser, ServerDocumentManager}, }; use futures::stream::StreamExt; use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; -pub enum WSActorMessage { +pub enum DocumentWSActorMessage { ClientData { client_data: WSClientData, persistence: Arc, @@ -28,12 +28,12 @@ pub enum WSActorMessage { } pub struct DocumentWebSocketActor { - receiver: Option>, + receiver: Option>, doc_manager: Arc, } impl DocumentWebSocketActor { - pub fn new(receiver: mpsc::Receiver, manager: Arc) -> Self { + pub fn new(receiver: mpsc::Receiver, manager: Arc) -> Self { Self { receiver: Some(receiver), doc_manager: manager, @@ -44,7 +44,7 @@ impl DocumentWebSocketActor { let mut receiver = self .receiver .take() - .expect("DocActor's receiver should only take one time"); + .expect("DocumentWebSocketActor's receiver should only take one time"); let stream = stream! { loop { @@ -58,19 +58,19 @@ impl DocumentWebSocketActor { stream.for_each(|msg| self.handle_message(msg)).await; } - async fn handle_message(&self, msg: WSActorMessage) { + async fn handle_message(&self, msg: DocumentWSActorMessage) { match msg { - WSActorMessage::ClientData { + DocumentWSActorMessage::ClientData { client_data, persistence: _, ret, } => { - let _ = ret.send(self.handle_client_data(client_data).await); + let _ = ret.send(self.handle_document_data(client_data).await); }, } } - async fn handle_client_data(&self, client_data: WSClientData) -> Result<()> { + async fn handle_document_data(&self, client_data: WSClientData) -> Result<()> { let WSClientData { user, socket, data } = client_data; let document_client_data = spawn_blocking(move || parse_from_bytes::(&data)) .await @@ -83,8 +83,7 @@ impl DocumentWebSocketActor { document_client_data.ty ); - let user = Arc::new(HttpDocumentUser { user, socket }); - + let user = Arc::new(DocumentRevisionUser { user, socket }); match &document_client_data.ty { ClientRevisionWSDataTypePB::ClientPushRev => { let _ = self @@ -115,34 +114,34 @@ fn verify_md5(revision: &RevisionPB) -> Result<()> { } #[derive(Clone)] -pub struct HttpDocumentUser { +pub struct DocumentRevisionUser { pub user: Arc, pub(crate) socket: Socket, } -impl std::fmt::Debug for HttpDocumentUser { +impl std::fmt::Debug for DocumentRevisionUser { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.debug_struct("ServerDocUser") + f.debug_struct("DocumentRevisionUser") .field("user", &self.user) .field("socket", &self.socket) .finish() } } -impl RevisionUser for HttpDocumentUser { +impl RevisionUser for DocumentRevisionUser { fn user_id(&self) -> String { self.user.id().to_string() } - fn receive(&self, resp: SyncResponse) { + fn receive(&self, resp: RevisionSyncResponse) { let result = match resp { - SyncResponse::Pull(data) => { + RevisionSyncResponse::Pull(data) => { let msg: WebSocketMessage = data.into(); self.socket.try_send(msg).map_err(internal_error) }, - SyncResponse::Push(data) => { + RevisionSyncResponse::Push(data) => { let msg: WebSocketMessage = data.into(); self.socket.try_send(msg).map_err(internal_error) }, - SyncResponse::Ack(data) => { + RevisionSyncResponse::Ack(data) => { let msg: WebSocketMessage = data.into(); self.socket.try_send(msg).map_err(internal_error) }, @@ -150,7 +149,7 @@ impl RevisionUser for HttpDocumentUser { match result { Ok(_) => {}, - Err(e) => log::error!("[ServerDocUser]: {}", e), + Err(e) => log::error!("[DocumentRevisionUser]: {}", e), } } } diff --git a/backend/src/services/document/ws_receiver.rs b/backend/src/services/document/ws_receiver.rs index 7ff91e637f..0d04d7db5e 100644 --- a/backend/src/services/document/ws_receiver.rs +++ b/backend/src/services/document/ws_receiver.rs @@ -3,7 +3,7 @@ use crate::{ services::{ document::{ persistence::{create_document, read_document, revisions_to_key_value_items, DocumentKVPersistence}, - ws_actor::{DocumentWebSocketActor, WSActorMessage}, + ws_actor::{DocumentWSActorMessage, DocumentWebSocketActor}, }, web_socket::{WSClientData, WebSocketReceiver}, }, @@ -33,7 +33,7 @@ pub fn make_document_ws_receiver( persistence: Arc, document_manager: Arc, ) -> Arc { - let (ws_sender, rx) = tokio::sync::mpsc::channel(100); + let (ws_sender, rx) = tokio::sync::mpsc::channel(1000); let actor = DocumentWebSocketActor::new(rx, document_manager); tokio::task::spawn(actor.run()); @@ -41,12 +41,12 @@ pub fn make_document_ws_receiver( } pub struct DocumentWebSocketReceiver { - ws_sender: mpsc::Sender, + ws_sender: mpsc::Sender, persistence: Arc, } impl DocumentWebSocketReceiver { - pub fn new(persistence: Arc, ws_sender: mpsc::Sender) -> Self { + pub fn new(persistence: Arc, ws_sender: mpsc::Sender) -> Self { Self { ws_sender, persistence } } } @@ -58,7 +58,7 @@ impl WebSocketReceiver for DocumentWebSocketReceiver { let persistence = self.persistence.clone(); actix_rt::spawn(async move { - let msg = WSActorMessage::ClientData { + let msg = DocumentWSActorMessage::ClientData { client_data: data, persistence, ret, @@ -82,7 +82,6 @@ impl Debug for HttpDocumentCloudPersistence { } impl DocumentCloudPersistence for HttpDocumentCloudPersistence { - fn enable_sync(&self) -> bool { true } fn read_document(&self, doc_id: &str) -> BoxResultFuture { let params = DocumentId { doc_id: doc_id.to_string(), diff --git a/backend/src/services/core/app/controller.rs b/backend/src/services/folder/app/controller.rs similarity index 96% rename from backend/src/services/core/app/controller.rs rename to backend/src/services/folder/app/controller.rs index 9732fe6c0b..8f6498d10a 100644 --- a/backend/src/services/core/app/controller.rs +++ b/backend/src/services/folder/app/controller.rs @@ -1,8 +1,8 @@ -use crate::services::core::view::read_view_belong_to_id; +use crate::services::folder::view::read_view_belong_to_id; use crate::{ entities::logged_user::LoggedUser, - services::core::{app::persistence::*, trash::read_trash_ids}, + services::folder::{app::persistence::*, trash::read_trash_ids}, util::sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder}, }; use backend_service::errors::{invalid_params, ServerError}; diff --git a/backend/src/services/core/app/mod.rs b/backend/src/services/folder/app/mod.rs similarity index 100% rename from backend/src/services/core/app/mod.rs rename to backend/src/services/folder/app/mod.rs diff --git a/backend/src/services/core/app/persistence.rs b/backend/src/services/folder/app/persistence.rs similarity index 100% rename from backend/src/services/core/app/persistence.rs rename to backend/src/services/folder/app/persistence.rs diff --git a/backend/src/services/core/app/router.rs b/backend/src/services/folder/app/router.rs similarity index 99% rename from backend/src/services/core/app/router.rs rename to backend/src/services/folder/app/router.rs index 714d7603df..4e8fb8234f 100644 --- a/backend/src/services/core/app/router.rs +++ b/backend/src/services/folder/app/router.rs @@ -1,6 +1,6 @@ use crate::{ entities::logged_user::LoggedUser, - services::core::app::{ + services::folder::app::{ controller::{create_app, delete_app, read_app, update_app}, persistence::check_app_id, }, diff --git a/backend/src/services/core/mod.rs b/backend/src/services/folder/mod.rs similarity index 53% rename from backend/src/services/core/mod.rs rename to backend/src/services/folder/mod.rs index d5d418881f..6168695a8f 100644 --- a/backend/src/services/core/mod.rs +++ b/backend/src/services/folder/mod.rs @@ -1,5 +1,6 @@ pub mod app; -pub(crate) mod log; pub mod trash; pub mod view; pub mod workspace; +pub(crate) mod ws_actor; +pub(crate) mod ws_receiver; diff --git a/backend/src/services/core/trash/mod.rs b/backend/src/services/folder/trash/mod.rs similarity index 100% rename from backend/src/services/core/trash/mod.rs rename to backend/src/services/folder/trash/mod.rs diff --git a/backend/src/services/core/trash/persistence.rs b/backend/src/services/folder/trash/persistence.rs similarity index 92% rename from backend/src/services/core/trash/persistence.rs rename to backend/src/services/folder/trash/persistence.rs index 7fec90abbb..d42fcd078f 100644 --- a/backend/src/services/core/trash/persistence.rs +++ b/backend/src/services/folder/trash/persistence.rs @@ -1,4 +1,4 @@ -use crate::services::core::{app::persistence::AppTable, view::persistence::ViewTable}; +use crate::services::folder::{app::persistence::AppTable, view::persistence::ViewTable}; use flowy_core_data_model::protobuf::{Trash, TrashType}; pub(crate) const TRASH_TABLE: &str = "trash_table"; diff --git a/backend/src/services/core/trash/router.rs b/backend/src/services/folder/trash/router.rs similarity index 97% rename from backend/src/services/core/trash/router.rs rename to backend/src/services/folder/trash/router.rs index 2a9d47d319..155bb3f6a3 100644 --- a/backend/src/services/core/trash/router.rs +++ b/backend/src/services/folder/trash/router.rs @@ -1,7 +1,7 @@ use crate::{ context::FlowyPersistence, entities::logged_user::LoggedUser, - services::core::trash::{create_trash, delete_all_trash, delete_trash, read_trash}, + services::folder::trash::{create_trash, delete_all_trash, delete_trash, read_trash}, util::serde_ext::parse_from_payload, }; use ::protobuf::ProtobufEnum; diff --git a/backend/src/services/core/trash/trash.rs b/backend/src/services/folder/trash/trash.rs similarity index 99% rename from backend/src/services/core/trash/trash.rs rename to backend/src/services/folder/trash/trash.rs index 0cd2c54471..713b7be79e 100644 --- a/backend/src/services/core/trash/trash.rs +++ b/backend/src/services/folder/trash/trash.rs @@ -1,12 +1,12 @@ use crate::{ entities::logged_user::LoggedUser, services::{ - core::{ + document::persistence::DocumentKVPersistence, + folder::{ app::controller::{delete_app, read_app_table}, trash::persistence::{TrashTable, TRASH_TABLE}, view::{delete_view, read_view_table}, }, - document::persistence::DocumentKVPersistence, }, util::sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder}, }; diff --git a/backend/src/services/core/view/controller.rs b/backend/src/services/folder/view/controller.rs similarity index 98% rename from backend/src/services/core/view/controller.rs rename to backend/src/services/folder/view/controller.rs index 435be53e00..01cd6bb1eb 100644 --- a/backend/src/services/core/view/controller.rs +++ b/backend/src/services/folder/view/controller.rs @@ -1,8 +1,8 @@ use crate::{ entities::logged_user::LoggedUser, services::{ - core::{trash::read_trash_ids, view::persistence::*}, document::persistence::{create_document, delete_document, DocumentKVPersistence}, + folder::{trash::read_trash_ids, view::persistence::*}, }, util::sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder}, }; diff --git a/backend/src/services/core/view/mod.rs b/backend/src/services/folder/view/mod.rs similarity index 100% rename from backend/src/services/core/view/mod.rs rename to backend/src/services/folder/view/mod.rs diff --git a/backend/src/services/core/view/persistence.rs b/backend/src/services/folder/view/persistence.rs similarity index 100% rename from backend/src/services/core/view/persistence.rs rename to backend/src/services/folder/view/persistence.rs diff --git a/backend/src/services/core/view/router.rs b/backend/src/services/folder/view/router.rs similarity index 99% rename from backend/src/services/core/view/router.rs rename to backend/src/services/folder/view/router.rs index cf89f2629c..a62d789924 100644 --- a/backend/src/services/core/view/router.rs +++ b/backend/src/services/folder/view/router.rs @@ -1,7 +1,7 @@ use crate::{ context::FlowyPersistence, entities::logged_user::LoggedUser, - services::core::view::{ + services::folder::view::{ create_view, delete_view, persistence::{check_view_id, check_view_ids}, diff --git a/backend/src/services/core/workspace/controller.rs b/backend/src/services/folder/workspace/controller.rs similarity index 99% rename from backend/src/services/core/workspace/controller.rs rename to backend/src/services/folder/workspace/controller.rs index 9f13d9c9ca..d9d3a4b88f 100644 --- a/backend/src/services/core/workspace/controller.rs +++ b/backend/src/services/folder/workspace/controller.rs @@ -1,7 +1,7 @@ use super::persistence::NewWorkspaceBuilder; use crate::{ entities::logged_user::LoggedUser, - services::core::{ + services::folder::{ app::{controller::read_app, persistence::AppTable}, workspace::persistence::*, }, diff --git a/backend/src/services/core/workspace/mod.rs b/backend/src/services/folder/workspace/mod.rs similarity index 100% rename from backend/src/services/core/workspace/mod.rs rename to backend/src/services/folder/workspace/mod.rs diff --git a/backend/src/services/core/workspace/persistence.rs b/backend/src/services/folder/workspace/persistence.rs similarity index 100% rename from backend/src/services/core/workspace/persistence.rs rename to backend/src/services/folder/workspace/persistence.rs diff --git a/backend/src/services/core/workspace/router.rs b/backend/src/services/folder/workspace/router.rs similarity index 99% rename from backend/src/services/core/workspace/router.rs rename to backend/src/services/folder/workspace/router.rs index 45178e7885..d87a017786 100644 --- a/backend/src/services/core/workspace/router.rs +++ b/backend/src/services/folder/workspace/router.rs @@ -1,6 +1,6 @@ use crate::{ entities::logged_user::LoggedUser, - services::core::workspace::{ + services::folder::workspace::{ create_workspace, delete_workspace, persistence::check_workspace_id, diff --git a/backend/src/services/folder/ws_actor.rs b/backend/src/services/folder/ws_actor.rs new file mode 100644 index 0000000000..1fa1beaf8b --- /dev/null +++ b/backend/src/services/folder/ws_actor.rs @@ -0,0 +1,132 @@ +use crate::{ + context::FlowyPersistence, + services::web_socket::{entities::Socket, WSClientData, WSUser, WebSocketMessage}, + util::serde_ext::parse_from_bytes, +}; +use actix_rt::task::spawn_blocking; +use async_stream::stream; +use backend_service::errors::{internal_error, Result}; +use flowy_collaboration::{ + protobuf::{ + ClientRevisionWSData as ClientRevisionWSDataPB, + ClientRevisionWSDataType as ClientRevisionWSDataTypePB, + }, + server_document::{RevisionSyncResponse, RevisionUser, ServerDocumentManager}, +}; +use futures::stream::StreamExt; +use std::sync::Arc; +use tokio::sync::{mpsc, oneshot}; + +pub enum FolderWSActorMessage { + ClientData { + client_data: WSClientData, + persistence: Arc, + ret: oneshot::Sender>, + }, +} + +pub struct FolderWebSocketActor { + receiver: Option>, +} + +impl FolderWebSocketActor { + pub fn new(receiver: mpsc::Receiver) -> Self { + Self { + receiver: Some(receiver), + } + } + + pub async fn run(mut self) { + let mut receiver = self + .receiver + .take() + .expect("FolderWebSocketActor's receiver should only take one time"); + let stream = stream! { + loop { + match receiver.recv().await { + Some(msg) => yield msg, + None => break, + } + } + }; + stream.for_each(|msg| self.handle_message(msg)).await; + } + + async fn handle_message(&self, msg: FolderWSActorMessage) { + match msg { + FolderWSActorMessage::ClientData { + client_data, + persistence: _, + ret, + } => { + let _ = ret.send(self.handle_folder_data(client_data).await); + }, + } + } + + async fn handle_folder_data(&self, client_data: WSClientData) -> Result<()> { + let WSClientData { user, socket, data } = client_data; + let folder_client_data = spawn_blocking(move || parse_from_bytes::(&data)) + .await + .map_err(internal_error)??; + + tracing::debug!( + "[DocumentWebSocketActor]: receive: {}:{}, {:?}", + folder_client_data.object_id, + folder_client_data.data_id, + folder_client_data.ty + ); + + let _user = Arc::new(FolderRevisionUser { user, socket }); + match &folder_client_data.ty { + ClientRevisionWSDataTypePB::ClientPushRev => { + todo!() + }, + ClientRevisionWSDataTypePB::ClientPing => { + todo!() + }, + } + Ok(()) + } +} + +#[derive(Clone)] +pub struct FolderRevisionUser { + pub user: Arc, + pub(crate) socket: Socket, +} + +impl std::fmt::Debug for FolderRevisionUser { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("FolderRevisionUser") + .field("user", &self.user) + .field("socket", &self.socket) + .finish() + } +} + +impl RevisionUser for FolderRevisionUser { + fn user_id(&self) -> String { self.user.id().to_string() } + + fn receive(&self, resp: RevisionSyncResponse) { + let result = match resp { + RevisionSyncResponse::Pull(data) => { + let msg: WebSocketMessage = data.into(); + self.socket.try_send(msg).map_err(internal_error) + }, + RevisionSyncResponse::Push(data) => { + let msg: WebSocketMessage = data.into(); + self.socket.try_send(msg).map_err(internal_error) + }, + RevisionSyncResponse::Ack(data) => { + let msg: WebSocketMessage = data.into(); + self.socket.try_send(msg).map_err(internal_error) + }, + }; + + match result { + Ok(_) => {}, + Err(e) => log::error!("[FolderRevisionUser]: {}", e), + } + } +} diff --git a/backend/src/services/folder/ws_receiver.rs b/backend/src/services/folder/ws_receiver.rs new file mode 100644 index 0000000000..282e1420dd --- /dev/null +++ b/backend/src/services/folder/ws_receiver.rs @@ -0,0 +1,53 @@ +use crate::{ + context::FlowyPersistence, + services::{ + folder::ws_actor::{FolderWSActorMessage, FolderWebSocketActor}, + web_socket::{WSClientData, WebSocketReceiver}, + }, +}; + +use std::sync::Arc; +use tokio::sync::{mpsc, oneshot}; + +pub fn make_folder_ws_receiver(persistence: Arc) -> Arc { + let (ws_sender, rx) = tokio::sync::mpsc::channel(1000); + let actor = FolderWebSocketActor::new(rx); + tokio::task::spawn(actor.run()); + Arc::new(FolderWebSocketReceiver::new(persistence, ws_sender)) +} + +pub struct FolderWebSocketReceiver { + ws_sender: mpsc::Sender, + persistence: Arc, +} + +impl FolderWebSocketReceiver { + pub fn new(persistence: Arc, ws_sender: mpsc::Sender) -> Self { + Self { ws_sender, persistence } + } +} + +impl WebSocketReceiver for FolderWebSocketReceiver { + fn receive(&self, data: WSClientData) { + let (ret, rx) = oneshot::channel(); + let sender = self.ws_sender.clone(); + let persistence = self.persistence.clone(); + + actix_rt::spawn(async move { + let msg = FolderWSActorMessage::ClientData { + client_data: data, + persistence, + ret, + }; + + match sender.send(msg).await { + Ok(_) => {}, + Err(e) => log::error!("{}", e), + } + match rx.await { + Ok(_) => {}, + Err(e) => log::error!("{:?}", e), + }; + }); + } +} diff --git a/backend/src/services/core/log.rs b/backend/src/services/log.rs similarity index 100% rename from backend/src/services/core/log.rs rename to backend/src/services/log.rs diff --git a/backend/src/services/mod.rs b/backend/src/services/mod.rs index 094f796a65..28905e9684 100644 --- a/backend/src/services/mod.rs +++ b/backend/src/services/mod.rs @@ -1,5 +1,6 @@ -pub mod core; pub mod document; +pub mod folder; pub mod kv; +pub(crate) mod log; pub mod user; pub mod web_socket; diff --git a/backend/tests/document_test/edit_script.rs b/backend/tests/document_test/edit_script.rs index c375a7727f..8527f77b48 100644 --- a/backend/tests/document_test/edit_script.rs +++ b/backend/tests/document_test/edit_script.rs @@ -63,14 +63,14 @@ struct ScriptContext { impl ScriptContext { async fn new(client_sdk: FlowySDKTest, server: TestServer) -> Self { let user_session = client_sdk.user_session.clone(); - let ws_manager = client_sdk.ws_conn.clone(); + let ws_conn = client_sdk.ws_conn.clone(); let doc_id = create_doc(&client_sdk).await; Self { client_editor: None, client_sdk, client_user_session: user_session, - ws_conn: ws_manager, + ws_conn, server, doc_id, } diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbenum.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbenum.dart index 9b4fbd51b0..3e8eafa214 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbenum.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbenum.dart @@ -11,9 +11,11 @@ import 'package:protobuf/protobuf.dart' as $pb; class WSModule extends $pb.ProtobufEnum { static const WSModule Doc = WSModule._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Doc'); + static const WSModule Folder = WSModule._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Folder'); static const $core.List values = [ Doc, + Folder, ]; static final $core.Map<$core.int, WSModule> _byValue = $pb.ProtobufEnum.initByValue(values); diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbjson.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbjson.dart index 4c43e7174a..cfdfa1a768 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbjson.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ws/msg.pbjson.dart @@ -13,11 +13,12 @@ const WSModule$json = const { '1': 'WSModule', '2': const [ const {'1': 'Doc', '2': 0}, + const {'1': 'Folder', '2': 1}, ], }; /// Descriptor for `WSModule`. Decode as a `google.protobuf.EnumDescriptorProto`. -final $typed_data.Uint8List wSModuleDescriptor = $convert.base64Decode('CghXU01vZHVsZRIHCgNEb2MQAA=='); +final $typed_data.Uint8List wSModuleDescriptor = $convert.base64Decode('CghXU01vZHVsZRIHCgNEb2MQABIKCgZGb2xkZXIQAQ=='); @$core.Deprecated('Use webSocketRawMessageDescriptor instead') const WebSocketRawMessage$json = const { '1': 'WebSocketRawMessage', diff --git a/frontend/rust-lib/flowy-core/Cargo.toml b/frontend/rust-lib/flowy-core/Cargo.toml index 9d61d2e119..3859b6f7b1 100644 --- a/frontend/rust-lib/flowy-core/Cargo.toml +++ b/frontend/rust-lib/flowy-core/Cargo.toml @@ -18,6 +18,7 @@ flowy-error = { path = "../flowy-error", features = ["db", "backend"]} dart-notify = { path = "../dart-notify" } lib-dispatch = { path = "../lib-dispatch" } lib-sqlite = { path = "../lib-sqlite" } +flowy-sync = { path = "../flowy-sync" } parking_lot = "0.11" protobuf = {version = "2.18.0"} diff --git a/frontend/rust-lib/flowy-core/src/context.rs b/frontend/rust-lib/flowy-core/src/controller.rs similarity index 79% rename from frontend/rust-lib/flowy-core/src/context.rs rename to frontend/rust-lib/flowy-core/src/controller.rs index 6760d23380..afa287c0dc 100644 --- a/frontend/rust-lib/flowy-core/src/context.rs +++ b/frontend/rust-lib/flowy-core/src/controller.rs @@ -1,6 +1,9 @@ +use bytes::Bytes; use chrono::Utc; use flowy_collaboration::client_document::default::{initial_delta, initial_read_me}; use flowy_core_data_model::{entities::view::CreateViewParams, user_default}; +use flowy_document::context::DocumentContext; +use flowy_sync::RevisionWebSocket; use lazy_static::lazy_static; use parking_lot::RwLock; use std::{collections::HashMap, sync::Arc}; @@ -23,7 +26,7 @@ lazy_static! { static ref INIT_WORKSPACE: RwLock> = RwLock::new(HashMap::new()); } -pub struct CoreContext { +pub struct FolderManager { pub user: Arc, pub(crate) cloud_service: Arc, pub(crate) persistence: Arc, @@ -31,22 +34,49 @@ pub struct CoreContext { pub(crate) app_controller: Arc, pub(crate) view_controller: Arc, pub(crate) trash_controller: Arc, + ws_sender: Arc, } -impl CoreContext { +impl FolderManager { pub(crate) fn new( user: Arc, cloud_service: Arc, persistence: Arc, - workspace_controller: Arc, - app_controller: Arc, - view_controller: Arc, - trash_controller: Arc, + flowy_document: Arc, + ws_sender: Arc, ) -> Self { if let Ok(token) = user.token() { INIT_WORKSPACE.write().insert(token, false); } + let trash_controller = Arc::new(TrashController::new( + persistence.clone(), + cloud_service.clone(), + user.clone(), + )); + + let view_controller = Arc::new(ViewController::new( + user.clone(), + persistence.clone(), + cloud_service.clone(), + trash_controller.clone(), + flowy_document, + )); + + let app_controller = Arc::new(AppController::new( + user.clone(), + persistence.clone(), + trash_controller.clone(), + cloud_service.clone(), + )); + + let workspace_controller = Arc::new(WorkspaceController::new( + user.clone(), + persistence.clone(), + trash_controller.clone(), + cloud_service.clone(), + )); + Self { user, cloud_service, @@ -55,6 +85,7 @@ impl CoreContext { app_controller, view_controller, trash_controller, + ws_sender, } } @@ -67,6 +98,8 @@ impl CoreContext { // } // } + pub async fn did_receive_ws_data(&self, _data: Bytes) {} + pub async fn user_did_sign_in(&self, token: &str) -> FlowyResult<()> { log::debug!("workspace initialize after sign in"); let _ = self.init(token).await?; diff --git a/frontend/rust-lib/flowy-core/src/lib.rs b/frontend/rust-lib/flowy-core/src/lib.rs index 63debb1552..956bd5c113 100644 --- a/frontend/rust-lib/flowy-core/src/lib.rs +++ b/frontend/rust-lib/flowy-core/src/lib.rs @@ -10,7 +10,7 @@ mod macros; #[macro_use] extern crate flowy_database; -pub mod context; +pub mod controller; mod dart_notification; pub mod protobuf; mod util; diff --git a/frontend/rust-lib/flowy-core/src/module.rs b/frontend/rust-lib/flowy-core/src/module.rs index 29943df707..2f11edc94d 100644 --- a/frontend/rust-lib/flowy-core/src/module.rs +++ b/frontend/rust-lib/flowy-core/src/module.rs @@ -1,5 +1,5 @@ use crate::{ - context::CoreContext, + controller::FolderManager, entities::{ app::{App, AppId, CreateAppParams, UpdateAppParams}, trash::{RepeatedTrash, RepeatedTrashId}, @@ -14,14 +14,11 @@ use crate::{ trash::event_handler::*, view::event_handler::*, workspace::event_handler::*, - AppController, - TrashController, - ViewController, - WorkspaceController, }, }; use flowy_database::DBConnection; use flowy_document::context::DocumentContext; +use flowy_sync::RevisionWebSocket; use lib_dispatch::prelude::*; use lib_infra::future::FutureResult; use lib_sqlite::ConnectionPool; @@ -44,61 +41,32 @@ pub trait WorkspaceDatabase: Send + Sync { } } -pub fn init_core( +pub fn init_folder( user: Arc, database: Arc, flowy_document: Arc, cloud_service: Arc, -) -> Arc { + ws_sender: Arc, +) -> Arc { let persistence = Arc::new(FlowyCorePersistence::new(database.clone())); - let trash_controller = Arc::new(TrashController::new( - persistence.clone(), - cloud_service.clone(), - user.clone(), - )); - - let view_controller = Arc::new(ViewController::new( - user.clone(), - persistence.clone(), - cloud_service.clone(), - trash_controller.clone(), - flowy_document, - )); - - let app_controller = Arc::new(AppController::new( - user.clone(), - persistence.clone(), - trash_controller.clone(), - cloud_service.clone(), - )); - - let workspace_controller = Arc::new(WorkspaceController::new( - user.clone(), - persistence.clone(), - trash_controller.clone(), - cloud_service.clone(), - )); - - Arc::new(CoreContext::new( + Arc::new(FolderManager::new( user, cloud_service, persistence, - workspace_controller, - app_controller, - view_controller, - trash_controller, + flowy_document, + ws_sender, )) } -pub fn create(core: Arc) -> Module { +pub fn create(folder: Arc) -> Module { let mut module = Module::new() .name("Flowy-Workspace") - .data(core.workspace_controller.clone()) - .data(core.app_controller.clone()) - .data(core.view_controller.clone()) - .data(core.trash_controller.clone()) - .data(core.clone()); + .data(folder.workspace_controller.clone()) + .data(folder.app_controller.clone()) + .data(folder.view_controller.clone()) + .data(folder.trash_controller.clone()) + .data(folder.clone()); module = module .event(WorkspaceEvent::CreateWorkspace, create_workspace_handler) diff --git a/frontend/rust-lib/flowy-core/src/services/workspace/event_handler.rs b/frontend/rust-lib/flowy-core/src/services/workspace/event_handler.rs index 8d1d08ce2d..5a03256fe0 100644 --- a/frontend/rust-lib/flowy-core/src/services/workspace/event_handler.rs +++ b/frontend/rust-lib/flowy-core/src/services/workspace/event_handler.rs @@ -1,5 +1,5 @@ use crate::{ - context::CoreContext, + controller::FolderManager, dart_notification::{send_dart_notification, WorkspaceNotification}, errors::FlowyError, services::{get_current_workspace, read_local_workspace_apps, WorkspaceController}, @@ -42,17 +42,17 @@ pub(crate) async fn open_workspace_handler( data_result(workspaces) } -#[tracing::instrument(skip(data, core), err)] +#[tracing::instrument(skip(data, folder), err)] pub(crate) async fn read_workspaces_handler( data: Data, - core: Unit>, + folder: Unit>, ) -> DataResult { let params: WorkspaceId = data.into_inner().try_into()?; - let user_id = core.user.user_id()?; - let workspace_controller = core.workspace_controller.clone(); + let user_id = folder.user.user_id()?; + let workspace_controller = folder.workspace_controller.clone(); - let trash_controller = core.trash_controller.clone(); - let workspaces = core.persistence.begin_transaction(|transaction| { + let trash_controller = folder.trash_controller.clone(); + let workspaces = folder.persistence.begin_transaction(|transaction| { let mut workspaces = workspace_controller.read_local_workspaces(params.workspace_id.clone(), &user_id, &transaction)?; for workspace in workspaces.iter_mut() { @@ -61,41 +61,40 @@ pub(crate) async fn read_workspaces_handler( } Ok(workspaces) })?; - let _ = read_workspaces_on_server(core, user_id, params); + let _ = read_workspaces_on_server(folder, user_id, params); data_result(workspaces) } -#[tracing::instrument(skip(core), err)] +#[tracing::instrument(skip(folder), err)] pub async fn read_cur_workspace_handler( - core: Unit>, + folder: Unit>, ) -> DataResult { let workspace_id = get_current_workspace()?; - let user_id = core.user.user_id()?; + let user_id = folder.user.user_id()?; let params = WorkspaceId { workspace_id: Some(workspace_id.clone()), }; - let workspace = core.persistence.begin_transaction(|transaction| { - core.workspace_controller + let workspace = folder.persistence.begin_transaction(|transaction| { + folder + .workspace_controller .read_local_workspace(workspace_id, &user_id, &transaction) })?; - let latest_view: Option = core.view_controller.latest_visit_view().unwrap_or(None); + let latest_view: Option = folder.view_controller.latest_visit_view().unwrap_or(None); let setting = CurrentWorkspaceSetting { workspace, latest_view }; - let _ = read_workspaces_on_server(core, user_id, params); + let _ = read_workspaces_on_server(folder, user_id, params); data_result(setting) } -#[tracing::instrument(level = "debug", skip(core), err)] +#[tracing::instrument(level = "debug", skip(folder_manager), err)] fn read_workspaces_on_server( - core: Unit>, + folder_manager: Unit>, user_id: String, params: WorkspaceId, ) -> Result<(), FlowyError> { - let (token, server) = (core.user.token()?, core.cloud_service.clone()); - let _app_ctrl = core.app_controller.clone(); - let _view_ctrl = core.view_controller.clone(); - let persistence = core.persistence.clone(); + let (token, server) = (folder_manager.user.token()?, folder_manager.cloud_service.clone()); + let persistence = folder_manager.persistence.clone(); tokio::spawn(async move { let workspaces = server.read_workspace(&token, params).await?; diff --git a/frontend/rust-lib/flowy-document/src/context.rs b/frontend/rust-lib/flowy-document/src/context.rs index 0367e3ad6f..13ac012597 100644 --- a/frontend/rust-lib/flowy-document/src/context.rs +++ b/frontend/rust-lib/flowy-document/src/context.rs @@ -1,11 +1,6 @@ -use crate::{ - controller::DocumentController, - errors::FlowyError, - ws_receivers::DocumentWSReceivers, - DocumentCloudService, -}; +use crate::{controller::DocumentController, errors::FlowyError}; use flowy_database::ConnectionPool; -use flowy_sync::RevisionWebSocket; + use std::sync::Arc; pub trait DocumentUser: Send + Sync { @@ -21,24 +16,6 @@ pub struct DocumentContext { } impl DocumentContext { - pub fn new( - user: Arc, - ws_receivers: Arc, - ws_sender: Arc, - cloud_service: Arc, - ) -> DocumentContext { - let doc_ctrl = Arc::new(DocumentController::new( - cloud_service, - user.clone(), - ws_receivers, - ws_sender, - )); - Self { - controller: doc_ctrl, - user, - } - } - pub fn init(&self) -> Result<(), FlowyError> { let _ = self.controller.init()?; Ok(()) diff --git a/frontend/rust-lib/flowy-document/src/controller.rs b/frontend/rust-lib/flowy-document/src/controller.rs index d25fdadf19..72301a58e0 100644 --- a/frontend/rust-lib/flowy-document/src/controller.rs +++ b/frontend/rust-lib/flowy-document/src/controller.rs @@ -1,49 +1,52 @@ -use crate::{ - context::DocumentUser, - core::ClientDocumentEditor, - errors::FlowyError, - ws_receivers::DocumentWSReceivers, - DocumentCloudService, -}; +use crate::{context::DocumentUser, core::ClientDocumentEditor, errors::FlowyError, DocumentCloudService}; +use async_trait::async_trait; use bytes::Bytes; use dashmap::DashMap; use flowy_collaboration::entities::{ doc::{DocumentDelta, DocumentId}, revision::{md5, RepeatedRevision, Revision}, + ws::ServerRevisionWSData, }; use flowy_database::ConnectionPool; use flowy_error::FlowyResult; use flowy_sync::{RevisionCache, RevisionCloudService, RevisionManager, RevisionWebSocket, WSStateReceiver}; use lib_infra::future::FutureResult; -use std::sync::Arc; +use lib_ws::WSConnectState; +use std::{convert::TryInto, sync::Arc}; +#[async_trait] +pub(crate) trait DocumentWSReceiver: Send + Sync { + async fn receive_ws_data(&self, data: ServerRevisionWSData) -> Result<(), FlowyError>; + fn connect_state_changed(&self, state: WSConnectState); +} +type WebSocketDataReceivers = Arc>>; pub struct DocumentController { cloud_service: Arc, - ws_receivers: Arc, - ws_sender: Arc, + ws_receivers: WebSocketDataReceivers, + web_socket: Arc, open_cache: Arc, user: Arc, } impl DocumentController { - pub(crate) fn new( + pub fn new( cloud_service: Arc, user: Arc, - ws_receivers: Arc, - ws_sender: Arc, + web_socket: Arc, ) -> Self { + let ws_receivers = Arc::new(DashMap::new()); let open_cache = Arc::new(OpenDocCache::new()); Self { cloud_service, ws_receivers, - ws_sender, + web_socket, open_cache, user, } } pub(crate) fn init(&self) -> FlowyResult<()> { - let notify = self.ws_sender.subscribe_state_changed(); + let notify = self.web_socket.subscribe_state_changed(); listen_ws_state_changed(notify, self.ws_receivers.clone()); Ok(()) @@ -61,7 +64,7 @@ impl DocumentController { let doc_id = doc_id.as_ref(); tracing::Span::current().record("doc_id", &doc_id); self.open_cache.remove(doc_id); - self.ws_receivers.remove(doc_id); + self.remove_ws_receiver(doc_id); Ok(()) } @@ -70,7 +73,7 @@ impl DocumentController { let doc_id = doc_id.as_ref(); tracing::Span::current().record("doc_id", &doc_id); self.open_cache.remove(doc_id); - self.ws_receivers.remove(doc_id); + self.remove_ws_receiver(doc_id); Ok(()) } @@ -93,6 +96,25 @@ impl DocumentController { Ok(()) } + pub async fn did_receive_ws_data(&self, data: Bytes) { + let data: ServerRevisionWSData = data.try_into().unwrap(); + match self.ws_receivers.get(&data.object_id) { + None => tracing::error!("Can't find any source handler for {:?}", data.object_id), + Some(handler) => match handler.receive_ws_data(data).await { + Ok(_) => {}, + Err(e) => tracing::error!("{}", e), + }, + } + } + + pub async fn ws_connect_state_changed(&self, state: &WSConnectState) { + for receiver in self.ws_receivers.iter() { + receiver.value().connect_state_changed(state.clone()); + } + } +} + +impl DocumentController { async fn get_editor(&self, doc_id: &str) -> FlowyResult> { match self.open_cache.get(doc_id) { None => { @@ -102,9 +124,7 @@ impl DocumentController { Some(editor) => Ok(editor), } } -} -impl DocumentController { async fn make_editor( &self, doc_id: &str, @@ -117,8 +137,8 @@ impl DocumentController { token, server: self.cloud_service.clone(), }); - let doc_editor = ClientDocumentEditor::new(doc_id, user, rev_manager, self.ws_sender.clone(), server).await?; - self.ws_receivers.add(doc_id, doc_editor.ws_handler()); + let doc_editor = ClientDocumentEditor::new(doc_id, user, rev_manager, self.web_socket.clone(), server).await?; + self.add_ws_receiver(doc_id, doc_editor.ws_handler()); self.open_cache.insert(&doc_id, &doc_editor); Ok(doc_editor) } @@ -128,6 +148,15 @@ impl DocumentController { let cache = Arc::new(RevisionCache::new(&user_id, doc_id, pool)); Ok(RevisionManager::new(&user_id, doc_id, cache)) } + + fn add_ws_receiver(&self, object_id: &str, receiver: Arc) { + if self.ws_receivers.contains_key(object_id) { + log::error!("Duplicate handler registered for {:?}", object_id); + } + self.ws_receivers.insert(object_id.to_string(), receiver); + } + + fn remove_ws_receiver(&self, id: &str) { self.ws_receivers.remove(id); } } struct RevisionServerImpl { @@ -194,10 +223,12 @@ impl OpenDocCache { } #[tracing::instrument(level = "debug", skip(state_receiver, receivers))] -fn listen_ws_state_changed(mut state_receiver: WSStateReceiver, receivers: Arc) { +fn listen_ws_state_changed(mut state_receiver: WSStateReceiver, receivers: WebSocketDataReceivers) { tokio::spawn(async move { while let Ok(state) = state_receiver.recv().await { - receivers.ws_connect_state_changed(&state).await; + for receiver in receivers.iter() { + receiver.value().connect_state_changed(state.clone()); + } } }); } diff --git a/frontend/rust-lib/flowy-document/src/core/editor.rs b/frontend/rust-lib/flowy-document/src/core/editor.rs index 42004aae0c..a390052b72 100644 --- a/frontend/rust-lib/flowy-document/src/core/editor.rs +++ b/frontend/rust-lib/flowy-document/src/core/editor.rs @@ -2,7 +2,7 @@ use crate::{ context::DocumentUser, core::{make_document_ws_manager, EditorCommand, EditorCommandQueue, EditorCommandSender}, errors::FlowyError, - ws_receivers::DocumentWSReceiver, + DocumentWSReceiver, }; use bytes::Bytes; use flowy_collaboration::{ @@ -38,7 +38,7 @@ impl ClientDocumentEditor { doc_id: &str, user: Arc, mut rev_manager: RevisionManager, - ws: Arc, + web_socket: Arc, server: Arc, ) -> FlowyResult> { let document_info = rev_manager.load::(server).await?; @@ -53,7 +53,7 @@ impl ClientDocumentEditor { user_id.clone(), edit_cmd_tx.clone(), rev_manager.clone(), - ws, + web_socket, ) .await; let editor = Arc::new(Self { diff --git a/frontend/rust-lib/flowy-document/src/core/web_socket.rs b/frontend/rust-lib/flowy-document/src/core/web_socket.rs index 5228aa8960..b68b0e4146 100644 --- a/frontend/rust-lib/flowy-document/src/core/web_socket.rs +++ b/frontend/rust-lib/flowy-document/src/core/web_socket.rs @@ -1,6 +1,6 @@ use crate::{ core::{EditorCommand, TransformDeltas, SYNC_INTERVAL_IN_MILLIS}, - ws_receivers::DocumentWSReceiver, + DocumentWSReceiver, }; use async_trait::async_trait; use bytes::Bytes; @@ -37,7 +37,7 @@ pub(crate) async fn make_document_ws_manager( user_id: String, edit_cmd_tx: EditorCommandSender, rev_manager: Arc, - ws_conn: Arc, + web_socket: Arc, ) -> Arc { let shared_sink = Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone())); let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { @@ -50,7 +50,7 @@ pub(crate) async fn make_document_ws_manager( let ping_duration = Duration::from_millis(SYNC_INTERVAL_IN_MILLIS); let ws_manager = Arc::new(RevisionWebSocketManager::new( &doc_id, - ws_conn, + web_socket, data_provider, ws_stream_consumer, ping_duration, diff --git a/frontend/rust-lib/flowy-document/src/lib.rs b/frontend/rust-lib/flowy-document/src/lib.rs index 4fd51b2975..cdf29fa678 100644 --- a/frontend/rust-lib/flowy-document/src/lib.rs +++ b/frontend/rust-lib/flowy-document/src/lib.rs @@ -1,10 +1,9 @@ pub mod context; -pub(crate) mod controller; +mod controller; pub mod core; // mod notify; pub mod protobuf; -pub mod ws_receivers; - +pub use controller::*; pub mod errors { pub use flowy_error::{internal_error, ErrorCode, FlowyError}; } diff --git a/frontend/rust-lib/flowy-document/src/ws_receivers.rs b/frontend/rust-lib/flowy-document/src/ws_receivers.rs deleted file mode 100644 index 2877131581..0000000000 --- a/frontend/rust-lib/flowy-document/src/ws_receivers.rs +++ /dev/null @@ -1,56 +0,0 @@ -use crate::errors::FlowyError; -use async_trait::async_trait; -use bytes::Bytes; -use dashmap::DashMap; -use flowy_collaboration::entities::ws::{ServerRevisionWSData}; -use lib_ws::WSConnectState; -use std::{convert::TryInto, sync::Arc}; - -#[async_trait] -pub(crate) trait DocumentWSReceiver: Send + Sync { - async fn receive_ws_data(&self, data: ServerRevisionWSData) -> Result<(), FlowyError>; - fn connect_state_changed(&self, state: WSConnectState); -} - -pub struct DocumentWSReceivers { - // key: the document id - // value: DocumentWSReceiver - receivers: Arc>>, -} - -impl std::default::Default for DocumentWSReceivers { - fn default() -> Self { - let receivers: Arc>> = Arc::new(DashMap::new()); - DocumentWSReceivers { receivers } - } -} - -impl DocumentWSReceivers { - pub fn new() -> Self { DocumentWSReceivers::default() } - - pub(crate) fn add(&self, doc_id: &str, receiver: Arc) { - if self.receivers.contains_key(doc_id) { - log::error!("Duplicate handler registered for {:?}", doc_id); - } - self.receivers.insert(doc_id.to_string(), receiver); - } - - pub(crate) fn remove(&self, id: &str) { self.receivers.remove(id); } - - pub async fn did_receive_data(&self, data: Bytes) { - let data: ServerRevisionWSData = data.try_into().unwrap(); - match self.receivers.get(&data.object_id) { - None => tracing::error!("Can't find any source handler for {:?}", data.object_id), - Some(handler) => match handler.receive_ws_data(data).await { - Ok(_) => {}, - Err(e) => tracing::error!("{}", e), - }, - } - } - - pub async fn ws_connect_state_changed(&self, state: &WSConnectState) { - for receiver in self.receivers.iter() { - receiver.value().connect_state_changed(state.clone()); - } - } -} diff --git a/frontend/rust-lib/flowy-net/src/local_server/persistence.rs b/frontend/rust-lib/flowy-net/src/local_server/persistence.rs index 7c41e622b3..7b89537a0c 100644 --- a/frontend/rust-lib/flowy-net/src/local_server/persistence.rs +++ b/frontend/rust-lib/flowy-net/src/local_server/persistence.rs @@ -12,42 +12,40 @@ use std::{ sync::Arc, }; -pub trait DocumentCloudStorage: Send + Sync { +pub trait RevisionCloudStorage: Send + Sync { fn set_revisions(&self, repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError>; fn get_revisions( &self, - doc_id: &str, + object_id: &str, rev_ids: Option>, ) -> BoxResultFuture; - fn reset_document( + fn reset_object( &self, - doc_id: &str, + object_id: &str, repeated_revision: RepeatedRevisionPB, ) -> BoxResultFuture<(), CollaborateError>; } -pub(crate) struct LocalDocumentCloudPersistence { +pub(crate) struct LocalRevisionCloudPersistence { // For the moment, we use memory to cache the data, it will be implemented with other storage. // Like the Firestore,Dropbox.etc. - storage: Arc, + storage: Arc, } -impl Debug for LocalDocumentCloudPersistence { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("LocalDocServerPersistence") } +impl Debug for LocalRevisionCloudPersistence { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("LocalRevisionCloudPersistence") } } -impl std::default::Default for LocalDocumentCloudPersistence { +impl std::default::Default for LocalRevisionCloudPersistence { fn default() -> Self { - LocalDocumentCloudPersistence { + LocalRevisionCloudPersistence { storage: Arc::new(MemoryDocumentCloudStorage::default()), } } } -impl DocumentCloudPersistence for LocalDocumentCloudPersistence { - fn enable_sync(&self) -> bool { false } - +impl DocumentCloudPersistence for LocalRevisionCloudPersistence { fn read_document(&self, doc_id: &str) -> BoxResultFuture { let storage = self.storage.clone(); let doc_id = doc_id.to_owned(); @@ -107,7 +105,7 @@ impl DocumentCloudPersistence for LocalDocumentCloudPersistence { let storage = self.storage.clone(); let doc_id = doc_id.to_owned(); Box::pin(async move { - let _ = storage.reset_document(&doc_id, revisions).await?; + let _ = storage.reset_object(&doc_id, revisions).await?; Ok(()) }) } @@ -117,7 +115,7 @@ struct MemoryDocumentCloudStorage {} impl std::default::Default for MemoryDocumentCloudStorage { fn default() -> Self { Self {} } } -impl DocumentCloudStorage for MemoryDocumentCloudStorage { +impl RevisionCloudStorage for MemoryDocumentCloudStorage { fn set_revisions(&self, _repeated_revision: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> { Box::pin(async move { Ok(()) }) } @@ -133,7 +131,7 @@ impl DocumentCloudStorage for MemoryDocumentCloudStorage { }) } - fn reset_document( + fn reset_object( &self, _doc_id: &str, _repeated_revision: RepeatedRevisionPB, diff --git a/frontend/rust-lib/flowy-net/src/local_server/server.rs b/frontend/rust-lib/flowy-net/src/local_server/server.rs index 85842f1208..5d3ed60eac 100644 --- a/frontend/rust-lib/flowy-net/src/local_server/server.rs +++ b/frontend/rust-lib/flowy-net/src/local_server/server.rs @@ -1,4 +1,4 @@ -use crate::local_server::persistence::LocalDocumentCloudPersistence; +use crate::local_server::persistence::LocalRevisionCloudPersistence; use async_stream::stream; use bytes::Bytes; use flowy_collaboration::{ @@ -35,7 +35,7 @@ impl LocalServer { client_ws_sender: mpsc::UnboundedSender, client_ws_receiver: broadcast::Sender, ) -> Self { - let persistence = Arc::new(LocalDocumentCloudPersistence::default()); + let persistence = Arc::new(LocalRevisionCloudPersistence::default()); let doc_manager = Arc::new(ServerDocumentManager::new(persistence)); let stop_tx = RwLock::new(None); @@ -122,7 +122,7 @@ impl LocalWebSocketRunner { client_data.ty, ); let client_ws_sender = self.client_ws_sender.clone(); - let user = Arc::new(LocalDocumentUser { + let user = Arc::new(LocalRevisionUser { user_id, client_ws_sender, }); @@ -144,15 +144,15 @@ impl LocalWebSocketRunner { } #[derive(Debug)] -struct LocalDocumentUser { +struct LocalRevisionUser { user_id: String, client_ws_sender: mpsc::UnboundedSender, } -impl RevisionUser for LocalDocumentUser { +impl RevisionUser for LocalRevisionUser { fn user_id(&self) -> String { self.user_id.clone() } - fn receive(&self, resp: SyncResponse) { + fn receive(&self, resp: RevisionSyncResponse) { let sender = self.client_ws_sender.clone(); let send_fn = |sender: UnboundedSender, msg: WebSocketRawMessage| match sender.send(msg) { Ok(_) => {}, @@ -163,7 +163,7 @@ impl RevisionUser for LocalDocumentUser { tokio::spawn(async move { match resp { - SyncResponse::Pull(data) => { + RevisionSyncResponse::Pull(data) => { let bytes: Bytes = data.try_into().unwrap(); let msg = WebSocketRawMessage { module: WSModule::Doc, @@ -171,7 +171,7 @@ impl RevisionUser for LocalDocumentUser { }; send_fn(sender, msg); }, - SyncResponse::Push(data) => { + RevisionSyncResponse::Push(data) => { let bytes: Bytes = data.try_into().unwrap(); let msg = WebSocketRawMessage { module: WSModule::Doc, @@ -179,7 +179,7 @@ impl RevisionUser for LocalDocumentUser { }; send_fn(sender, msg); }, - SyncResponse::Ack(data) => { + RevisionSyncResponse::Ack(data) => { let bytes: Bytes = data.try_into().unwrap(); let msg = WebSocketRawMessage { module: WSModule::Doc, diff --git a/frontend/rust-lib/flowy-net/src/local_server/ws.rs b/frontend/rust-lib/flowy-net/src/local_server/ws.rs index da1ebac84e..a1502c346a 100644 --- a/frontend/rust-lib/flowy-net/src/local_server/ws.rs +++ b/frontend/rust-lib/flowy-net/src/local_server/ws.rs @@ -1,4 +1,4 @@ -use crate::ws::connection::{FlowyRawWebSocket, FlowyWSSender}; +use crate::ws::connection::{FlowyRawWebSocket, FlowyWebSocket}; use dashmap::DashMap; use flowy_error::FlowyError; use lib_infra::future::FutureResult; @@ -65,7 +65,7 @@ impl FlowyRawWebSocket for LocalWebSocket { Ok(()) } - fn sender(&self) -> Result, FlowyError> { + fn sender(&self) -> Result, FlowyError> { let ws = LocalWebSocketAdaptor(self.server_ws_sender.clone()); Ok(Arc::new(ws)) } @@ -74,7 +74,7 @@ impl FlowyRawWebSocket for LocalWebSocket { #[derive(Clone)] struct LocalWebSocketAdaptor(broadcast::Sender); -impl FlowyWSSender for LocalWebSocketAdaptor { +impl FlowyWebSocket for LocalWebSocketAdaptor { fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> { let _ = self.0.send(msg); Ok(()) diff --git a/frontend/rust-lib/flowy-net/src/ws/connection.rs b/frontend/rust-lib/flowy-net/src/ws/connection.rs index 9e9b89627a..72689f90b5 100644 --- a/frontend/rust-lib/flowy-net/src/ws/connection.rs +++ b/frontend/rust-lib/flowy-net/src/ws/connection.rs @@ -16,10 +16,10 @@ pub trait FlowyRawWebSocket: Send + Sync { fn subscribe_connect_state(&self) -> broadcast::Receiver; fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError>; fn add_receiver(&self, receiver: Arc) -> Result<(), FlowyError>; - fn sender(&self) -> Result, FlowyError>; + fn sender(&self) -> Result, FlowyError>; } -pub trait FlowyWSSender: Send + Sync { +pub trait FlowyWebSocket: Send + Sync { fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError>; } @@ -101,12 +101,12 @@ impl FlowyWebSocketConnect { Ok(()) } - pub fn ws_sender(&self) -> Result, FlowyError> { self.inner.sender() } + pub fn web_socket(&self) -> Result, FlowyError> { self.inner.sender() } } #[tracing::instrument(level = "debug", skip(ws_conn))] pub fn listen_on_websocket(ws_conn: Arc) { - let ws = ws_conn.inner.clone(); + let raw_web_socket = ws_conn.inner.clone(); let mut notify = ws_conn.inner.subscribe_connect_state(); let _ = tokio::spawn(async move { loop { @@ -117,7 +117,7 @@ pub fn listen_on_websocket(ws_conn: Arc) { WSConnectState::Init => {}, WSConnectState::Connected => {}, WSConnectState::Connecting => {}, - WSConnectState::Disconnected => retry_connect(ws.clone(), 100).await, + WSConnectState::Disconnected => retry_connect(raw_web_socket.clone(), 100).await, } }, Err(e) => { diff --git a/frontend/rust-lib/flowy-net/src/ws/http_ws.rs b/frontend/rust-lib/flowy-net/src/ws/http_ws.rs index 1d019b4c5d..fe0ff009c0 100644 --- a/frontend/rust-lib/flowy-net/src/ws/http_ws.rs +++ b/frontend/rust-lib/flowy-net/src/ws/http_ws.rs @@ -1,4 +1,4 @@ -use crate::ws::connection::{FlowyRawWebSocket, FlowyWSSender}; +use crate::ws::connection::{FlowyRawWebSocket, FlowyWebSocket}; use flowy_error::internal_error; pub use flowy_error::FlowyError; use lib_infra::future::FutureResult; @@ -42,13 +42,13 @@ impl FlowyRawWebSocket for Arc { Ok(()) } - fn sender(&self) -> Result, FlowyError> { + fn sender(&self) -> Result, FlowyError> { let sender = self.ws_message_sender().map_err(internal_error)?; Ok(sender) } } -impl FlowyWSSender for WSSender { +impl FlowyWebSocket for WSSender { fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> { let _ = self.send_msg(msg).map_err(internal_error)?; Ok(()) diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/core_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/core_deps.rs index 2f180ed747..cabe4de893 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/core_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/core_deps.rs @@ -1,13 +1,22 @@ use backend_service::configuration::ClientServerConfiguration; +use bytes::Bytes; +use flowy_collaboration::entities::ws::ClientRevisionWSData; use flowy_core::{ - errors::FlowyError, - module::{WorkspaceCloudService, WorkspaceDatabase, WorkspaceUser}, + controller::FolderManager, + errors::{internal_error, FlowyError}, + module::{init_folder, WorkspaceCloudService, WorkspaceDatabase, WorkspaceUser}, }; use flowy_database::ConnectionPool; -use flowy_net::{http_server::core::CoreHttpCloudService, local_server::LocalServer}; +use flowy_document::context::DocumentContext; +use flowy_net::{ + http_server::core::CoreHttpCloudService, + local_server::LocalServer, + ws::connection::FlowyWebSocketConnect, +}; +use flowy_sync::{RevisionWebSocket, WSStateReceiver}; use flowy_user::services::UserSession; - -use std::sync::Arc; +use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage}; +use std::{convert::TryInto, sync::Arc}; pub struct CoreDepsResolver(); impl CoreDepsResolver { @@ -15,18 +24,22 @@ impl CoreDepsResolver { local_server: Option>, user_session: Arc, server_config: &ClientServerConfiguration, - ) -> ( - Arc, - Arc, - Arc, - ) { + flowy_document: &Arc, + ws_conn: Arc, + ) -> Arc { let user: Arc = Arc::new(WorkspaceUserImpl(user_session.clone())); let database: Arc = Arc::new(WorkspaceDatabaseImpl(user_session)); + let ws_sender = Arc::new(FolderWebSocketImpl(ws_conn.clone())); let cloud_service: Arc = match local_server { None => Arc::new(CoreHttpCloudService::new(server_config.clone())), Some(local_server) => local_server, }; - (user, database, cloud_service) + + let folder_manager = init_folder(user, database, flowy_document.clone(), cloud_service, ws_sender); + let receiver = Arc::new(FolderWSMessageReceiverImpl(folder_manager.clone())); + ws_conn.add_ws_message_receiver(receiver).unwrap(); + + folder_manager } } @@ -43,3 +56,30 @@ impl WorkspaceUser for WorkspaceUserImpl { fn token(&self) -> Result { self.0.token().map_err(|e| FlowyError::internal().context(e)) } } + +struct FolderWebSocketImpl(Arc); +impl RevisionWebSocket for FolderWebSocketImpl { + fn send(&self, data: ClientRevisionWSData) -> Result<(), FlowyError> { + let bytes: Bytes = data.try_into().unwrap(); + let msg = WebSocketRawMessage { + module: WSModule::Folder, + data: bytes.to_vec(), + }; + let sender = self.0.web_socket()?; + sender.send(msg).map_err(internal_error)?; + Ok(()) + } + + fn subscribe_state_changed(&self) -> WSStateReceiver { self.0.subscribe_websocket_state() } +} + +struct FolderWSMessageReceiverImpl(Arc); +impl WSMessageReceiver for FolderWSMessageReceiverImpl { + fn source(&self) -> WSModule { WSModule::Folder } + fn receive_message(&self, msg: WebSocketRawMessage) { + let handler = self.0.clone(); + tokio::spawn(async move { + handler.did_receive_ws_data(Bytes::from(msg.data)).await; + }); + } +} diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs index bedab1d5df..12b094635f 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs @@ -3,10 +3,10 @@ use bytes::Bytes; use flowy_collaboration::entities::ws::ClientRevisionWSData; use flowy_database::ConnectionPool; use flowy_document::{ - context::DocumentUser, + context::{DocumentContext, DocumentUser}, errors::{internal_error, FlowyError}, - ws_receivers::DocumentWSReceivers, DocumentCloudService, + DocumentController, }; use flowy_net::{ http_server::document::DocumentHttpCloudService, @@ -18,13 +18,6 @@ use flowy_user::services::UserSession; use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage}; use std::{convert::TryInto, path::Path, sync::Arc}; -pub struct DocumentDependencies { - pub user: Arc, - pub ws_receivers: Arc, - pub ws_sender: Arc, - pub cloud_service: Arc, -} - pub struct DocumentDepsResolver(); impl DocumentDepsResolver { pub fn resolve( @@ -32,23 +25,21 @@ impl DocumentDepsResolver { ws_conn: Arc, user_session: Arc, server_config: &ClientServerConfiguration, - ) -> DocumentDependencies { + ) -> DocumentContext { let user = Arc::new(DocumentUserImpl(user_session)); let ws_sender = Arc::new(DocumentWebSocketImpl(ws_conn.clone())); - let ws_receivers = Arc::new(DocumentWSReceivers::new()); - let receiver = Arc::new(WSMessageReceiverImpl(ws_receivers.clone())); - ws_conn.add_ws_message_receiver(receiver).unwrap(); - let cloud_service: Arc = match local_server { None => Arc::new(DocumentHttpCloudService::new(server_config.clone())), Some(local_server) => local_server, }; - DocumentDependencies { + let document_controller = Arc::new(DocumentController::new(cloud_service, user.clone(), ws_sender)); + let receiver = Arc::new(DocumentWSMessageReceiverImpl(document_controller.clone())); + ws_conn.add_ws_message_receiver(receiver).unwrap(); + + DocumentContext { + controller: document_controller, user, - ws_receivers, - ws_sender, - cloud_service, } } } @@ -80,7 +71,7 @@ impl RevisionWebSocket for DocumentWebSocketImpl { module: WSModule::Doc, data: bytes.to_vec(), }; - let sender = self.0.ws_sender()?; + let sender = self.0.web_socket()?; sender.send(msg).map_err(internal_error)?; Ok(()) } @@ -88,13 +79,13 @@ impl RevisionWebSocket for DocumentWebSocketImpl { fn subscribe_state_changed(&self) -> WSStateReceiver { self.0.subscribe_websocket_state() } } -struct WSMessageReceiverImpl(Arc); -impl WSMessageReceiver for WSMessageReceiverImpl { +struct DocumentWSMessageReceiverImpl(Arc); +impl WSMessageReceiver for DocumentWSMessageReceiverImpl { fn source(&self) -> WSModule { WSModule::Doc } fn receive_message(&self, msg: WebSocketRawMessage) { - let receivers = self.0.clone(); + let handler = self.0.clone(); tokio::spawn(async move { - receivers.did_receive_data(Bytes::from(msg.data)).await; + handler.did_receive_ws_data(Bytes::from(msg.data)).await; }); } } diff --git a/frontend/rust-lib/flowy-sdk/src/lib.rs b/frontend/rust-lib/flowy-sdk/src/lib.rs index 197a24ca6b..4310b37b38 100644 --- a/frontend/rust-lib/flowy-sdk/src/lib.rs +++ b/frontend/rust-lib/flowy-sdk/src/lib.rs @@ -2,7 +2,7 @@ mod deps_resolve; pub mod module; use crate::deps_resolve::*; use backend_service::configuration::ClientServerConfiguration; -use flowy_core::{context::CoreContext, errors::FlowyError, module::init_core}; +use flowy_core::{controller::FolderManager, errors::FlowyError}; use flowy_document::context::DocumentContext; use flowy_net::{ entities::NetworkType, @@ -83,7 +83,7 @@ pub struct FlowySDK { config: FlowySDKConfig, pub user_session: Arc, pub document_ctx: Arc, - pub core: Arc, + pub core: Arc, pub dispatcher: Arc, pub ws_conn: Arc, pub local_server: Option>, @@ -108,7 +108,13 @@ impl FlowySDK { let user_session = mk_user_session(&config, &local_server, &config.server_config); let flowy_document = mk_document(&local_server, &ws_conn, &user_session, &config.server_config); - let core_ctx = mk_core_context(&local_server, &user_session, &flowy_document, &config.server_config); + let core_ctx = mk_core_context( + &local_server, + &user_session, + &flowy_document, + &config.server_config, + &ws_conn, + ); // let modules = mk_modules(&ws_conn, &core_ctx, &user_session); @@ -134,12 +140,12 @@ fn _init( dispatch: &EventDispatcher, ws_conn: &Arc, user_session: &Arc, - core: &Arc, + folder_manager: &Arc, ) { let subscribe_user_status = user_session.notifier.subscribe_user_status(); let subscribe_network_type = ws_conn.subscribe_network_ty(); - let core = core.clone(); - let cloned_core = core.clone(); + let folder_manager = folder_manager.clone(); + let cloned_folder_manager = folder_manager.clone(); let user_session = user_session.clone(); let ws_conn = ws_conn.clone(); let local_server = local_server.clone(); @@ -152,36 +158,36 @@ fn _init( user_session.init(); ws_conn.init().await; listen_on_websocket(ws_conn.clone()); - _listen_user_status(ws_conn.clone(), subscribe_user_status, core.clone()).await; + _listen_user_status(ws_conn.clone(), subscribe_user_status, folder_manager.clone()).await; }); dispatch.spawn(async move { - _listen_network_status(subscribe_network_type, cloned_core).await; + _listen_network_status(subscribe_network_type, cloned_folder_manager).await; }); } async fn _listen_user_status( ws_conn: Arc, mut subscribe: broadcast::Receiver, - core: Arc, + folder_manager: Arc, ) { while let Ok(status) = subscribe.recv().await { let result = || async { match status { UserStatus::Login { token, user_id } => { - let _ = core.user_did_sign_in(&token).await?; + let _ = folder_manager.user_did_sign_in(&token).await?; let _ = ws_conn.start(token, user_id).await?; }, UserStatus::Logout { .. } => { - core.user_did_logout().await; + folder_manager.user_did_logout().await; let _ = ws_conn.stop().await; }, UserStatus::Expired { .. } => { - core.user_session_expired().await; + folder_manager.user_session_expired().await; let _ = ws_conn.stop().await; }, UserStatus::SignUp { profile, ret } => { - let _ = core.user_did_sign_up(&profile.token).await?; + let _ = folder_manager.user_did_sign_up(&profile.token).await?; let _ = ws_conn.start(profile.token.clone(), profile.id.clone()).await?; let _ = ret.send(()); }, @@ -196,7 +202,7 @@ async fn _listen_user_status( } } -async fn _listen_network_status(mut subscribe: broadcast::Receiver, _core: Arc) { +async fn _listen_network_status(mut subscribe: broadcast::Receiver, _core: Arc) { while let Ok(_new_type) = subscribe.recv().await { // core.network_state_changed(new_type); } @@ -235,10 +241,15 @@ fn mk_core_context( user_session: &Arc, flowy_document: &Arc, server_config: &ClientServerConfiguration, -) -> Arc { - let (user, database, cloud_service) = - CoreDepsResolver::resolve(local_server.clone(), user_session.clone(), server_config); - init_core(user, database, flowy_document.clone(), cloud_service) + ws_conn: &Arc, +) -> Arc { + CoreDepsResolver::resolve( + local_server.clone(), + user_session.clone(), + server_config, + flowy_document, + ws_conn.clone(), + ) } pub fn mk_document( @@ -247,16 +258,10 @@ pub fn mk_document( user_session: &Arc, server_config: &ClientServerConfiguration, ) -> Arc { - let dependencies = DocumentDepsResolver::resolve( + Arc::new(DocumentDepsResolver::resolve( local_server.clone(), ws_conn.clone(), user_session.clone(), server_config, - ); - Arc::new(DocumentContext::new( - dependencies.user, - dependencies.ws_receivers, - dependencies.ws_sender, - dependencies.cloud_service, )) } diff --git a/frontend/rust-lib/flowy-sdk/src/module.rs b/frontend/rust-lib/flowy-sdk/src/module.rs index 49867864b3..4394b76e39 100644 --- a/frontend/rust-lib/flowy-sdk/src/module.rs +++ b/frontend/rust-lib/flowy-sdk/src/module.rs @@ -1,4 +1,4 @@ -use flowy_core::context::CoreContext; +use flowy_core::controller::FolderManager; use flowy_net::ws::connection::FlowyWebSocketConnect; use flowy_user::services::UserSession; use lib_dispatch::prelude::Module; @@ -6,17 +6,17 @@ use std::sync::Arc; pub fn mk_modules( ws_conn: &Arc, - core: &Arc, + folder_manager: &Arc, user_session: &Arc, ) -> Vec { let user_module = mk_user_module(user_session.clone()); - let core_module = mk_core_module(core.clone()); + let core_module = mk_core_module(folder_manager.clone()); let network_module = mk_network_module(ws_conn.clone()); vec![user_module, core_module, network_module] } fn mk_user_module(user_session: Arc) -> Module { flowy_user::module::create(user_session) } -fn mk_core_module(core: Arc) -> Module { flowy_core::module::create(core) } +fn mk_core_module(core: Arc) -> Module { flowy_core::module::create(core) } fn mk_network_module(ws_conn: Arc) -> Module { flowy_net::module::create(ws_conn) } diff --git a/frontend/rust-lib/flowy-sync/src/rev_manager.rs b/frontend/rust-lib/flowy-sync/src/rev_manager.rs index 61382608d0..c69f291543 100644 --- a/frontend/rust-lib/flowy-sync/src/rev_manager.rs +++ b/frontend/rust-lib/flowy-sync/src/rev_manager.rs @@ -1,5 +1,4 @@ use crate::{RevisionCache, RevisionRecord}; - use dashmap::DashMap; use flowy_collaboration::{ entities::revision::{RepeatedRevision, Revision, RevisionRange, RevisionState}, @@ -8,7 +7,6 @@ use flowy_collaboration::{ use flowy_error::{FlowyError, FlowyResult}; use futures_util::{future, stream, stream::StreamExt}; use lib_infra::future::FutureResult; - use std::{collections::VecDeque, sync::Arc}; use tokio::sync::RwLock; diff --git a/frontend/rust-lib/flowy-sync/src/ws_manager.rs b/frontend/rust-lib/flowy-sync/src/ws_manager.rs index a008e0c3c5..577a079664 100644 --- a/frontend/rust-lib/flowy-sync/src/ws_manager.rs +++ b/frontend/rust-lib/flowy-sync/src/ws_manager.rs @@ -43,7 +43,7 @@ pub struct RevisionWebSocketManager { pub object_id: String, data_provider: Arc, stream_consumer: Arc, - ws_conn: Arc, + web_socket: Arc, pub ws_passthrough_tx: Sender, ws_passthrough_rx: Option>, pub state_passthrough_tx: broadcast::Sender, @@ -53,7 +53,7 @@ pub struct RevisionWebSocketManager { impl RevisionWebSocketManager { pub fn new( object_id: &str, - ws_conn: Arc, + web_socket: Arc, data_provider: Arc, stream_consumer: Arc, ping_duration: Duration, @@ -66,7 +66,7 @@ impl RevisionWebSocketManager { object_id, data_provider, stream_consumer, - ws_conn, + web_socket, ws_passthrough_tx, ws_passthrough_rx: Some(ws_passthrough_rx), state_passthrough_tx, @@ -81,7 +81,7 @@ impl RevisionWebSocketManager { let sink = RevisionWSSink::new( &self.object_id, self.data_provider.clone(), - self.ws_conn.clone(), + self.web_socket.clone(), self.stop_sync_tx.subscribe(), ping_duration, ); diff --git a/shared-lib/flowy-collaboration/src/server_document/document_manager.rs b/shared-lib/flowy-collaboration/src/server_document/document_manager.rs index 7db7f059ea..b9168342b5 100644 --- a/shared-lib/flowy-collaboration/src/server_document/document_manager.rs +++ b/shared-lib/flowy-collaboration/src/server_document/document_manager.rs @@ -2,7 +2,7 @@ use crate::{ entities::{doc::DocumentInfo, ws::ServerRevisionWSDataBuilder}, errors::{internal_error, CollaborateError, CollaborateResult}, protobuf::{ClientRevisionWSData, RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, - server_document::{document_pad::ServerDocument, RevisionSynchronizer, RevisionUser, SyncResponse}, + server_document::{document_pad::ServerDocument, RevisionSyncResponse, RevisionSynchronizer, RevisionUser}, }; use async_stream::stream; use dashmap::DashMap; @@ -16,8 +16,6 @@ use tokio::{ }; pub trait DocumentCloudPersistence: Send + Sync + Debug { - fn enable_sync(&self) -> bool; - fn read_document(&self, doc_id: &str) -> BoxResultFuture; fn create_document( @@ -78,9 +76,9 @@ impl ServerDocumentManager { }; if result.is_ok() { - cloned_user.receive(SyncResponse::Ack(ServerRevisionWSDataBuilder::build_ack_message( - &object_id, ack_id, - ))); + cloned_user.receive(RevisionSyncResponse::Ack( + ServerRevisionWSDataBuilder::build_ack_message(&object_id, ack_id), + )); } result } diff --git a/shared-lib/flowy-collaboration/src/server_document/revision_sync.rs b/shared-lib/flowy-collaboration/src/server_document/revision_sync.rs index 2ca6f5cbc5..d85c3584e1 100644 --- a/shared-lib/flowy-collaboration/src/server_document/revision_sync.rs +++ b/shared-lib/flowy-collaboration/src/server_document/revision_sync.rs @@ -22,10 +22,10 @@ use std::{ pub trait RevisionUser: Send + Sync + Debug { fn user_id(&self) -> String; - fn receive(&self, resp: SyncResponse); + fn receive(&self, resp: RevisionSyncResponse); } -pub enum SyncResponse { +pub enum RevisionSyncResponse { Pull(ServerRevisionWSData), Push(ServerRevisionWSData), Ack(ServerRevisionWSData), @@ -66,7 +66,7 @@ impl RevisionSynchronizer { let revisions = self.persistence.read_revisions(&doc_id, None).await?; let repeated_revision = repeated_revision_from_revision_pbs(revisions)?; let data = ServerRevisionWSDataBuilder::build_push_message(&doc_id, repeated_revision); - user.receive(SyncResponse::Push(data)); + user.receive(RevisionSyncResponse::Push(data)); return Ok(()); } @@ -94,7 +94,7 @@ impl RevisionSynchronizer { end: first_revision.rev_id, }; let msg = ServerRevisionWSDataBuilder::build_pull_message(&self.doc_id, range); - user.receive(SyncResponse::Pull(msg)); + user.receive(RevisionSyncResponse::Pull(msg)); } }, Ordering::Equal => { @@ -221,7 +221,7 @@ impl RevisionSynchronizer { match repeated_revision_from_revision_pbs(revisions) { Ok(repeated_revision) => { let data = ServerRevisionWSDataBuilder::build_push_message(&self.doc_id, repeated_revision); - user.receive(SyncResponse::Push(data)); + user.receive(RevisionSyncResponse::Push(data)); }, Err(e) => tracing::error!("{}", e), } diff --git a/shared-lib/lib-ws/src/msg.rs b/shared-lib/lib-ws/src/msg.rs index 98baba8d43..54324f3cab 100644 --- a/shared-lib/lib-ws/src/msg.rs +++ b/shared-lib/lib-ws/src/msg.rs @@ -14,7 +14,8 @@ pub struct WebSocketRawMessage { #[derive(ProtoBuf_Enum, Debug, Clone, Eq, PartialEq, Hash)] pub enum WSModule { - Doc = 0, + Doc = 0, + Folder = 1, } impl std::default::Default for WSModule { @@ -25,6 +26,7 @@ impl ToString for WSModule { fn to_string(&self) -> String { match self { WSModule::Doc => "0".to_string(), + WSModule::Folder => "1".to_string(), } } } diff --git a/shared-lib/lib-ws/src/protobuf/model/msg.rs b/shared-lib/lib-ws/src/protobuf/model/msg.rs index 835aeb0a3d..db8621afe8 100644 --- a/shared-lib/lib-ws/src/protobuf/model/msg.rs +++ b/shared-lib/lib-ws/src/protobuf/model/msg.rs @@ -216,6 +216,7 @@ impl ::protobuf::reflect::ProtobufValue for WebSocketRawMessage { #[derive(Clone,PartialEq,Eq,Debug,Hash)] pub enum WSModule { Doc = 0, + Folder = 1, } impl ::protobuf::ProtobufEnum for WSModule { @@ -226,6 +227,7 @@ impl ::protobuf::ProtobufEnum for WSModule { fn from_i32(value: i32) -> ::std::option::Option { match value { 0 => ::std::option::Option::Some(WSModule::Doc), + 1 => ::std::option::Option::Some(WSModule::Folder), _ => ::std::option::Option::None } } @@ -233,6 +235,7 @@ impl ::protobuf::ProtobufEnum for WSModule { fn values() -> &'static [Self] { static values: &'static [WSModule] = &[ WSModule::Doc, + WSModule::Folder, ]; values } @@ -263,18 +266,20 @@ impl ::protobuf::reflect::ProtobufValue for WSModule { static file_descriptor_proto_data: &'static [u8] = b"\ \n\tmsg.proto\"L\n\x13WebSocketRawMessage\x12!\n\x06module\x18\x01\x20\ \x01(\x0e2\t.WSModuleR\x06module\x12\x12\n\x04data\x18\x02\x20\x01(\x0cR\ - \x04data*\x13\n\x08WSModule\x12\x07\n\x03Doc\x10\0J\xd9\x01\n\x06\x12\ - \x04\0\0\x08\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\ - \x02\0\x05\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x1b\n\x0b\n\x04\x04\0\ - \x02\0\x12\x03\x03\x04\x18\n\x0c\n\x05\x04\0\x02\0\x06\x12\x03\x03\x04\ - \x0c\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\r\x13\n\x0c\n\x05\x04\0\x02\ - \0\x03\x12\x03\x03\x16\x17\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x13\ - \n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\x04\t\n\x0c\n\x05\x04\0\x02\ - \x01\x01\x12\x03\x04\n\x0e\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x11\ - \x12\n\n\n\x02\x05\0\x12\x04\x06\0\x08\x01\n\n\n\x03\x05\0\x01\x12\x03\ - \x06\x05\r\n\x0b\n\x04\x05\0\x02\0\x12\x03\x07\x04\x0c\n\x0c\n\x05\x05\0\ - \x02\0\x01\x12\x03\x07\x04\x07\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x07\n\ - \x0bb\x06proto3\ + \x04data*\x1f\n\x08WSModule\x12\x07\n\x03Doc\x10\0\x12\n\n\x06Folder\x10\ + \x01J\x82\x02\n\x06\x12\x04\0\0\t\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\ + \n\n\x02\x04\0\x12\x04\x02\0\x05\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\ + \x1b\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x18\n\x0c\n\x05\x04\0\x02\0\ + \x06\x12\x03\x03\x04\x0c\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\r\x13\n\ + \x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x16\x17\n\x0b\n\x04\x04\0\x02\x01\ + \x12\x03\x04\x04\x13\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\x04\t\n\ + \x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\n\x0e\n\x0c\n\x05\x04\0\x02\x01\ + \x03\x12\x03\x04\x11\x12\n\n\n\x02\x05\0\x12\x04\x06\0\t\x01\n\n\n\x03\ + \x05\0\x01\x12\x03\x06\x05\r\n\x0b\n\x04\x05\0\x02\0\x12\x03\x07\x04\x0c\ + \n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x07\x04\x07\n\x0c\n\x05\x05\0\x02\0\ + \x02\x12\x03\x07\n\x0b\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x08\x04\x0f\n\ + \x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x08\x04\n\n\x0c\n\x05\x05\0\x02\x01\ + \x02\x12\x03\x08\r\x0eb\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/shared-lib/lib-ws/src/protobuf/proto/msg.proto b/shared-lib/lib-ws/src/protobuf/proto/msg.proto index 169cbe7ec8..b478f72b7a 100644 --- a/shared-lib/lib-ws/src/protobuf/proto/msg.proto +++ b/shared-lib/lib-ws/src/protobuf/proto/msg.proto @@ -6,4 +6,5 @@ message WebSocketRawMessage { } enum WSModule { Doc = 0; + Folder = 1; }