From d7300dd7e228aeee5ebe4a9b939df10a08e0471a Mon Sep 17 00:00:00 2001 From: appflowy Date: Sun, 3 Oct 2021 14:05:15 +0800 Subject: [PATCH] retry if ws send new_doc_user message failed --- backend/src/service/doc/ws_actor.rs | 4 +- .../src/services/doc/revision/actor.rs | 2 +- .../src/services/doc/revision/manager.rs | 40 ++-- .../src/services/doc/revision/util.rs | 47 ++++- rust-lib/flowy-infra/Cargo.toml | 6 +- rust-lib/flowy-infra/src/lib.rs | 1 + rust-lib/flowy-infra/src/retry/future.rs | 189 ++++++++++++++++++ rust-lib/flowy-infra/src/retry/mod.rs | 5 + .../src/retry/strategy/exponential_backoff.rs | 127 ++++++++++++ .../src/retry/strategy/fixed_interval.rs | 35 ++++ .../flowy-infra/src/retry/strategy/jitter.rs | 5 + .../flowy-infra/src/retry/strategy/mod.rs | 7 + rust-lib/flowy-ws/Cargo.toml | 1 + rust-lib/flowy-ws/src/ws.rs | 4 +- 14 files changed, 446 insertions(+), 27 deletions(-) create mode 100644 rust-lib/flowy-infra/src/retry/future.rs create mode 100644 rust-lib/flowy-infra/src/retry/mod.rs create mode 100644 rust-lib/flowy-infra/src/retry/strategy/exponential_backoff.rs create mode 100644 rust-lib/flowy-infra/src/retry/strategy/fixed_interval.rs create mode 100644 rust-lib/flowy-infra/src/retry/strategy/jitter.rs create mode 100644 rust-lib/flowy-infra/src/retry/strategy/mod.rs diff --git a/backend/src/service/doc/ws_actor.rs b/backend/src/service/doc/ws_actor.rs index 52607aedd1..5d8565b960 100644 --- a/backend/src/service/doc/ws_actor.rs +++ b/backend/src/service/doc/ws_actor.rs @@ -80,8 +80,8 @@ impl DocWsActor { } } - async fn handle_new_doc_user(&self, socket: Socket, data: Vec) -> DocResult<()> { - let user = spawn_blocking(move || { + async fn handle_new_doc_user(&self, _socket: Socket, data: Vec) -> DocResult<()> { + let _user = spawn_blocking(move || { let user: NewDocUser = parse_from_bytes(&data)?; DocResult::Ok(user) }) diff --git a/rust-lib/flowy-document/src/services/doc/revision/actor.rs b/rust-lib/flowy-document/src/services/doc/revision/actor.rs index 2040cd1a43..08d1c3ddc2 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/actor.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/actor.rs @@ -2,7 +2,7 @@ use crate::{ entities::doc::{RevId, Revision, RevisionRange}, errors::{internal_error, DocError, DocResult}, services::doc::revision::{util::RevisionOperation, DocRevision, RevisionServer}, - sql_tables::{DocTableSql, RevState, RevTableSql}, + sql_tables::{RevState, RevTableSql}, }; use async_stream::stream; use dashmap::DashMap; diff --git a/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/rust-lib/flowy-document/src/services/doc/revision/manager.rs index 0748b108bf..43ee987e86 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -1,19 +1,19 @@ use crate::{ - entities::doc::{RevType, Revision, RevisionRange}, - errors::DocError, + entities::doc::{RevId, RevType, Revision, RevisionRange}, + errors::{DocError, DocResult}, services::{ - doc::revision::actor::{RevisionCmd, RevisionStoreActor}, + doc::revision::{ + actor::{RevisionCmd, RevisionStoreActor}, + util::NotifyOpenDocAction, + }, util::RevIdCounter, ws::WsDocumentSender, }, }; - -use crate::{ - entities::doc::{NewDocUser, RevId}, - errors::DocResult, +use flowy_infra::{ + future::ResultFuture, + retry::{ExponentialBackoff, Retry}, }; -use flowy_database::ConnectionPool; -use flowy_infra::future::ResultFuture; use flowy_ot::core::Delta; use parking_lot::RwLock; use std::{collections::VecDeque, sync::Arc}; @@ -112,15 +112,17 @@ impl RevisionManager { } } +// FIXME: +// user_id may be invalid if the user switch to another account while +// theNotifyOpenDocAction is flying fn notify_open_doc(ws: &Arc, user_id: &str, doc_id: &str, rev_id: &RevId) { - let new_doc_user = NewDocUser { - user_id: user_id.to_string(), - rev_id: rev_id.clone().into(), - doc_id: doc_id.to_string(), - }; - - match ws.send(new_doc_user.into()) { - Ok(_) => {}, - Err(e) => log::error!("Send new_doc_user failed: {:?}", e), - } + let action = NotifyOpenDocAction::new(user_id, doc_id, rev_id, ws); + let strategy = ExponentialBackoff::from_millis(50).take(3); + let retry = Retry::spawn(strategy, action); + tokio::spawn(async move { + match retry.await { + Ok(_) => {}, + Err(e) => log::error!("Notify open doc failed: {}", e), + } + }); } diff --git a/rust-lib/flowy-document/src/services/doc/revision/util.rs b/rust-lib/flowy-document/src/services/doc/revision/util.rs index fbb0e5a618..6922f832a7 100644 --- a/rust-lib/flowy-document/src/services/doc/revision/util.rs +++ b/rust-lib/flowy-document/src/services/doc/revision/util.rs @@ -1,4 +1,12 @@ -use crate::{entities::doc::Revision, errors::DocResult, sql_tables::RevState}; +use crate::{ + entities::doc::{NewDocUser, RevId, Revision}, + errors::{DocError, DocResult}, + services::ws::WsDocumentSender, + sql_tables::RevState, +}; +use flowy_infra::retry::Action; +use futures::future::BoxFuture; +use std::{future, sync::Arc}; use tokio::sync::oneshot; pub type Sender = oneshot::Sender>; @@ -41,3 +49,40 @@ impl std::ops::Deref for RevisionOperation { fn deref(&self) -> &Self::Target { &self.inner } } + +pub(crate) struct NotifyOpenDocAction { + user_id: String, + rev_id: RevId, + doc_id: String, + ws: Arc, +} + +impl NotifyOpenDocAction { + pub(crate) fn new(user_id: &str, doc_id: &str, rev_id: &RevId, ws: &Arc) -> Self { + Self { + user_id: user_id.to_owned(), + rev_id: rev_id.clone(), + doc_id: doc_id.to_owned(), + ws: ws.clone(), + } + } +} + +impl Action for NotifyOpenDocAction { + type Future = BoxFuture<'static, Result>; + type Item = (); + type Error = DocError; + + fn run(&mut self) -> Self::Future { + let new_doc_user = NewDocUser { + user_id: self.user_id.clone(), + rev_id: self.rev_id.clone().into(), + doc_id: self.doc_id.clone(), + }; + + match self.ws.send(new_doc_user.into()) { + Ok(_) => Box::pin(future::ready(Ok::<(), DocError>(()))), + Err(e) => Box::pin(future::ready(Err::<(), DocError>(e))), + } + } +} diff --git a/rust-lib/flowy-infra/Cargo.toml b/rust-lib/flowy-infra/Cargo.toml index 06d6364f48..d96339652d 100644 --- a/rust-lib/flowy-infra/Cargo.toml +++ b/rust-lib/flowy-infra/Cargo.toml @@ -17,5 +17,7 @@ protobuf = {version = "2.18.0"} log = "0.4.14" chrono = "0.4.19" bytes = { version = "1.0" } -pin-project = "1.0.0" -futures-core = { version = "0.3", default-features = false } \ No newline at end of file +pin-project = "1.0" +futures-core = { version = "0.3", default-features = false } +tokio = { version = "1.0", features = ["time"] } +rand = "0.8.3" \ No newline at end of file diff --git a/rust-lib/flowy-infra/src/lib.rs b/rust-lib/flowy-infra/src/lib.rs index 22529deec3..89faf8e814 100644 --- a/rust-lib/flowy-infra/src/lib.rs +++ b/rust-lib/flowy-infra/src/lib.rs @@ -7,6 +7,7 @@ extern crate diesel_derives; pub mod future; pub mod kv; mod protobuf; +pub mod retry; #[allow(dead_code)] pub fn uuid() -> String { uuid::Uuid::new_v4().to_string() } diff --git a/rust-lib/flowy-infra/src/retry/future.rs b/rust-lib/flowy-infra/src/retry/future.rs new file mode 100644 index 0000000000..f314c23976 --- /dev/null +++ b/rust-lib/flowy-infra/src/retry/future.rs @@ -0,0 +1,189 @@ +use pin_project::pin_project; +use std::{ + future::Future, + iter::{IntoIterator, Iterator}, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::time::{sleep_until, Duration, Instant, Sleep}; + +#[pin_project(project = RetryStateProj)] +enum RetryState +where + A: Action, +{ + Running(#[pin] A::Future), + Sleeping(#[pin] Sleep), +} + +impl RetryState { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> RetryFuturePoll { + match self.project() { + RetryStateProj::Running(future) => RetryFuturePoll::Running(future.poll(cx)), + RetryStateProj::Sleeping(future) => RetryFuturePoll::Sleeping(future.poll(cx)), + } + } +} + +enum RetryFuturePoll +where + A: Action, +{ + Running(Poll>), + Sleeping(Poll<()>), +} + +/// Future that drives multiple attempts at an action via a retry strategy. +#[pin_project] +pub struct Retry +where + I: Iterator, + A: Action, +{ + #[pin] + retry_if: RetryIf bool>, +} + +impl Retry +where + I: Iterator, + A: Action, +{ + pub fn spawn>(strategy: T, action: A) -> Retry { + Retry { + retry_if: RetryIf::spawn(strategy, action, (|_| true) as fn(&A::Error) -> bool), + } + } +} + +impl Future for Retry +where + I: Iterator, + A: Action, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.project(); + this.retry_if.poll(cx) + } +} + +/// Future that drives multiple attempts at an action via a retry strategy. +/// Retries are only attempted if the `Error` returned by the future satisfies a +/// given condition. +#[pin_project] +pub struct RetryIf +where + I: Iterator, + A: Action, + C: Condition, +{ + strategy: I, + #[pin] + state: RetryState, + action: A, + condition: C, +} + +impl RetryIf +where + I: Iterator, + A: Action, + C: Condition, +{ + pub fn spawn>( + strategy: T, + mut action: A, + condition: C, + ) -> RetryIf { + RetryIf { + strategy: strategy.into_iter(), + state: RetryState::Running(action.run()), + action, + condition, + } + } + + fn attempt(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let future = { + let this = self.as_mut().project(); + this.action.run() + }; + self.as_mut().project().state.set(RetryState::Running(future)); + self.poll(cx) + } + + fn retry( + mut self: Pin<&mut Self>, + err: A::Error, + cx: &mut Context, + ) -> Result>, A::Error> { + match self.as_mut().project().strategy.next() { + None => Err(err), + Some(duration) => { + let deadline = Instant::now() + duration; + let future = sleep_until(deadline); + self.as_mut().project().state.set(RetryState::Sleeping(future)); + Ok(self.poll(cx)) + }, + } + } +} + +impl Future for RetryIf +where + I: Iterator, + A: Action, + C: Condition, +{ + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.as_mut().project().state.poll(cx) { + RetryFuturePoll::Running(poll_result) => match poll_result { + Poll::Ready(Ok(ok)) => Poll::Ready(Ok(ok)), + Poll::Pending => Poll::Pending, + Poll::Ready(Err(err)) => { + if self.as_mut().project().condition.should_retry(&err) { + match self.retry(err, cx) { + Ok(poll) => poll, + Err(err) => Poll::Ready(Err(err)), + } + } else { + Poll::Ready(Err(err)) + } + }, + }, + RetryFuturePoll::Sleeping(poll_result) => match poll_result { + Poll::Pending => Poll::Pending, + Poll::Ready(_) => self.attempt(cx), + }, + } + } +} + +/// An action can be run multiple times and produces a future. +pub trait Action { + type Future: Future>; + type Item; + type Error; + + fn run(&mut self) -> Self::Future; +} + +impl>, F: FnMut() -> T> Action for F { + type Future = T; + type Item = R; + type Error = E; + + fn run(&mut self) -> Self::Future { self() } +} + +pub trait Condition { + fn should_retry(&mut self, error: &E) -> bool; +} + +impl bool> Condition for F { + fn should_retry(&mut self, error: &E) -> bool { self(error) } +} diff --git a/rust-lib/flowy-infra/src/retry/mod.rs b/rust-lib/flowy-infra/src/retry/mod.rs new file mode 100644 index 0000000000..617a22f34e --- /dev/null +++ b/rust-lib/flowy-infra/src/retry/mod.rs @@ -0,0 +1,5 @@ +mod future; +mod strategy; + +pub use future::*; +pub use strategy::*; diff --git a/rust-lib/flowy-infra/src/retry/strategy/exponential_backoff.rs b/rust-lib/flowy-infra/src/retry/strategy/exponential_backoff.rs new file mode 100644 index 0000000000..7eb996d469 --- /dev/null +++ b/rust-lib/flowy-infra/src/retry/strategy/exponential_backoff.rs @@ -0,0 +1,127 @@ +use std::{iter::Iterator, time::Duration}; +/// A retry strategy driven by exponential back-off. +/// +/// The power corresponds to the number of past attempts. +#[derive(Debug, Clone)] +pub struct ExponentialBackoff { + current: u64, + base: u64, + factor: u64, + max_delay: Option, +} + +impl ExponentialBackoff { + /// Constructs a new exponential back-off strategy, + /// given a base duration in milliseconds. + /// + /// The resulting duration is calculated by taking the base to the `n`-th + /// power, where `n` denotes the number of past attempts. + pub fn from_millis(base: u64) -> ExponentialBackoff { + ExponentialBackoff { + current: base, + base, + factor: 1u64, + max_delay: None, + } + } + + /// A multiplicative factor that will be applied to the retry delay. + /// + /// For example, using a factor of `1000` will make each delay in units of + /// seconds. + /// + /// Default factor is `1`. + pub fn factor(mut self, factor: u64) -> ExponentialBackoff { + self.factor = factor; + self + } + + /// Apply a maximum delay. No retry delay will be longer than this + /// `Duration`. + pub fn max_delay(mut self, duration: Duration) -> ExponentialBackoff { + self.max_delay = Some(duration); + self + } +} + +impl Iterator for ExponentialBackoff { + type Item = Duration; + + fn next(&mut self) -> Option { + // set delay duration by applying factor + let duration = if let Some(duration) = self.current.checked_mul(self.factor) { + Duration::from_millis(duration) + } else { + Duration::from_millis(u64::MAX) + }; + + // check if we reached max delay + if let Some(ref max_delay) = self.max_delay { + if duration > *max_delay { + return Some(*max_delay); + } + } + + if let Some(next) = self.current.checked_mul(self.base) { + self.current = next; + } else { + self.current = u64::MAX; + } + + Some(duration) + } +} + +#[test] +fn returns_some_exponential_base_10() { + let mut s = ExponentialBackoff::from_millis(10); + + assert_eq!(s.next(), Some(Duration::from_millis(10))); + assert_eq!(s.next(), Some(Duration::from_millis(100))); + assert_eq!(s.next(), Some(Duration::from_millis(1000))); +} + +#[test] +fn returns_some_exponential_base_2() { + let mut s = ExponentialBackoff::from_millis(2); + + assert_eq!(s.next(), Some(Duration::from_millis(2))); + assert_eq!(s.next(), Some(Duration::from_millis(4))); + assert_eq!(s.next(), Some(Duration::from_millis(8))); +} + +#[test] +fn saturates_at_maximum_value() { + let mut s = ExponentialBackoff::from_millis(u64::MAX - 1); + + assert_eq!(s.next(), Some(Duration::from_millis(u64::MAX - 1))); + assert_eq!(s.next(), Some(Duration::from_millis(u64::MAX))); + assert_eq!(s.next(), Some(Duration::from_millis(u64::MAX))); +} + +#[test] +fn can_use_factor_to_get_seconds() { + let factor = 1000; + let mut s = ExponentialBackoff::from_millis(2).factor(factor); + + assert_eq!(s.next(), Some(Duration::from_secs(2))); + assert_eq!(s.next(), Some(Duration::from_secs(4))); + assert_eq!(s.next(), Some(Duration::from_secs(8))); +} + +#[test] +fn stops_increasing_at_max_delay() { + let mut s = ExponentialBackoff::from_millis(2).max_delay(Duration::from_millis(4)); + + assert_eq!(s.next(), Some(Duration::from_millis(2))); + assert_eq!(s.next(), Some(Duration::from_millis(4))); + assert_eq!(s.next(), Some(Duration::from_millis(4))); +} + +#[test] +fn returns_max_when_max_less_than_base() { + let mut s = ExponentialBackoff::from_millis(20).max_delay(Duration::from_millis(10)); + + assert_eq!(s.next(), Some(Duration::from_millis(10))); + assert_eq!(s.next(), Some(Duration::from_millis(10))); +} diff --git a/rust-lib/flowy-infra/src/retry/strategy/fixed_interval.rs b/rust-lib/flowy-infra/src/retry/strategy/fixed_interval.rs new file mode 100644 index 0000000000..9ec18e584a --- /dev/null +++ b/rust-lib/flowy-infra/src/retry/strategy/fixed_interval.rs @@ -0,0 +1,35 @@ +use std::{iter::Iterator, time::Duration}; + +/// A retry strategy driven by a fixed interval. +#[derive(Debug, Clone)] +pub struct FixedInterval { + duration: Duration, +} + +impl FixedInterval { + /// Constructs a new fixed interval strategy. + pub fn new(duration: Duration) -> FixedInterval { FixedInterval { duration } } + + /// Constructs a new fixed interval strategy, + /// given a duration in milliseconds. + pub fn from_millis(millis: u64) -> FixedInterval { + FixedInterval { + duration: Duration::from_millis(millis), + } + } +} + +impl Iterator for FixedInterval { + type Item = Duration; + + fn next(&mut self) -> Option { Some(self.duration) } +} + +#[test] +fn returns_some_fixed() { + let mut s = FixedInterval::new(Duration::from_millis(123)); + + assert_eq!(s.next(), Some(Duration::from_millis(123))); + assert_eq!(s.next(), Some(Duration::from_millis(123))); + assert_eq!(s.next(), Some(Duration::from_millis(123))); +} diff --git a/rust-lib/flowy-infra/src/retry/strategy/jitter.rs b/rust-lib/flowy-infra/src/retry/strategy/jitter.rs new file mode 100644 index 0000000000..c11ae8ed87 --- /dev/null +++ b/rust-lib/flowy-infra/src/retry/strategy/jitter.rs @@ -0,0 +1,5 @@ +use std::time::Duration; + +pub fn jitter(duration: Duration) -> Duration { + duration.mul_f64(rand::random::()) +} diff --git a/rust-lib/flowy-infra/src/retry/strategy/mod.rs b/rust-lib/flowy-infra/src/retry/strategy/mod.rs new file mode 100644 index 0000000000..8f3a618457 --- /dev/null +++ b/rust-lib/flowy-infra/src/retry/strategy/mod.rs @@ -0,0 +1,7 @@ +mod exponential_backoff; +mod fixed_interval; +mod jitter; + +pub use exponential_backoff::*; +pub use fixed_interval::*; +pub use jitter::*; diff --git a/rust-lib/flowy-ws/Cargo.toml b/rust-lib/flowy-ws/Cargo.toml index 0d3bbfd675..4c0b60d78b 100644 --- a/rust-lib/flowy-ws/Cargo.toml +++ b/rust-lib/flowy-ws/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" [dependencies] flowy-derive = { path = "../flowy-derive" } flowy-net = { path = "../flowy-net" } +flowy-infra = { path = "../flowy-infra" } tokio-tungstenite = "0.15" futures-util = "0.3.17" diff --git a/rust-lib/flowy-ws/src/ws.rs b/rust-lib/flowy-ws/src/ws.rs index 2b0aa73e5c..ce16f9260c 100644 --- a/rust-lib/flowy-ws/src/ws.rs +++ b/rust-lib/flowy-ws/src/ws.rs @@ -96,7 +96,7 @@ impl WsController { Ok(stream) => { let _ = state_notify.send(WsState::Connected(sender)); let _ = ret.send(Ok(())); - spawn_steam_and_handlers(stream, handlers, state_notify).await; + spawn_stream_and_handlers(stream, handlers, state_notify).await; }, Err(e) => { let _ = state_notify.send(WsState::Disconnected(e.clone())); @@ -128,7 +128,7 @@ impl WsController { } } -async fn spawn_steam_and_handlers( +async fn spawn_stream_and_handlers( stream: WsStream, handlers: WsHandlerFuture, state_notify: Arc>,