diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 25d6f99d3c..cdae832ceb 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1387,6 +1387,8 @@ version = "0.1.0" dependencies = [ "anyhow", "bytes", + "dashmap", + "flowy-collaboration", "flowy-derive", "flowy-error", "lib-dispatch", @@ -1413,7 +1415,6 @@ dependencies = [ "flowy-document", "flowy-net", "flowy-user", - "flowy-virtual-net", "futures-core", "lib-dispatch", "lib-infra", @@ -1504,21 +1505,6 @@ dependencies = [ "validator", ] -[[package]] -name = "flowy-virtual-net" -version = "0.1.0" -dependencies = [ - "bytes", - "dashmap", - "flowy-collaboration", - "flowy-net", - "lib-infra", - "lib-ws", - "parking_lot", - "tokio", - "tracing", -] - [[package]] name = "fnv" version = "1.0.7" diff --git a/backend/tests/document_test/edit_script.rs b/backend/tests/document_test/edit_script.rs index cf900f97f0..f74d362221 100644 --- a/backend/tests/document_test/edit_script.rs +++ b/backend/tests/document_test/edit_script.rs @@ -17,11 +17,9 @@ use backend::services::document::persistence::{read_document, reset_document}; use flowy_collaboration::entities::revision::{RepeatedRevision, Revision}; use flowy_collaboration::protobuf::{RepeatedRevision as RepeatedRevisionPB, DocumentId as DocumentIdPB}; use flowy_collaboration::sync::ServerDocumentManager; +use flowy_net::services::ws_conn::FlowyWebSocketConnect; use lib_ot::core::Interval; -use flowy_net::services::ws::FlowyWSConnect; - - pub struct DocumentTest { server: TestServer, flowy_test: FlowySDKTest, @@ -39,7 +37,7 @@ pub enum DocScript { impl DocumentTest { pub async fn new() -> Self { let server = spawn_server().await; - let flowy_test = FlowySDKTest::setup_with(server.client_server_config.clone()); + let flowy_test = FlowySDKTest::new(server.client_server_config.clone(), None); Self { server, flowy_test } } @@ -57,7 +55,7 @@ struct ScriptContext { client_editor: Option>, client_sdk: FlowySDKTest, client_user_session: Arc, - ws_conn: Arc, + ws_conn: Arc, server: TestServer, doc_id: String, } @@ -65,7 +63,7 @@ struct ScriptContext { impl ScriptContext { async fn new(client_sdk: FlowySDKTest, server: TestServer) -> Self { let user_session = client_sdk.user_session.clone(); - let ws_manager = client_sdk.ws_manager.clone(); + let ws_manager = client_sdk.ws_conn.clone(); let doc_id = create_doc(&client_sdk).await; Self { @@ -80,7 +78,7 @@ impl ScriptContext { async fn open_doc(&mut self) { let doc_id = self.doc_id.clone(); - let edit_context = self.client_sdk.document_ctx.controller.open(doc_id).await.unwrap(); + let edit_context = self.client_sdk.document_ctx.controller.open_document(doc_id).await.unwrap(); self.client_editor = Some(edit_context); } diff --git a/frontend/app_flowy/.vscode/settings.json b/frontend/app_flowy/.vscode/settings.json index cdfa99888f..13845cc225 100644 --- a/frontend/app_flowy/.vscode/settings.json +++ b/frontend/app_flowy/.vscode/settings.json @@ -22,4 +22,5 @@ "files.associations": { "*.log.*": "log" }, + "editor.formatOnSave": true, } \ No newline at end of file diff --git a/frontend/rust-lib/Cargo.toml b/frontend/rust-lib/Cargo.toml index 23d079b073..8565d7ed6f 100644 --- a/frontend/rust-lib/Cargo.toml +++ b/frontend/rust-lib/Cargo.toml @@ -4,7 +4,6 @@ members = [ "lib-log", "lib-sqlite", "flowy-net", - "flowy-virtual-net", "flowy-sdk", "dart-ffi", "flowy-user", diff --git a/frontend/rust-lib/dart-ffi/src/lib.rs b/frontend/rust-lib/dart-ffi/src/lib.rs index ea396464d0..1dd6d736f9 100644 --- a/frontend/rust-lib/dart-ffi/src/lib.rs +++ b/frontend/rust-lib/dart-ffi/src/lib.rs @@ -26,7 +26,7 @@ pub extern "C" fn init_sdk(path: *mut c_char) -> i64 { let path: &str = c_str.to_str().unwrap(); let server_config = get_client_server_configuration().unwrap(); - let config = FlowySDKConfig::new(path, server_config, "appflowy").log_filter("debug"); + let config = FlowySDKConfig::new(path, server_config, "appflowy", None).log_filter("debug"); *FLOWY_SDK.write() = Some(Arc::new(FlowySDK::new(config))); 0 diff --git a/frontend/rust-lib/flowy-core/src/context.rs b/frontend/rust-lib/flowy-core/src/context.rs index 7b283ea349..bd009d1212 100644 --- a/frontend/rust-lib/flowy-core/src/context.rs +++ b/frontend/rust-lib/flowy-core/src/context.rs @@ -119,7 +119,7 @@ impl CoreContext { .payload(repeated_workspace) .send(); - log::debug!("workspace initialize after sign up"); + tracing::debug!("Create default workspace after sign up"); let _ = self.init(&token).await?; Ok(()) } @@ -130,13 +130,13 @@ impl CoreContext { return Ok(()); } } - log::debug!("Start initializing flowy core"); + tracing::debug!("Start initializing flowy core"); INIT_WORKSPACE.write().insert(token.to_owned(), true); let _ = self.workspace_controller.init()?; let _ = self.app_controller.init()?; let _ = self.view_controller.init()?; let _ = self.trash_controller.init()?; - log::debug!("Finish initializing core"); + tracing::debug!("Finish initializing core"); Ok(()) } diff --git a/frontend/rust-lib/flowy-core/src/services/view/controller.rs b/frontend/rust-lib/flowy-core/src/services/view/controller.rs index 47a65ba055..57854cc10f 100644 --- a/frontend/rust-lib/flowy-core/src/services/view/controller.rs +++ b/frontend/rust-lib/flowy-core/src/services/view/controller.rs @@ -129,7 +129,7 @@ impl ViewController { #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)] pub(crate) async fn open_view(&self, params: DocumentId) -> Result { let doc_id = params.doc_id.clone(); - let editor = self.document_ctx.controller.open(¶ms.doc_id).await?; + let editor = self.document_ctx.controller.open_document(¶ms.doc_id).await?; KV::set_str(LATEST_VIEW_ID, doc_id.clone()); let document_json = editor.document_json().await?; @@ -141,7 +141,7 @@ impl ViewController { #[tracing::instrument(level = "debug", skip(self,params), fields(doc_id = %params.doc_id), err)] pub(crate) async fn close_view(&self, params: DocumentId) -> Result<(), FlowyError> { - let _ = self.document_ctx.controller.close(¶ms.doc_id)?; + let _ = self.document_ctx.controller.close_document(¶ms.doc_id)?; Ok(()) } @@ -152,14 +152,14 @@ impl ViewController { let _ = KV::remove(LATEST_VIEW_ID); } } - let _ = self.document_ctx.controller.close(¶ms.doc_id)?; + let _ = self.document_ctx.controller.close_document(¶ms.doc_id)?; Ok(()) } #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)] pub(crate) async fn duplicate_view(&self, params: DocumentId) -> Result<(), FlowyError> { let view: View = ViewTableSql::read_view(¶ms.doc_id, &*self.database.db_connection()?)?.into(); - let editor = self.document_ctx.controller.open(¶ms.doc_id).await?; + let editor = self.document_ctx.controller.open_document(¶ms.doc_id).await?; let document_json = editor.document_json().await?; let duplicate_params = CreateViewParams { belong_to_id: view.belong_to_id.clone(), @@ -177,7 +177,7 @@ impl ViewController { #[tracing::instrument(level = "debug", skip(self, params), err)] pub(crate) async fn export_doc(&self, params: ExportParams) -> Result { - let editor = self.document_ctx.controller.open(¶ms.doc_id).await?; + let editor = self.document_ctx.controller.open_document(¶ms.doc_id).await?; let delta_json = editor.document_json().await?; Ok(ExportData { data: delta_json, diff --git a/frontend/rust-lib/flowy-core/tests/workspace/view_test.rs b/frontend/rust-lib/flowy-core/tests/workspace/view_test.rs index ef0162aa4a..0bbf507061 100644 --- a/frontend/rust-lib/flowy-core/tests/workspace/view_test.rs +++ b/frontend/rust-lib/flowy-core/tests/workspace/view_test.rs @@ -8,7 +8,7 @@ use flowy_test::{helper::*, FlowySDKTest}; #[tokio::test] #[should_panic] async fn view_delete() { - let test = FlowySDKTest::setup(); + let test = FlowySDKTest::default(); let _ = test.init_user().await; let test = ViewTest::new(&test).await; @@ -21,7 +21,7 @@ async fn view_delete() { #[tokio::test] async fn view_delete_then_putback() { - let test = FlowySDKTest::setup(); + let test = FlowySDKTest::default(); let _ = test.init_user().await; let test = ViewTest::new(&test).await; @@ -44,7 +44,7 @@ async fn view_delete_then_putback() { #[tokio::test] async fn view_delete_all() { - let test = FlowySDKTest::setup(); + let test = FlowySDKTest::default(); let _ = test.init_user().await; let test = ViewTest::new(&test).await; @@ -66,7 +66,7 @@ async fn view_delete_all() { #[tokio::test] async fn view_delete_all_permanent() { - let test = FlowySDKTest::setup(); + let test = FlowySDKTest::default(); let _ = test.init_user().await; let test = ViewTest::new(&test).await; @@ -85,7 +85,7 @@ async fn view_delete_all_permanent() { #[tokio::test] async fn view_open_doc() { - let test = FlowySDKTest::setup(); + let test = FlowySDKTest::default(); let _ = test.init_user().await; let test = ViewTest::new(&test).await; diff --git a/frontend/rust-lib/flowy-core/tests/workspace/workspace_test.rs b/frontend/rust-lib/flowy-core/tests/workspace/workspace_test.rs index 1fde2ff4b7..913a9556d2 100644 --- a/frontend/rust-lib/flowy-core/tests/workspace/workspace_test.rs +++ b/frontend/rust-lib/flowy-core/tests/workspace/workspace_test.rs @@ -42,7 +42,7 @@ async fn workspace_create_with_apps() { #[tokio::test] async fn workspace_create_with_invalid_name() { for (name, code) in invalid_workspace_name_test_case() { - let sdk = FlowySDKTest::setup(); + let sdk = FlowySDKTest::default(); let request = CreateWorkspaceRequest { name, desc: "".to_owned(), @@ -62,7 +62,7 @@ async fn workspace_create_with_invalid_name() { #[tokio::test] async fn workspace_update_with_invalid_name() { - let sdk = FlowySDKTest::setup(); + let sdk = FlowySDKTest::default(); for (name, code) in invalid_workspace_name_test_case() { let request = CreateWorkspaceRequest { name, diff --git a/frontend/rust-lib/flowy-document/src/controller.rs b/frontend/rust-lib/flowy-document/src/controller.rs index 55e90cbda2..1ff83d0d16 100644 --- a/frontend/rust-lib/flowy-document/src/controller.rs +++ b/frontend/rust-lib/flowy-document/src/controller.rs @@ -2,7 +2,7 @@ use crate::{ context::DocumentUser, core::{ edit::ClientDocumentEditor, - revision::{RevisionCache, RevisionManager, RevisionServer}, + revision::{DocumentRevisionCache, DocumentRevisionManager, RevisionServer}, DocumentWSReceivers, DocumentWebSocket, WSStateReceiver, @@ -54,14 +54,14 @@ impl DocumentController { } #[tracing::instrument(level = "debug", skip(self, doc_id), fields(doc_id), err)] - pub async fn open>(&self, doc_id: T) -> Result, FlowyError> { + pub async fn open_document>(&self, doc_id: T) -> Result, FlowyError> { let doc_id = doc_id.as_ref(); tracing::Span::current().record("doc_id", &doc_id); self.get_editor(doc_id).await } #[tracing::instrument(level = "debug", skip(self, doc_id), fields(doc_id), err)] - pub fn close>(&self, doc_id: T) -> Result<(), FlowyError> { + pub fn close_document>(&self, doc_id: T) -> Result<(), FlowyError> { let doc_id = doc_id.as_ref(); tracing::Span::current().record("doc_id", &doc_id); self.open_cache.remove(doc_id); @@ -127,10 +127,10 @@ impl DocumentController { Ok(doc_editor) } - fn make_rev_manager(&self, doc_id: &str, pool: Arc) -> Result { + fn make_rev_manager(&self, doc_id: &str, pool: Arc) -> Result { let user_id = self.user.user_id()?; - let cache = Arc::new(RevisionCache::new(&user_id, doc_id, pool)); - Ok(RevisionManager::new(&user_id, doc_id, cache)) + let cache = Arc::new(DocumentRevisionCache::new(&user_id, doc_id, pool)); + Ok(DocumentRevisionManager::new(&user_id, doc_id, cache)) } } diff --git a/frontend/rust-lib/flowy-document/src/core/edit/editor.rs b/frontend/rust-lib/flowy-document/src/core/edit/editor.rs index 635f4394f4..89672cfef6 100644 --- a/frontend/rust-lib/flowy-document/src/core/edit/editor.rs +++ b/frontend/rust-lib/flowy-document/src/core/edit/editor.rs @@ -18,7 +18,7 @@ use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot}; pub struct ClientDocumentEditor { pub doc_id: String, - rev_manager: Arc, + rev_manager: Arc, ws_manager: Arc, edit_queue: UnboundedSender, } @@ -27,7 +27,7 @@ impl ClientDocumentEditor { pub(crate) async fn new( doc_id: &str, user: Arc, - mut rev_manager: RevisionManager, + mut rev_manager: DocumentRevisionManager, ws: Arc, server: Arc, ) -> FlowyResult> { @@ -157,7 +157,7 @@ impl ClientDocumentEditor { fn spawn_edit_queue( user: Arc, - rev_manager: Arc, + rev_manager: Arc, delta: RichTextDelta, ) -> UnboundedSender { let (sender, receiver) = mpsc::unbounded_channel::(); @@ -184,5 +184,5 @@ impl ClientDocumentEditor { Ok(delta) } - pub fn rev_manager(&self) -> Arc { self.rev_manager.clone() } + pub fn rev_manager(&self) -> Arc { self.rev_manager.clone() } } diff --git a/frontend/rust-lib/flowy-document/src/core/edit/queue.rs b/frontend/rust-lib/flowy-document/src/core/edit/queue.rs index 0e676e23a3..6df45450fb 100644 --- a/frontend/rust-lib/flowy-document/src/core/edit/queue.rs +++ b/frontend/rust-lib/flowy-document/src/core/edit/queue.rs @@ -1,4 +1,4 @@ -use crate::{context::DocumentUser, core::RevisionManager}; +use crate::{context::DocumentUser, core::DocumentRevisionManager}; use async_stream::stream; use flowy_collaboration::{ document::{history::UndoResult, Document, NewlineDoc}, @@ -18,14 +18,14 @@ use tokio::sync::{mpsc, oneshot, RwLock}; pub(crate) struct EditorCommandQueue { document: Arc>, user: Arc, - rev_manager: Arc, + rev_manager: Arc, receiver: Option>, } impl EditorCommandQueue { pub(crate) fn new( user: Arc, - rev_manager: Arc, + rev_manager: Arc, delta: RichTextDelta, receiver: mpsc::UnboundedReceiver, ) -> Self { diff --git a/frontend/rust-lib/flowy-document/src/core/revision/cache.rs b/frontend/rust-lib/flowy-document/src/core/revision/cache.rs index 1e4a3827a1..f2aaea024d 100644 --- a/frontend/rust-lib/flowy-document/src/core/revision/cache.rs +++ b/frontend/rust-lib/flowy-document/src/core/revision/cache.rs @@ -1,7 +1,7 @@ use crate::{ core::revision::{ disk::{DocumentRevisionDiskCache, RevisionChangeset, RevisionTableState, SQLitePersistence}, - memory::{RevisionMemoryCache, RevisionMemoryCacheDelegate}, + memory::{DocumentRevisionMemoryCache, RevisionMemoryCacheDelegate}, }, errors::FlowyError, }; @@ -17,17 +17,17 @@ use std::{ }; use tokio::task::spawn_blocking; -pub struct RevisionCache { +pub struct DocumentRevisionCache { doc_id: String, disk_cache: Arc>, - memory_cache: Arc, + memory_cache: Arc, latest_rev_id: AtomicI64, } -impl RevisionCache { - pub fn new(user_id: &str, doc_id: &str, pool: Arc) -> RevisionCache { +impl DocumentRevisionCache { + pub fn new(user_id: &str, doc_id: &str, pool: Arc) -> DocumentRevisionCache { let disk_cache = Arc::new(SQLitePersistence::new(user_id, pool)); - let memory_cache = Arc::new(RevisionMemoryCache::new(doc_id, Arc::new(disk_cache.clone()))); + let memory_cache = Arc::new(DocumentRevisionMemoryCache::new(doc_id, Arc::new(disk_cache.clone()))); let doc_id = doc_id.to_owned(); Self { doc_id, @@ -44,7 +44,7 @@ impl RevisionCache { write_to_disk: bool, ) -> FlowyResult { if self.memory_cache.contains(&revision.rev_id) { - return Err(FlowyError::internal().context(format!("Duplicate remote revision id: {}", revision.rev_id))); + return Err(FlowyError::internal().context(format!("Duplicate revision: {} {:?}", revision.rev_id, state))); } let state = state.as_ref().clone(); let rev_id = revision.rev_id; @@ -53,6 +53,7 @@ impl RevisionCache { state, write_to_disk, }; + self.memory_cache.add(Cow::Borrowed(&record)).await; self.set_latest_rev_id(rev_id); Ok(record) @@ -131,10 +132,15 @@ impl RevisionCache { } impl RevisionMemoryCacheDelegate for Arc { + #[tracing::instrument(level = "debug", skip(self, records), fields(checkpoint_result), err)] fn checkpoint_tick(&self, mut records: Vec) -> FlowyResult<()> { let conn = &*self.pool.get().map_err(internal_error)?; records.retain(|record| record.write_to_disk); if !records.is_empty() { + tracing::Span::current().record( + "checkpoint_result", + &format!("{} records were saved", records.len()).as_str(), + ); let _ = self.write_revision_records(records, &conn)?; } Ok(()) diff --git a/frontend/rust-lib/flowy-document/src/core/revision/manager.rs b/frontend/rust-lib/flowy-document/src/core/revision/manager.rs index 7bc30ea0d8..0867a4b3dc 100644 --- a/frontend/rust-lib/flowy-document/src/core/revision/manager.rs +++ b/frontend/rust-lib/flowy-document/src/core/revision/manager.rs @@ -1,5 +1,5 @@ use crate::{ - core::{revision::RevisionCache, RevisionRecord}, + core::{revision::DocumentRevisionCache, RevisionRecord}, errors::FlowyError, }; use bytes::Bytes; @@ -22,16 +22,16 @@ pub trait RevisionServer: Send + Sync { fn fetch_document(&self, doc_id: &str) -> FutureResult; } -pub struct RevisionManager { +pub struct DocumentRevisionManager { pub(crate) doc_id: String, user_id: String, rev_id_counter: RevIdCounter, - cache: Arc, + cache: Arc, sync_seq: Arc, } -impl RevisionManager { - pub fn new(user_id: &str, doc_id: &str, cache: Arc) -> Self { +impl DocumentRevisionManager { + pub fn new(user_id: &str, doc_id: &str, cache: Arc) -> Self { let rev_id_counter = RevIdCounter::new(0); let sync_seq = Arc::new(RevisionSyncSequence::new()); Self { @@ -70,8 +70,8 @@ impl RevisionManager { if revision.delta_data.is_empty() { return Err(FlowyError::internal().context("Delta data should be empty")); } - self.rev_id_counter.set(revision.rev_id); let _ = self.cache.add(revision.clone(), RevisionState::Ack, true).await?; + self.rev_id_counter.set(revision.rev_id); Ok(()) } @@ -194,7 +194,7 @@ struct RevisionLoader { doc_id: String, user_id: String, server: Arc, - cache: Arc, + cache: Arc, } impl RevisionLoader { @@ -272,6 +272,6 @@ impl RevisionSyncSequence { } #[cfg(feature = "flowy_unit_test")] -impl RevisionManager { - pub fn revision_cache(&self) -> Arc { self.cache.clone() } +impl DocumentRevisionManager { + pub fn revision_cache(&self) -> Arc { self.cache.clone() } } diff --git a/frontend/rust-lib/flowy-document/src/core/revision/memory.rs b/frontend/rust-lib/flowy-document/src/core/revision/memory.rs index 35fc8f9bbf..4f8dc37f09 100644 --- a/frontend/rust-lib/flowy-document/src/core/revision/memory.rs +++ b/frontend/rust-lib/flowy-document/src/core/revision/memory.rs @@ -2,7 +2,6 @@ use crate::core::RevisionRecord; use dashmap::DashMap; use flowy_collaboration::entities::revision::RevisionRange; use flowy_error::{FlowyError, FlowyResult}; -use futures_util::{stream, stream::StreamExt}; use std::{borrow::Cow, sync::Arc, time::Duration}; use tokio::{sync::RwLock, task::JoinHandle}; @@ -11,7 +10,7 @@ pub(crate) trait RevisionMemoryCacheDelegate: Send + Sync { fn receive_ack(&self, doc_id: &str, rev_id: i64); } -pub(crate) struct RevisionMemoryCache { +pub(crate) struct DocumentRevisionMemoryCache { doc_id: String, revs_map: Arc>, delegate: Arc, @@ -19,9 +18,9 @@ pub(crate) struct RevisionMemoryCache { defer_save: RwLock>>, } -impl RevisionMemoryCache { +impl DocumentRevisionMemoryCache { pub(crate) fn new(doc_id: &str, delegate: Arc) -> Self { - RevisionMemoryCache { + DocumentRevisionMemoryCache { doc_id: doc_id.to_owned(), revs_map: Arc::new(DashMap::new()), delegate, @@ -38,15 +37,19 @@ impl RevisionMemoryCache { Cow::Owned(record) => record, }; + let rev_id = record.revision.rev_id; + if self.revs_map.contains_key(&rev_id) { + return; + } + if let Some(rev_id) = self.pending_write_revs.read().await.last() { if *rev_id >= record.revision.rev_id { tracing::error!("Duplicated revision added to memory_cache"); return; } } - // TODO: Remove outdated revisions to reduce memory usage - self.revs_map.insert(record.revision.rev_id, record.clone()); - self.pending_write_revs.write().await.push(record.revision.rev_id); + self.revs_map.insert(rev_id, record); + self.pending_write_revs.write().await.push(rev_id); self.make_checkpoint().await; } @@ -79,16 +82,19 @@ impl RevisionMemoryCache { pub(crate) async fn reset_with_revisions(&self, revision_records: &[RevisionRecord]) -> FlowyResult<()> { self.revs_map.clear(); - self.pending_write_revs.write().await.clear(); if let Some(handler) = self.defer_save.write().await.take() { handler.abort(); } - stream::iter(revision_records) - .for_each(|record| async move { - self.add(Cow::Borrowed(record)).await; - }) - .await; + let mut write_guard = self.pending_write_revs.write().await; + write_guard.clear(); + for record in revision_records { + self.revs_map.insert(record.revision.rev_id, record.clone()); + write_guard.push(record.revision.rev_id); + } + drop(write_guard); + + self.make_checkpoint().await; Ok(()) } @@ -107,9 +113,8 @@ impl RevisionMemoryCache { let delegate = self.delegate.clone(); *self.defer_save.write().await = Some(tokio::spawn(async move { - tokio::time::sleep(Duration::from_millis(300)).await; + tokio::time::sleep(Duration::from_millis(600)).await; let mut revs_write_guard = pending_write_revs.write().await; - // TODO: // It may cause performance issues because we hold the write lock of the // rev_order and the lock will be released after the checkpoint has been written // to the disk. diff --git a/frontend/rust-lib/flowy-document/src/core/web_socket/http_ws_impl.rs b/frontend/rust-lib/flowy-document/src/core/web_socket/http_ws_impl.rs index cd5febd2cf..d907622c1d 100644 --- a/frontend/rust-lib/flowy-document/src/core/web_socket/http_ws_impl.rs +++ b/frontend/rust-lib/flowy-document/src/core/web_socket/http_ws_impl.rs @@ -1,5 +1,5 @@ use crate::{ - core::{web_socket::web_socket::DocumentWebSocketManager, SYNC_INTERVAL_IN_MILLIS}, + core::{web_socket::ws_manager::DocumentWebSocketManager, SYNC_INTERVAL_IN_MILLIS}, ws_receivers::{DocumentWSReceiver, DocumentWebSocket}, }; use async_stream::stream; @@ -27,7 +27,7 @@ pub(crate) struct HttpWebSocketManager { doc_id: String, data_provider: Arc, stream_consumer: Arc, - ws: Arc, + ws_conn: Arc, ws_msg_tx: UnboundedSender, ws_msg_rx: Option>, stop_sync_tx: SinkStopTx, @@ -37,7 +37,7 @@ pub(crate) struct HttpWebSocketManager { impl HttpWebSocketManager { pub(crate) fn new( doc_id: &str, - ws: Arc, + ws_conn: Arc, data_provider: Arc, stream_consumer: Arc, ) -> Self { @@ -49,7 +49,7 @@ impl HttpWebSocketManager { doc_id, data_provider, stream_consumer, - ws, + ws_conn, ws_msg_tx, ws_msg_rx: Some(ws_msg_rx), stop_sync_tx, @@ -64,7 +64,7 @@ impl HttpWebSocketManager { let sink = DocumentWSSink::new( &self.doc_id, self.data_provider.clone(), - self.ws.clone(), + self.ws_conn.clone(), self.stop_sync_tx.subscribe(), ); let stream = DocumentWSStream::new( @@ -200,7 +200,6 @@ impl DocumentWSStream { // Notify the user that someone has connected to this document }, } - Ok(()) } } @@ -260,7 +259,7 @@ impl DocumentWSSink { .for_each(|_| async { match self.send_next_revision().await { Ok(_) => {}, - Err(e) => log::error!("[DocumentSink]: send msg failed, {:?}", e), + Err(e) => log::error!("[DocumentSink]: Send failed, {:?}", e), } }) .await; @@ -273,6 +272,7 @@ impl DocumentWSSink { Ok(()) }, Some(data) => { + tracing::debug!("[DocumentSink]: Try send: {}:{:?}-{}", data.doc_id, data.ty, data.id()); self.ws_sender.send(data).map_err(internal_error) // let _ = tokio::time::timeout(Duration::from_millis(2000), }, diff --git a/frontend/rust-lib/flowy-document/src/core/web_socket/mod.rs b/frontend/rust-lib/flowy-document/src/core/web_socket/mod.rs index 519cff4c02..5fc1dc2604 100644 --- a/frontend/rust-lib/flowy-document/src/core/web_socket/mod.rs +++ b/frontend/rust-lib/flowy-document/src/core/web_socket/mod.rs @@ -1,7 +1,7 @@ #![allow(clippy::module_inception)] mod http_ws_impl; mod local_ws_impl; -mod web_socket; +mod ws_manager; pub(crate) use http_ws_impl::*; -pub(crate) use web_socket::*; +pub(crate) use ws_manager::*; diff --git a/frontend/rust-lib/flowy-document/src/core/web_socket/web_socket.rs b/frontend/rust-lib/flowy-document/src/core/web_socket/ws_manager.rs similarity index 83% rename from frontend/rust-lib/flowy-document/src/core/web_socket/web_socket.rs rename to frontend/rust-lib/flowy-document/src/core/web_socket/ws_manager.rs index b87c172081..482818dd43 100644 --- a/frontend/rust-lib/flowy-document/src/core/web_socket/web_socket.rs +++ b/frontend/rust-lib/flowy-document/src/core/web_socket/ws_manager.rs @@ -1,9 +1,9 @@ use crate::core::{ web_socket::{DocumentWSSinkDataProvider, DocumentWSSteamConsumer, HttpWebSocketManager}, + DocumentRevisionManager, DocumentWSReceiver, DocumentWebSocket, EditorCommand, - RevisionManager, TransformDeltas, }; use bytes::Bytes; @@ -17,7 +17,6 @@ use flowy_collaboration::{ use flowy_error::{internal_error, FlowyError, FlowyResult}; use lib_infra::future::FutureResult; -use crate::core::web_socket::local_ws_impl::LocalWebSocketManager; use flowy_collaboration::entities::ws::DocumentServerWSDataType; use lib_ws::WSConnectState; @@ -33,36 +32,54 @@ pub(crate) async fn make_document_ws_manager( doc_id: String, user_id: String, edit_cmd_tx: UnboundedSender, - rev_manager: Arc, - ws: Arc, + rev_manager: Arc, + ws_conn: Arc, ) -> Arc { - if cfg!(feature = "http_server") { - let shared_sink = Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone())); - let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { - doc_id: doc_id.clone(), - edit_cmd_tx, - rev_manager: rev_manager.clone(), - shared_sink: shared_sink.clone(), - }); - let ws_stream_provider = DocumentWSSinkDataProviderAdapter(shared_sink); - let ws_manager = Arc::new(HttpWebSocketManager::new( - &doc_id, - ws.clone(), - Arc::new(ws_stream_provider), - ws_stream_consumer, - )); - listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(), rev_manager); - Arc::new(ws_manager) - } else { - Arc::new(Arc::new(LocalWebSocketManager {})) - } + // if cfg!(feature = "http_server") { + // let shared_sink = + // Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone())); + // let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { + // doc_id: doc_id.clone(), + // edit_cmd_tx, + // rev_manager: rev_manager.clone(), + // shared_sink: shared_sink.clone(), + // }); + // let data_provider = + // Arc::new(DocumentWSSinkDataProviderAdapter(shared_sink)); + // let ws_manager = Arc::new(HttpWebSocketManager::new( + // &doc_id, + // ws_conn, + // data_provider, + // ws_stream_consumer, + // )); + // listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(), + // rev_manager); Arc::new(ws_manager) + // } else { + // Arc::new(Arc::new(LocalWebSocketManager {})) + // } + let shared_sink = Arc::new(SharedWSSinkDataProvider::new(rev_manager.clone())); + let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter { + doc_id: doc_id.clone(), + edit_cmd_tx, + rev_manager: rev_manager.clone(), + shared_sink: shared_sink.clone(), + }); + let data_provider = Arc::new(DocumentWSSinkDataProviderAdapter(shared_sink)); + let ws_manager = Arc::new(HttpWebSocketManager::new( + &doc_id, + ws_conn, + data_provider, + ws_stream_consumer, + )); + listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(), rev_manager); + Arc::new(ws_manager) } fn listen_document_ws_state( _user_id: &str, _doc_id: &str, mut subscriber: broadcast::Receiver, - _rev_manager: Arc, + _rev_manager: Arc, ) { tokio::spawn(async move { while let Ok(state) = subscriber.recv().await { @@ -79,7 +96,7 @@ fn listen_document_ws_state( pub(crate) struct DocumentWebSocketSteamConsumerAdapter { pub(crate) doc_id: String, pub(crate) edit_cmd_tx: UnboundedSender, - pub(crate) rev_manager: Arc, + pub(crate) rev_manager: Arc, pub(crate) shared_sink: Arc, } @@ -141,7 +158,7 @@ async fn transform_pushed_revisions( #[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))] pub(crate) async fn handle_remote_revision( edit_cmd_tx: UnboundedSender, - rev_manager: Arc, + rev_manager: Arc, bytes: Bytes, ) -> FlowyResult> { let mut revisions = RepeatedRevision::try_from(bytes)?.into_inner(); @@ -202,12 +219,12 @@ enum SourceType { #[derive(Clone)] pub(crate) struct SharedWSSinkDataProvider { shared: Arc>>, - rev_manager: Arc, + rev_manager: Arc, source_ty: Arc>, } impl SharedWSSinkDataProvider { - pub(crate) fn new(rev_manager: Arc) -> Self { + pub(crate) fn new(rev_manager: Arc) -> Self { SharedWSSinkDataProvider { shared: Arc::new(RwLock::new(VecDeque::new())), rev_manager, @@ -241,7 +258,6 @@ impl SharedWSSinkDataProvider { match self.rev_manager.next_sync_revision().await? { Some(rev) => { - tracing::debug!("[SharedWSSinkDataProvider]: {}:{:?}", rev.doc_id, rev.rev_id); let doc_id = rev.doc_id.clone(); Ok(Some(DocumentClientWSData::from_revisions(&doc_id, vec![rev]))) }, diff --git a/frontend/rust-lib/flowy-net/Cargo.toml b/frontend/rust-lib/flowy-net/Cargo.toml index 307752c51e..e0567c4bc0 100644 --- a/frontend/rust-lib/flowy-net/Cargo.toml +++ b/frontend/rust-lib/flowy-net/Cargo.toml @@ -9,6 +9,7 @@ edition = "2018" lib-dispatch = { path = "../lib-dispatch" } flowy-error = { path = "../flowy-error" } flowy-derive = { path = "../../../shared-lib/flowy-derive" } +flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration"} lib-infra = { path = "../../../shared-lib/lib-infra" } protobuf = {version = "2.18.0"} lib-ws = { path = "../../../shared-lib/lib-ws" } @@ -19,6 +20,6 @@ parking_lot = "0.11" strum = "0.21" strum_macros = "0.21" tracing = { version = "0.1", features = ["log"] } - +dashmap = {version = "4.0"} [features] http_server = [] \ No newline at end of file diff --git a/frontend/rust-lib/flowy-net/src/handlers/mod.rs b/frontend/rust-lib/flowy-net/src/handlers/mod.rs index 59e7b9c986..302f9b787f 100644 --- a/frontend/rust-lib/flowy-net/src/handlers/mod.rs +++ b/frontend/rust-lib/flowy-net/src/handlers/mod.rs @@ -1,5 +1,4 @@ -use crate::{entities::NetworkState, services::ws::FlowyWSConnect}; - +use crate::{entities::NetworkState, services::ws_conn::FlowyWebSocketConnect}; use flowy_error::FlowyError; use lib_dispatch::prelude::{Data, Unit}; use std::sync::Arc; @@ -7,7 +6,7 @@ use std::sync::Arc; #[tracing::instrument(skip(data, ws_manager))] pub async fn update_network_ty( data: Data, - ws_manager: Unit>, + ws_manager: Unit>, ) -> Result<(), FlowyError> { let network_state = data.into_inner(); ws_manager.update_network_type(&network_state.ty); diff --git a/frontend/rust-lib/flowy-net/src/module.rs b/frontend/rust-lib/flowy-net/src/module.rs index 4ca0973dcd..4a2c62f8d2 100644 --- a/frontend/rust-lib/flowy-net/src/module.rs +++ b/frontend/rust-lib/flowy-net/src/module.rs @@ -1,10 +1,10 @@ -use crate::{event::NetworkEvent, handlers::*, services::ws::FlowyWSConnect}; +use crate::{event::NetworkEvent, handlers::*, services::ws_conn::FlowyWebSocketConnect}; use lib_dispatch::prelude::*; use std::sync::Arc; -pub fn create(ws_manager: Arc) -> Module { +pub fn create(ws_conn: Arc) -> Module { Module::new() .name("Flowy-Network") - .data(ws_manager) + .data(ws_conn) .event(NetworkEvent::UpdateNetworkType, update_network_ty) } diff --git a/frontend/rust-lib/flowy-net/src/services/http_ws/http_ws_impl.rs b/frontend/rust-lib/flowy-net/src/services/http_ws/http_ws_impl.rs new file mode 100644 index 0000000000..f51ad89fed --- /dev/null +++ b/frontend/rust-lib/flowy-net/src/services/http_ws/http_ws_impl.rs @@ -0,0 +1,54 @@ +use crate::services::ws_conn::{FlowyRawWebSocket, FlowyWSSender}; +use flowy_error::internal_error; +pub use flowy_error::FlowyError; +use lib_infra::future::FutureResult; +pub use lib_ws::{WSConnectState, WSMessageReceiver, WebSocketRawMessage}; +use lib_ws::{WSController, WSSender}; + +use std::sync::Arc; +use tokio::sync::broadcast::Receiver; + +impl FlowyRawWebSocket for Arc { + fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError> { + let cloned_ws = self.clone(); + FutureResult::new(async move { + let _ = cloned_ws.start(addr).await.map_err(internal_error)?; + Ok(()) + }) + } + + fn stop_connect(&self) -> FutureResult<(), FlowyError> { + let controller = self.clone(); + FutureResult::new(async move { + controller.stop().await; + Ok(()) + }) + } + + fn subscribe_connect_state(&self) -> Receiver { self.subscribe_state() } + + fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError> { + let cloned_ws = self.clone(); + FutureResult::new(async move { + let _ = cloned_ws.retry(count).await.map_err(internal_error)?; + Ok(()) + }) + } + + fn add_receiver(&self, receiver: Arc) -> Result<(), FlowyError> { + let _ = self.add_ws_message_receiver(receiver).map_err(internal_error)?; + Ok(()) + } + + fn sender(&self) -> Result, FlowyError> { + let sender = self.ws_message_sender().map_err(internal_error)?; + Ok(sender) + } +} + +impl FlowyWSSender for WSSender { + fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> { + let _ = self.send_msg(msg).map_err(internal_error)?; + Ok(()) + } +} diff --git a/frontend/rust-lib/flowy-net/src/services/http_ws/mod.rs b/frontend/rust-lib/flowy-net/src/services/http_ws/mod.rs new file mode 100644 index 0000000000..7f637698c3 --- /dev/null +++ b/frontend/rust-lib/flowy-net/src/services/http_ws/mod.rs @@ -0,0 +1,3 @@ +pub use http_ws_impl::*; + +mod http_ws_impl; diff --git a/frontend/rust-lib/flowy-virtual-net/src/mock/server.rs b/frontend/rust-lib/flowy-net/src/services/local_ws/local_server.rs similarity index 56% rename from frontend/rust-lib/flowy-virtual-net/src/mock/server.rs rename to frontend/rust-lib/flowy-net/src/services/local_ws/local_server.rs index 99d7c598e6..a01bdee74d 100644 --- a/frontend/rust-lib/flowy-virtual-net/src/mock/server.rs +++ b/frontend/rust-lib/flowy-net/src/services/local_ws/local_server.rs @@ -6,12 +6,16 @@ use flowy_collaboration::{ ws::{DocumentClientWSData, DocumentClientWSDataType}, }, errors::CollaborateError, - protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, + protobuf::{ + DocumentClientWSData as DocumentClientWSDataPB, + RepeatedRevision as RepeatedRevisionPB, + Revision as RevisionPB, + }, sync::*, util::repeated_revision_from_repeated_revision_pb, }; use lib_infra::future::BoxResultFuture; -use lib_ws::{WSModule, WebSocketRawMessage}; +use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage}; use std::{ convert::TryInto, fmt::{Debug, Formatter}, @@ -19,62 +23,76 @@ use std::{ }; use tokio::sync::mpsc; -pub struct MockDocServer { - pub manager: Arc, +pub(crate) fn spawn_server(receivers: Arc>>) -> Arc { + let (server_tx, mut server_rx) = mpsc::unbounded_channel(); + let server = Arc::new(LocalDocumentServer::new(server_tx)); + tokio::spawn(async move { + while let Some(message) = server_rx.recv().await { + match receivers.get(&message.module) { + None => tracing::error!("Can't find any handler for message: {:?}", message), + Some(handler) => handler.receive_message(message.clone()), + } + } + }); + server } -impl std::default::Default for MockDocServer { - fn default() -> Self { - let persistence = Arc::new(MockDocServerPersistence::default()); - let manager = Arc::new(ServerDocumentManager::new(persistence)); - MockDocServer { manager } +pub struct LocalDocumentServer { + pub doc_manager: Arc, + sender: mpsc::UnboundedSender, +} + +impl LocalDocumentServer { + pub fn new(sender: mpsc::UnboundedSender) -> Self { + let persistence = Arc::new(LocalDocServerPersistence::default()); + let doc_manager = Arc::new(ServerDocumentManager::new(persistence)); + LocalDocumentServer { doc_manager, sender } } -} -impl MockDocServer { - pub async fn handle_client_data( - &self, - client_data: DocumentClientWSData, - ) -> Option> { - match client_data.ty { + pub async fn handle_client_data(&self, client_data: DocumentClientWSData) -> Result<(), CollaborateError> { + tracing::debug!( + "[LocalDocumentServer] receive client data: {}:{:?} ", + client_data.doc_id, + client_data.ty + ); + let user = Arc::new(LocalDocumentUser { + user_id: "fake_user_id".to_owned(), + ws_sender: self.sender.clone(), + }); + let ty = client_data.ty.clone(); + let document_client_data: DocumentClientWSDataPB = client_data.try_into().unwrap(); + match ty { DocumentClientWSDataType::ClientPushRev => { - let (tx, rx) = mpsc::channel(1); - let user = Arc::new(MockDocUser { - user_id: "fake_user_id".to_owned(), - tx, - }); - let pb_client_data: flowy_collaboration::protobuf::DocumentClientWSData = - client_data.try_into().unwrap(); - self.manager - .handle_client_revisions(user, pb_client_data) - .await - .unwrap(); - Some(rx) + let _ = self + .doc_manager + .handle_client_revisions(user, document_client_data) + .await?; }, DocumentClientWSDataType::ClientPing => { - todo!() + let _ = self.doc_manager.handle_client_ping(user, document_client_data).await?; }, } + Ok(()) } } -struct MockDocServerPersistence { +struct LocalDocServerPersistence { inner: Arc>, } -impl Debug for MockDocServerPersistence { +impl Debug for LocalDocServerPersistence { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("MockDocServerPersistence") } } -impl std::default::Default for MockDocServerPersistence { +impl std::default::Default for LocalDocServerPersistence { fn default() -> Self { - MockDocServerPersistence { + LocalDocServerPersistence { inner: Arc::new(DashMap::new()), } } } -impl DocumentPersistence for MockDocServerPersistence { +impl DocumentPersistence for LocalDocServerPersistence { fn read_doc(&self, doc_id: &str) -> BoxResultFuture { let inner = self.inner.clone(); let doc_id = doc_id.to_owned(); @@ -118,16 +136,16 @@ impl DocumentPersistence for MockDocServerPersistence { } #[derive(Debug)] -struct MockDocUser { +struct LocalDocumentUser { user_id: String, - tx: mpsc::Sender, + ws_sender: mpsc::UnboundedSender, } -impl RevisionUser for MockDocUser { +impl RevisionUser for LocalDocumentUser { fn user_id(&self) -> String { self.user_id.clone() } fn receive(&self, resp: SyncResponse) { - let sender = self.tx.clone(); + let sender = self.ws_sender.clone(); tokio::spawn(async move { match resp { SyncResponse::Pull(data) => { @@ -136,7 +154,7 @@ impl RevisionUser for MockDocUser { module: WSModule::Doc, data: bytes.to_vec(), }; - sender.send(msg).await.unwrap(); + sender.send(msg).unwrap(); }, SyncResponse::Push(data) => { let bytes: Bytes = data.try_into().unwrap(); @@ -144,7 +162,7 @@ impl RevisionUser for MockDocUser { module: WSModule::Doc, data: bytes.to_vec(), }; - sender.send(msg).await.unwrap(); + sender.send(msg).unwrap(); }, SyncResponse::Ack(data) => { let bytes: Bytes = data.try_into().unwrap(); @@ -152,7 +170,7 @@ impl RevisionUser for MockDocUser { module: WSModule::Doc, data: bytes.to_vec(), }; - sender.send(msg).await.unwrap(); + sender.send(msg).unwrap(); }, SyncResponse::NewRevision(_) => { // unimplemented!() diff --git a/frontend/rust-lib/flowy-net/src/services/local_ws/local_ws_impl.rs b/frontend/rust-lib/flowy-net/src/services/local_ws/local_ws_impl.rs new file mode 100644 index 0000000000..f92d8f3b77 --- /dev/null +++ b/frontend/rust-lib/flowy-net/src/services/local_ws/local_ws_impl.rs @@ -0,0 +1,105 @@ +use bytes::Bytes; +use dashmap::DashMap; +use flowy_collaboration::entities::ws::*; +use flowy_error::{internal_error, FlowyError}; +use lib_infra::future::FutureResult; +use lib_ws::{WSConnectState, WSMessageReceiver, WSModule, WebSocketRawMessage}; + +use crate::services::{ + local_ws::local_server::{spawn_server, LocalDocumentServer}, + ws_conn::{FlowyRawWebSocket, FlowyWSSender}, +}; +use std::{convert::TryFrom, sync::Arc}; +use tokio::sync::{broadcast, broadcast::Receiver}; + +pub struct LocalWebSocket { + receivers: Arc>>, + state_sender: broadcast::Sender, + ws_sender: LocalWSSender, + server: Arc, +} + +impl std::default::Default for LocalWebSocket { + fn default() -> Self { + let (state_sender, _) = broadcast::channel(16); + let ws_sender = LocalWSSender::default(); + let receivers = Arc::new(DashMap::new()); + let server = spawn_server(receivers.clone()); + + LocalWebSocket { + receivers, + state_sender, + ws_sender, + server, + } + } +} + +impl LocalWebSocket { + fn spawn_client(&self, _addr: String) { + let mut ws_receiver = self.ws_sender.subscribe(); + let server = self.server.clone(); + tokio::spawn(async move { + loop { + match ws_receiver.recv().await { + Ok(message) => { + let fut = || async { + let bytes = Bytes::from(message.data); + let client_data = DocumentClientWSData::try_from(bytes).map_err(internal_error)?; + let _ = server.handle_client_data(client_data).await?; + Ok::<(), FlowyError>(()) + }; + match fut().await { + Ok(_) => {}, + Err(e) => tracing::error!("[LocalWebSocket] error: {:?}", e), + } + }, + Err(e) => tracing::error!("[LocalWebSocket] error: {}", e), + } + } + }); + } +} + +impl FlowyRawWebSocket for LocalWebSocket { + fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError> { + self.spawn_client(addr); + FutureResult::new(async { Ok(()) }) + } + + fn stop_connect(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } + + fn subscribe_connect_state(&self) -> Receiver { self.state_sender.subscribe() } + + fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } + + fn add_receiver(&self, receiver: Arc) -> Result<(), FlowyError> { + self.receivers.insert(receiver.source(), receiver); + Ok(()) + } + + fn sender(&self) -> Result, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) } +} + +#[derive(Clone)] +struct LocalWSSender(broadcast::Sender); + +impl std::default::Default for LocalWSSender { + fn default() -> Self { + let (tx, _) = broadcast::channel(16); + Self(tx) + } +} + +impl FlowyWSSender for LocalWSSender { + fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> { + let _ = self.0.send(msg); + Ok(()) + } +} + +impl std::ops::Deref for LocalWSSender { + type Target = broadcast::Sender; + + fn deref(&self) -> &Self::Target { &self.0 } +} diff --git a/frontend/rust-lib/flowy-net/src/services/local_ws/mod.rs b/frontend/rust-lib/flowy-net/src/services/local_ws/mod.rs new file mode 100644 index 0000000000..2191e9e0ce --- /dev/null +++ b/frontend/rust-lib/flowy-net/src/services/local_ws/mod.rs @@ -0,0 +1,4 @@ +mod local_server; +mod local_ws_impl; + +pub use local_ws_impl::*; diff --git a/frontend/rust-lib/flowy-net/src/services/mod.rs b/frontend/rust-lib/flowy-net/src/services/mod.rs index 2ffb9de4c7..5dcb797055 100644 --- a/frontend/rust-lib/flowy-net/src/services/mod.rs +++ b/frontend/rust-lib/flowy-net/src/services/mod.rs @@ -1,4 +1,3 @@ -pub mod ws; - -// #[cfg(feature = "flowy_unit_test")] -// mod mock; +pub mod http_ws; +pub mod local_ws; +pub mod ws_conn; diff --git a/frontend/rust-lib/flowy-net/src/services/ws/mod.rs b/frontend/rust-lib/flowy-net/src/services/ws/mod.rs deleted file mode 100644 index 2fe6a324b2..0000000000 --- a/frontend/rust-lib/flowy-net/src/services/ws/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub use conn::*; - -mod conn; diff --git a/frontend/rust-lib/flowy-net/src/services/ws/conn.rs b/frontend/rust-lib/flowy-net/src/services/ws_conn.rs similarity index 59% rename from frontend/rust-lib/flowy-net/src/services/ws/conn.rs rename to frontend/rust-lib/flowy-net/src/services/ws_conn.rs index 3e734cf1fc..5ee78765a8 100644 --- a/frontend/rust-lib/flowy-net/src/services/ws/conn.rs +++ b/frontend/rust-lib/flowy-net/src/services/ws_conn.rs @@ -1,37 +1,37 @@ use crate::entities::NetworkType; -use flowy_error::internal_error; + pub use flowy_error::FlowyError; use lib_infra::future::FutureResult; pub use lib_ws::{WSConnectState, WSMessageReceiver, WebSocketRawMessage}; -use lib_ws::{WSController, WSSender}; + use parking_lot::RwLock; use std::sync::Arc; -use tokio::sync::{broadcast, broadcast::Receiver}; +use tokio::sync::broadcast; -pub trait FlowyWebSocket: Send + Sync { +pub trait FlowyRawWebSocket: Send + Sync { fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError>; fn stop_connect(&self) -> FutureResult<(), FlowyError>; fn subscribe_connect_state(&self) -> broadcast::Receiver; fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError>; - fn add_message_receiver(&self, handler: Arc) -> Result<(), FlowyError>; - fn ws_sender(&self) -> Result, FlowyError>; + fn add_receiver(&self, receiver: Arc) -> Result<(), FlowyError>; + fn sender(&self) -> Result, FlowyError>; } pub trait FlowyWSSender: Send + Sync { fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError>; } -pub struct FlowyWSConnect { - inner: Arc, +pub struct FlowyWebSocketConnect { + inner: Arc, connect_type: RwLock, status_notifier: broadcast::Sender, addr: String, } -impl FlowyWSConnect { - pub fn new(addr: String, ws: Arc) -> Self { +impl FlowyWebSocketConnect { + pub fn new(addr: String, ws: Arc) -> Self { let (status_notifier, _) = broadcast::channel(10); - FlowyWSConnect { + FlowyWebSocketConnect { inner: ws, connect_type: RwLock::new(NetworkType::default()), status_notifier, @@ -76,19 +76,19 @@ impl FlowyWSConnect { pub fn subscribe_network_ty(&self) -> broadcast::Receiver { self.status_notifier.subscribe() } - pub fn add_receiver(&self, handler: Arc) -> Result<(), FlowyError> { - let _ = self.inner.add_message_receiver(handler)?; + pub fn add_ws_message_receiver(&self, receiver: Arc) -> Result<(), FlowyError> { + let _ = self.inner.add_receiver(receiver)?; Ok(()) } - pub fn ws_sender(&self) -> Result, FlowyError> { self.inner.ws_sender() } + pub fn ws_sender(&self) -> Result, FlowyError> { self.inner.sender() } } -#[tracing::instrument(level = "debug", skip(manager))] -pub fn listen_on_websocket(manager: Arc) { +#[tracing::instrument(level = "debug", skip(ws_conn))] +pub fn listen_on_websocket(ws_conn: Arc) { if cfg!(feature = "http_server") { - let ws = manager.inner.clone(); - let mut notify = manager.inner.subscribe_connect_state(); + let ws = ws_conn.inner.clone(); + let mut notify = ws_conn.inner.subscribe_connect_state(); let _ = tokio::spawn(async move { loop { match notify.recv().await { @@ -113,7 +113,7 @@ pub fn listen_on_websocket(manager: Arc) { }; } -async fn retry_connect(ws: Arc, count: usize) { +async fn retry_connect(ws: Arc, count: usize) { match ws.reconnect(count).await { Ok(_) => {}, Err(e) => { @@ -121,48 +121,3 @@ async fn retry_connect(ws: Arc, count: usize) { }, } } - -impl FlowyWebSocket for Arc { - fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError> { - let cloned_ws = self.clone(); - FutureResult::new(async move { - let _ = cloned_ws.start(addr).await.map_err(internal_error)?; - Ok(()) - }) - } - - fn stop_connect(&self) -> FutureResult<(), FlowyError> { - let controller = self.clone(); - FutureResult::new(async move { - controller.stop().await; - Ok(()) - }) - } - - fn subscribe_connect_state(&self) -> Receiver { self.subscribe_state() } - - fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError> { - let cloned_ws = self.clone(); - FutureResult::new(async move { - let _ = cloned_ws.retry(count).await.map_err(internal_error)?; - Ok(()) - }) - } - - fn add_message_receiver(&self, handler: Arc) -> Result<(), FlowyError> { - let _ = self.add_receiver(handler).map_err(internal_error)?; - Ok(()) - } - - fn ws_sender(&self) -> Result, FlowyError> { - let sender = self.sender().map_err(internal_error)?; - Ok(sender) - } -} - -impl FlowyWSSender for WSSender { - fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> { - let _ = self.send_msg(msg).map_err(internal_error)?; - Ok(()) - } -} diff --git a/frontend/rust-lib/flowy-sdk/Cargo.toml b/frontend/rust-lib/flowy-sdk/Cargo.toml index 8ecf0ae451..d1dcd821f4 100644 --- a/frontend/rust-lib/flowy-sdk/Cargo.toml +++ b/frontend/rust-lib/flowy-sdk/Cargo.toml @@ -10,7 +10,6 @@ lib-dispatch = { path = "../lib-dispatch" } lib-log = { path = "../lib-log" } flowy-user = { path = "../flowy-user" } flowy-net = { path = "../flowy-net" } -flowy-virtual-net = { path = "../flowy-virtual-net" } flowy-core = { path = "../flowy-core", default-features = false } flowy-database = { path = "../flowy-database" } flowy-document = { path = "../flowy-document" } diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs index 74ccb222d4..f12cdb2665 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs @@ -6,7 +6,7 @@ use flowy_document::{ core::{DocumentWSReceivers, DocumentWebSocket, WSStateReceiver}, errors::{internal_error, FlowyError}, }; -use flowy_net::services::ws::FlowyWSConnect; +use flowy_net::services::ws_conn::FlowyWebSocketConnect; use flowy_user::services::user::UserSession; use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage}; use std::{convert::TryInto, path::Path, sync::Arc}; @@ -14,7 +14,7 @@ use std::{convert::TryInto, path::Path, sync::Arc}; pub struct DocumentDepsResolver(); impl DocumentDepsResolver { pub fn resolve( - ws_manager: Arc, + ws_conn: Arc, user_session: Arc, ) -> ( Arc, @@ -24,11 +24,11 @@ impl DocumentDepsResolver { let user = Arc::new(DocumentUserImpl { user: user_session }); let ws_sender = Arc::new(DocumentWebSocketAdapter { - ws_manager: ws_manager.clone(), + ws_conn: ws_conn.clone(), }); let ws_receivers = Arc::new(DocumentWSReceivers::new()); let receiver = Arc::new(WSMessageReceiverAdaptor(ws_receivers.clone())); - ws_manager.add_receiver(receiver).unwrap(); + ws_conn.add_ws_message_receiver(receiver).unwrap(); (user, ws_receivers, ws_sender) } } @@ -61,7 +61,7 @@ impl DocumentUser for DocumentUserImpl { } struct DocumentWebSocketAdapter { - ws_manager: Arc, + ws_conn: Arc, } impl DocumentWebSocket for DocumentWebSocketAdapter { @@ -71,13 +71,12 @@ impl DocumentWebSocket for DocumentWebSocketAdapter { module: WSModule::Doc, data: bytes.to_vec(), }; - let sender = self.ws_manager.ws_sender().map_err(internal_error)?; + let sender = self.ws_conn.ws_sender().map_err(internal_error)?; sender.send(msg).map_err(internal_error)?; - Ok(()) } - fn subscribe_state_changed(&self) -> WSStateReceiver { self.ws_manager.subscribe_websocket_state() } + fn subscribe_state_changed(&self) -> WSStateReceiver { self.ws_conn.subscribe_websocket_state() } } struct WSMessageReceiverAdaptor(Arc); diff --git a/frontend/rust-lib/flowy-sdk/src/lib.rs b/frontend/rust-lib/flowy-sdk/src/lib.rs index bca2760b16..26a4d7468c 100644 --- a/frontend/rust-lib/flowy-sdk/src/lib.rs +++ b/frontend/rust-lib/flowy-sdk/src/lib.rs @@ -6,40 +6,63 @@ use flowy_core::{context::CoreContext, errors::FlowyError, module::init_core}; use flowy_document::context::DocumentContext; use flowy_net::{ entities::NetworkType, - services::ws::{listen_on_websocket, FlowyWSConnect, FlowyWebSocket}, + services::{ + local_ws::LocalWebSocket, + ws_conn::{listen_on_websocket, FlowyRawWebSocket, FlowyWebSocketConnect}, + }, }; use flowy_user::{ prelude::UserStatus, services::user::{UserSession, UserSessionConfig}, }; -use flowy_virtual_net::local_web_socket; use lib_dispatch::prelude::*; use lib_ws::WSController; use module::mk_modules; pub use module::*; -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, +use std::{ + fmt, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, }; use tokio::sync::broadcast; static INIT_LOG: AtomicBool = AtomicBool::new(false); -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct FlowySDKConfig { name: String, root: String, log_filter: String, server_config: ClientServerConfiguration, + ws: Arc, +} + +impl fmt::Debug for FlowySDKConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FlowySDKConfig") + .field("name", &self.name) + .field("root", &self.root) + .field("server_config", &self.server_config) + .finish() + } } impl FlowySDKConfig { - pub fn new(root: &str, server_config: ClientServerConfiguration, name: &str) -> Self { + pub fn new( + root: &str, + server_config: ClientServerConfiguration, + name: &str, + ws: Option>, + ) -> Self { + let ws = ws.unwrap_or_else(default_web_socket); FlowySDKConfig { name: name.to_owned(), root: root.to_owned(), log_filter: crate_log_filter(None), server_config, + ws, } } @@ -73,7 +96,7 @@ pub struct FlowySDK { pub document_ctx: Arc, pub core: Arc, pub dispatcher: Arc, - pub ws_manager: Arc, + pub ws_conn: Arc, } impl FlowySDK { @@ -82,21 +105,18 @@ impl FlowySDK { init_kv(&config.root); tracing::debug!("🔥 {:?}", config); - let ws: Arc = if cfg!(feature = "http_server") { - Arc::new(Arc::new(WSController::new())) - } else { - local_web_socket() - }; - - let ws_manager = Arc::new(FlowyWSConnect::new(config.server_config.ws_addr(), ws)); + let ws_conn = Arc::new(FlowyWebSocketConnect::new( + config.server_config.ws_addr(), + config.ws.clone(), + )); let user_session = mk_user_session(&config); - let flowy_document = mk_document(ws_manager.clone(), user_session.clone(), &config.server_config); - let core_ctx = mk_core_context(user_session.clone(), flowy_document.clone(), &config.server_config); + let flowy_document = mk_document(&ws_conn, &user_session, &config.server_config); + let core_ctx = mk_core_context(&user_session, &flowy_document, &config.server_config); // - let modules = mk_modules(ws_manager.clone(), core_ctx.clone(), user_session.clone()); + let modules = mk_modules(&ws_conn, &core_ctx, &user_session); let dispatcher = Arc::new(EventDispatcher::construct(|| modules)); - _init(&dispatcher, ws_manager.clone(), user_session.clone(), core_ctx.clone()); + _init(&dispatcher, &ws_conn, &user_session, &core_ctx); Self { config, @@ -104,7 +124,7 @@ impl FlowySDK { document_ctx: flowy_document, core: core_ctx, dispatcher, - ws_manager, + ws_conn, } } @@ -113,18 +133,21 @@ impl FlowySDK { fn _init( dispatch: &EventDispatcher, - ws_manager: Arc, - user_session: Arc, - core: Arc, + ws_conn: &Arc, + user_session: &Arc, + core: &Arc, ) { let subscribe_user_status = user_session.notifier.subscribe_user_status(); - let subscribe_network_type = ws_manager.subscribe_network_ty(); + let subscribe_network_type = ws_conn.subscribe_network_ty(); + let core = core.clone(); let cloned_core = core.clone(); + let user_session = user_session.clone(); + let ws_conn = ws_conn.clone(); dispatch.spawn(async move { user_session.init(); - listen_on_websocket(ws_manager.clone()); - _listen_user_status(ws_manager.clone(), subscribe_user_status, core.clone()).await; + listen_on_websocket(ws_conn.clone()); + _listen_user_status(ws_conn.clone(), subscribe_user_status, core.clone()).await; }); dispatch.spawn(async move { @@ -133,7 +156,7 @@ fn _init( } async fn _listen_user_status( - ws_manager: Arc, + ws_conn: Arc, mut subscribe: broadcast::Receiver, core: Arc, ) { @@ -142,19 +165,19 @@ async fn _listen_user_status( match status { UserStatus::Login { token } => { let _ = core.user_did_sign_in(&token).await?; - let _ = ws_manager.start(token).await?; + let _ = ws_conn.start(token).await?; }, UserStatus::Logout { .. } => { core.user_did_logout().await; - let _ = ws_manager.stop().await; + let _ = ws_conn.stop().await; }, UserStatus::Expired { .. } => { core.user_session_expired().await; - let _ = ws_manager.stop().await; + let _ = ws_conn.stop().await; }, UserStatus::SignUp { profile, ret } => { let _ = core.user_did_sign_up(&profile.token).await?; - let _ = ws_manager.start(profile.token.clone()).await?; + let _ = ws_conn.start(profile.token.clone()).await?; let _ = ret.send(()); }, } @@ -198,20 +221,28 @@ fn mk_user_session(config: &FlowySDKConfig) -> Arc { } fn mk_core_context( - user_session: Arc, - flowy_document: Arc, + user_session: &Arc, + flowy_document: &Arc, server_config: &ClientServerConfiguration, ) -> Arc { - let workspace_deps = WorkspaceDepsResolver::new(user_session); + let workspace_deps = WorkspaceDepsResolver::new(user_session.clone()); let (user, database) = workspace_deps.split_into(); - init_core(user, database, flowy_document, server_config) + init_core(user, database, flowy_document.clone(), server_config) +} + +fn default_web_socket() -> Arc { + if cfg!(feature = "http_server") { + Arc::new(Arc::new(WSController::new())) + } else { + Arc::new(LocalWebSocket::default()) + } } pub fn mk_document( - ws_manager: Arc, - user_session: Arc, + ws_manager: &Arc, + user_session: &Arc, server_config: &ClientServerConfiguration, ) -> Arc { - let (user, ws_receivers, ws_sender) = DocumentDepsResolver::resolve(ws_manager, user_session); + let (user, ws_receivers, ws_sender) = DocumentDepsResolver::resolve(ws_manager.clone(), user_session.clone()); Arc::new(DocumentContext::new(user, ws_receivers, ws_sender, server_config)) } diff --git a/frontend/rust-lib/flowy-sdk/src/module.rs b/frontend/rust-lib/flowy-sdk/src/module.rs index af6a7ce455..7f2c9c5691 100644 --- a/frontend/rust-lib/flowy-sdk/src/module.rs +++ b/frontend/rust-lib/flowy-sdk/src/module.rs @@ -1,17 +1,17 @@ use flowy_core::context::CoreContext; -use flowy_net::services::ws::FlowyWSConnect; +use flowy_net::services::ws_conn::FlowyWebSocketConnect; use flowy_user::services::user::UserSession; use lib_dispatch::prelude::Module; use std::sync::Arc; pub fn mk_modules( - ws_manager: Arc, - core: Arc, - user_session: Arc, + ws_conn: &Arc, + core: &Arc, + user_session: &Arc, ) -> Vec { - let user_module = mk_user_module(user_session); - let core_module = mk_core_module(core); - let network_module = mk_network_module(ws_manager); + let user_module = mk_user_module(user_session.clone()); + let core_module = mk_core_module(core.clone()); + let network_module = mk_network_module(ws_conn.clone()); vec![user_module, core_module, network_module] } @@ -19,4 +19,4 @@ fn mk_user_module(user_session: Arc) -> Module { flowy_user::module fn mk_core_module(core: Arc) -> Module { flowy_core::module::create(core) } -fn mk_network_module(ws_manager: Arc) -> Module { flowy_net::module::create(ws_manager) } +fn mk_network_module(ws_conn: Arc) -> Module { flowy_net::module::create(ws_conn) } diff --git a/frontend/rust-lib/flowy-test/Cargo.toml b/frontend/rust-lib/flowy-test/Cargo.toml index 32c4a804a6..6798a18e5f 100644 --- a/frontend/rust-lib/flowy-test/Cargo.toml +++ b/frontend/rust-lib/flowy-test/Cargo.toml @@ -35,5 +35,4 @@ quickcheck_macros = "0.9.1" fake = "~2.3.0" claim = "0.4.0" futures = "0.3.15" -serial_test = "0.5.1" -flowy-virtual-net = { path = "../flowy-virtual-net", features = ["flowy_unit_test"] } \ No newline at end of file +serial_test = "0.5.1" \ No newline at end of file diff --git a/frontend/rust-lib/flowy-test/src/doc_script.rs b/frontend/rust-lib/flowy-test/src/doc_script.rs index 4111b9eac6..8e1772b244 100644 --- a/frontend/rust-lib/flowy-test/src/doc_script.rs +++ b/frontend/rust-lib/flowy-test/src/doc_script.rs @@ -1,4 +1,5 @@ use crate::{helper::ViewTest, FlowySDKTest}; +use backend_service::configuration::get_client_server_configuration; use flowy_collaboration::entities::revision::RevisionState; use flowy_document::core::{edit::ClientDocumentEditor, SYNC_INTERVAL_IN_MILLIS}; use lib_ot::{core::Interval, rich_text::RichTextDelta}; @@ -6,8 +7,6 @@ use std::sync::Arc; use tokio::time::{sleep, Duration}; pub enum EditorScript { - StartWs, - StopWs, InsertText(&'static str, usize), Delete(Interval), Replace(Interval, &'static str), @@ -16,8 +15,6 @@ pub enum EditorScript { AssertNextRevId(Option), AssertCurrentRevId(i64), AssertJson(&'static str), - - WaitSyncFinished, } pub struct EditorTest { @@ -27,10 +24,11 @@ pub struct EditorTest { impl EditorTest { pub async fn new() -> Self { - let sdk = FlowySDKTest::setup(); + let server_config = get_client_server_configuration().unwrap(); + let sdk = FlowySDKTest::new(server_config, None); let _ = sdk.init_user().await; let test = ViewTest::new(&sdk).await; - let editor = sdk.document_ctx.controller.open(&test.view.id).await.unwrap(); + let editor = sdk.document_ctx.controller.open_document(&test.view.id).await.unwrap(); Self { sdk, editor } } @@ -46,17 +44,11 @@ impl EditorTest { let rev_manager = self.editor.rev_manager(); let cache = rev_manager.revision_cache(); let _user_id = self.sdk.user_session.user_id().unwrap(); - let ws_manager = self.sdk.ws_manager.clone(); - let token = self.sdk.user_session.token().unwrap(); + // let ws_manager = self.sdk.ws_conn.clone(); + // let token = self.sdk.user_session.token().unwrap(); let wait_millis = 2 * SYNC_INTERVAL_IN_MILLIS; match script { - EditorScript::StartWs => { - ws_manager.start(token.clone()).await.unwrap(); - }, - EditorScript::StopWs => { - ws_manager.stop().await; - }, EditorScript::InsertText(s, offset) => { self.editor.insert(offset, s).await.unwrap(); }, @@ -91,10 +83,6 @@ impl EditorTest { } assert_eq!(expected_delta, delta); }, - EditorScript::WaitSyncFinished => { - // Workaround: just wait two seconds - sleep(Duration::from_millis(2000)).await; - }, } sleep(Duration::from_millis(wait_millis)).await; } diff --git a/frontend/rust-lib/flowy-test/src/helper.rs b/frontend/rust-lib/flowy-test/src/helper.rs index 52e6805791..fc51050b92 100644 --- a/frontend/rust-lib/flowy-test/src/helper.rs +++ b/frontend/rust-lib/flowy-test/src/helper.rs @@ -28,7 +28,7 @@ pub struct WorkspaceTest { impl WorkspaceTest { pub async fn new() -> Self { - let sdk = FlowySDKTest::setup(); + let sdk = FlowySDKTest::default(); let _ = sdk.init_user().await; let workspace = create_workspace(&sdk, "Workspace", "").await; open_workspace(&sdk, &workspace.id).await; @@ -45,7 +45,7 @@ pub struct AppTest { impl AppTest { pub async fn new() -> Self { - let sdk = FlowySDKTest::setup(); + let sdk = FlowySDKTest::default(); let _ = sdk.init_user().await; let workspace = create_workspace(&sdk, "Workspace", "").await; open_workspace(&sdk, &workspace.id).await; diff --git a/frontend/rust-lib/flowy-test/src/lib.rs b/frontend/rust-lib/flowy-test/src/lib.rs index 44514e01ab..12ece7b4ff 100644 --- a/frontend/rust-lib/flowy-test/src/lib.rs +++ b/frontend/rust-lib/flowy-test/src/lib.rs @@ -4,9 +4,11 @@ pub mod helper; use crate::helper::*; use backend_service::configuration::{get_client_server_configuration, ClientServerConfiguration}; +use flowy_net::services::ws_conn::FlowyRawWebSocket; use flowy_sdk::{FlowySDK, FlowySDKConfig}; use flowy_user::entities::UserProfile; use lib_infra::uuid_string; +use std::sync::Arc; pub mod prelude { pub use crate::{event_builder::*, helper::*, *}; @@ -14,36 +16,42 @@ pub mod prelude { } #[derive(Clone)] -pub struct FlowySDKTest(pub FlowySDK); +pub struct FlowySDKTest { + pub inner: FlowySDK, + pub ws: Option>, +} impl std::ops::Deref for FlowySDKTest { type Target = FlowySDK; - fn deref(&self) -> &Self::Target { &self.0 } + fn deref(&self) -> &Self::Target { &self.inner } } -impl FlowySDKTest { - pub fn setup() -> Self { +impl std::default::Default for FlowySDKTest { + fn default() -> Self { let server_config = get_client_server_configuration().unwrap(); - let sdk = Self::setup_with(server_config); + let sdk = Self::new(server_config, None); std::mem::forget(sdk.dispatcher()); sdk } +} - pub fn setup_with(server_config: ClientServerConfiguration) -> Self { - let config = FlowySDKConfig::new(&root_dir(), server_config, &uuid_string()).log_filter("debug"); +impl FlowySDKTest { + pub fn new(server_config: ClientServerConfiguration, ws: Option>) -> Self { + let config = FlowySDKConfig::new(&root_dir(), server_config, &uuid_string(), None).log_filter("debug"); let sdk = FlowySDK::new(config); - Self(sdk) + std::mem::forget(sdk.dispatcher()); + Self { inner: sdk, ws } } pub async fn sign_up(&self) -> SignUpContext { - let context = async_sign_up(self.0.dispatcher()).await; + let context = async_sign_up(self.inner.dispatcher()).await; context } pub async fn init_user(&self) -> UserProfile { - let context = async_sign_up(self.0.dispatcher()).await; - init_user_setting(self.0.dispatcher()).await; + let context = async_sign_up(self.inner.dispatcher()).await; + init_user_setting(self.inner.dispatcher()).await; context.user_profile } } diff --git a/frontend/rust-lib/flowy-test/tests/main.rs b/frontend/rust-lib/flowy-test/tests/main.rs index 67cf0d2b80..3eb8b414b2 100644 --- a/frontend/rust-lib/flowy-test/tests/main.rs +++ b/frontend/rust-lib/flowy-test/tests/main.rs @@ -1 +1 @@ -// mod revision_test; +mod revision_test; diff --git a/frontend/rust-lib/flowy-test/tests/revision_test.rs b/frontend/rust-lib/flowy-test/tests/revision_test.rs index 49b2e47236..2751c86b0a 100644 --- a/frontend/rust-lib/flowy-test/tests/revision_test.rs +++ b/frontend/rust-lib/flowy-test/tests/revision_test.rs @@ -17,11 +17,8 @@ async fn doc_sync_test() { async fn doc_sync_retry_ws_conn() { let scripts = vec![ InsertText("1", 0), - StopWs, InsertText("2", 1), InsertText("3", 2), - StartWs, - WaitSyncFinished, AssertRevisionState(2, RevisionState::Ack), AssertRevisionState(3, RevisionState::Ack), AssertNextRevId(None), diff --git a/frontend/rust-lib/flowy-user/tests/event/auth_test.rs b/frontend/rust-lib/flowy-user/tests/event/auth_test.rs index a0490927af..899eb48731 100644 --- a/frontend/rust-lib/flowy-user/tests/event/auth_test.rs +++ b/frontend/rust-lib/flowy-user/tests/event/auth_test.rs @@ -5,7 +5,7 @@ use flowy_user::{errors::ErrorCode, event::UserEvent::*, prelude::*}; #[tokio::test] async fn sign_up_with_invalid_email() { for email in invalid_email_test_case() { - let sdk = FlowySDKTest::setup(); + let sdk = FlowySDKTest::default(); let request = SignUpRequest { email: email.to_string(), name: valid_name(), @@ -27,7 +27,7 @@ async fn sign_up_with_invalid_email() { #[tokio::test] async fn sign_up_with_invalid_password() { for password in invalid_password_test_case() { - let sdk = FlowySDKTest::setup(); + let sdk = FlowySDKTest::default(); let request = SignUpRequest { email: random_email(), name: valid_name(), @@ -45,7 +45,7 @@ async fn sign_up_with_invalid_password() { #[tokio::test] async fn sign_in_success() { - let test = FlowySDKTest::setup(); + let test = FlowySDKTest::default(); let _ = UserModuleEventBuilder::new(test.clone()).event(SignOut).sync_send(); let sign_up_context = test.sign_up().await; @@ -67,7 +67,7 @@ async fn sign_in_success() { #[tokio::test] async fn sign_in_with_invalid_email() { for email in invalid_email_test_case() { - let sdk = FlowySDKTest::setup(); + let sdk = FlowySDKTest::default(); let request = SignInRequest { email: email.to_string(), password: login_password(), @@ -90,7 +90,7 @@ async fn sign_in_with_invalid_email() { #[tokio::test] async fn sign_in_with_invalid_password() { for password in invalid_password_test_case() { - let sdk = FlowySDKTest::setup(); + let sdk = FlowySDKTest::default(); let request = SignInRequest { email: random_email(), diff --git a/frontend/rust-lib/flowy-user/tests/event/user_profile_test.rs b/frontend/rust-lib/flowy-user/tests/event/user_profile_test.rs index 15923b0125..9fef50600f 100644 --- a/frontend/rust-lib/flowy-user/tests/event/user_profile_test.rs +++ b/frontend/rust-lib/flowy-user/tests/event/user_profile_test.rs @@ -6,7 +6,7 @@ use serial_test::*; #[tokio::test] async fn user_profile_get_failed() { - let sdk = FlowySDKTest::setup(); + let sdk = FlowySDKTest::default(); let result = UserModuleEventBuilder::new(sdk) .event(GetUserProfile) .assert_error() @@ -18,7 +18,7 @@ async fn user_profile_get_failed() { #[tokio::test] #[serial] async fn user_profile_get() { - let test = FlowySDKTest::setup(); + let test = FlowySDKTest::default(); let user_profile = test.init_user().await; let user = UserModuleEventBuilder::new(test.clone()) .event(GetUserProfile) @@ -30,7 +30,7 @@ async fn user_profile_get() { #[tokio::test] #[serial] async fn user_update_with_name() { - let sdk = FlowySDKTest::setup(); + let sdk = FlowySDKTest::default(); let user = sdk.init_user().await; let new_name = "hello_world".to_owned(); let request = UpdateUserRequest::new(&user.id).name(&new_name); @@ -51,7 +51,7 @@ async fn user_update_with_name() { #[tokio::test] #[serial] async fn user_update_with_email() { - let sdk = FlowySDKTest::setup(); + let sdk = FlowySDKTest::default(); let user = sdk.init_user().await; let new_email = format!("{}@gmail.com", uuid_string()); let request = UpdateUserRequest::new(&user.id).email(&new_email); @@ -71,7 +71,7 @@ async fn user_update_with_email() { #[tokio::test] #[serial] async fn user_update_with_password() { - let sdk = FlowySDKTest::setup(); + let sdk = FlowySDKTest::default(); let user = sdk.init_user().await; let new_password = "H123world!".to_owned(); let request = UpdateUserRequest::new(&user.id).password(&new_password); @@ -86,7 +86,7 @@ async fn user_update_with_password() { #[tokio::test] #[serial] async fn user_update_with_invalid_email() { - let test = FlowySDKTest::setup(); + let test = FlowySDKTest::default(); let user = test.init_user().await; for email in invalid_email_test_case() { let request = UpdateUserRequest::new(&user.id).email(&email); @@ -105,7 +105,7 @@ async fn user_update_with_invalid_email() { #[tokio::test] #[serial] async fn user_update_with_invalid_password() { - let test = FlowySDKTest::setup(); + let test = FlowySDKTest::default(); let user = test.init_user().await; for password in invalid_password_test_case() { let request = UpdateUserRequest::new(&user.id).password(&password); @@ -121,7 +121,7 @@ async fn user_update_with_invalid_password() { #[tokio::test] #[serial] async fn user_update_with_invalid_name() { - let test = FlowySDKTest::setup(); + let test = FlowySDKTest::default(); let user = test.init_user().await; let request = UpdateUserRequest::new(&user.id).name(""); UserModuleEventBuilder::new(test.clone()) diff --git a/frontend/rust-lib/flowy-virtual-net/Cargo.toml b/frontend/rust-lib/flowy-virtual-net/Cargo.toml deleted file mode 100644 index cbc1b7ef87..0000000000 --- a/frontend/rust-lib/flowy-virtual-net/Cargo.toml +++ /dev/null @@ -1,23 +0,0 @@ -[package] -name = "flowy-virtual-net" -version = "0.1.0" -edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -lib-ws = { path = "../../../shared-lib/lib-ws" } -lib-infra = { path = "../../../shared-lib/lib-infra" } -flowy-net = { path = "../flowy-net" } -bytes = { version = "1.0" } -parking_lot = "0.11" -tokio = {version = "1", features = ["sync"]} -tracing = { version = "0.1", features = ["log"] } - -# flowy-collaboration and dashmap would be optional -flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration"} -dashmap = {version = "4.0"} - -[features] -flowy_unit_test = [] -http_server = [] \ No newline at end of file diff --git a/frontend/rust-lib/flowy-virtual-net/src/lib.rs b/frontend/rust-lib/flowy-virtual-net/src/lib.rs deleted file mode 100644 index 8d1b774812..0000000000 --- a/frontend/rust-lib/flowy-virtual-net/src/lib.rs +++ /dev/null @@ -1,12 +0,0 @@ -use flowy_net::services::ws::FlowyWebSocket; -use std::sync::Arc; -mod ws; - -#[cfg(not(feature = "flowy_unit_test"))] -pub fn local_web_socket() -> Arc { Arc::new(ws::LocalWebSocket::default()) } - -#[cfg(feature = "flowy_unit_test")] -mod mock; - -#[cfg(feature = "flowy_unit_test")] -pub fn local_web_socket() -> Arc { Arc::new(crate::mock::MockWebSocket::default()) } diff --git a/frontend/rust-lib/flowy-virtual-net/src/mock/mod.rs b/frontend/rust-lib/flowy-virtual-net/src/mock/mod.rs deleted file mode 100644 index 2066369bbb..0000000000 --- a/frontend/rust-lib/flowy-virtual-net/src/mock/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod server; -mod ws_local; - -pub use ws_local::*; diff --git a/frontend/rust-lib/flowy-virtual-net/src/mock/ws_local.rs b/frontend/rust-lib/flowy-virtual-net/src/mock/ws_local.rs deleted file mode 100644 index 2af11e6241..0000000000 --- a/frontend/rust-lib/flowy-virtual-net/src/mock/ws_local.rs +++ /dev/null @@ -1,95 +0,0 @@ -use crate::mock::server::MockDocServer; -use bytes::Bytes; -use dashmap::DashMap; -use flowy_collaboration::entities::ws::*; -use flowy_net::services::ws::*; -use lib_infra::future::FutureResult; -use lib_ws::{WSModule, WebSocketRawMessage}; -use parking_lot::RwLock; -use std::{convert::TryFrom, sync::Arc}; -use tokio::sync::{broadcast, broadcast::Receiver}; - -pub struct MockWebSocket { - receivers: Arc>>, - state_sender: broadcast::Sender, - ws_sender: MockWSSender, - is_stop: Arc>, - server: Arc, -} - -impl std::default::Default for MockWebSocket { - fn default() -> Self { - let (state_sender, _) = broadcast::channel(16); - let (ws_sender, _) = broadcast::channel(16); - let server = Arc::new(MockDocServer::default()); - MockWebSocket { - receivers: Arc::new(DashMap::new()), - state_sender, - ws_sender: MockWSSender(ws_sender), - is_stop: Arc::new(RwLock::new(false)), - server, - } - } -} - -impl FlowyWebSocket for MockWebSocket { - fn start_connect(&self, _addr: String) -> FutureResult<(), FlowyError> { - *self.is_stop.write() = false; - - let mut ws_receiver = self.ws_sender.subscribe(); - let receivers = self.receivers.clone(); - let is_stop = self.is_stop.clone(); - let server = self.server.clone(); - tokio::spawn(async move { - while let Ok(message) = ws_receiver.recv().await { - if *is_stop.read() { - // do nothing - } else { - let ws_data = DocumentClientWSData::try_from(Bytes::from(message.data.clone())).unwrap(); - - if let Some(mut rx) = server.handle_client_data(ws_data).await { - let new_ws_message = rx.recv().await.unwrap(); - match receivers.get(&new_ws_message.module) { - None => tracing::error!("Can't find any handler for message: {:?}", new_ws_message), - Some(handler) => handler.receive_message(new_ws_message.clone()), - } - } - } - } - }); - - FutureResult::new(async { Ok(()) }) - } - - fn stop_connect(&self) -> FutureResult<(), FlowyError> { - *self.is_stop.write() = true; - FutureResult::new(async { Ok(()) }) - } - - fn subscribe_connect_state(&self) -> Receiver { self.state_sender.subscribe() } - - fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } - - fn add_message_receiver(&self, handler: Arc) -> Result<(), FlowyError> { - self.receivers.insert(handler.source(), handler); - Ok(()) - } - - fn ws_sender(&self) -> Result, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) } -} - -#[derive(Clone)] -pub struct MockWSSender(broadcast::Sender); - -impl FlowyWSSender for MockWSSender { - fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> { - let _ = self.0.send(msg); - Ok(()) - } -} - -impl std::ops::Deref for MockWSSender { - type Target = broadcast::Sender; - - fn deref(&self) -> &Self::Target { &self.0 } -} diff --git a/frontend/rust-lib/flowy-virtual-net/src/ws/mod.rs b/frontend/rust-lib/flowy-virtual-net/src/ws/mod.rs deleted file mode 100644 index 637a2655af..0000000000 --- a/frontend/rust-lib/flowy-virtual-net/src/ws/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod ws_local; - -pub use ws_local::*; diff --git a/frontend/rust-lib/flowy-virtual-net/src/ws/ws_local.rs b/frontend/rust-lib/flowy-virtual-net/src/ws/ws_local.rs deleted file mode 100644 index c50c415fc5..0000000000 --- a/frontend/rust-lib/flowy-virtual-net/src/ws/ws_local.rs +++ /dev/null @@ -1,55 +0,0 @@ -use flowy_net::services::ws::{ - FlowyError, - FlowyWSSender, - FlowyWebSocket, - WSConnectState, - WSMessageReceiver, - WebSocketRawMessage, -}; -use lib_infra::future::FutureResult; -use std::sync::Arc; -use tokio::sync::{broadcast, broadcast::Receiver}; - -pub(crate) struct LocalWebSocket { - state_sender: broadcast::Sender, - ws_sender: LocalWSSender, -} - -impl std::default::Default for LocalWebSocket { - fn default() -> Self { - let (state_sender, _) = broadcast::channel(16); - let (ws_sender, _) = broadcast::channel(16); - LocalWebSocket { - state_sender, - ws_sender: LocalWSSender(ws_sender), - } - } -} - -impl FlowyWebSocket for LocalWebSocket { - fn start_connect(&self, _addr: String) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } - - fn stop_connect(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } - - fn subscribe_connect_state(&self) -> Receiver { self.state_sender.subscribe() } - - fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } - - fn add_message_receiver(&self, _handler: Arc) -> Result<(), FlowyError> { Ok(()) } - - fn ws_sender(&self) -> Result, FlowyError> { Ok(Arc::new(self.ws_sender.clone())) } -} - -#[derive(Clone)] -pub struct LocalWSSender(broadcast::Sender); -impl FlowyWSSender for LocalWSSender { - fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> { - let _ = self.0.send(msg); - Ok(()) - } -} - -impl std::ops::Deref for LocalWSSender { - type Target = broadcast::Sender; - fn deref(&self) -> &Self::Target { &self.0 } -} diff --git a/shared-lib/flowy-collaboration/src/sync/server.rs b/shared-lib/flowy-collaboration/src/sync/server.rs index b1cecc1ecb..73148c88ac 100644 --- a/shared-lib/flowy-collaboration/src/sync/server.rs +++ b/shared-lib/flowy-collaboration/src/sync/server.rs @@ -123,11 +123,15 @@ impl ServerDocumentManager { } let mut write_guard = self.open_doc_map.write().await; - let doc = self.persistence.read_doc(doc_id).await.unwrap(); - let handler = self.create_document_handler(doc).await.map_err(internal_error).unwrap(); - write_guard.insert(doc_id.to_owned(), handler.clone()); - drop(write_guard); - Some(handler) + match self.persistence.read_doc(doc_id).await { + Ok(doc) => { + let handler = self.create_document_handler(doc).await.map_err(internal_error).unwrap(); + write_guard.insert(doc_id.to_owned(), handler.clone()); + drop(write_guard); + Some(handler) + }, + Err(_) => None, + } } #[tracing::instrument(level = "debug", skip(self, repeated_revision), err)] diff --git a/shared-lib/lib-ws/src/ws.rs b/shared-lib/lib-ws/src/ws.rs index 9824d42af5..386ec9b372 100644 --- a/shared-lib/lib-ws/src/ws.rs +++ b/shared-lib/lib-ws/src/ws.rs @@ -59,7 +59,7 @@ impl std::default::Default for WSController { impl WSController { pub fn new() -> Self { WSController::default() } - pub fn add_receiver(&self, handler: Arc) -> Result<(), WSError> { + pub fn add_ws_message_receiver(&self, handler: Arc) -> Result<(), WSError> { let source = handler.source(); if self.handlers.contains_key(&source) { log::error!("WsSource's {:?} is already registered", source); @@ -133,7 +133,7 @@ impl WSController { pub fn subscribe_state(&self) -> broadcast::Receiver { self.state_notify.subscribe() } - pub fn sender(&self) -> Result, WSError> { + pub fn ws_message_sender(&self) -> Result, WSError> { match self.sender_ctrl.read().sender() { None => Err(WSError::internal().context("WsSender is not initialized, should call connect first")), Some(sender) => Ok(sender),