config web socket

This commit is contained in:
appflowy 2022-01-24 16:27:40 +08:00
parent 10d99bdd8b
commit 24c1817c8d
13 changed files with 244 additions and 186 deletions

View File

@ -15,7 +15,7 @@ CARGO_MAKE_EXTEND_WORKSPACE_MAKEFILE = true
CARGO_MAKE_CRATE_FS_NAME = "dart_ffi"
CARGO_MAKE_CRATE_NAME = "dart-ffi"
VERSION = "0.0.2"
FEATURES = "flutter, http_server"
FEATURES = "flutter"
PRODUCT_NAME = "AppFlowy"
#CRATE_TYPE: https://doc.rust-lang.org/reference/linkage.html
CRATE_TYPE = "staticlib"

View File

@ -140,7 +140,7 @@ impl FolderManager {
}
}
pub async fn initialize(&self, user_id: &str) -> FlowyResult<()> {
pub async fn initialize(&self, user_id: &str, token: &str) -> FlowyResult<()> {
let mut write_guard = INIT_FOLDER_FLAG.write().await;
if let Some(is_init) = write_guard.get(user_id) {
if *is_init {
@ -150,9 +150,8 @@ impl FolderManager {
let folder_id = FolderId::new(user_id);
let _ = self.persistence.initialize(user_id, &folder_id).await?;
let token = self.user.token()?;
let pool = self.persistence.db_pool()?;
let folder_editor = FolderEditor::new(user_id, &folder_id, &token, pool, self.web_socket.clone()).await?;
let folder_editor = FolderEditor::new(user_id, &folder_id, token, pool, self.web_socket.clone()).await?;
*self.folder_editor.write().await = Some(Arc::new(folder_editor));
let _ = self.app_controller.initialize()?;
@ -163,7 +162,7 @@ impl FolderManager {
pub async fn initialize_with_new_user(&self, user_id: &str, token: &str) -> FlowyResult<()> {
DefaultFolderBuilder::build(token, user_id, self.persistence.clone(), self.view_controller.clone()).await?;
self.initialize(user_id).await
self.initialize(user_id, token).await
}
pub async fn clear(&self) { *self.folder_editor.write().await = None; }

View File

@ -9,7 +9,7 @@ use flowy_collaboration::entities::{
};
use flowy_database::ConnectionPool;
use flowy_error::FlowyResult;
use flowy_sync::{RevisionCache, RevisionCloudService, RevisionManager, RevisionWebSocket, WSStateReceiver};
use flowy_sync::{RevisionCache, RevisionCloudService, RevisionManager, RevisionWebSocket};
use lib_infra::future::FutureResult;
use lib_ws::WSConnectState;
use std::{convert::TryInto, sync::Arc};
@ -53,8 +53,7 @@ impl FlowyDocumentManager {
}
pub fn init(&self) -> FlowyResult<()> {
let notify = self.web_socket.subscribe_state_changed();
listen_ws_state_changed(notify, self.ws_receivers.clone());
listen_ws_state_changed(self.web_socket.clone(), self.ws_receivers.clone());
Ok(())
}
@ -234,10 +233,11 @@ impl OpenDocCache {
}
}
#[tracing::instrument(level = "trace", skip(state_receiver, receivers))]
fn listen_ws_state_changed(mut state_receiver: WSStateReceiver, receivers: WebSocketDataReceivers) {
#[tracing::instrument(level = "trace", skip(web_socket, receivers))]
fn listen_ws_state_changed(web_socket: Arc<dyn RevisionWebSocket>, receivers: WebSocketDataReceivers) {
tokio::spawn(async move {
while let Ok(state) = state_receiver.recv().await {
let mut notify = web_socket.subscribe_state_changed().await;
while let Ok(state) = notify.recv().await {
for receiver in receivers.iter() {
receiver.value().connect_state_changed(state.clone());
}

View File

@ -1,6 +1,7 @@
use crate::ws::connection::{FlowyRawWebSocket, FlowyWebSocket};
use dashmap::DashMap;
use flowy_error::FlowyError;
use futures_util::future::BoxFuture;
use lib_infra::future::FutureResult;
use lib_ws::{WSChannel, WSConnectState, WSMessageReceiver, WebSocketRawMessage};
use parking_lot::RwLock;
@ -56,7 +57,10 @@ impl FlowyRawWebSocket for LocalWebSocket {
fn stop_connect(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
fn subscribe_connect_state(&self) -> Receiver<WSConnectState> { self.state_sender.subscribe() }
fn subscribe_connect_state(&self) -> BoxFuture<Receiver<WSConnectState>> {
let subscribe = self.state_sender.subscribe();
Box::pin(async move { subscribe })
}
fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
@ -66,8 +70,9 @@ impl FlowyRawWebSocket for LocalWebSocket {
Ok(())
}
fn ws_msg_sender(&self) -> Result<Arc<dyn FlowyWebSocket>, FlowyError> {
Ok(Arc::new(LocalWebSocketAdaptor(self.server_ws_sender.clone())))
fn ws_msg_sender(&self) -> FutureResult<Option<Arc<dyn FlowyWebSocket>>, FlowyError> {
let ws: Arc<dyn FlowyWebSocket> = Arc::new(LocalWebSocketAdaptor(self.server_ws_sender.clone()));
FutureResult::new(async move { Ok(Some(ws)) })
}
}

View File

@ -4,6 +4,7 @@ pub use flowy_error::FlowyError;
use lib_infra::future::FutureResult;
pub use lib_ws::{WSConnectState, WSMessageReceiver, WebSocketRawMessage};
use futures_util::future::BoxFuture;
use lib_ws::WSController;
use parking_lot::RwLock;
use std::sync::Arc;
@ -13,10 +14,10 @@ pub trait FlowyRawWebSocket: Send + Sync {
fn initialize(&self) -> FutureResult<(), FlowyError>;
fn start_connect(&self, addr: String, user_id: String) -> FutureResult<(), FlowyError>;
fn stop_connect(&self) -> FutureResult<(), FlowyError>;
fn subscribe_connect_state(&self) -> broadcast::Receiver<WSConnectState>;
fn subscribe_connect_state(&self) -> BoxFuture<broadcast::Receiver<WSConnectState>>;
fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError>;
fn add_msg_receiver(&self, receiver: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError>;
fn ws_msg_sender(&self) -> Result<Arc<dyn FlowyWebSocket>, FlowyError>;
fn ws_msg_sender(&self) -> FutureResult<Option<Arc<dyn FlowyWebSocket>>, FlowyError>;
}
pub trait FlowyWebSocket: Send + Sync {
@ -90,8 +91,8 @@ impl FlowyWebSocketConnect {
}
}
pub fn subscribe_websocket_state(&self) -> broadcast::Receiver<WSConnectState> {
self.inner.subscribe_connect_state()
pub async fn subscribe_websocket_state(&self) -> broadcast::Receiver<WSConnectState> {
self.inner.subscribe_connect_state().await
}
pub fn subscribe_network_ty(&self) -> broadcast::Receiver<NetworkType> { self.status_notifier.subscribe() }
@ -101,14 +102,16 @@ impl FlowyWebSocketConnect {
Ok(())
}
pub fn web_socket(&self) -> Result<Arc<dyn FlowyWebSocket>, FlowyError> { self.inner.ws_msg_sender() }
pub async fn web_socket(&self) -> Result<Option<Arc<dyn FlowyWebSocket>>, FlowyError> {
self.inner.ws_msg_sender().await
}
}
#[tracing::instrument(level = "debug", skip(ws_conn))]
pub fn listen_on_websocket(ws_conn: Arc<FlowyWebSocketConnect>) {
let raw_web_socket = ws_conn.inner.clone();
let mut notify = ws_conn.inner.subscribe_connect_state();
let _ = tokio::spawn(async move {
let mut notify = ws_conn.inner.subscribe_connect_state().await;
loop {
match notify.recv().await {
Ok(state) => {

View File

@ -5,6 +5,7 @@ use lib_infra::future::FutureResult;
pub use lib_ws::{WSConnectState, WSMessageReceiver, WebSocketRawMessage};
use lib_ws::{WSController, WSSender};
use futures_util::future::BoxFuture;
use std::sync::Arc;
use tokio::sync::broadcast::Receiver;
@ -27,7 +28,10 @@ impl FlowyRawWebSocket for Arc<WSController> {
})
}
fn subscribe_connect_state(&self) -> Receiver<WSConnectState> { self.subscribe_state() }
fn subscribe_connect_state(&self) -> BoxFuture<Receiver<WSConnectState>> {
let cloned_ws = self.clone();
Box::pin(async move { cloned_ws.subscribe_state().await })
}
fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError> {
let cloned_ws = self.clone();
@ -42,9 +46,17 @@ impl FlowyRawWebSocket for Arc<WSController> {
Ok(())
}
fn ws_msg_sender(&self) -> Result<Arc<dyn FlowyWebSocket>, FlowyError> {
let sender = self.ws_message_sender().map_err(internal_error)?;
Ok(sender)
fn ws_msg_sender(&self) -> FutureResult<Option<Arc<dyn FlowyWebSocket>>, FlowyError> {
let cloned_self = self.clone();
FutureResult::new(async move {
match cloned_self.ws_message_sender().await.map_err(internal_error)? {
None => Ok(None),
Some(sender) => {
let sender = sender as Arc<dyn FlowyWebSocket>;
Ok(Some(sender))
},
}
})
}
}

View File

@ -15,6 +15,8 @@ use flowy_net::{
};
use flowy_sync::{RevisionWebSocket, WSStateReceiver};
use flowy_user::services::UserSession;
use futures_core::future::BoxFuture;
use lib_infra::future::BoxResultFuture;
use lib_ws::{WSChannel, WSMessageReceiver, WebSocketRawMessage};
use std::{convert::TryInto, path::Path, sync::Arc};
@ -62,18 +64,28 @@ impl DocumentUser for DocumentUserImpl {
struct DocumentWebSocketImpl(Arc<FlowyWebSocketConnect>);
impl RevisionWebSocket for DocumentWebSocketImpl {
fn send(&self, data: ClientRevisionWSData) -> Result<(), FlowyError> {
fn send(&self, data: ClientRevisionWSData) -> BoxResultFuture<(), FlowyError> {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebSocketRawMessage {
channel: WSChannel::Document,
data: bytes.to_vec(),
};
let sender = self.0.web_socket()?;
sender.send(msg).map_err(internal_error)?;
Ok(())
let ws_conn = self.0.clone();
Box::pin(async move {
match ws_conn.web_socket().await? {
None => {},
Some(sender) => {
sender.send(msg).map_err(internal_error)?;
},
}
Ok(())
})
}
fn subscribe_state_changed(&self) -> WSStateReceiver { self.0.subscribe_websocket_state() }
fn subscribe_state_changed(&self) -> BoxFuture<WSStateReceiver> {
let ws_conn = self.0.clone();
Box::pin(async move { ws_conn.subscribe_websocket_state().await })
}
}
struct DocumentWSMessageReceiverImpl(Arc<FlowyDocumentManager>);

View File

@ -15,6 +15,8 @@ use flowy_net::{
};
use flowy_sync::{RevisionWebSocket, WSStateReceiver};
use flowy_user::services::UserSession;
use futures_core::future::BoxFuture;
use lib_infra::future::BoxResultFuture;
use lib_ws::{WSChannel, WSMessageReceiver, WebSocketRawMessage};
use std::{convert::TryInto, sync::Arc};
@ -35,8 +37,23 @@ impl FolderDepsResolver {
Some(local_server) => local_server,
};
let folder_manager =
Arc::new(FolderManager::new(user, cloud_service, database, document_manager.clone(), web_socket).await);
let folder_manager = Arc::new(
FolderManager::new(
user.clone(),
cloud_service,
database,
document_manager.clone(),
web_socket,
)
.await,
);
if let (Ok(user_id), Ok(token)) = (user.user_id(), user.token()) {
match folder_manager.initialize(&user_id, &token).await {
Ok(_) => {},
Err(e) => tracing::error!("Initialize folder manager failed: {}", e),
}
}
let receiver = Arc::new(FolderWSMessageReceiverImpl(folder_manager.clone()));
ws_conn.add_ws_message_receiver(receiver).unwrap();
@ -61,18 +78,29 @@ impl WorkspaceUser for WorkspaceUserImpl {
struct FolderWebSocketImpl(Arc<FlowyWebSocketConnect>);
impl RevisionWebSocket for FolderWebSocketImpl {
fn send(&self, data: ClientRevisionWSData) -> Result<(), FlowyError> {
fn send(&self, data: ClientRevisionWSData) -> BoxResultFuture<(), FlowyError> {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebSocketRawMessage {
channel: WSChannel::Folder,
data: bytes.to_vec(),
};
let sender = self.0.web_socket()?;
sender.send(msg).map_err(internal_error)?;
Ok(())
let ws_conn = self.0.clone();
Box::pin(async move {
match ws_conn.web_socket().await? {
None => {},
Some(sender) => {
sender.send(msg).map_err(internal_error)?;
},
}
Ok(())
})
}
fn subscribe_state_changed(&self) -> WSStateReceiver { self.0.subscribe_websocket_state() }
fn subscribe_state_changed(&self) -> BoxFuture<WSStateReceiver> {
let ws_conn = self.0.clone();
Box::pin(async move { ws_conn.subscribe_websocket_state().await })
}
}
struct FolderWSMessageReceiverImpl(Arc<FolderManager>);

View File

@ -22,7 +22,7 @@ use std::{
Arc,
},
};
use tokio::{runtime::Runtime, sync::broadcast};
use tokio::sync::broadcast;
static INIT_LOG: AtomicBool = AtomicBool::new(false);
@ -97,32 +97,37 @@ impl FlowySDK {
init_kv(&config.root);
tracing::debug!("🔥 {:?}", config);
let runtime = tokio_default_runtime().unwrap();
let ws_addr = config.server_config.ws_addr();
let (local_server, ws_conn) = if cfg!(feature = "http_server") {
let ws_conn = Arc::new(FlowyWebSocketConnect::new(ws_addr));
(None, ws_conn)
} else {
let context = flowy_net::local_server::build_server(&config.server_config);
let local_ws = Arc::new(context.local_ws);
let ws_conn = Arc::new(FlowyWebSocketConnect::from_local(ws_addr, local_ws));
(Some(Arc::new(context.local_server)), ws_conn)
};
let (local_server, ws_conn) = mk_local_server(&config.server_config);
let (user_session, document_manager, folder_manager, local_server) = runtime.block_on(async {
let user_session = mk_user_session(&config, &local_server, &config.server_config);
let document_manager = DocumentDepsResolver::resolve(
local_server.clone(),
ws_conn.clone(),
user_session.clone(),
&config.server_config,
);
let user_session = mk_user_session(&config, &local_server, &config.server_config);
let document_manager = mk_document(&local_server, &ws_conn, &user_session, &config.server_config);
let folder_manager = mk_folder_manager(
&runtime,
&local_server,
&user_session,
&document_manager,
&config.server_config,
&ws_conn,
);
let folder_manager = FolderDepsResolver::resolve(
local_server.clone(),
user_session.clone(),
&config.server_config,
&document_manager,
ws_conn.clone(),
)
.await;
if let Some(local_server) = local_server.as_ref() {
local_server.run();
}
ws_conn.init().await;
(user_session, document_manager, folder_manager, local_server)
});
let dispatcher = Arc::new(EventDispatcher::construct(runtime, || {
mk_modules(&ws_conn, &folder_manager, &user_session)
}));
_init(&local_server, &dispatcher, &ws_conn, &user_session, &folder_manager);
_start_listening(&dispatcher, &ws_conn, &user_session, &folder_manager);
Self {
config,
@ -138,8 +143,7 @@ impl FlowySDK {
pub fn dispatcher(&self) -> Arc<EventDispatcher> { self.dispatcher.clone() }
}
fn _init(
local_server: &Option<Arc<LocalServer>>,
fn _start_listening(
dispatch: &EventDispatcher,
ws_conn: &Arc<FlowyWebSocketConnect>,
user_session: &Arc<UserSession>,
@ -149,17 +153,11 @@ fn _init(
let subscribe_network_type = ws_conn.subscribe_network_ty();
let folder_manager = folder_manager.clone();
let cloned_folder_manager = folder_manager.clone();
let user_session = user_session.clone();
let ws_conn = ws_conn.clone();
let local_server = local_server.clone();
let user_session = user_session.clone();
dispatch.spawn(async move {
if let Some(local_server) = local_server.as_ref() {
local_server.run();
}
user_session.init();
ws_conn.init().await;
listen_on_websocket(ws_conn.clone());
_listen_user_status(ws_conn.clone(), subscribe_user_status, folder_manager.clone()).await;
});
@ -169,6 +167,21 @@ fn _init(
});
}
fn mk_local_server(
server_config: &ClientServerConfiguration,
) -> (Option<Arc<LocalServer>>, Arc<FlowyWebSocketConnect>) {
let ws_addr = server_config.ws_addr();
if cfg!(feature = "http_server") {
let ws_conn = Arc::new(FlowyWebSocketConnect::new(ws_addr));
(None, ws_conn)
} else {
let context = flowy_net::local_server::build_server(server_config);
let local_ws = Arc::new(context.local_ws);
let ws_conn = Arc::new(FlowyWebSocketConnect::from_local(ws_addr, local_ws));
(Some(Arc::new(context.local_server)), ws_conn)
}
}
async fn _listen_user_status(
ws_conn: Arc<FlowyWebSocketConnect>,
mut subscribe: broadcast::Receiver<UserStatus>,
@ -179,7 +192,7 @@ async fn _listen_user_status(
match status {
UserStatus::Login { token, user_id } => {
tracing::trace!("User did login");
let _ = folder_manager.initialize(&user_id).await?;
let _ = folder_manager.initialize(&user_id, &token).await?;
let _ = ws_conn.start(token, user_id).await?;
},
UserStatus::Logout { .. } => {
@ -244,37 +257,3 @@ fn mk_user_session(
let cloud_service = UserDepsResolver::resolve(local_server, server_config);
Arc::new(UserSession::new(user_config, cloud_service))
}
fn mk_folder_manager(
runtime: &Runtime,
local_server: &Option<Arc<LocalServer>>,
user_session: &Arc<UserSession>,
document_manager: &Arc<FlowyDocumentManager>,
server_config: &ClientServerConfiguration,
ws_conn: &Arc<FlowyWebSocketConnect>,
) -> Arc<FolderManager> {
runtime.block_on(async {
FolderDepsResolver::resolve(
local_server.clone(),
user_session.clone(),
server_config,
document_manager,
ws_conn.clone(),
)
.await
})
}
pub fn mk_document(
local_server: &Option<Arc<LocalServer>>,
ws_conn: &Arc<FlowyWebSocketConnect>,
user_session: &Arc<UserSession>,
server_config: &ClientServerConfiguration,
) -> Arc<FlowyDocumentManager> {
DocumentDepsResolver::resolve(
local_server.clone(),
ws_conn.clone(),
user_session.clone(),
server_config,
)
}

View File

@ -6,7 +6,7 @@ use flowy_collaboration::entities::{
ws_data::{ClientRevisionWSData, NewDocumentUser, ServerRevisionWSData, ServerRevisionWSDataType},
};
use flowy_error::{FlowyError, FlowyResult};
use futures_util::stream::StreamExt;
use futures_util::{future::BoxFuture, stream::StreamExt};
use lib_infra::future::{BoxResultFuture, FutureResult};
use lib_ws::WSConnectState;
use std::{collections::VecDeque, convert::TryFrom, fmt::Formatter, sync::Arc};
@ -36,8 +36,8 @@ pub trait RevisionWSSinkDataProvider: Send + Sync {
pub type WSStateReceiver = tokio::sync::broadcast::Receiver<WSConnectState>;
pub trait RevisionWebSocket: Send + Sync + 'static {
fn send(&self, data: ClientRevisionWSData) -> Result<(), FlowyError>;
fn subscribe_state_changed(&self) -> WSStateReceiver;
fn send(&self, data: ClientRevisionWSData) -> BoxResultFuture<(), FlowyError>;
fn subscribe_state_changed(&self) -> BoxFuture<WSStateReceiver>;
}
pub struct RevisionWebSocketManager {
@ -287,7 +287,7 @@ impl RevisionWSSink {
},
Some(data) => {
tracing::trace!("[{}]: send {}:{}-{:?}", self, data.object_id, data.id(), data.ty);
self.ws_sender.send(data)
self.ws_sender.send(data).await
},
}
}

View File

@ -35,7 +35,7 @@ impl std::default::Default for FlowySDKTest {
impl FlowySDKTest {
pub fn new(server_config: ClientServerConfiguration) -> Self {
let config = FlowySDKConfig::new(&root_dir(), server_config, &uuid_string()).log_filter("trace");
let sdk = FlowySDK::new(config);
let sdk = std::thread::spawn(|| FlowySDK::new(config)).join().unwrap();
std::mem::forget(sdk.dispatcher());
Self { inner: sdk }
}

View File

@ -88,7 +88,7 @@ impl UserSession {
#[tracing::instrument(level = "debug", skip(self))]
pub async fn sign_in(&self, params: SignInParams) -> Result<UserProfile, FlowyError> {
if self.is_login(&params.email) {
if self.is_user_login(&params.email) {
self.user_profile().await
} else {
let resp = self.cloud_service.sign_in(params).await?;
@ -103,7 +103,7 @@ impl UserSession {
#[tracing::instrument(level = "debug", skip(self))]
pub async fn sign_up(&self, params: SignUpParams) -> Result<UserProfile, FlowyError> {
if self.is_login(&params.email) {
if self.is_user_login(&params.email) {
self.user_profile().await
} else {
let resp = self.cloud_service.sign_up(params).await?;
@ -263,7 +263,7 @@ impl UserSession {
}
}
fn is_login(&self, email: &str) -> bool {
fn is_user_login(&self, email: &str) -> bool {
match self.get_session() {
Ok(session) => session.email == email,
Err(_) => false,

View File

@ -11,7 +11,6 @@ use dashmap::DashMap;
use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures_core::{ready, Stream};
use lib_infra::retry::{Action, FixedInterval, Retry};
use parking_lot::RwLock;
use pin_project::pin_project;
use std::{
convert::TryFrom,
@ -22,7 +21,7 @@ use std::{
task::{Context, Poll},
time::Duration,
};
use tokio::sync::{broadcast, oneshot};
use tokio::sync::{broadcast, oneshot, RwLock};
use tokio_tungstenite::tungstenite::{
protocol::{frame::coding::CloseCode, CloseFrame},
Message,
@ -39,19 +38,22 @@ pub trait WSMessageReceiver: Sync + Send + 'static {
pub struct WSController {
handlers: Handlers,
state_notify: Arc<broadcast::Sender<WSConnectState>>,
sender_ctrl: Arc<RwLock<WSSenderController>>,
addr: Arc<RwLock<Option<String>>>,
sender: Arc<RwLock<Option<Arc<WSSender>>>>,
conn_state_notify: Arc<RwLock<WSConnectStateNotifier>>,
}
impl std::fmt::Display for WSController {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("WebSocket") }
}
impl std::default::Default for WSController {
fn default() -> Self {
let (state_notify, _) = broadcast::channel(16);
Self {
handlers: DashMap::new(),
sender_ctrl: Arc::new(RwLock::new(WSSenderController::default())),
state_notify: Arc::new(state_notify),
addr: Arc::new(RwLock::new(None)),
sender: Arc::new(RwLock::new(None)),
conn_state_notify: Arc::new(RwLock::new(WSConnectStateNotifier::default())),
}
}
}
@ -62,36 +64,52 @@ impl WSController {
pub fn add_ws_message_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), WSError> {
let source = handler.source();
if self.handlers.contains_key(&source) {
log::error!("WsSource's {:?} is already registered", source);
log::error!("{:?} is already registered", source);
}
self.handlers.insert(source, handler);
Ok(())
}
pub async fn start(&self, addr: String) -> Result<(), ServerError> {
*self.addr.write() = Some(addr.clone());
*self.addr.write().await = Some(addr.clone());
let strategy = FixedInterval::from_millis(5000).take(3);
self.connect(addr, strategy).await
}
pub async fn stop(&self) { self.sender_ctrl.write().set_state(WSConnectState::Disconnected); }
pub async fn stop(&self) {
if self.conn_state_notify.read().await.conn_state.is_connected() {
tracing::trace!("[{}] stop", self);
self.conn_state_notify
.write()
.await
.update_state(WSConnectState::Disconnected);
}
}
async fn connect<T, I>(&self, addr: String, strategy: T) -> Result<(), ServerError>
where
T: IntoIterator<IntoIter = I, Item = Duration>,
I: Iterator<Item = Duration> + Send + 'static,
{
let mut conn_state_notify = self.conn_state_notify.write().await;
let conn_state = conn_state_notify.conn_state.clone();
if conn_state.is_connected() || conn_state.is_connecting() {
return Ok(());
}
let (ret, rx) = oneshot::channel::<Result<(), ServerError>>();
*self.addr.write() = Some(addr.clone());
*self.addr.write().await = Some(addr.clone());
let action = WSConnectAction {
addr,
handlers: self.handlers.clone(),
};
let retry = Retry::spawn(strategy, action);
let sender_ctrl = self.sender_ctrl.clone();
sender_ctrl.write().set_state(WSConnectState::Connecting);
conn_state_notify.update_state(WSConnectState::Connecting);
drop(conn_state_notify);
let cloned_conn_state = self.conn_state_notify.clone();
let cloned_sender = self.sender.clone();
tracing::trace!("[{}] start connecting", self);
tokio::spawn(async move {
match retry.await {
Ok(result) => {
@ -100,30 +118,36 @@ impl WSController {
handlers_fut,
sender,
} = result;
sender_ctrl.write().set_sender(sender);
sender_ctrl.write().set_state(WSConnectState::Connected);
cloned_conn_state.write().await.update_state(WSConnectState::Connected);
*cloned_sender.write().await = Some(Arc::new(sender));
let _ = ret.send(Ok(()));
spawn_stream_and_handlers(stream, handlers_fut, sender_ctrl.clone()).await;
spawn_stream_and_handlers(stream, handlers_fut).await;
},
Err(e) => {
sender_ctrl.write().set_error(e.clone());
cloned_conn_state
.write()
.await
.update_state(WSConnectState::Disconnected);
let _ = ret.send(Err(ServerError::internal().context(e)));
},
}
});
rx.await?
}
pub async fn retry(&self, count: usize) -> Result<(), ServerError> {
if !self.sender_ctrl.read().is_disconnected() {
if !self.conn_state_notify.read().await.conn_state.is_disconnected() {
return Ok(());
}
tracing::trace!("[WebSocket]: retry connect...");
let strategy = FixedInterval::from_millis(5000).take(count);
let addr = self
.addr
.read()
.await
.as_ref()
.expect("Retry web socket connection failed, should call start_connect first")
.clone();
@ -131,25 +155,30 @@ impl WSController {
self.connect(addr, strategy).await
}
pub fn subscribe_state(&self) -> broadcast::Receiver<WSConnectState> { self.state_notify.subscribe() }
pub async fn subscribe_state(&self) -> broadcast::Receiver<WSConnectState> {
self.conn_state_notify.read().await.notify.subscribe()
}
pub fn ws_message_sender(&self) -> Result<Arc<WSSender>, WSError> {
match self.sender_ctrl.read().sender() {
None => Err(WSError::internal().context("WebSocket is not initialized, should call connect first")),
Some(sender) => Ok(sender),
pub async fn ws_message_sender(&self) -> Result<Option<Arc<WSSender>>, WSError> {
let sender = self.sender.read().await.clone();
match sender {
None => match self.conn_state_notify.read().await.conn_state {
WSConnectState::Disconnected => {
let msg = "WebSocket is disconnected";
Err(WSError::internal().context(msg))
},
_ => Ok(None),
},
Some(sender) => Ok(Some(sender)),
}
}
}
async fn spawn_stream_and_handlers(
stream: WSStream,
handlers: WSHandlerFuture,
sender_ctrl: Arc<RwLock<WSSenderController>>,
) {
async fn spawn_stream_and_handlers(stream: WSStream, handlers: WSHandlerFuture) {
tokio::select! {
result = stream => {
if let Err(e) = result {
sender_ctrl.write().set_error(e);
tracing::error!("WSStream error: {:?}", e);
}
},
result = handlers => tracing::debug!("handlers completed {:?}", result),
@ -201,15 +230,13 @@ impl Future for WSHandlerFuture {
}
#[derive(Debug, Clone)]
pub struct WSSender {
ws_tx: MsgSender,
}
pub struct WSSender(MsgSender);
impl WSSender {
pub fn send_msg<T: Into<WebSocketRawMessage>>(&self, msg: T) -> Result<(), WSError> {
let msg = msg.into();
let _ = self
.ws_tx
.0
.unbounded_send(msg.into())
.map_err(|e| WSError::internal().context(e))?;
Ok(())
@ -237,10 +264,7 @@ impl WSSender {
reason: reason.to_owned().into(),
};
let msg = Message::Close(Some(frame));
let _ = self
.ws_tx
.unbounded_send(msg)
.map_err(|e| WSError::internal().context(e))?;
let _ = self.0.unbounded_send(msg).map_err(|e| WSError::internal().context(e))?;
Ok(())
}
}
@ -291,7 +315,7 @@ impl WSConnectActionFut {
// └───────────────┘ └──────────────┘
let (msg_tx, msg_rx) = futures_channel::mpsc::unbounded();
let (ws_tx, ws_rx) = futures_channel::mpsc::unbounded();
let sender = WSSender { ws_tx };
let sender = WSSender(ws_tx);
let handlers_fut = WSHandlerFuture::new(handlers, msg_rx);
let conn = WSConnectionFuture::new(msg_tx, ws_rx, addr.clone());
Self {
@ -330,12 +354,20 @@ pub enum WSConnectState {
Disconnected,
}
impl WSConnectState {
fn is_connected(&self) -> bool { self == &WSConnectState::Connected }
fn is_connecting(&self) -> bool { self == &WSConnectState::Connecting }
fn is_disconnected(&self) -> bool { self == &WSConnectState::Disconnected || self == &WSConnectState::Init }
}
impl std::fmt::Display for WSConnectState {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
WSConnectState::Init => f.write_str("Init"),
WSConnectState::Connected => f.write_str("Connecting"),
WSConnectState::Connecting => f.write_str("Connected"),
WSConnectState::Connected => f.write_str("Connected"),
WSConnectState::Connecting => f.write_str("Connecting"),
WSConnectState::Disconnected => f.write_str("Disconnected"),
}
}
@ -345,44 +377,32 @@ impl std::fmt::Debug for WSConnectState {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str(&format!("{}", self)) }
}
struct WSSenderController {
state: WSConnectState,
state_notify: Arc<broadcast::Sender<WSConnectState>>,
sender: Option<Arc<WSSender>>,
struct WSConnectStateNotifier {
conn_state: WSConnectState,
notify: Arc<broadcast::Sender<WSConnectState>>,
}
impl WSSenderController {
fn set_sender(&mut self, sender: WSSender) { self.sender = Some(Arc::new(sender)); }
fn set_state(&mut self, state: WSConnectState) {
if state != WSConnectState::Connected {
self.sender = None;
}
self.state = state;
let _ = self.state_notify.send(self.state.clone());
}
fn set_error(&mut self, error: WSError) {
log::error!("{:?}", error);
self.set_state(WSConnectState::Disconnected);
}
fn sender(&self) -> Option<Arc<WSSender>> { self.sender.clone() }
#[allow(dead_code)]
fn is_connecting(&self) -> bool { self.state == WSConnectState::Connecting }
fn is_disconnected(&self) -> bool { self.state == WSConnectState::Disconnected }
}
impl std::default::Default for WSSenderController {
impl std::default::Default for WSConnectStateNotifier {
fn default() -> Self {
let (state_notify, _) = broadcast::channel(16);
WSSenderController {
state: WSConnectState::Init,
state_notify: Arc::new(state_notify),
sender: None,
Self {
conn_state: WSConnectState::Init,
notify: Arc::new(state_notify),
}
}
}
impl WSConnectStateNotifier {
fn update_state(&mut self, new_state: WSConnectState) {
if self.conn_state == new_state {
return;
}
tracing::debug!(
"WebSocket connect state did change: {} -> {}",
self.conn_state,
new_state
);
self.conn_state = new_state.clone();
let _ = self.notify.send(new_state);
}
}