chore: fix database test (#4893)

* chore: fix database test

* chore: clippy

* chore: fmt
This commit is contained in:
Nathan.fooo
2024-03-13 19:39:56 +08:00
committed by GitHub
parent bf70be1841
commit fd81d64a32
11 changed files with 113 additions and 141 deletions

View File

@ -48,6 +48,7 @@ tokio-stream = { workspace = true, features = ["sync"] }
client-api = { version = "0.1.0", features = ["collab-sync", "test_util"] }
lib-dispatch = { workspace = true }
yrs = "0.17.1"
rand = "0.8.5"
[dev-dependencies]
uuid.workspace = true

View File

@ -11,6 +11,7 @@ use client_api::ws::{
};
use client_api::{Client, ClientConfiguration};
use flowy_storage::ObjectStorageService;
use rand::Rng;
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
use tracing::{error, event, info, warn};
@ -253,16 +254,7 @@ fn spawn_ws_conn(
// Try to reconnect if the connection is timed out.
if let Some(api_client) = weak_api_client.upgrade() {
if enable_sync.load(Ordering::SeqCst) {
match api_client.ws_connect_info().await {
Ok(conn_info) => {
// sleep two seconds and then try to reconnect
tokio::time::sleep(Duration::from_secs(2)).await;
event!(tracing::Level::INFO, "🟢reconnecting websocket");
let _ = ws_client.connect(api_client.ws_addr(), conn_info).await;
},
Err(err) => error!("Failed to get ws url: {}, connect state:{:?}", err, state),
}
attempt_reconnect(&ws_client, &api_client, 2).await;
}
}
},
@ -308,6 +300,28 @@ fn spawn_ws_conn(
});
}
async fn attempt_reconnect(
ws_client: &Arc<WSClient>,
api_client: &Arc<Client>,
minimum_delay: u64,
) {
// Introduce randomness in the reconnection attempts to avoid thundering herd problem
let delay_seconds = rand::thread_rng().gen_range(minimum_delay..8);
tokio::time::sleep(Duration::from_secs(delay_seconds)).await;
event!(
tracing::Level::INFO,
"🟢 Attempting to reconnect websocket."
);
match api_client.ws_connect_info().await {
Ok(conn_info) => {
if let Err(e) = ws_client.connect(api_client.ws_addr(), conn_info).await {
error!("Failed to reconnect websocket: {}", e);
}
},
Err(err) => error!("Failed to get websocket URL: {}", err),
}
}
pub trait AFServer: Send + Sync + 'static {
fn get_client(&self) -> Option<Arc<AFCloudClient>>;
fn try_get_client(&self) -> Result<Arc<AFCloudClient>, Error>;