feat: cloud workspace api (#4469)

* feat: workspace api

* feat: added cloud apis for add and delete workspace

* feat: add and delete workspace event handlers

* chore: rust fmt

* chore: save user workspace

* test: add test

* test: add test

* chore: add to gitignore

* feat: update api add name to workspace

* chore: cargo clippy and rename to create

* chore: add envrc and direnv to gitignore

* chore: change name to create workspace instead of add workspace

* chore: update client api rev

* feat: add create workspace impl

* chore: restore gitignore to original

* test: fix create workspace event test

* fix: change delete workspace input

* fix: compile

* fix: create workspace test

* feat: add error code for request payload too large

* chore: remove cargo backup files

* feat: add is async option for upload file handler

* chore: update client api version

---------

Co-authored-by: nathan <nathan@appflowy.io>
This commit is contained in:
Zack 2024-02-04 05:49:45 +08:00 committed by GitHub
parent 250f29f325
commit 08938b8c70
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
46 changed files with 457 additions and 128 deletions

View File

@ -1995,6 +1995,7 @@ dependencies = [
"collab",
"collab-document",
"collab-entity",
"collab-folder",
"collab-plugins",
"flowy-database-pub",
"flowy-document-pub",

View File

@ -1987,6 +1987,7 @@ dependencies = [
"collab",
"collab-document",
"collab-entity",
"collab-folder",
"collab-plugins",
"dotenv",
"flowy-database-pub",

View File

@ -18,6 +18,13 @@ use crate::event_builder::EventBuilder;
use crate::EventIntegrationTest;
impl EventIntegrationTest {
pub async fn create_document(&self, name: &str) -> ViewPB {
let current_workspace = self.get_current_workspace().await;
self
.create_and_open_document(&current_workspace.id, name.to_string(), vec![])
.await
}
pub async fn create_and_open_document(
&self,
parent_id: &str,

View File

@ -114,7 +114,7 @@ impl EventIntegrationTest {
let uid = self.get_user_profile().await?.id;
let doc_state = server
.folder_service()
.get_collab_doc_state_f(&workspace_id, uid, collay_type, oid)
.get_folder_doc_state(&workspace_id, uid, collay_type, oid)
.await?;
Ok(doc_state)

View File

@ -16,9 +16,9 @@ use flowy_server::af_cloud::define::{USER_DEVICE_ID, USER_EMAIL, USER_SIGN_IN_UR
use flowy_server_pub::af_cloud_config::AFCloudConfiguration;
use flowy_server_pub::AuthenticatorType;
use flowy_user::entities::{
AuthenticatorPB, CloudSettingPB, ImportAppFlowyDataPB, OauthSignInPB, SignInUrlPB,
SignInUrlPayloadPB, SignUpPayloadPB, UpdateCloudConfigPB, UpdateUserProfilePayloadPB,
UserProfilePB,
AuthenticatorPB, CloudSettingPB, CreateWorkspacePB, ImportAppFlowyDataPB, OauthSignInPB,
RepeatedUserWorkspacePB, SignInUrlPB, SignInUrlPayloadPB, SignUpPayloadPB, UpdateCloudConfigPB,
UpdateUserProfilePayloadPB, UserProfilePB, UserWorkspaceIdPB, UserWorkspacePB,
};
use flowy_user::errors::{FlowyError, FlowyResult};
use flowy_user::event_map::UserEvent;
@ -211,6 +211,48 @@ impl EventIntegrationTest {
None => Ok(()),
}
}
pub async fn create_workspace(&self, name: &str) -> UserWorkspacePB {
let payload = CreateWorkspacePB {
name: name.to_string(),
};
EventBuilder::new(self.clone())
.event(CreateWorkspace)
.payload(payload)
.async_send()
.await
.parse::<UserWorkspacePB>()
}
pub async fn get_all_workspaces(&self) -> RepeatedUserWorkspacePB {
EventBuilder::new(self.clone())
.event(GetAllWorkspace)
.async_send()
.await
.parse::<RepeatedUserWorkspacePB>()
}
pub async fn delete_workspace(&self, workspace_id: &str) {
let payload = UserWorkspaceIdPB {
workspace_id: workspace_id.to_string(),
};
EventBuilder::new(self.clone())
.event(DeleteWorkspace)
.payload(payload)
.async_send()
.await;
}
pub async fn open_workspace(&self, workspace_id: &str) {
let payload = UserWorkspaceIdPB {
workspace_id: workspace_id.to_string(),
};
EventBuilder::new(self.clone())
.event(OpenWorkspace)
.payload(payload)
.async_send()
.await;
}
}
#[derive(Clone)]

View File

@ -70,7 +70,7 @@ impl FlowySupabaseDatabaseTest {
let workspace_id = self.user_manager.workspace_id().unwrap();
let cloud_service = self.database_manager.get_cloud_service().clone();
cloud_service
.get_collab_doc_state_db(database_id, CollabType::Database, &workspace_id)
.get_database_object_doc_state(database_id, CollabType::Database, &workspace_id)
.await
.unwrap()
}

View File

@ -68,7 +68,7 @@ async fn update_parent_view_test() {
}
#[tokio::test]
async fn app_create_with_view() {
async fn create_sub_views_test() {
let mut test = FolderTest::new().await;
let mut app = test.parent_view.clone();
test

View File

@ -11,14 +11,14 @@ async fn create_workspace_event_test() {
name: "my second workspace".to_owned(),
desc: "".to_owned(),
};
let resp = EventBuilder::new(test)
let view_pb = EventBuilder::new(test)
.event(flowy_folder::event_map::FolderEvent::CreateWorkspace)
.payload(request)
.async_send()
.await
.error()
.unwrap();
assert_eq!(resp.code, ErrorCode::NotSupportYet);
.parse::<flowy_folder::entities::ViewPB>();
assert_eq!(view_pb.parent_view_id, "my second workspace".to_owned());
}
// #[tokio::test]

View File

@ -51,7 +51,7 @@ impl FlowySupabaseFolderTest {
pub async fn get_collab_update(&self, workspace_id: &str) -> Vec<u8> {
let cloud_service = self.folder_manager.get_cloud_service().clone();
cloud_service
.get_collab_doc_state_f(
.get_folder_doc_state(
workspace_id,
self.user_manager.user_id().unwrap(),
CollabType::Folder,

View File

@ -2,3 +2,4 @@ mod anon_user_test;
mod auth_test;
mod import_af_data_folder_test;
mod member_test;
mod workspace_test;

View File

@ -0,0 +1,52 @@
use std::time::Duration;
use event_integration::user_event::user_localhost_af_cloud;
use event_integration::EventIntegrationTest;
use flowy_user::entities::RepeatedUserWorkspacePB;
use flowy_user::protobuf::UserNotification;
use crate::util::receive_with_timeout;
#[tokio::test]
async fn af_cloud_create_workspace_test() {
user_localhost_af_cloud().await;
let test = EventIntegrationTest::new().await;
let user_profile_pb = test.af_cloud_sign_up().await;
let workspaces = test.get_all_workspaces().await.items;
assert_eq!(workspaces.len(), 1);
test.create_workspace("my second workspace").await;
let _workspaces = test.get_all_workspaces().await.items;
let a = user_profile_pb.id.to_string();
let rx = test
.notification_sender
.subscribe::<RepeatedUserWorkspacePB>(&a, UserNotification::DidUpdateUserWorkspaces as i32);
let workspaces = receive_with_timeout(rx, Duration::from_secs(30))
.await
.unwrap()
.items;
assert_eq!(workspaces.len(), 2);
assert_eq!(workspaces[1].name, "my second workspace".to_string());
}
#[tokio::test]
async fn af_cloud_open_workspace_test() {
user_localhost_af_cloud().await;
let test = EventIntegrationTest::new().await;
let _ = test.af_cloud_sign_up().await;
let workspace = test.create_workspace("my second workspace").await;
test.open_workspace(&workspace.workspace_id).await;
test.create_document("my first document").await;
test.create_document("my second document").await;
let views = test.get_all_workspace_views().await;
assert_eq!(views.len(), 3);
// the first view is the default get started view
assert_eq!(views[1].name, "my first document".to_string());
assert_eq!(views[2].name, "my second document".to_string());
}

View File

@ -1,13 +1,8 @@
use nanoid::nanoid;
use crate::user::local_test::helper::*;
use event_integration::{event_builder::EventBuilder, EventIntegrationTest};
use flowy_user::entities::{AuthenticatorPB, UpdateUserProfilePayloadPB, UserProfilePB};
use flowy_user::{errors::ErrorCode, event_map::UserEvent::*};
use crate::user::local_test::helper::*;
// use serial_test::*;
use nanoid::nanoid;
#[tokio::test]
async fn user_profile_get_failed() {
let sdk = EventIntegrationTest::new().await;

View File

@ -388,9 +388,8 @@ async fn migrate_anon_data_on_cloud_signup() {
json!("Row document")
);
}
assert!(cloud_service
.get_collab_doc_state_db(&database_id, CollabType::Database, &workspace_id)
.get_database_object_doc_state(&database_id, CollabType::Database, &workspace_id)
.await
.is_ok());
}

View File

@ -181,7 +181,7 @@ impl FolderCloudService for ServerProvider {
})
}
fn get_collab_doc_state_f(
fn get_folder_doc_state(
&self,
workspace_id: &str,
uid: i64,
@ -194,12 +194,12 @@ impl FolderCloudService for ServerProvider {
FutureResult::new(async move {
server?
.folder_service()
.get_collab_doc_state_f(&workspace_id, uid, collab_type, &object_id)
.get_folder_doc_state(&workspace_id, uid, collab_type, &object_id)
.await
})
}
fn batch_create_collab_object_f(
fn batch_create_folder_collab_objects(
&self,
workspace_id: &str,
objects: Vec<FolderCollabParams>,
@ -209,7 +209,7 @@ impl FolderCloudService for ServerProvider {
FutureResult::new(async move {
server?
.folder_service()
.batch_create_collab_object_f(&workspace_id, objects)
.batch_create_folder_collab_objects(&workspace_id, objects)
.await
})
}
@ -223,7 +223,7 @@ impl FolderCloudService for ServerProvider {
}
impl DatabaseCloudService for ServerProvider {
fn get_collab_doc_state_db(
fn get_database_object_doc_state(
&self,
object_id: &str,
collab_type: CollabType,
@ -235,12 +235,12 @@ impl DatabaseCloudService for ServerProvider {
FutureResult::new(async move {
server?
.database_service()
.get_collab_doc_state_db(&database_id, collab_type, &workspace_id)
.get_database_object_doc_state(&database_id, collab_type, &workspace_id)
.await
})
}
fn batch_get_collab_doc_state_db(
fn batch_get_database_object_doc_state(
&self,
object_ids: Vec<String>,
object_ty: CollabType,
@ -251,12 +251,12 @@ impl DatabaseCloudService for ServerProvider {
FutureResult::new(async move {
server?
.database_service()
.batch_get_collab_doc_state_db(object_ids, object_ty, &workspace_id)
.batch_get_database_object_doc_state(object_ids, object_ty, &workspace_id)
.await
})
}
fn get_collab_snapshots(
fn get_database_collab_object_snapshots(
&self,
object_id: &str,
limit: usize,
@ -266,7 +266,7 @@ impl DatabaseCloudService for ServerProvider {
FutureResult::new(async move {
server?
.database_service()
.get_collab_snapshots(&database_id, limit)
.get_database_collab_object_snapshots(&database_id, limit)
.await
})
}

View File

@ -154,7 +154,7 @@ impl UserStatusCallback for UserStatusCallbackImpl {
// for initializing a default workspace differs depending on the sign-up method used.
let data_source = match folder_manager
.cloud_service
.get_collab_doc_state_f(
.get_folder_doc_state(
&user_workspace.id,
user_profile.uid,
CollabType::Folder,

View File

@ -12,23 +12,21 @@ pub type CollabDocStateByOid = HashMap<String, CollabDocState>;
/// Each kind of server should implement this trait. Check out the [AppFlowyServerProvider] of
/// [flowy-server] crate for more information.
pub trait DatabaseCloudService: Send + Sync {
/// The suffix 'db' in the method name serves as a workaround to avoid naming conflicts with the existing method `get_collab_doc_state`.
fn get_collab_doc_state_db(
fn get_database_object_doc_state(
&self,
object_id: &str,
collab_type: CollabType,
workspace_id: &str,
) -> FutureResult<CollabDocState, Error>;
/// The suffix 'db' in the method name serves as a workaround to avoid naming conflicts with the existing method `get_collab_doc_state`.
fn batch_get_collab_doc_state_db(
fn batch_get_database_object_doc_state(
&self,
object_ids: Vec<String>,
object_ty: CollabType,
workspace_id: &str,
) -> FutureResult<CollabDocStateByOid, Error>;
fn get_collab_snapshots(
fn get_database_collab_object_snapshots(
&self,
object_id: &str,
limit: usize,

View File

@ -76,16 +76,21 @@ impl DatabaseManager {
}
}
/// When initialize with new workspace, all the resources will be cleared.
pub async fn initialize(
&self,
uid: i64,
workspace_id: String,
workspace_database_object_id: String,
) -> FlowyResult<()> {
// Clear all existing tasks
// 1. Clear all existing tasks
self.task_scheduler.write().await.clear_task();
// Release all existing editors
// 2. Release all existing editors
for (_, editor) in self.editors.lock().await.iter() {
editor.close().await;
}
self.editors.lock().await.clear();
// 3. Clear the workspace database
*self.workspace_database.write().await = None;
let collab_db = self.user.collab_db(uid)?;
@ -95,22 +100,22 @@ impl DatabaseManager {
cloud_service: self.cloud_service.clone(),
};
let config = CollabPersistenceConfig::new().snapshot_per_update(100);
let mut collab_raw_data = CollabDocState::default();
let mut workspace_database_doc_state = CollabDocState::default();
// If the workspace database not exist in disk, try to fetch from remote.
if !self.is_collab_exist(uid, &collab_db, &workspace_database_object_id) {
trace!("workspace database not exist, try to fetch from remote");
match self
.cloud_service
.get_collab_doc_state_db(
.get_database_object_doc_state(
&workspace_database_object_id,
CollabType::WorkspaceDatabase,
&workspace_id,
)
.await
{
Ok(updates) => {
collab_raw_data = updates;
Ok(remote_doc_state) => {
workspace_database_doc_state = remote_doc_state;
},
Err(err) => {
return Err(FlowyError::record_not_found().with_context(format!(
@ -132,13 +137,12 @@ impl DatabaseManager {
&workspace_database_object_id,
CollabType::WorkspaceDatabase,
collab_db.clone(),
collab_raw_data,
workspace_database_doc_state,
config.clone(),
);
let workspace_database =
WorkspaceDatabase::open(uid, collab, collab_db, config, collab_builder);
*self.workspace_database.write().await = Some(Arc::new(workspace_database));
Ok(())
}
@ -234,7 +238,7 @@ impl DatabaseManager {
if let Some(database_id) = database_id {
let mut editors = self.editors.lock().await;
if let Some(editor) = editors.get(&database_id) {
editor.close_view_editor(view_id).await;
editor.close_view(view_id).await;
}
}
@ -350,7 +354,7 @@ impl DatabaseManager {
let database_id = self.get_database_id_with_view_id(view_id).await?;
let snapshots = self
.cloud_service
.get_collab_snapshots(&database_id, limit)
.get_database_collab_object_snapshots(&database_id, limit)
.await?
.into_iter()
.map(|snapshot| DatabaseSnapshotPB {
@ -423,7 +427,7 @@ impl DatabaseCollabService for UserDatabaseCollabServiceImpl {
},
Some(cloud_service) => {
let updates = cloud_service
.get_collab_doc_state_db(&object_id, object_ty, &workspace_id)
.get_database_object_doc_state(&object_id, object_ty, &workspace_id)
.await?;
Ok(updates)
},
@ -446,7 +450,7 @@ impl DatabaseCollabService for UserDatabaseCollabServiceImpl {
},
Some(cloud_service) => {
let updates = cloud_service
.batch_get_collab_doc_state_db(object_ids, object_ty, &workspace_id)
.batch_get_database_object_doc_state(object_ids, object_ty, &workspace_id)
.await?;
Ok(updates)
},

View File

@ -115,12 +115,24 @@ impl DatabaseEditor {
/// Returns bool value indicating whether the database is empty.
///
#[tracing::instrument(level = "debug", skip_all)]
pub async fn close_view_editor(&self, view_id: &str) -> bool {
pub async fn close_view(&self, view_id: &str) -> bool {
// If the database is empty, flush the database to the disk.
if self.database_views.editors().await.len() == 1 {
if let Some(database) = self.database.try_lock() {
let _ = database.flush();
}
}
self.database_views.close_view(view_id).await
}
#[tracing::instrument(level = "debug", skip_all)]
pub async fn close(&self) {
if let Some(database) = self.database.try_lock() {
let _ = database.flush();
}
self.database_views.close_view(view_id).await
for view in self.database_views.editors().await {
view.close().await;
}
}
pub async fn get_layout_type(&self, view_id: &str) -> DatabaseLayout {

View File

@ -73,6 +73,9 @@ pub struct UploadFileParamsPB {
#[pb(index = 2)]
#[validate(custom = "required_valid_path")]
pub local_file_path: String,
#[pb(index = 3)]
pub is_async: bool,
}
#[derive(Default, ProtoBuf, Validate)]

View File

@ -411,10 +411,13 @@ pub(crate) async fn upload_file_handler(
let AFPluginData(UploadFileParamsPB {
workspace_id,
local_file_path,
is_async,
}) = params;
let manager = upgrade_document(manager)?;
let url = manager.upload_file(workspace_id, &local_file_path).await?;
let url = manager
.upload_file(workspace_id, &local_file_path, is_async)
.await?;
Ok(AFPluginData(UploadedFilePB {
url,

View File

@ -254,18 +254,25 @@ impl DocumentManager {
&self,
workspace_id: String,
local_file_path: &str,
is_async: bool,
) -> FlowyResult<String> {
let (object_identity, object_value) = object_from_disk(&workspace_id, local_file_path).await?;
let storage_service = self.storage_service_upgrade()?;
let url = storage_service.get_object_url(object_identity).await?;
// let the upload happen in the background
let clone_url = url.clone();
af_spawn(async move {
if let Err(e) = storage_service.put_object(clone_url, object_value).await {
error!("upload file failed: {}", e);
}
});
match is_async {
false => storage_service.put_object(clone_url, object_value).await?,
true => {
// let the upload happen in the background
af_spawn(async move {
if let Err(e) = storage_service.put_object(clone_url, object_value).await {
error!("upload file failed: {}", e);
}
});
},
}
Ok(url)
}

View File

@ -268,6 +268,9 @@ pub enum ErrorCode {
#[error("AppFlowy data folder import error")]
AppFlowyDataFolderImportError = 89,
#[error("Cloud request payload too large")]
CloudRequestPayloadTooLarge = 90,
}
impl ErrorCode {

View File

@ -20,6 +20,7 @@ impl From<AppResponseError> for FlowyError {
AppErrorCode::NotLoggedIn => ErrorCode::UserUnauthorized,
AppErrorCode::NotEnoughPermissions => ErrorCode::NotEnoughPermissions,
AppErrorCode::NetworkError => ErrorCode::HttpError,
AppErrorCode::PayloadTooLarge => ErrorCode::CloudRequestPayloadTooLarge,
_ => ErrorCode::Internal,
};

View File

@ -30,8 +30,7 @@ pub trait FolderCloudService: Send + Sync + 'static {
limit: usize,
) -> FutureResult<Vec<FolderSnapshot>, Error>;
/// The suffix 'f' in the method name serves as a workaround to avoid naming conflicts with the existing method `get_collab_doc_state`.
fn get_collab_doc_state_f(
fn get_folder_doc_state(
&self,
workspace_id: &str,
uid: i64,
@ -39,8 +38,7 @@ pub trait FolderCloudService: Send + Sync + 'static {
object_id: &str,
) -> FutureResult<CollabDocState, Error>;
/// The suffix 'f' in the method name serves as a workaround to avoid naming conflicts with the existing method `get_collab_doc_state`.
fn batch_create_collab_object_f(
fn batch_create_folder_collab_objects(
&self,
workspace_id: &str,
objects: Vec<FolderCollabParams>,

View File

@ -8,7 +8,7 @@ use collab_folder::{
Folder, FolderData, Section, SectionItem, TrashInfo, View, ViewLayout, ViewUpdate, Workspace,
};
use parking_lot::{Mutex, RwLock};
use tracing::{error, event, info, instrument, Level};
use tracing::{error, info, instrument};
use collab_integrate::collab_builder::{AppFlowyCollabBuilder, CollabBuilderConfig};
use collab_integrate::{CollabKVDB, CollabPersistenceConfig};
@ -156,16 +156,8 @@ impl FolderManager {
) -> FlowyResult<()> {
let folder_doc_state = self
.cloud_service
.get_collab_doc_state_f(workspace_id, user_id, CollabType::Folder, workspace_id)
.get_folder_doc_state(workspace_id, user_id, CollabType::Folder, workspace_id)
.await?;
event!(
Level::INFO,
"Get folder updates via {}, number of updates: {}",
self.cloud_service.service_name(),
folder_doc_state.len()
);
if let Err(err) = self
.initialize(
user_id,
@ -176,10 +168,7 @@ impl FolderManager {
{
// If failed to open folder with remote data, open from local disk. After open from the local
// disk. the data will be synced to the remote server.
error!(
"Failed to initialize folder with error {}, fallback to use local data",
err
);
error!("initialize folder with error {:?}, fallback local", err);
self
.initialize(
user_id,
@ -213,7 +202,7 @@ impl FolderManager {
// when the user signs up for the first time.
let result = self
.cloud_service
.get_collab_doc_state_f(workspace_id, user_id, CollabType::Folder, workspace_id)
.get_folder_doc_state(workspace_id, user_id, CollabType::Folder, workspace_id)
.await
.map_err(FlowyError::from);
@ -249,8 +238,13 @@ impl FolderManager {
pub async fn clear(&self, _user_id: i64) {}
#[tracing::instrument(level = "info", skip_all, err)]
pub async fn create_workspace(&self, _params: CreateWorkspaceParams) -> FlowyResult<Workspace> {
Err(FlowyError::not_support())
pub async fn create_workspace(&self, params: CreateWorkspaceParams) -> FlowyResult<Workspace> {
let uid = self.user.user_id()?;
let new_workspace = self
.cloud_service
.create_workspace(uid, &params.name)
.await?;
Ok(new_workspace)
}
#[tracing::instrument(level = "info", skip_all, err)]

View File

@ -29,6 +29,7 @@ collab = { version = "0.1.0" }
collab-plugins = { version = "0.1.0"}
collab-document = { version = "0.1.0" }
collab-entity = { version = "0.1.0" }
collab-folder = { version = "0.1.0" }
hex = "0.4.3"
postgrest = "1.0"
lib-infra = { workspace = true }

View File

@ -18,7 +18,7 @@ impl<T> DatabaseCloudService for AFCloudDatabaseCloudServiceImpl<T>
where
T: AFServer,
{
fn get_collab_doc_state_db(
fn get_database_object_doc_state(
&self,
object_id: &str,
collab_type: CollabType,
@ -48,7 +48,7 @@ where
})
}
fn batch_get_collab_doc_state_db(
fn batch_get_database_object_doc_state(
&self,
object_ids: Vec<String>,
object_ty: CollabType,
@ -90,7 +90,7 @@ where
})
}
fn get_collab_snapshots(
fn get_database_collab_object_snapshots(
&self,
_object_id: &str,
_limit: usize,

View File

@ -1,8 +1,11 @@
use anyhow::{anyhow, Error};
use client_api::entity::{CollabParams, QueryCollab, QueryCollabParams};
use anyhow::Error;
use client_api::entity::{
workspace_dto::CreateWorkspaceParam, CollabParams, QueryCollab, QueryCollabParams,
};
use collab::core::collab::CollabDocState;
use collab::core::origin::CollabOrigin;
use collab_entity::CollabType;
use collab_folder::RepeatedViewIdentifier;
use flowy_error::FlowyError;
use flowy_folder_pub::cloud::{
@ -19,8 +22,27 @@ impl<T> FolderCloudService for AFCloudFolderCloudServiceImpl<T>
where
T: AFServer,
{
fn create_workspace(&self, _uid: i64, _name: &str) -> FutureResult<Workspace, Error> {
FutureResult::new(async move { Err(anyhow!("Not support yet")) })
fn create_workspace(&self, _uid: i64, name: &str) -> FutureResult<Workspace, Error> {
let try_get_client = self.0.try_get_client();
let cloned_name = name.to_string();
FutureResult::new(async move {
let client = try_get_client?;
let new_workspace = client
.create_workspace(CreateWorkspaceParam {
workspace_name: Some(cloned_name),
})
.await?;
Ok(Workspace {
id: new_workspace.workspace_id.to_string(),
name: new_workspace.workspace_name,
created_at: new_workspace.created_at.timestamp(),
child_views: RepeatedViewIdentifier::new(vec![]),
created_by: Some(new_workspace.owner_uid),
last_edited_time: new_workspace.created_at.timestamp(),
last_edited_by: Some(new_workspace.owner_uid),
})
})
}
fn open_workspace(&self, workspace_id: &str) -> FutureResult<(), Error> {
@ -88,7 +110,7 @@ where
FutureResult::new(async move { Ok(vec![]) })
}
fn get_collab_doc_state_f(
fn get_folder_doc_state(
&self,
workspace_id: &str,
_uid: i64,
@ -116,7 +138,7 @@ where
})
}
fn batch_create_collab_object_f(
fn batch_create_folder_collab_objects(
&self,
workspace_id: &str,
objects: Vec<FolderCollabParams>,

View File

@ -2,7 +2,9 @@ use std::collections::HashMap;
use std::sync::Arc;
use anyhow::{anyhow, Error};
use client_api::entity::workspace_dto::{CreateWorkspaceMember, WorkspaceMemberChangeset};
use client_api::entity::workspace_dto::{
CreateWorkspaceMember, CreateWorkspaceParam, WorkspaceMemberChangeset,
};
use client_api::entity::{AFRole, AFWorkspace, AuthProvider, CollabParams, CreateCollabParams};
use client_api::{Client, ClientConfiguration};
use collab::core::collab::CollabDocState;
@ -294,6 +296,30 @@ where
Ok(())
})
}
fn create_workspace(&self, workspace_name: &str) -> FutureResult<UserWorkspace, FlowyError> {
let try_get_client = self.server.try_get_client();
let workspace_name_owned = workspace_name.to_owned();
FutureResult::new(async move {
let client = try_get_client?;
let new_workspace = client
.create_workspace(CreateWorkspaceParam {
workspace_name: Some(workspace_name_owned),
})
.await?;
Ok(to_user_workspace(new_workspace))
})
}
fn delete_workspace(&self, workspace_id: &str) -> FutureResult<(), FlowyError> {
let try_get_client = self.server.try_get_client();
let workspace_id_owned = workspace_id.to_owned();
FutureResult::new(async move {
let client = try_get_client?;
client.delete_workspace(&workspace_id_owned).await?;
Ok(())
})
}
}
async fn get_admin_client(client: &Arc<AFCloudClient>) -> FlowyResult<Client> {

View File

@ -8,7 +8,7 @@ use lib_infra::future::FutureResult;
pub(crate) struct LocalServerDatabaseCloudServiceImpl();
impl DatabaseCloudService for LocalServerDatabaseCloudServiceImpl {
fn get_collab_doc_state_db(
fn get_database_object_doc_state(
&self,
_object_id: &str,
_collab_type: CollabType,
@ -17,7 +17,7 @@ impl DatabaseCloudService for LocalServerDatabaseCloudServiceImpl {
FutureResult::new(async move { Ok(vec![]) })
}
fn batch_get_collab_doc_state_db(
fn batch_get_database_object_doc_state(
&self,
_object_ids: Vec<String>,
_object_ty: CollabType,
@ -26,7 +26,7 @@ impl DatabaseCloudService for LocalServerDatabaseCloudServiceImpl {
FutureResult::new(async move { Ok(CollabDocStateByOid::default()) })
}
fn get_collab_snapshots(
fn get_database_collab_object_snapshots(
&self,
_object_id: &str,
_limit: usize,

View File

@ -53,7 +53,7 @@ impl FolderCloudService for LocalServerFolderCloudServiceImpl {
FutureResult::new(async move { Ok(vec![]) })
}
fn get_collab_doc_state_f(
fn get_folder_doc_state(
&self,
_workspace_id: &str,
_uid: i64,
@ -67,7 +67,7 @@ impl FolderCloudService for LocalServerFolderCloudServiceImpl {
})
}
fn batch_create_collab_object_f(
fn batch_create_folder_collab_objects(
&self,
_workspace_id: &str,
_objects: Vec<FolderCollabParams>,

View File

@ -174,6 +174,24 @@ impl UserCloudService for LocalServerUserAuthServiceImpl {
) -> FutureResult<(), Error> {
FutureResult::new(async { Err(anyhow!("local server doesn't support create collab object")) })
}
fn create_workspace(&self, _workspace_name: &str) -> FutureResult<UserWorkspace, FlowyError> {
FutureResult::new(async {
Err(
FlowyError::local_version_not_support()
.with_context("local server doesn't support mulitple workspaces"),
)
})
}
fn delete_workspace(&self, _workspace_id: &str) -> FutureResult<(), FlowyError> {
FutureResult::new(async {
Err(
FlowyError::local_version_not_support()
.with_context("local server doesn't support mulitple workspaces"),
)
})
}
}
fn make_user_workspace() -> UserWorkspace {

View File

@ -1,9 +1,11 @@
use client_api::ws::ConnectState;
use client_api::ws::WSConnectStateReceiver;
use client_api::ws::WebSocketChannel;
use flowy_storage::ObjectStorageService;
use std::sync::Arc;
use anyhow::Error;
use client_api::collab_sync::collab_msg::CollabMessage;
use client_api::ws::{ConnectState, WSConnectStateReceiver, WebSocketChannel};
use parking_lot::RwLock;
use tokio_stream::wrappers::WatchStream;
#[cfg(feature = "enable_supabase")]

View File

@ -26,7 +26,7 @@ impl<T> DatabaseCloudService for SupabaseDatabaseServiceImpl<T>
where
T: SupabaseServerService,
{
fn get_collab_doc_state_db(
fn get_database_object_doc_state(
&self,
object_id: &str,
collab_type: CollabType,
@ -50,7 +50,7 @@ where
FutureResult::new(async { rx.await? })
}
fn batch_get_collab_doc_state_db(
fn batch_get_database_object_doc_state(
&self,
object_ids: Vec<String>,
object_ty: CollabType,
@ -72,7 +72,7 @@ where
FutureResult::new(async { rx.await? })
}
fn get_collab_snapshots(
fn get_database_collab_object_snapshots(
&self,
object_id: &str,
limit: usize,

View File

@ -131,7 +131,7 @@ where
})
}
fn get_collab_doc_state_f(
fn get_folder_doc_state(
&self,
_workspace_id: &str,
_uid: i64,
@ -154,7 +154,7 @@ where
FutureResult::new(async { rx.await? })
}
fn batch_create_collab_object_f(
fn batch_create_folder_collab_objects(
&self,
_workspace_id: &str,
_objects: Vec<FolderCollabParams>,

View File

@ -354,6 +354,24 @@ where
))
})
}
fn create_workspace(&self, _workspace_name: &str) -> FutureResult<UserWorkspace, FlowyError> {
FutureResult::new(async {
Err(
FlowyError::local_version_not_support()
.with_context("supabase server doesn't support mulitple workspaces"),
)
})
}
fn delete_workspace(&self, _workspace_id: &str) -> FutureResult<(), FlowyError> {
FutureResult::new(async {
Err(
FlowyError::local_version_not_support()
.with_context("supabase server doesn't support mulitple workspaces"),
)
})
}
}
pub struct CreateCollabAction {

View File

@ -45,7 +45,7 @@ async fn supabase_create_database_test() {
}
let updates_by_oid = database_service
.batch_get_collab_doc_state_db(row_ids, CollabType::DatabaseRow, "fake_workspace_id")
.batch_get_database_object_doc_state(row_ids, CollabType::DatabaseRow, "fake_workspace_id")
.await
.unwrap();

View File

@ -69,7 +69,7 @@ async fn supabase_get_folder_test() {
// let updates = collab_service.get_all_updates(&collab_object).await.unwrap();
let updates = folder_service
.get_collab_doc_state_f(
.get_folder_doc_state(
&user.latest_workspace.id,
user.user_id,
CollabType::Folder,
@ -86,7 +86,7 @@ async fn supabase_get_folder_test() {
.unwrap();
}
let updates = folder_service
.get_collab_doc_state_f(
.get_folder_doc_state(
&user.latest_workspace.id,
user.user_id,
CollabType::Folder,
@ -157,7 +157,7 @@ async fn supabase_duplicate_updates_test() {
.await
.unwrap();
let first_init_sync_update = folder_service
.get_collab_doc_state_f(
.get_folder_doc_state(
&user.latest_workspace.id,
user.user_id,
CollabType::Folder,
@ -179,7 +179,7 @@ async fn supabase_duplicate_updates_test() {
.await
.unwrap();
let second_init_sync_update = folder_service
.get_collab_doc_state_f(
.get_folder_doc_state(
&user.latest_workspace.id,
user.user_id,
CollabType::Folder,
@ -271,7 +271,7 @@ async fn supabase_diff_state_vector_test() {
let old_version_doc = Doc::new();
let map = { old_version_doc.get_or_insert_map("map") };
let doc_state = folder_service
.get_collab_doc_state_f(
.get_folder_doc_state(
&user.latest_workspace.id,
user.user_id,
CollabType::Folder,

View File

@ -176,9 +176,16 @@ pub trait UserCloudService: Send + Sync + 'static {
fn open_workspace(&self, workspace_id: &str) -> FutureResult<UserWorkspace, FlowyError>;
/// Return the all the workspaces of the user
/// Return the all the workspaces of the user
fn get_all_workspace(&self, uid: i64) -> FutureResult<Vec<UserWorkspace>, FlowyError>;
/// Creates a new workspace for the user.
/// Returns the new workspace if successful
fn create_workspace(&self, workspace_name: &str) -> FutureResult<UserWorkspace, FlowyError>;
/// Deletes a workspace owned by the user.
fn delete_workspace(&self, workspace_id: &str) -> FutureResult<(), FlowyError>;
fn add_workspace_member(
&self,
user_email: String,

View File

@ -4,7 +4,7 @@ pub use realtime::*;
pub use reminder::*;
pub use user_profile::*;
pub use user_setting::*;
pub use workspace_member::*;
pub use workspace::*;
pub mod auth;
pub mod date_time;
@ -14,4 +14,4 @@ pub mod realtime;
mod reminder;
mod user_profile;
mod user_setting;
mod workspace_member;
mod workspace;

View File

@ -109,3 +109,10 @@ pub struct UserWorkspaceIdPB {
#[validate(custom = "required_not_empty_str")]
pub workspace_id: String,
}
#[derive(ProtoBuf, Default, Clone, Validate)]
pub struct CreateWorkspacePB {
#[pb(index = 1)]
#[validate(custom = "required_not_empty_str")]
pub name: String,
}

View File

@ -466,7 +466,7 @@ pub async fn get_all_workspace_handler(
) -> DataResult<RepeatedUserWorkspacePB, FlowyError> {
let manager = upgrade_manager(manager)?;
let uid = manager.get_session()?.user_id;
let user_workspaces = manager.get_all_user_workspaces(uid)?;
let user_workspaces = manager.get_all_user_workspaces(uid).await?;
data_result_ok(user_workspaces.into())
}
@ -652,3 +652,25 @@ pub async fn update_workspace_member_handler(
.await?;
Ok(())
}
#[tracing::instrument(level = "debug", skip_all, err)]
pub async fn create_workspace_handler(
data: AFPluginData<CreateWorkspacePB>,
manager: AFPluginState<Weak<UserManager>>,
) -> DataResult<UserWorkspacePB, FlowyError> {
let data = data.try_into_inner()?;
let manager = upgrade_manager(manager)?;
let new_workspace = manager.add_workspace(&data.name).await?;
data_result_ok(new_workspace.into())
}
#[tracing::instrument(level = "debug", skip_all, err)]
pub async fn delete_workspace_handler(
delete_workspace_param: AFPluginData<UserWorkspaceIdPB>,
manager: AFPluginState<Weak<UserManager>>,
) -> Result<(), FlowyError> {
let workspace_id = delete_workspace_param.try_into_inner()?.workspace_id;
let manager = upgrade_manager(manager)?;
manager.delete_workspace(&workspace_id).await?;
Ok(())
}

View File

@ -38,7 +38,6 @@ pub fn init(user_manager: Weak<UserManager>) -> AFPlugin {
.event(UserEvent::OauthSignIn, oauth_sign_in_handler)
.event(UserEvent::GenerateSignInURL, gen_sign_in_url_handler)
.event(UserEvent::GetOauthURLWithProvider, sign_in_with_provider_handler)
.event(UserEvent::GetAllWorkspace, get_all_workspace_handler)
.event(UserEvent::OpenWorkspace, open_workspace_handler)
.event(UserEvent::UpdateNetworkState, update_network_state_handler)
.event(UserEvent::OpenAnonUser, open_anon_user_handler)
@ -59,6 +58,10 @@ pub fn init(user_manager: Weak<UserManager>) -> AFPlugin {
.event(UserEvent::RemoveWorkspaceMember, delete_workspace_member_handler)
.event(UserEvent::GetWorkspaceMember, get_workspace_member_handler)
.event(UserEvent::UpdateWorkspaceMember, update_workspace_member_handler)
// Workspace
.event(UserEvent::GetAllWorkspace, get_all_workspace_handler)
.event(UserEvent::CreateWorkspace, create_workspace_handler)
.event(UserEvent::DeleteWorkspace, delete_workspace_handler)
}
#[derive(Clone, Copy, PartialEq, Eq, Debug, Display, Hash, ProtoBuf_Enum, Flowy_Event)]
@ -191,6 +194,12 @@ pub enum UserEvent {
#[event(input = "ImportAppFlowyDataPB")]
ImportAppFlowyDataFolder = 41,
#[event(output = "CreateWorkspacePB")]
CreateWorkspace = 42,
#[event(input = "UserWorkspaceIdPB")]
DeleteWorkspace = 43,
}
pub trait UserStatusCallback: Send + Sync + 'static {

View File

@ -1,10 +1,11 @@
use std::convert::TryFrom;
use chrono::{TimeZone, Utc};
use diesel::{RunQueryDsl, SqliteConnection};
use flowy_error::FlowyError;
use flowy_sqlite::schema::user_workspace_table;
use flowy_sqlite::DBConnection;
use flowy_sqlite::{query_dsl::*, ExpressionMethods};
use flowy_user_pub::entities::UserWorkspace;
use std::convert::TryFrom;
#[derive(Clone, Default, Queryable, Identifiable, Insertable)]
#[diesel(table_name = user_workspace_table)]
@ -16,6 +17,62 @@ pub struct UserWorkspaceTable {
pub database_storage_id: String,
}
pub fn get_user_workspace_op(workspace_id: &str, mut conn: DBConnection) -> Option<UserWorkspace> {
user_workspace_table::dsl::user_workspace_table
.filter(user_workspace_table::id.eq(workspace_id))
.first::<UserWorkspaceTable>(&mut *conn)
.ok()
.map(UserWorkspace::from)
}
pub fn get_all_user_workspace_op(
user_id: i64,
mut conn: DBConnection,
) -> Result<Vec<UserWorkspace>, FlowyError> {
let rows = user_workspace_table::dsl::user_workspace_table
.filter(user_workspace_table::uid.eq(user_id))
.load::<UserWorkspaceTable>(&mut *conn)?;
Ok(rows.into_iter().map(UserWorkspace::from).collect())
}
/// Remove all existing workspaces for given user and insert the new ones.
///
#[allow(dead_code)]
pub fn save_user_workspaces_op(
uid: i64,
mut conn: DBConnection,
user_workspaces: &[UserWorkspace],
) -> Result<(), FlowyError> {
conn.immediate_transaction(|conn| {
delete_existing_workspaces(uid, conn)?;
insert_new_workspaces_op(uid, user_workspaces, conn)?;
Ok(())
})
}
#[allow(dead_code)]
fn delete_existing_workspaces(uid: i64, conn: &mut SqliteConnection) -> Result<(), FlowyError> {
diesel::delete(
user_workspace_table::dsl::user_workspace_table.filter(user_workspace_table::uid.eq(uid)),
)
.execute(conn)?;
Ok(())
}
pub fn insert_new_workspaces_op(
uid: i64,
user_workspaces: &[UserWorkspace],
conn: &mut SqliteConnection,
) -> Result<(), FlowyError> {
for user_workspace in user_workspaces {
let new_record = UserWorkspaceTable::try_from((uid, user_workspace))?;
diesel::insert_into(user_workspace_table::table)
.values(new_record)
.execute(conn)?;
}
Ok(())
}
impl TryFrom<(i64, &UserWorkspace)> for UserWorkspaceTable {
type Error = FlowyError;

View File

@ -16,7 +16,9 @@ use crate::entities::{RepeatedUserWorkspacePB, ResetWorkspacePB};
use crate::migrations::AnonUser;
use crate::notification::{send_notification, UserNotification};
use crate::services::data_import::{upload_collab_objects_data, ImportContext};
use crate::services::sqlite_sql::workspace_sql::UserWorkspaceTable;
use crate::services::sqlite_sql::workspace_sql::{
get_all_user_workspace_op, get_user_workspace_op, insert_new_workspaces_op, UserWorkspaceTable,
};
use crate::user_manager::UserManager;
use flowy_user_pub::session::Session;
@ -151,6 +153,29 @@ impl UserManager {
Ok(())
}
pub async fn add_workspace(&self, workspace_name: &str) -> FlowyResult<UserWorkspace> {
let new_workspace = self
.cloud_services
.get_user_service()?
.create_workspace(workspace_name)
.await?;
// save the workspace to sqlite db
let uid = self.user_id()?;
let mut conn = self.db_connection(uid)?;
insert_new_workspaces_op(uid, &[new_workspace.clone()], &mut conn)?;
Ok(new_workspace)
}
pub async fn delete_workspace(&self, workspace_id: &str) -> FlowyResult<()> {
self
.cloud_services
.get_user_service()?
.delete_workspace(workspace_id)
.await?;
Ok(())
}
pub async fn add_workspace_member(
&self,
user_email: String,
@ -204,19 +229,13 @@ impl UserManager {
}
pub fn get_user_workspace(&self, uid: i64, workspace_id: &str) -> Option<UserWorkspace> {
let mut conn = self.db_connection(uid).ok()?;
let row = user_workspace_table::dsl::user_workspace_table
.filter(user_workspace_table::id.eq(workspace_id))
.first::<UserWorkspaceTable>(&mut *conn)
.ok()?;
Some(UserWorkspace::from(row))
let conn = self.db_connection(uid).ok()?;
get_user_workspace_op(workspace_id, conn)
}
pub fn get_all_user_workspaces(&self, uid: i64) -> FlowyResult<Vec<UserWorkspace>> {
let mut conn = self.db_connection(uid)?;
let rows = user_workspace_table::dsl::user_workspace_table
.filter(user_workspace_table::uid.eq(uid))
.load::<UserWorkspaceTable>(&mut *conn)?;
pub async fn get_all_user_workspaces(&self, uid: i64) -> FlowyResult<Vec<UserWorkspace>> {
let conn = self.db_connection(uid)?;
let workspaces = get_all_user_workspace_op(uid, conn)?;
if let Ok(service) = self.cloud_services.get_user_service() {
if let Ok(pool) = self.db_pool(uid) {
@ -233,7 +252,7 @@ impl UserManager {
});
}
}
Ok(rows.into_iter().map(UserWorkspace::from).collect())
Ok(workspaces)
}
/// Reset the remote workspace using local workspace data. This is useful when a user wishes to

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
# Ensure a new revision ID is provided
if [ "$#" -ne 1 ]; then