feat: ai billing (#5741)

* feat: start on AI plan+billing UI

* chore: enable plan and billing

* feat: cache workspace subscription + minor fixes (#5705)

* feat: update api from billing

* feat: add api for workspace subscription info (#5717)

* feat: refactor and start integrating AI plans

* feat: refine UI and add business logic for AI

* feat: complete UIUX for AI and limits

* chore: remove resolved todo

* chore: localize remove addon dialog

* chore: fix spacing issue for usage

* fix: interpret subscription + usage on action

* chore: update api for billing (#5735)

* chore: update revisions

* fix: remove subscription cache

* fix: copy improvements + use consistent dialog

* chore: update to the latest client api

* feat: support updating billing period

* Feat/ai billing cancel reason (#5752)

* chore: add cancellation reason field

* fix: ci add one retry for concurrent sign up

* chore: merge with main

* chore: half merge

* chore: fix conflict

* chore: observer error

* chore: remove unneeded protobuf and remove unwrap

* feat: added subscription plan details

* chore: check error code and update sidebar toast

* chore: periodically check billing state

* chore: editor ai error

* chore: return file upload error

* chore: fmt

* chore: clippy

* chore: disable upload image when exceed storage limitation

* chore: remove todo

* chore: remove openai i18n

* chore: update log

* chore: update client-api to fix stream error

* chore: clippy

* chore: fix language file

* chore: disable billing UI

---------

Co-authored-by: Zack Fu Zi Xiang <speed2exe@live.com.sg>
Co-authored-by: nathan <nathan@appflowy.io>
This commit is contained in:
Mathias Mogensen
2024-07-22 09:43:48 +02:00
committed by GitHub
parent 864768b3ba
commit 620e027c3e
121 changed files with 4141 additions and 1422 deletions

View File

@ -19,8 +19,18 @@ mime_guess = "2.0.4"
fxhash = "0.2.1"
anyhow = "1.0.86"
chrono = "0.4.33"
flowy-notification = { workspace = true }
flowy-derive.workspace = true
protobuf = { workspace = true }
[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
uuid = "1.6.1"
rand = { version = "0.8", features = ["std_rng"] }
[features]
dart = ["flowy-codegen/dart", "flowy-notification/dart"]
tauri_ts = ["flowy-codegen/ts", "flowy-notification/tauri_ts"]
[build-dependencies]
flowy-codegen.workspace = true

View File

@ -0,0 +1,2 @@
# Check out the FlowyConfig (located in flowy_toml.rs) for more details.
proto_input = ["src/notification.rs"]

View File

@ -0,0 +1,23 @@
fn main() {
#[cfg(feature = "dart")]
{
flowy_codegen::protobuf_file::dart_gen(env!("CARGO_PKG_NAME"));
flowy_codegen::dart_event::gen(env!("CARGO_PKG_NAME"));
}
#[cfg(feature = "tauri_ts")]
{
flowy_codegen::ts_event::gen(env!("CARGO_PKG_NAME"), flowy_codegen::Project::Tauri);
flowy_codegen::protobuf_file::ts_gen(
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_NAME"),
flowy_codegen::Project::Tauri,
);
flowy_codegen::ts_event::gen(env!("CARGO_PKG_NAME"), flowy_codegen::Project::TauriApp);
flowy_codegen::protobuf_file::ts_gen(
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_NAME"),
flowy_codegen::Project::TauriApp,
);
}
}

View File

@ -1,4 +1,6 @@
mod file_cache;
pub mod manager;
mod notification;
mod protobuf;
pub mod sqlite_sql;
mod uploader;

View File

@ -1,4 +1,5 @@
use crate::file_cache::FileTempStorage;
use crate::notification::{make_notification, StorageNotification};
use crate::sqlite_sql::{
batch_select_upload_file, delete_upload_file, insert_upload_file, insert_upload_part,
select_upload_file, select_upload_parts, update_upload_file_upload_id, UploadFilePartTable,
@ -6,7 +7,7 @@ use crate::sqlite_sql::{
};
use crate::uploader::{FileUploader, FileUploaderRunner, Signal, UploadTask, UploadTaskQueue};
use async_trait::async_trait;
use flowy_error::{FlowyError, FlowyResult};
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_sqlite::DBConnection;
use flowy_storage_pub::chunked_byte::{ChunkedBytes, MIN_CHUNK_SIZE};
use flowy_storage_pub::cloud::{ObjectIdentity, ObjectValue, StorageCloudService};
@ -19,6 +20,7 @@ use lib_infra::future::FutureResult;
use lib_infra::util::timestamp;
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@ -35,7 +37,7 @@ pub trait StorageUserService: Send + Sync + 'static {
pub struct StorageManager {
pub storage_service: Arc<dyn StorageService>,
uploader: Arc<FileUploader>,
broadcast: tokio::sync::broadcast::Sender<UploadResult>,
upload_status_notifier: tokio::sync::broadcast::Sender<UploadResult>,
}
impl Drop for StorageManager {
@ -49,23 +51,29 @@ impl StorageManager {
cloud_service: Arc<dyn StorageCloudService>,
user_service: Arc<dyn StorageUserService>,
) -> Self {
let is_exceed_storage_limit = Arc::new(AtomicBool::new(false));
let temp_storage_path = PathBuf::from(format!(
"{}/cache_files",
user_service.get_application_root_dir()
));
let temp_storage = Arc::new(FileTempStorage::new(temp_storage_path));
let (notifier, notifier_rx) = watch::channel(Signal::Proceed);
let (broadcast, _) = tokio::sync::broadcast::channel::<UploadResult>(100);
let (upload_status_notifier, _) = tokio::sync::broadcast::channel::<UploadResult>(100);
let task_queue = Arc::new(UploadTaskQueue::new(notifier));
let storage_service = Arc::new(StorageServiceImpl {
cloud_service,
user_service: user_service.clone(),
temp_storage,
task_queue: task_queue.clone(),
upload_status_notifier: broadcast.clone(),
upload_status_notifier: upload_status_notifier.clone(),
is_exceed_storage_limit: is_exceed_storage_limit.clone(),
});
let uploader = Arc::new(FileUploader::new(storage_service.clone(), task_queue));
let uploader = Arc::new(FileUploader::new(
storage_service.clone(),
task_queue,
is_exceed_storage_limit,
));
tokio::spawn(FileUploaderRunner::run(
Arc::downgrade(&uploader),
notifier_rx,
@ -85,7 +93,7 @@ impl StorageManager {
Self {
storage_service,
uploader,
broadcast,
upload_status_notifier,
}
}
@ -97,8 +105,18 @@ impl StorageManager {
}
}
pub fn disable_storage_write_access(&self) {
// when storage is purchased, resume the uploader
self.uploader.disable_storage_write();
}
pub fn enable_storage_write_access(&self) {
// when storage is purchased, resume the uploader
self.uploader.enable_storage_write();
}
pub fn subscribe_upload_result(&self) -> tokio::sync::broadcast::Receiver<UploadResult> {
self.broadcast.subscribe()
self.upload_status_notifier.subscribe()
}
}
@ -130,6 +148,7 @@ pub struct StorageServiceImpl {
temp_storage: Arc<FileTempStorage>,
task_queue: Arc<UploadTaskQueue>,
upload_status_notifier: tokio::sync::broadcast::Sender<UploadResult>,
is_exceed_storage_limit: Arc<AtomicBool>,
}
#[async_trait]
@ -239,8 +258,18 @@ impl StorageService for StorageServiceImpl {
let task_queue = self.task_queue.clone();
let user_service = self.user_service.clone();
let cloud_service = self.cloud_service.clone();
let is_exceed_storage_limit = self.is_exceed_storage_limit.clone();
FutureResult::new(async move {
let is_exceed_limit = is_exceed_storage_limit.load(std::sync::atomic::Ordering::Relaxed);
if is_exceed_limit {
make_notification(StorageNotification::FileStorageLimitExceeded)
.payload(FlowyError::file_storage_limit())
.send();
return Err(FlowyError::file_storage_limit());
}
let local_file_path = temp_storage
.create_temp_file_from_existing(Path::new(&file_path))
.await
@ -256,25 +285,34 @@ impl StorageService for StorageServiceImpl {
// 2. save the record to sqlite
let conn = user_service.sqlite_connection(user_service.user_id()?)?;
insert_upload_file(conn, &record)?;
// 3. generate url for given file
let url = cloud_service.get_object_url_v1(
&record.workspace_id,
&record.parent_dir,
&record.file_id,
)?;
let file_id = record.file_id.clone();
match insert_upload_file(conn, &record) {
Ok(_) => {
// 3. generate url for given file
task_queue
.queue_task(UploadTask::Task {
chunks,
record,
retry_count: 0,
})
.await;
task_queue
.queue_task(UploadTask::Task {
chunks,
record,
retry_count: 0,
})
.await;
Ok::<_, FlowyError>(CreatedUpload { url, file_id })
Ok::<_, FlowyError>(CreatedUpload { url, file_id })
},
Err(err) => {
if matches!(err.code, ErrorCode::DuplicateSqliteRecord) {
info!("upload record already exists, skip creating new upload task");
Ok::<_, FlowyError>(CreatedUpload { url, file_id })
} else {
Err(err)
}
},
}
})
}
@ -392,14 +430,23 @@ async fn start_upload(
upload_file.file_id
);
let create_upload_resp = cloud_service
let create_upload_resp_result = cloud_service
.create_upload(
&upload_file.workspace_id,
&upload_file.parent_dir,
&upload_file.file_id,
&upload_file.content_type,
)
.await?;
.await;
if let Err(err) = create_upload_resp_result.as_ref() {
if err.is_file_limit_exceeded() {
make_notification(StorageNotification::FileStorageLimitExceeded)
.payload(err.clone())
.send();
}
}
let create_upload_resp = create_upload_resp_result?;
// 2. update upload_id
let conn = user_service.sqlite_connection(user_service.user_id()?)?;
update_upload_file_upload_id(
@ -468,6 +515,12 @@ async fn start_upload(
});
},
Err(err) => {
if err.is_file_limit_exceeded() {
make_notification(StorageNotification::FileStorageLimitExceeded)
.payload(err.clone())
.send();
}
error!("[File] {} upload part failed: {}", upload_file.file_id, err);
return Err(err);
},
@ -475,7 +528,7 @@ async fn start_upload(
}
// mark it as completed
complete_upload(
let complete_upload_result = complete_upload(
cloud_service,
user_service,
temp_storage,
@ -483,7 +536,14 @@ async fn start_upload(
completed_parts,
notifier,
)
.await?;
.await;
if let Err(err) = complete_upload_result {
if err.is_file_limit_exceeded() {
make_notification(StorageNotification::FileStorageLimitExceeded)
.payload(err.clone())
.send();
}
}
trace!("[File] {} upload completed", upload_file.file_id);
Ok(())

View File

@ -0,0 +1,21 @@
use flowy_derive::ProtoBuf_Enum;
use flowy_notification::NotificationBuilder;
const OBSERVABLE_SOURCE: &str = "storage";
#[derive(ProtoBuf_Enum, Debug, Default)]
pub(crate) enum StorageNotification {
#[default]
FileStorageLimitExceeded = 0,
}
impl std::convert::From<StorageNotification> for i32 {
fn from(notification: StorageNotification) -> Self {
notification as i32
}
}
#[tracing::instrument(level = "trace")]
pub(crate) fn make_notification(ty: StorageNotification) -> NotificationBuilder {
NotificationBuilder::new("appflowy_file_storage_notification", ty, OBSERVABLE_SOURCE)
}

View File

@ -1,4 +1,6 @@
use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::result::DatabaseErrorKind;
use flowy_sqlite::result::Error::DatabaseError;
use flowy_sqlite::schema::{upload_file_part, upload_file_table};
use flowy_sqlite::{
diesel, AsChangeset, BoolExpressionMethods, DBConnection, ExpressionMethods, Identifiable,
@ -52,10 +54,17 @@ pub fn insert_upload_file(
mut conn: DBConnection,
upload_file: &UploadFileTable,
) -> FlowyResult<()> {
diesel::insert_into(upload_file_table::table)
match diesel::insert_into(upload_file_table::table)
.values(upload_file)
.execute(&mut *conn)?;
Ok(())
.execute(&mut *conn)
{
Ok(_) => Ok(()),
Err(DatabaseError(DatabaseErrorKind::UniqueViolation, _)) => Err(FlowyError::new(
flowy_error::ErrorCode::DuplicateSqliteRecord,
"Upload file already exists",
)),
Err(e) => Err(e.into()),
}
}
pub fn update_upload_file_upload_id(

View File

@ -10,7 +10,7 @@ use std::sync::atomic::{AtomicBool, AtomicU8};
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::sync::{watch, RwLock};
use tracing::{info, trace};
use tracing::{error, info, trace};
#[derive(Clone)]
pub enum Signal {
@ -44,6 +44,7 @@ pub struct FileUploader {
max_uploads: u8,
current_uploads: AtomicU8,
pause_sync: AtomicBool,
has_exceeded_limit: Arc<AtomicBool>,
}
impl Drop for FileUploader {
@ -53,13 +54,18 @@ impl Drop for FileUploader {
}
impl FileUploader {
pub fn new(storage_service: Arc<dyn StorageService>, queue: Arc<UploadTaskQueue>) -> Self {
pub fn new(
storage_service: Arc<dyn StorageService>,
queue: Arc<UploadTaskQueue>,
is_exceed_limit: Arc<AtomicBool>,
) -> Self {
Self {
storage_service,
queue,
max_uploads: 3,
current_uploads: Default::default(),
pause_sync: Default::default(),
has_exceeded_limit: is_exceed_limit,
}
}
@ -77,7 +83,25 @@ impl FileUploader {
.store(true, std::sync::atomic::Ordering::SeqCst);
}
pub fn disable_storage_write(&self) {
self
.has_exceeded_limit
.store(true, std::sync::atomic::Ordering::SeqCst);
self.pause();
}
pub fn enable_storage_write(&self) {
self
.has_exceeded_limit
.store(false, std::sync::atomic::Ordering::SeqCst);
self.resume();
}
pub fn resume(&self) {
if self.pause_sync.load(std::sync::atomic::Ordering::Relaxed) {
return;
}
self
.pause_sync
.store(false, std::sync::atomic::Ordering::SeqCst);
@ -108,6 +132,14 @@ impl FileUploader {
return None;
}
if self
.has_exceeded_limit
.load(std::sync::atomic::Ordering::SeqCst)
{
// If the storage limitation is enabled, do not proceed.
return None;
}
let task = self.queue.tasks.write().await.pop()?;
if task.retry_count() > 5 {
// If the task has been retried more than 5 times, we should not retry it anymore.
@ -128,6 +160,11 @@ impl FileUploader {
} => {
let record = BoxAny::new(record);
if let Err(err) = self.storage_service.start_upload(&chunks, &record).await {
if err.is_file_limit_exceeded() {
error!("Failed to upload file: {}", err);
self.disable_storage_write();
}
info!(
"Failed to upload file: {}, retry_count:{}",
err, retry_count
@ -154,6 +191,11 @@ impl FileUploader {
.resume_upload(&workspace_id, &parent_dir, &file_id)
.await
{
if err.is_file_limit_exceeded() {
error!("Failed to upload file: {}", err);
self.disable_storage_write();
}
info!(
"Failed to resume upload file: {}, retry_count:{}",
err, retry_count