fix: ws connect with invalid token (#5282)

* chore: bump client api

* chore: fix potentail ws connect with invalid token

* fix: cargo clippy

---------

Co-authored-by: Lucas.Xu <lucas.xu@appflowy.io>
This commit is contained in:
Nathan.fooo
2024-05-07 17:37:11 +08:00
committed by GitHub
parent dc813d85a8
commit b4279f8004
12 changed files with 152 additions and 117 deletions

View File

@ -12,8 +12,10 @@ use client_api::ws::{
use client_api::{Client, ClientConfiguration};
use flowy_storage::ObjectStorageService;
use rand::Rng;
use tokio::sync::watch;
use tokio::select;
use tokio::sync::{watch, Mutex};
use tokio_stream::wrappers::WatchStream;
use tokio_util::sync::CancellationToken;
use tracing::{error, event, info, warn};
use uuid::Uuid;
@ -74,11 +76,22 @@ impl AppFlowyCloudServer {
let enable_sync = Arc::new(AtomicBool::new(enable_sync));
let network_reachable = Arc::new(AtomicBool::new(true));
let ws_client = WSClient::new(WSClientConfig::default(), api_client.clone());
let ws_client = WSClient::new(
WSClientConfig::default(),
api_client.clone(),
api_client.clone(),
);
let ws_client = Arc::new(ws_client);
let api_client = Arc::new(api_client);
let ws_connect_cancellation_token = Arc::new(Mutex::new(CancellationToken::new()));
spawn_ws_conn(token_state_rx, &ws_client, &api_client, &enable_sync);
spawn_ws_conn(
token_state_rx,
&ws_client,
ws_connect_cancellation_token,
&api_client,
&enable_sync,
);
Self {
config,
client: api_client,
@ -241,12 +254,14 @@ impl AppFlowyServer for AppFlowyCloudServer {
fn spawn_ws_conn(
mut token_state_rx: TokenStateReceiver,
ws_client: &Arc<WSClient>,
conn_cancellation_token: Arc<Mutex<CancellationToken>>,
api_client: &Arc<Client>,
enable_sync: &Arc<AtomicBool>,
) {
let weak_ws_client = Arc::downgrade(ws_client);
let weak_api_client = Arc::downgrade(api_client);
let enable_sync = enable_sync.clone();
let cloned_conn_cancellation_token = conn_cancellation_token.clone();
af_spawn(async move {
if let Some(ws_client) = weak_ws_client.upgrade() {
@ -256,15 +271,16 @@ fn spawn_ws_conn(
match state {
ConnectState::PingTimeout | ConnectState::Lost => {
// Try to reconnect if the connection is timed out.
if let Some(api_client) = weak_api_client.upgrade() {
if enable_sync.load(Ordering::SeqCst) {
attempt_reconnect(&ws_client, &api_client, 2).await;
}
if weak_api_client.upgrade().is_some() && enable_sync.load(Ordering::SeqCst) {
attempt_reconnect(&ws_client, 2, &cloned_conn_cancellation_token).await;
}
},
ConnectState::Unauthorized => {
if let Some(api_client) = weak_api_client.upgrade() {
if let Err(err) = api_client.refresh_token().await {
if let Err(err) = api_client
.refresh_token("websocket connect unauthorized")
.await
{
error!("Failed to refresh token: {}", err);
}
}
@ -276,21 +292,13 @@ fn spawn_ws_conn(
});
let weak_ws_client = Arc::downgrade(ws_client);
let weak_api_client = Arc::downgrade(api_client);
af_spawn(async move {
while let Ok(token_state) = token_state_rx.recv().await {
info!("🟢token state: {:?}", token_state);
match token_state {
TokenState::Refresh => {
if let (Some(api_client), Some(ws_client)) =
(weak_api_client.upgrade(), weak_ws_client.upgrade())
{
match api_client.ws_connect_info().await {
Ok(conn_info) => {
let _ = ws_client.connect(api_client.ws_addr(), conn_info).await;
},
Err(err) => error!("Failed to get ws url: {}", err),
}
if let Some(ws_client) = weak_ws_client.upgrade() {
attempt_reconnect(&ws_client, 5, &conn_cancellation_token).await;
}
},
TokenState::Invalid => {
@ -304,26 +312,43 @@ fn spawn_ws_conn(
});
}
/// Attempts to reconnect a WebSocket client with a randomized delay to mitigate the thundering herd problem.
///
/// This function cancels any existing reconnection attempt, sets up a new cancellation token, and then
/// attempts to reconnect after a randomized delay. The delay is set between a specified minimum and
/// that minimum plus 10 seconds.
///
async fn attempt_reconnect(
ws_client: &Arc<WSClient>,
api_client: &Arc<Client>,
minimum_delay: u64,
minimum_delay_in_secs: u64,
conn_cancellation_token: &Arc<Mutex<CancellationToken>>,
) {
// Introduce randomness in the reconnection attempts to avoid thundering herd problem
let delay_seconds = rand::thread_rng().gen_range(minimum_delay..8);
tokio::time::sleep(Duration::from_secs(delay_seconds)).await;
event!(
tracing::Level::INFO,
"🟢 Attempting to reconnect websocket."
);
match api_client.ws_connect_info().await {
Ok(conn_info) => {
if let Err(e) = ws_client.connect(api_client.ws_addr(), conn_info).await {
error!("Failed to reconnect websocket: {}", e);
// Cancel the previous reconnection attempt
let mut cancel_token_lock = conn_cancellation_token.lock().await;
cancel_token_lock.cancel();
let new_cancel_token = CancellationToken::new();
*cancel_token_lock = new_cancel_token.clone();
drop(cancel_token_lock);
// randomness in the reconnection attempts to avoid thundering herd problem
let delay_seconds = rand::thread_rng().gen_range(minimum_delay_in_secs..10);
let ws_client = ws_client.clone();
tokio::spawn(async move {
select! {
_ = new_cancel_token.cancelled() => {
event!(
tracing::Level::TRACE,
"🟢websocket reconnection attempt cancelled."
);
},
_ = tokio::time::sleep(Duration::from_secs(delay_seconds)) => {
if let Err(e) = ws_client.connect().await {
error!("Failed to reconnect websocket: {}", e);
}
}
},
Err(err) => error!("Failed to get websocket URL: {}", err),
}
}
});
}
pub trait AFServer: Send + Sync + 'static {