diff --git a/backend/src/services/document/ws_actor.rs b/backend/src/services/document/ws_actor.rs index 0d5c0e5383..52c175a287 100644 --- a/backend/src/services/document/ws_actor.rs +++ b/backend/src/services/document/ws_actor.rs @@ -77,7 +77,7 @@ impl DocumentWebSocketActor { .map_err(internal_error)??; tracing::debug!( - "[DocumentWebSocketActor]: client data: {}:{}, {:?}", + "[DocumentWebSocketActor]: receive: {}:{}, {:?}", document_client_data.doc_id, document_client_data.id, document_client_data.ty diff --git a/frontend/rust-lib/dart-ffi/src/lib.rs b/frontend/rust-lib/dart-ffi/src/lib.rs index 1dd6d736f9..ea396464d0 100644 --- a/frontend/rust-lib/dart-ffi/src/lib.rs +++ b/frontend/rust-lib/dart-ffi/src/lib.rs @@ -26,7 +26,7 @@ pub extern "C" fn init_sdk(path: *mut c_char) -> i64 { let path: &str = c_str.to_str().unwrap(); let server_config = get_client_server_configuration().unwrap(); - let config = FlowySDKConfig::new(path, server_config, "appflowy", None).log_filter("debug"); + let config = FlowySDKConfig::new(path, server_config, "appflowy").log_filter("debug"); *FLOWY_SDK.write() = Some(Arc::new(FlowySDK::new(config))); 0 diff --git a/frontend/rust-lib/flowy-document/src/core/web_socket/http_ws_impl.rs b/frontend/rust-lib/flowy-document/src/core/web_socket/http_ws_impl.rs index d907622c1d..ea4433c73a 100644 --- a/frontend/rust-lib/flowy-document/src/core/web_socket/http_ws_impl.rs +++ b/frontend/rust-lib/flowy-document/src/core/web_socket/http_ws_impl.rs @@ -272,8 +272,8 @@ impl DocumentWSSink { Ok(()) }, Some(data) => { - tracing::debug!("[DocumentSink]: Try send: {}:{:?}-{}", data.doc_id, data.ty, data.id()); - self.ws_sender.send(data).map_err(internal_error) + tracing::debug!("[DocumentSink]: send: {}:{}-{:?}", data.doc_id, data.id(), data.ty); + self.ws_sender.send(data) // let _ = tokio::time::timeout(Duration::from_millis(2000), }, } diff --git a/frontend/rust-lib/flowy-net/src/services/http_ws/http_ws_impl.rs b/frontend/rust-lib/flowy-net/src/services/http_ws/http_ws_impl.rs index f51ad89fed..4b81047095 100644 --- a/frontend/rust-lib/flowy-net/src/services/http_ws/http_ws_impl.rs +++ b/frontend/rust-lib/flowy-net/src/services/http_ws/http_ws_impl.rs @@ -9,7 +9,9 @@ use std::sync::Arc; use tokio::sync::broadcast::Receiver; impl FlowyRawWebSocket for Arc { - fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError> { + fn initialize(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } + + fn start_connect(&self, addr: String, _user_id: String) -> FutureResult<(), FlowyError> { let cloned_ws = self.clone(); FutureResult::new(async move { let _ = cloned_ws.start(addr).await.map_err(internal_error)?; diff --git a/frontend/rust-lib/flowy-net/src/services/local_ws/local_server.rs b/frontend/rust-lib/flowy-net/src/services/local_ws/local_server.rs index a01bdee74d..a5c8fdfcb8 100644 --- a/frontend/rust-lib/flowy-net/src/services/local_ws/local_server.rs +++ b/frontend/rust-lib/flowy-net/src/services/local_ws/local_server.rs @@ -15,27 +15,13 @@ use flowy_collaboration::{ util::repeated_revision_from_repeated_revision_pb, }; use lib_infra::future::BoxResultFuture; -use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage}; +use lib_ws::{WSModule, WebSocketRawMessage}; use std::{ convert::TryInto, fmt::{Debug, Formatter}, sync::Arc, }; -use tokio::sync::mpsc; - -pub(crate) fn spawn_server(receivers: Arc>>) -> Arc { - let (server_tx, mut server_rx) = mpsc::unbounded_channel(); - let server = Arc::new(LocalDocumentServer::new(server_tx)); - tokio::spawn(async move { - while let Some(message) = server_rx.recv().await { - match receivers.get(&message.module) { - None => tracing::error!("Can't find any handler for message: {:?}", message), - Some(handler) => handler.receive_message(message.clone()), - } - } - }); - server -} +use tokio::sync::{mpsc, mpsc::UnboundedSender}; pub struct LocalDocumentServer { pub doc_manager: Arc, @@ -49,14 +35,19 @@ impl LocalDocumentServer { LocalDocumentServer { doc_manager, sender } } - pub async fn handle_client_data(&self, client_data: DocumentClientWSData) -> Result<(), CollaborateError> { + pub async fn handle_client_data( + &self, + client_data: DocumentClientWSData, + user_id: String, + ) -> Result<(), CollaborateError> { tracing::debug!( - "[LocalDocumentServer] receive client data: {}:{:?} ", + "[LocalDocumentServer] receive: {}:{}-{:?} ", client_data.doc_id, - client_data.ty + client_data.id(), + client_data.ty, ); let user = Arc::new(LocalDocumentUser { - user_id: "fake_user_id".to_owned(), + user_id, ws_sender: self.sender.clone(), }); let ty = client_data.ty.clone(); @@ -146,6 +137,13 @@ impl RevisionUser for LocalDocumentUser { fn receive(&self, resp: SyncResponse) { let sender = self.ws_sender.clone(); + let send_fn = |sender: UnboundedSender, msg: WebSocketRawMessage| match sender.send(msg) { + Ok(_) => {}, + Err(e) => { + tracing::error!("LocalDocumentUser send message failed: {}", e); + }, + }; + tokio::spawn(async move { match resp { SyncResponse::Pull(data) => { @@ -154,7 +152,7 @@ impl RevisionUser for LocalDocumentUser { module: WSModule::Doc, data: bytes.to_vec(), }; - sender.send(msg).unwrap(); + send_fn(sender, msg); }, SyncResponse::Push(data) => { let bytes: Bytes = data.try_into().unwrap(); @@ -162,7 +160,7 @@ impl RevisionUser for LocalDocumentUser { module: WSModule::Doc, data: bytes.to_vec(), }; - sender.send(msg).unwrap(); + send_fn(sender, msg); }, SyncResponse::Ack(data) => { let bytes: Bytes = data.try_into().unwrap(); @@ -170,9 +168,9 @@ impl RevisionUser for LocalDocumentUser { module: WSModule::Doc, data: bytes.to_vec(), }; - sender.send(msg).unwrap(); + send_fn(sender, msg); }, - SyncResponse::NewRevision(_) => { + SyncResponse::NewRevision(mut _repeated_revision) => { // unimplemented!() }, } diff --git a/frontend/rust-lib/flowy-net/src/services/local_ws/local_ws_impl.rs b/frontend/rust-lib/flowy-net/src/services/local_ws/local_ws_impl.rs index f92d8f3b77..962418e88f 100644 --- a/frontend/rust-lib/flowy-net/src/services/local_ws/local_ws_impl.rs +++ b/frontend/rust-lib/flowy-net/src/services/local_ws/local_ws_impl.rs @@ -6,17 +6,20 @@ use lib_infra::future::FutureResult; use lib_ws::{WSConnectState, WSMessageReceiver, WSModule, WebSocketRawMessage}; use crate::services::{ - local_ws::local_server::{spawn_server, LocalDocumentServer}, + local_ws::local_server::LocalDocumentServer, ws_conn::{FlowyRawWebSocket, FlowyWSSender}, }; +use parking_lot::RwLock; use std::{convert::TryFrom, sync::Arc}; -use tokio::sync::{broadcast, broadcast::Receiver}; +use tokio::sync::{broadcast, broadcast::Receiver, mpsc, mpsc::UnboundedReceiver}; pub struct LocalWebSocket { receivers: Arc>>, state_sender: broadcast::Sender, ws_sender: LocalWSSender, server: Arc, + server_rx: RwLock>>, + user_id: Arc>>, } impl std::default::Default for LocalWebSocket { @@ -24,13 +27,19 @@ impl std::default::Default for LocalWebSocket { let (state_sender, _) = broadcast::channel(16); let ws_sender = LocalWSSender::default(); let receivers = Arc::new(DashMap::new()); - let server = spawn_server(receivers.clone()); + + let (server_tx, server_rx) = mpsc::unbounded_channel(); + let server = Arc::new(LocalDocumentServer::new(server_tx)); + let server_rx = RwLock::new(Some(server_rx)); + let user_token = Arc::new(RwLock::new(None)); LocalWebSocket { receivers, state_sender, ws_sender, server, + server_rx, + user_id: user_token, } } } @@ -38,15 +47,26 @@ impl std::default::Default for LocalWebSocket { impl LocalWebSocket { fn spawn_client(&self, _addr: String) { let mut ws_receiver = self.ws_sender.subscribe(); - let server = self.server.clone(); + let local_server = self.server.clone(); + let user_id = self.user_id.clone(); tokio::spawn(async move { loop { + // Polling the web socket message sent by user match ws_receiver.recv().await { Ok(message) => { - let fut = || async { + let user_id = user_id.read().clone(); + if user_id.is_none() { + continue; + } + let user_id = user_id.unwrap(); + let server = local_server.clone(); + let fut = || async move { let bytes = Bytes::from(message.data); let client_data = DocumentClientWSData::try_from(bytes).map_err(internal_error)?; - let _ = server.handle_client_data(client_data).await?; + let _ = server + .handle_client_data(client_data, user_id) + .await + .map_err(internal_error)?; Ok::<(), FlowyError>(()) }; match fut().await { @@ -62,7 +82,22 @@ impl LocalWebSocket { } impl FlowyRawWebSocket for LocalWebSocket { - fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError> { + fn initialize(&self) -> FutureResult<(), FlowyError> { + let mut server_rx = self.server_rx.write().take().expect("Only take once"); + let receivers = self.receivers.clone(); + tokio::spawn(async move { + while let Some(message) = server_rx.recv().await { + match receivers.get(&message.module) { + None => tracing::error!("Can't find any handler for message: {:?}", message), + Some(handler) => handler.receive_message(message.clone()), + } + } + }); + FutureResult::new(async { Ok(()) }) + } + + fn start_connect(&self, addr: String, user_id: String) -> FutureResult<(), FlowyError> { + *self.user_id.write() = Some(user_id); self.spawn_client(addr); FutureResult::new(async { Ok(()) }) } diff --git a/frontend/rust-lib/flowy-net/src/services/ws_conn.rs b/frontend/rust-lib/flowy-net/src/services/ws_conn.rs index 5ee78765a8..fdec3cf334 100644 --- a/frontend/rust-lib/flowy-net/src/services/ws_conn.rs +++ b/frontend/rust-lib/flowy-net/src/services/ws_conn.rs @@ -9,7 +9,8 @@ use std::sync::Arc; use tokio::sync::broadcast; pub trait FlowyRawWebSocket: Send + Sync { - fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError>; + fn initialize(&self) -> FutureResult<(), FlowyError>; + fn start_connect(&self, addr: String, user_id: String) -> FutureResult<(), FlowyError>; fn stop_connect(&self) -> FutureResult<(), FlowyError>; fn subscribe_connect_state(&self) -> broadcast::Receiver; fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError>; @@ -39,10 +40,17 @@ impl FlowyWebSocketConnect { } } - pub async fn start(&self, token: String) -> Result<(), FlowyError> { - let addr = format!("{}/{}", self.addr, token); + pub async fn init(&self) { + match self.inner.initialize().await { + Ok(_) => {}, + Err(e) => tracing::error!("FlowyWebSocketConnect init error: {:?}", e), + } + } + + pub async fn start(&self, token: String, user_id: String) -> Result<(), FlowyError> { + let addr = format!("{}/{}", self.addr, &token); self.inner.stop_connect().await?; - let _ = self.inner.start_connect(addr).await?; + let _ = self.inner.start_connect(addr, user_id).await?; Ok(()) } diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs index f12cdb2665..c002e58c55 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs @@ -6,7 +6,7 @@ use flowy_document::{ core::{DocumentWSReceivers, DocumentWebSocket, WSStateReceiver}, errors::{internal_error, FlowyError}, }; -use flowy_net::services::ws_conn::FlowyWebSocketConnect; +use flowy_net::services::ws_conn::{FlowyWSSender, FlowyWebSocketConnect}; use flowy_user::services::user::UserSession; use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage}; use std::{convert::TryInto, path::Path, sync::Arc}; @@ -71,7 +71,7 @@ impl DocumentWebSocket for DocumentWebSocketAdapter { module: WSModule::Doc, data: bytes.to_vec(), }; - let sender = self.ws_conn.ws_sender().map_err(internal_error)?; + let sender = self.ws_conn.ws_sender()?; sender.send(msg).map_err(internal_error)?; Ok(()) } diff --git a/frontend/rust-lib/flowy-sdk/src/lib.rs b/frontend/rust-lib/flowy-sdk/src/lib.rs index 26a4d7468c..57efe1ec24 100644 --- a/frontend/rust-lib/flowy-sdk/src/lib.rs +++ b/frontend/rust-lib/flowy-sdk/src/lib.rs @@ -36,7 +36,6 @@ pub struct FlowySDKConfig { root: String, log_filter: String, server_config: ClientServerConfiguration, - ws: Arc, } impl fmt::Debug for FlowySDKConfig { @@ -50,19 +49,12 @@ impl fmt::Debug for FlowySDKConfig { } impl FlowySDKConfig { - pub fn new( - root: &str, - server_config: ClientServerConfiguration, - name: &str, - ws: Option>, - ) -> Self { - let ws = ws.unwrap_or_else(default_web_socket); + pub fn new(root: &str, server_config: ClientServerConfiguration, name: &str) -> Self { FlowySDKConfig { name: name.to_owned(), root: root.to_owned(), log_filter: crate_log_filter(None), server_config, - ws, } } @@ -107,7 +99,7 @@ impl FlowySDK { let ws_conn = Arc::new(FlowyWebSocketConnect::new( config.server_config.ws_addr(), - config.ws.clone(), + default_web_socket(), )); let user_session = mk_user_session(&config); let flowy_document = mk_document(&ws_conn, &user_session, &config.server_config); @@ -146,6 +138,7 @@ fn _init( dispatch.spawn(async move { user_session.init(); + ws_conn.init().await; listen_on_websocket(ws_conn.clone()); _listen_user_status(ws_conn.clone(), subscribe_user_status, core.clone()).await; }); @@ -163,9 +156,9 @@ async fn _listen_user_status( while let Ok(status) = subscribe.recv().await { let result = || async { match status { - UserStatus::Login { token } => { + UserStatus::Login { token, user_id } => { let _ = core.user_did_sign_in(&token).await?; - let _ = ws_conn.start(token).await?; + let _ = ws_conn.start(token, user_id).await?; }, UserStatus::Logout { .. } => { core.user_did_logout().await; @@ -177,7 +170,7 @@ async fn _listen_user_status( }, UserStatus::SignUp { profile, ret } => { let _ = core.user_did_sign_up(&profile.token).await?; - let _ = ws_conn.start(profile.token.clone()).await?; + let _ = ws_conn.start(profile.token.clone(), profile.id.clone()).await?; let _ = ret.send(()); }, } diff --git a/frontend/rust-lib/flowy-test/src/lib.rs b/frontend/rust-lib/flowy-test/src/lib.rs index 12ece7b4ff..4605acdf0d 100644 --- a/frontend/rust-lib/flowy-test/src/lib.rs +++ b/frontend/rust-lib/flowy-test/src/lib.rs @@ -38,7 +38,7 @@ impl std::default::Default for FlowySDKTest { impl FlowySDKTest { pub fn new(server_config: ClientServerConfiguration, ws: Option>) -> Self { - let config = FlowySDKConfig::new(&root_dir(), server_config, &uuid_string(), None).log_filter("debug"); + let config = FlowySDKConfig::new(&root_dir(), server_config, &uuid_string()).log_filter("debug"); let sdk = FlowySDK::new(config); std::mem::forget(sdk.dispatcher()); Self { inner: sdk, ws } diff --git a/frontend/rust-lib/flowy-user/src/entities/status.rs b/frontend/rust-lib/flowy-user/src/entities/status.rs index 287797b172..cfdb200a5f 100644 --- a/frontend/rust-lib/flowy-user/src/entities/status.rs +++ b/frontend/rust-lib/flowy-user/src/entities/status.rs @@ -5,6 +5,7 @@ use tokio::sync::mpsc; pub enum UserStatus { Login { token: String, + user_id: String, }, Logout { token: String, diff --git a/frontend/rust-lib/flowy-user/src/services/user/notifier.rs b/frontend/rust-lib/flowy-user/src/services/user/notifier.rs index eadb2fcc5b..67b3ac78a9 100644 --- a/frontend/rust-lib/flowy-user/src/services/user/notifier.rs +++ b/frontend/rust-lib/flowy-user/src/services/user/notifier.rs @@ -16,9 +16,10 @@ impl std::default::Default for UserNotifier { impl UserNotifier { pub(crate) fn new() -> Self { UserNotifier::default() } - pub(crate) fn notify_login(&self, token: &str) { + pub(crate) fn notify_login(&self, token: &str, user_id: &str) { let _ = self.user_status_notifier.send(UserStatus::Login { token: token.to_owned(), + user_id: user_id.to_owned(), }); } diff --git a/frontend/rust-lib/flowy-user/src/services/user/user_session.rs b/frontend/rust-lib/flowy-user/src/services/user/user_session.rs index 906e9ff323..5613d4cf92 100644 --- a/frontend/rust-lib/flowy-user/src/services/user/user_session.rs +++ b/frontend/rust-lib/flowy-user/src/services/user/user_session.rs @@ -67,7 +67,7 @@ impl UserSession { pub fn init(&self) { if let Ok(session) = self.get_session() { - self.notifier.notify_login(&session.token); + self.notifier.notify_login(&session.token, &session.user_id); } } @@ -97,7 +97,7 @@ impl UserSession { let _ = self.set_session(Some(session))?; let user_table = self.save_user(resp.into()).await?; let user_profile: UserProfile = user_table.into(); - self.notifier.notify_login(&user_profile.token); + self.notifier.notify_login(&user_profile.token, &user_profile.id); Ok(user_profile) } } diff --git a/shared-lib/flowy-collaboration/src/sync/server.rs b/shared-lib/flowy-collaboration/src/sync/server.rs index 73148c88ac..bcd6caa3ea 100644 --- a/shared-lib/flowy-collaboration/src/sync/server.rs +++ b/shared-lib/flowy-collaboration/src/sync/server.rs @@ -206,7 +206,6 @@ impl OpenDocHandle { result } - #[tracing::instrument(level = "debug", skip(self, user), err)] async fn apply_ping(&self, rev_id: i64, user: Arc) -> Result<(), CollaborateError> { let (ret, rx) = oneshot::channel(); self.users.insert(user.user_id(), user.clone()); diff --git a/shared-lib/flowy-collaboration/src/sync/synchronizer.rs b/shared-lib/flowy-collaboration/src/sync/synchronizer.rs index 7d1e558cec..48f7e8fb63 100644 --- a/shared-lib/flowy-collaboration/src/sync/synchronizer.rs +++ b/shared-lib/flowy-collaboration/src/sync/synchronizer.rs @@ -122,17 +122,17 @@ impl RevisionSynchronizer { let server_rev_id = self.rev_id(); tracing::Span::current().record("server_rev_id", &server_rev_id); match server_rev_id.cmp(&client_rev_id) { - Ordering::Less => tracing::error!( - "[Pong] Client should not send ping and the server should pull the revisions from the client" - ), - Ordering::Equal => tracing::debug!("[Pong]: The document:{} is up to date.", doc_id), + Ordering::Less => { + tracing::error!("Client should not send ping and the server should pull the revisions from the client") + }, + Ordering::Equal => tracing::debug!("{} is up to date.", doc_id), Ordering::Greater => { // The client document is outdated. Transform the client revision delta and then // send the prime delta to the client. Client should compose the this prime // delta. let from_rev_id = client_rev_id; let to_rev_id = server_rev_id; - tracing::trace!("[Pong]: Push revisions to user"); + tracing::trace!("Push revisions to user"); let _ = self .push_revisions_to_user(user, persistence, from_rev_id, to_rev_id) .await;