feat: af cloud database (#3684)

* chore: forece user to logout when token is invalid

* chore: update client api rev
This commit is contained in:
Nathan.fooo 2023-10-12 20:25:00 +08:00 committed by GitHub
parent b2c80981cc
commit ec80f72c8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 119 additions and 57 deletions

View File

@ -10,6 +10,7 @@ import 'deps_resolver.dart';
import 'entry_point.dart';
import 'launch_configuration.dart';
import 'plugin/plugin.dart';
import 'tasks/appflowy_cloud_task.dart';
import 'tasks/prelude.dart';
final getIt = GetIt.instance;
@ -71,6 +72,7 @@ class FlowyRunner {
if (!mode.isUnitTest) ...[
const HotKeyTask(),
InitSupabaseTask(),
InitAppFlowyCloudTask(),
const InitAppWidgetTask(),
const InitPlatformServiceTask()
],

View File

@ -0,0 +1,29 @@
import 'package:appflowy/env/env.dart';
import 'package:appflowy/startup/startup.dart';
import 'package:appflowy/user/application/auth/auth_service.dart';
import 'package:appflowy/user/application/user_auth_listener.dart';
class InitAppFlowyCloudTask extends LaunchTask {
final _authStateListener = UserAuthStateListener();
bool isLoggingOut = false;
@override
Future<void> initialize(LaunchContext context) async {
if (!isAppFlowyCloudEnabled) {
return;
}
_authStateListener.start(
didSignIn: () {
isLoggingOut = false;
},
onInvalidAuth: (message) async {
await getIt<AuthService>().signOut();
// TODO(nathan): Show a dialog to notify the user that the session is expired.
if (!isLoggingOut) {
await runAppFlowy();
}
},
);
}
}

View File

@ -35,7 +35,7 @@ class SupbaseRealtimeService {
_subscribeTablesChanges();
isLoggingOut = false;
},
onForceLogout: (message) async {
onInvalidAuth: (message) async {
await getIt<AuthService>().signOut();
channel?.unsubscribe();
channel = null;

View File

@ -11,16 +11,16 @@ import 'package:appflowy_backend/protobuf/flowy-user/notification.pb.dart'
import 'package:appflowy_backend/rust_stream.dart';
class UserAuthStateListener {
void Function(String)? _onForceLogout;
void Function(String)? _onInvalidAuth;
void Function()? _didSignIn;
StreamSubscription<SubscribeObject>? _subscription;
UserNotificationParser? _userParser;
void start({
void Function(String)? onForceLogout,
void Function(String)? onInvalidAuth,
void Function()? didSignIn,
}) {
_onForceLogout = onForceLogout;
_onInvalidAuth = onInvalidAuth;
_didSignIn = didSignIn;
_userParser = UserNotificationParser(
@ -35,7 +35,7 @@ class UserAuthStateListener {
Future<void> stop() async {
_userParser = null;
await _subscription?.cancel();
_onForceLogout = null;
_onInvalidAuth = null;
}
void _userNotificationCallback(
@ -51,8 +51,8 @@ class UserAuthStateListener {
case AuthStatePB.AuthStateSignIn:
_didSignIn?.call();
break;
case AuthStatePB.AuthStateForceSignOut:
_onForceLogout?.call("");
case AuthStatePB.InvalidAuth:
_onInvalidAuth?.call("");
break;
default:
break;

View File

@ -762,7 +762,7 @@ dependencies = [
[[package]]
name = "client-api"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5a231fda#5a231fdadba514d891e914e95c90bfcca7e042b7"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=52b82e68#52b82e68f6e61e9e38606b9110d782870ee911e4"
dependencies = [
"anyhow",
"bytes",
@ -1291,7 +1291,7 @@ dependencies = [
"cssparser-macros",
"dtoa-short",
"itoa 1.0.6",
"phf 0.8.0",
"phf 0.11.2",
"smallvec",
]
@ -1437,7 +1437,7 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308"
[[package]]
name = "database-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5a231fda#5a231fdadba514d891e914e95c90bfcca7e042b7"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=52b82e68#52b82e68f6e61e9e38606b9110d782870ee911e4"
dependencies = [
"anyhow",
"chrono",
@ -2778,7 +2778,7 @@ dependencies = [
[[package]]
name = "gotrue"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5a231fda#5a231fdadba514d891e914e95c90bfcca7e042b7"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=52b82e68#52b82e68f6e61e9e38606b9110d782870ee911e4"
dependencies = [
"anyhow",
"futures-util",
@ -2794,7 +2794,7 @@ dependencies = [
[[package]]
name = "gotrue-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5a231fda#5a231fdadba514d891e914e95c90bfcca7e042b7"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=52b82e68#52b82e68f6e61e9e38606b9110d782870ee911e4"
dependencies = [
"anyhow",
"reqwest",
@ -3227,7 +3227,7 @@ dependencies = [
[[package]]
name = "infra"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5a231fda#5a231fdadba514d891e914e95c90bfcca7e042b7"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=52b82e68#52b82e68f6e61e9e38606b9110d782870ee911e4"
dependencies = [
"anyhow",
"reqwest",
@ -4268,6 +4268,7 @@ version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc"
dependencies = [
"phf_macros 0.11.2",
"phf_shared 0.11.2",
]
@ -4359,6 +4360,19 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "phf_macros"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3444646e286606587e49f3bcf1679b8cef1dc2c5ecc29ddacaffc305180d464b"
dependencies = [
"phf_generator 0.11.2",
"phf_shared 0.11.2",
"proc-macro2",
"quote",
"syn 2.0.29",
]
[[package]]
name = "phf_shared"
version = "0.8.0"
@ -4862,7 +4876,7 @@ dependencies = [
[[package]]
name = "realtime-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5a231fda#5a231fdadba514d891e914e95c90bfcca7e042b7"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=52b82e68#52b82e68f6e61e9e38606b9110d782870ee911e4"
dependencies = [
"bytes",
"collab",
@ -5584,7 +5598,7 @@ dependencies = [
[[package]]
name = "shared_entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5a231fda#5a231fdadba514d891e914e95c90bfcca7e042b7"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=52b82e68#52b82e68f6e61e9e38606b9110d782870ee911e4"
dependencies = [
"anyhow",
"database-entity",

View File

@ -38,7 +38,7 @@ custom-protocol = ["tauri/custom-protocol"]
# Run the script:
# scripts/tool/update_client_api_rev.sh new_rev_id
# ⚠️⚠️⚠️️
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "5a231fda" }
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "52b82e68" }
# Please use the following script to update collab.
# Working directory: frontend
#

View File

@ -461,7 +461,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4114279215a005bc675e386011e594e1d9b800918cea18fcadadcce864a2046b"
dependencies = [
"borsh-derive",
"hashbrown 0.12.3",
"hashbrown 0.13.2",
]
[[package]]
@ -660,7 +660,7 @@ dependencies = [
[[package]]
name = "client-api"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5a231fda#5a231fdadba514d891e914e95c90bfcca7e042b7"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=52b82e68#52b82e68f6e61e9e38606b9110d782870ee911e4"
dependencies = [
"anyhow",
"bytes",
@ -1264,7 +1264,7 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308"
[[package]]
name = "database-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5a231fda#5a231fdadba514d891e914e95c90bfcca7e042b7"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=52b82e68#52b82e68f6e61e9e38606b9110d782870ee911e4"
dependencies = [
"anyhow",
"chrono",
@ -1935,7 +1935,6 @@ dependencies = [
"serde_json",
"serde_repr",
"thiserror",
"tokio",
"tokio-postgres",
"url",
]
@ -2438,7 +2437,7 @@ dependencies = [
[[package]]
name = "gotrue"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5a231fda#5a231fdadba514d891e914e95c90bfcca7e042b7"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=52b82e68#52b82e68f6e61e9e38606b9110d782870ee911e4"
dependencies = [
"anyhow",
"futures-util",
@ -2454,7 +2453,7 @@ dependencies = [
[[package]]
name = "gotrue-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5a231fda#5a231fdadba514d891e914e95c90bfcca7e042b7"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=52b82e68#52b82e68f6e61e9e38606b9110d782870ee911e4"
dependencies = [
"anyhow",
"reqwest",
@ -2812,7 +2811,7 @@ dependencies = [
[[package]]
name = "infra"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5a231fda#5a231fdadba514d891e914e95c90bfcca7e042b7"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=52b82e68#52b82e68f6e61e9e38606b9110d782870ee911e4"
dependencies = [
"anyhow",
"reqwest",
@ -4203,7 +4202,7 @@ dependencies = [
[[package]]
name = "realtime-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5a231fda#5a231fdadba514d891e914e95c90bfcca7e042b7"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=52b82e68#52b82e68f6e61e9e38606b9110d782870ee911e4"
dependencies = [
"bytes",
"collab",
@ -4824,7 +4823,7 @@ dependencies = [
[[package]]
name = "shared_entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5a231fda#5a231fdadba514d891e914e95c90bfcca7e042b7"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=52b82e68#52b82e68f6e61e9e38606b9110d782870ee911e4"
dependencies = [
"anyhow",
"database-entity",

View File

@ -82,7 +82,7 @@ incremental = false
# Run the script:
# scripts/tool/update_client_api_rev.sh new_rev_id
# ⚠️⚠️⚠️️
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "5a231fda" }
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "52b82e68" }
# Please use the following script to update collab.
# Working directory: frontend
#

View File

@ -26,17 +26,16 @@ url = { version = "2.2", optional = true }
collab-database = { version = "0.1.0", optional = true }
collab-document = { version = "0.1.0", optional = true }
tokio-postgres = { version = "0.7.8", optional = true }
tokio = { version = "1.0", optional = true }
client-api = { version = "0.1.0", optional = true }
[features]
default = ["impl_from_appflowy_cloud", "impl_from_collab", "impl_from_reqwest", "impl_from_serde"]
impl_from_dispatch_error = ["lib-dispatch"]
impl_from_serde = ["serde_json"]
impl_from_reqwest = ["reqwest"]
impl_from_sqlite = ["flowy-sqlite", "r2d2"]
impl_from_collab = ["collab-database", "collab-document", "impl_from_reqwest"]
impl_from_postgres = ["tokio-postgres"]
impl_from_tokio = ["tokio"]
impl_from_url = ["url"]
impl_from_appflowy_cloud = ["client-api"]
dart = ["flowy-codegen/dart"]

View File

@ -19,9 +19,6 @@ pub mod collab;
#[cfg(feature = "impl_from_postgres")]
mod postgres;
#[cfg(feature = "impl_from_tokio")]
mod tokio;
#[cfg(feature = "impl_from_appflowy_cloud")]
mod cloud;
#[cfg(feature = "impl_from_url")]

View File

@ -1,7 +0,0 @@
use crate::FlowyError;
// impl<T> std::convert::From<tokio::sync::mpsc::error::SendError<T>> for FlowyError {
// fn from(error: tokio::sync::mpsc::error::SendError<T>) -> Self {
// FlowyError::internal().context(error)
// }
// }

View File

@ -104,7 +104,7 @@ where
fn get_user_profile(
&self,
_credential: UserCredentials,
) -> FutureResult<Option<UserProfile>, Error> {
) -> FutureResult<Option<UserProfile>, FlowyError> {
let try_get_client = self.server.try_get_client();
FutureResult::new(async move {
let client = try_get_client?;

View File

@ -33,7 +33,7 @@ pub struct AFCloudServer {
#[allow(dead_code)]
pub(crate) config: AFCloudConfiguration,
pub(crate) client: Arc<AFCloudClient>,
enable_sync: AtomicBool,
enable_sync: Arc<AtomicBool>,
#[allow(dead_code)]
device_id: Arc<parking_lot::RwLock<String>>,
ws_client: Arc<WSClient>,
@ -47,7 +47,7 @@ impl AFCloudServer {
) -> Self {
let api_client = AFCloudClient::new(&config.base_url, &config.ws_base_url, &config.gotrue_url);
let token_state_rx = api_client.subscribe_token_state();
let enable_sync = AtomicBool::new(enable_sync);
let enable_sync = Arc::new(AtomicBool::new(enable_sync));
let ws_client = WSClient::new(WSClientConfig {
buffer_capacity: 100,
@ -57,7 +57,13 @@ impl AFCloudServer {
let ws_client = Arc::new(ws_client);
let api_client = Arc::new(api_client);
spawn_ws_conn(&device_id, token_state_rx, &ws_client, &api_client);
spawn_ws_conn(
&device_id,
token_state_rx,
&ws_client,
&api_client,
&enable_sync,
);
Self {
config,
client: api_client,
@ -172,10 +178,12 @@ fn spawn_ws_conn(
mut token_state_rx: TokenStateReceiver,
ws_client: &Arc<WSClient>,
api_client: &Arc<Client>,
enable_sync: &Arc<AtomicBool>,
) {
let weak_device_id = Arc::downgrade(device_id);
let weak_ws_client = Arc::downgrade(ws_client);
let weak_api_client = Arc::downgrade(api_client);
let enable_sync = enable_sync.clone();
tokio::spawn(async move {
if let Some(ws_client) = weak_ws_client.upgrade() {
@ -189,10 +197,15 @@ fn spawn_ws_conn(
if let (Some(api_client), Some(device_id)) =
(weak_api_client.upgrade(), weak_device_id.upgrade())
{
let device_id = device_id.read().clone();
if let Ok(ws_addr) = api_client.ws_url(&device_id) {
info!("🟢WebSocket Reconnecting");
let _ = ws_client.connect(ws_addr).await;
if enable_sync.load(Ordering::SeqCst) {
info!("🟢websocket state: {:?}, reconnecting", state);
let device_id = device_id.read().clone();
match api_client.ws_url(&device_id) {
Ok(ws_addr) => {
let _ = ws_client.connect(ws_addr).await;
},
Err(err) => error!("Failed to get ws url: {}", err),
}
}
}
}
@ -204,7 +217,6 @@ fn spawn_ws_conn(
let weak_api_client = Arc::downgrade(api_client);
tokio::spawn(async move {
while let Ok(token_state) = token_state_rx.recv().await {
info!("🟢Token state: {:?}", token_state);
match token_state {
TokenState::Refresh => {
if let (Some(api_client), Some(ws_client), Some(device_id)) = (
@ -213,14 +225,18 @@ fn spawn_ws_conn(
weak_device_id.upgrade(),
) {
let device_id = device_id.read().clone();
if let Ok(ws_addr) = api_client.ws_url(&device_id) {
let _ = ws_client.connect(ws_addr).await;
match api_client.ws_url(&device_id) {
Ok(ws_addr) => {
info!("🟢token state: {:?}, reconnecting websocket", token_state);
let _ = ws_client.connect(ws_addr).await;
},
Err(err) => error!("Failed to get ws url: {}", err),
}
}
},
TokenState::Invalid => {
if let Some(ws_client) = weak_ws_client.upgrade() {
info!("🟡WebSocket Disconnecting");
info!("🟢token state: {:?}, disconnect websocket", token_state);
ws_client.disconnect().await;
}
},

View File

@ -5,6 +5,7 @@ use collab_entity::CollabObject;
use lazy_static::lazy_static;
use parking_lot::Mutex;
use flowy_error::FlowyError;
use flowy_user_deps::cloud::UserCloudService;
use flowy_user_deps::entities::*;
use flowy_user_deps::DEFAULT_USER_NAME;
@ -99,7 +100,7 @@ impl UserCloudService for LocalServerUserAuthServiceImpl {
fn get_user_profile(
&self,
_credential: UserCredentials,
) -> FutureResult<Option<UserProfile>, Error> {
) -> FutureResult<Option<UserProfile>, FlowyError> {
FutureResult::new(async { Ok(None) })
}

View File

@ -17,6 +17,7 @@ use tokio_retry::strategy::FixedInterval;
use tokio_retry::{Action, RetryIf};
use uuid::Uuid;
use flowy_error::FlowyError;
use flowy_folder_deps::cloud::{Folder, Workspace};
use flowy_user_deps::cloud::*;
use flowy_user_deps::entities::*;
@ -197,7 +198,7 @@ where
fn get_user_profile(
&self,
credential: UserCredentials,
) -> FutureResult<Option<UserProfile>, Error> {
) -> FutureResult<Option<UserProfile>, FlowyError> {
let try_get_postgrest = self.server.try_get_postgrest();
let uid = credential
.uid

View File

@ -92,7 +92,7 @@ pub trait UserCloudService: Send + Sync + 'static {
fn get_user_profile(
&self,
credential: UserCredentials,
) -> FutureResult<Option<UserProfile>, Error>;
) -> FutureResult<Option<UserProfile>, FlowyError>;
/// Return the all the workspaces of the user
fn get_user_workspaces(&self, uid: i64) -> FutureResult<Vec<UserWorkspace>, Error>;
@ -146,3 +146,10 @@ pub fn uuid_from_map(map: &HashMap<String, String>) -> Result<Uuid, Error> {
let uuid = Uuid::from_str(uuid)?;
Ok(uuid)
}
pub type UserTokenStateReceiver = tokio::sync::broadcast::Receiver<UserTokenState>;
#[derive(Debug, Clone)]
pub enum UserTokenState {
Refresh,
Invalid,
}

View File

@ -248,7 +248,7 @@ pub enum AuthStatePB {
AuthStateUnknown = 0,
AuthStateSignIn = 1,
AuthStateSignOut = 2,
AuthStateForceSignOut = 3,
InvalidAuth = 3,
}
impl Default for AuthStatePB {

View File

@ -146,7 +146,12 @@ impl UserManager {
}
}
},
UserTokenState::Invalid => {},
UserTokenState::Invalid => {
send_auth_state_notification(AuthStateChangedPB {
state: AuthStatePB::InvalidAuth,
})
.send();
},
}
}
});
@ -629,7 +634,6 @@ impl UserManager {
if session.user_id == user_update.uid {
debug!("Receive user update: {:?}", user_update);
let user_profile = self.get_user_profile(user_update.uid).await?;
if !is_user_encryption_sign_valid(&user_profile, &user_update.encryption_sign) {
return Ok(());
}
@ -681,7 +685,7 @@ fn is_user_encryption_sign_valid(user_profile: &UserProfile, encryption_sign: &s
let is_valid = user_profile.encryption_type.sign() == encryption_sign;
if !is_valid {
send_auth_state_notification(AuthStateChangedPB {
state: AuthStatePB::AuthStateForceSignOut,
state: AuthStatePB::InvalidAuth,
})
.send();
}