fix: open local folder when fail to init with remote server data (#4158)

This commit is contained in:
Nathan.fooo 2023-12-18 03:14:05 +08:00 committed by GitHub
parent 781fbf1b30
commit 5ef9d55dca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 443 additions and 357 deletions

View File

@ -139,7 +139,7 @@ checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6"
[[package]]
name = "app-error"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=9589054f3874c60063878f084af01905b182d537#9589054f3874c60063878f084af01905b182d537"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=69ed6ff4e80ab1476d73f7b587ae61e3e24f8894#69ed6ff4e80ab1476d73f7b587ae61e3e24f8894"
dependencies = [
"anyhow",
"reqwest",
@ -786,7 +786,7 @@ dependencies = [
[[package]]
name = "client-api"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=9589054f3874c60063878f084af01905b182d537#9589054f3874c60063878f084af01905b182d537"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=69ed6ff4e80ab1476d73f7b587ae61e3e24f8894#69ed6ff4e80ab1476d73f7b587ae61e3e24f8894"
dependencies = [
"anyhow",
"app-error",
@ -1471,7 +1471,7 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308"
[[package]]
name = "database-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=9589054f3874c60063878f084af01905b182d537#9589054f3874c60063878f084af01905b182d537"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=69ed6ff4e80ab1476d73f7b587ae61e3e24f8894#69ed6ff4e80ab1476d73f7b587ae61e3e24f8894"
dependencies = [
"anyhow",
"app-error",
@ -2830,7 +2830,7 @@ dependencies = [
[[package]]
name = "gotrue"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=9589054f3874c60063878f084af01905b182d537#9589054f3874c60063878f084af01905b182d537"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=69ed6ff4e80ab1476d73f7b587ae61e3e24f8894#69ed6ff4e80ab1476d73f7b587ae61e3e24f8894"
dependencies = [
"anyhow",
"futures-util",
@ -2846,7 +2846,7 @@ dependencies = [
[[package]]
name = "gotrue-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=9589054f3874c60063878f084af01905b182d537#9589054f3874c60063878f084af01905b182d537"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=69ed6ff4e80ab1476d73f7b587ae61e3e24f8894#69ed6ff4e80ab1476d73f7b587ae61e3e24f8894"
dependencies = [
"anyhow",
"app-error",
@ -3268,7 +3268,7 @@ dependencies = [
[[package]]
name = "infra"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=9589054f3874c60063878f084af01905b182d537#9589054f3874c60063878f084af01905b182d537"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=69ed6ff4e80ab1476d73f7b587ae61e3e24f8894#69ed6ff4e80ab1476d73f7b587ae61e3e24f8894"
dependencies = [
"anyhow",
"reqwest",
@ -5014,7 +5014,7 @@ dependencies = [
[[package]]
name = "realtime-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=9589054f3874c60063878f084af01905b182d537#9589054f3874c60063878f084af01905b182d537"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=69ed6ff4e80ab1476d73f7b587ae61e3e24f8894#69ed6ff4e80ab1476d73f7b587ae61e3e24f8894"
dependencies = [
"anyhow",
"bincode",
@ -5036,7 +5036,7 @@ dependencies = [
[[package]]
name = "realtime-protocol"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=9589054f3874c60063878f084af01905b182d537#9589054f3874c60063878f084af01905b182d537"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=69ed6ff4e80ab1476d73f7b587ae61e3e24f8894#69ed6ff4e80ab1476d73f7b587ae61e3e24f8894"
dependencies = [
"anyhow",
"bincode",
@ -5783,7 +5783,7 @@ dependencies = [
[[package]]
name = "shared_entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=9589054f3874c60063878f084af01905b182d537#9589054f3874c60063878f084af01905b182d537"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=69ed6ff4e80ab1476d73f7b587ae61e3e24f8894#69ed6ff4e80ab1476d73f7b587ae61e3e24f8894"
dependencies = [
"anyhow",
"app-error",
@ -7673,7 +7673,7 @@ dependencies = [
[[package]]
name = "workspace-template"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=9589054f3874c60063878f084af01905b182d537#9589054f3874c60063878f084af01905b182d537"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=69ed6ff4e80ab1476d73f7b587ae61e3e24f8894#69ed6ff4e80ab1476d73f7b587ae61e3e24f8894"
dependencies = [
"anyhow",
"async-trait",

View File

@ -57,7 +57,7 @@ custom-protocol = ["tauri/custom-protocol"]
# Run the script:
# scripts/tool/update_client_api_rev.sh new_rev_id
# ⚠️⚠️⚠️️
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "9589054f3874c60063878f084af01905b182d537" }
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "69ed6ff4e80ab1476d73f7b587ae61e3e24f8894" }
# Please use the following script to update collab.
# Working directory: frontend
#

View File

@ -125,7 +125,7 @@ checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6"
[[package]]
name = "app-error"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=9589054f3874c60063878f084af01905b182d537#9589054f3874c60063878f084af01905b182d537"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=69ed6ff4e80ab1476d73f7b587ae61e3e24f8894#69ed6ff4e80ab1476d73f7b587ae61e3e24f8894"
dependencies = [
"anyhow",
"reqwest",
@ -667,7 +667,7 @@ dependencies = [
[[package]]
name = "client-api"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=9589054f3874c60063878f084af01905b182d537#9589054f3874c60063878f084af01905b182d537"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=69ed6ff4e80ab1476d73f7b587ae61e3e24f8894#69ed6ff4e80ab1476d73f7b587ae61e3e24f8894"
dependencies = [
"anyhow",
"app-error",
@ -1277,7 +1277,7 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308"
[[package]]
name = "database-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=9589054f3874c60063878f084af01905b182d537#9589054f3874c60063878f084af01905b182d537"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=69ed6ff4e80ab1476d73f7b587ae61e3e24f8894#69ed6ff4e80ab1476d73f7b587ae61e3e24f8894"
dependencies = [
"anyhow",
"app-error",
@ -2471,7 +2471,7 @@ dependencies = [
[[package]]
name = "gotrue"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=9589054f3874c60063878f084af01905b182d537#9589054f3874c60063878f084af01905b182d537"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=69ed6ff4e80ab1476d73f7b587ae61e3e24f8894#69ed6ff4e80ab1476d73f7b587ae61e3e24f8894"
dependencies = [
"anyhow",
"futures-util",
@ -2487,7 +2487,7 @@ dependencies = [
[[package]]
name = "gotrue-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=9589054f3874c60063878f084af01905b182d537#9589054f3874c60063878f084af01905b182d537"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=69ed6ff4e80ab1476d73f7b587ae61e3e24f8894#69ed6ff4e80ab1476d73f7b587ae61e3e24f8894"
dependencies = [
"anyhow",
"app-error",
@ -2848,7 +2848,7 @@ dependencies = [
[[package]]
name = "infra"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=9589054f3874c60063878f084af01905b182d537#9589054f3874c60063878f084af01905b182d537"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=69ed6ff4e80ab1476d73f7b587ae61e3e24f8894#69ed6ff4e80ab1476d73f7b587ae61e3e24f8894"
dependencies = [
"anyhow",
"reqwest",
@ -4303,7 +4303,7 @@ dependencies = [
[[package]]
name = "realtime-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=9589054f3874c60063878f084af01905b182d537#9589054f3874c60063878f084af01905b182d537"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=69ed6ff4e80ab1476d73f7b587ae61e3e24f8894#69ed6ff4e80ab1476d73f7b587ae61e3e24f8894"
dependencies = [
"anyhow",
"bincode",
@ -4325,7 +4325,7 @@ dependencies = [
[[package]]
name = "realtime-protocol"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=9589054f3874c60063878f084af01905b182d537#9589054f3874c60063878f084af01905b182d537"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=69ed6ff4e80ab1476d73f7b587ae61e3e24f8894#69ed6ff4e80ab1476d73f7b587ae61e3e24f8894"
dependencies = [
"anyhow",
"bincode",
@ -4985,7 +4985,7 @@ dependencies = [
[[package]]
name = "shared_entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=9589054f3874c60063878f084af01905b182d537#9589054f3874c60063878f084af01905b182d537"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=69ed6ff4e80ab1476d73f7b587ae61e3e24f8894#69ed6ff4e80ab1476d73f7b587ae61e3e24f8894"
dependencies = [
"anyhow",
"app-error",
@ -6323,7 +6323,7 @@ dependencies = [
[[package]]
name = "workspace-template"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=9589054f3874c60063878f084af01905b182d537#9589054f3874c60063878f084af01905b182d537"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=69ed6ff4e80ab1476d73f7b587ae61e3e24f8894#69ed6ff4e80ab1476d73f7b587ae61e3e24f8894"
dependencies = [
"anyhow",
"async-trait",

View File

@ -99,7 +99,7 @@ incremental = false
# Run the script:
# scripts/tool/update_client_api_rev.sh new_rev_id
# ⚠️⚠️⚠️️
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "9589054f3874c60063878f084af01905b182d537" }
client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "69ed6ff4e80ab1476d73f7b587ae61e3e24f8894" }
# Please use the following script to update collab.
# Working directory: frontend
#

View File

@ -9,6 +9,9 @@ pub mod protobuf;
mod user_default;
pub mod view_operation;
mod manager_init;
mod manager_observer;
pub mod share;
#[cfg(feature = "test_helper")]
mod test_helper;
mod util;

View File

@ -1,38 +1,34 @@
use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::sync::{Arc, Weak};
use collab::core::collab::{CollabRawData, MutexCollab};
use collab::core::collab_state::SyncState;
use collab_entity::CollabType;
use collab_folder::{
Folder, FolderData, FolderNotify, Section, SectionItem, TrashChange, TrashChangeReceiver,
TrashInfo, UserId, View, ViewChange, ViewChangeReceiver, ViewLayout, ViewUpdate, Workspace,
Folder, FolderData, Section, SectionItem, TrashInfo, View, ViewLayout, ViewUpdate, Workspace,
};
use parking_lot::{Mutex, RwLock};
use tokio_stream::wrappers::WatchStream;
use tokio_stream::StreamExt;
use tracing::{event, info, instrument, Level};
use collab_integrate::collab_builder::AppFlowyCollabBuilder;
use collab_integrate::{CollabPersistenceConfig, RocksCollabDB, YrsDocAction};
use collab_integrate::{CollabPersistenceConfig, RocksCollabDB};
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_folder_deps::cloud::{gen_view_id, FolderCloudService};
use lib_dispatch::prelude::af_spawn;
use crate::entities::icon::UpdateViewIconParams;
use crate::entities::{
view_pb_with_child_views, view_pb_without_child_views, ChildViewUpdatePB, CreateViewParams,
CreateWorkspaceParams, DeletedViewPB, FolderSnapshotPB, FolderSnapshotStatePB, FolderSyncStatePB,
RepeatedTrashPB, RepeatedViewIdPB, RepeatedViewPB, UpdateViewParams, UserFolderPB, ViewPB,
WorkspacePB, WorkspaceSettingPB,
view_pb_with_child_views, view_pb_without_child_views, CreateViewParams, CreateWorkspaceParams,
DeletedViewPB, FolderSnapshotPB, RepeatedTrashPB, RepeatedViewIdPB, RepeatedViewPB,
UpdateViewParams, ViewPB, WorkspacePB, WorkspaceSettingPB,
};
use crate::manager_observer::{
notify_child_views_changed, notify_parent_view_did_change, ChildViewChangeReason,
};
use crate::notification::{
send_notification, send_workspace_setting_notification, FolderNotification,
};
use crate::share::ImportParams;
use crate::user_default::DefaultFolderBuilder;
use crate::util::{folder_not_init_error, workspace_data_not_sync_error};
use crate::view_operation::{create_view, FolderOperationHandler, FolderOperationHandlers};
/// [FolderUser] represents the user for folder.
@ -43,11 +39,11 @@ pub trait FolderUser: Send + Sync {
}
pub struct FolderManager {
workspace_id: RwLock<Option<String>>,
mutex_folder: Arc<MutexFolder>,
pub(crate) workspace_id: RwLock<Option<String>>,
pub(crate) mutex_folder: Arc<MutexFolder>,
collab_builder: Arc<AppFlowyCollabBuilder>,
user: Arc<dyn FolderUser>,
operation_handlers: FolderOperationHandlers,
pub(crate) user: Arc<dyn FolderUser>,
pub(crate) operation_handlers: FolderOperationHandlers,
pub cloud_service: Arc<dyn FolderCloudService>,
}
@ -125,105 +121,7 @@ impl FolderManager {
Ok(views)
}
/// Called immediately after the application launched if the user already sign in/sign up.
#[tracing::instrument(level = "info", skip(self, initial_data), err)]
pub async fn initialize(
&self,
uid: i64,
workspace_id: &str,
initial_data: FolderInitDataSource,
) -> FlowyResult<()> {
// Update the workspace id
event!(
Level::INFO,
"Init workspace: {} from: {}",
workspace_id,
initial_data
);
*self.workspace_id.write() = Some(workspace_id.to_string());
let workspace_id = workspace_id.to_string();
// Get the collab db for the user with given user id.
let collab_db = self.user.collab_db(uid)?;
let (view_tx, view_rx) = tokio::sync::broadcast::channel(100);
let (trash_tx, trash_rx) = tokio::sync::broadcast::channel(100);
let folder_notifier = FolderNotify {
view_change_tx: view_tx,
trash_change_tx: trash_tx,
};
let folder = match initial_data {
FolderInitDataSource::LocalDisk {
create_if_not_exist,
} => {
let is_exist = is_exist_in_local_disk(&self.user, &workspace_id).unwrap_or(false);
if is_exist {
event!(Level::INFO, "Init folder from local disk");
let collab = self
.collab_for_folder(uid, &workspace_id, collab_db, vec![])
.await?;
Folder::open(UserId::from(uid), collab, Some(folder_notifier))?
} else if create_if_not_exist {
// Currently, this branch is only used when the server type is supabase. For appflowy cloud,
// the default workspace is already created when the user sign up.
event!(Level::INFO, "Create folder with default folder builder");
let folder_data =
DefaultFolderBuilder::build(uid, workspace_id.to_string(), &self.operation_handlers)
.await;
let collab = self
.collab_for_folder(uid, &workspace_id, collab_db, vec![])
.await?;
Folder::create(
UserId::from(uid),
collab,
Some(folder_notifier),
folder_data,
)
} else {
return Err(FlowyError::new(
ErrorCode::RecordNotFound,
"Can't find any workspace data",
));
}
},
FolderInitDataSource::Cloud(raw_data) => {
event!(Level::INFO, "Restore folder from cloud service");
if raw_data.is_empty() {
return Err(workspace_data_not_sync_error(uid, &workspace_id));
}
let collab = self
.collab_for_folder(uid, &workspace_id, collab_db, raw_data)
.await?;
Folder::open(UserId::from(uid), collab, Some(folder_notifier))?
},
FolderInitDataSource::FolderData(folder_data) => {
event!(Level::INFO, "Restore folder with passed-in folder data");
let collab = self
.collab_for_folder(uid, &workspace_id, collab_db, vec![])
.await?;
Folder::create(
UserId::from(uid),
collab,
Some(folder_notifier),
folder_data,
)
},
};
let folder_state_rx = folder.subscribe_sync_state();
*self.mutex_folder.lock() = Some(folder);
let weak_mutex_folder = Arc::downgrade(&self.mutex_folder);
subscribe_folder_sync_state_changed(workspace_id.clone(), folder_state_rx, &weak_mutex_folder);
subscribe_folder_snapshot_state_changed(workspace_id, &weak_mutex_folder);
subscribe_folder_trash_changed(trash_rx, &weak_mutex_folder);
subscribe_folder_view_changed(view_rx, &weak_mutex_folder);
Ok(())
}
async fn collab_for_folder(
pub(crate) async fn collab_for_folder(
&self,
uid: i64,
workspace_id: &str,
@ -1095,124 +993,8 @@ impl FolderManager {
}
}
/// Listen on the [ViewChange] after create/delete/update events happened
fn subscribe_folder_view_changed(
mut rx: ViewChangeReceiver,
weak_mutex_folder: &Weak<MutexFolder>,
) {
let weak_mutex_folder = weak_mutex_folder.clone();
af_spawn(async move {
while let Ok(value) = rx.recv().await {
if let Some(folder) = weak_mutex_folder.upgrade() {
tracing::trace!("Did receive view change: {:?}", value);
match value {
ViewChange::DidCreateView { view } => {
notify_child_views_changed(
view_pb_without_child_views(Arc::new(view.clone())),
ChildViewChangeReason::DidCreateView,
);
notify_parent_view_did_change(folder.clone(), vec![view.parent_view_id]);
},
ViewChange::DidDeleteView { views } => {
for view in views {
notify_child_views_changed(
view_pb_without_child_views(view),
ChildViewChangeReason::DidDeleteView,
);
}
},
ViewChange::DidUpdate { view } => {
notify_child_views_changed(
view_pb_without_child_views(Arc::new(view.clone())),
ChildViewChangeReason::DidUpdateView,
);
notify_parent_view_did_change(folder.clone(), vec![view.parent_view_id]);
},
};
}
}
});
}
fn subscribe_folder_snapshot_state_changed(
workspace_id: String,
weak_mutex_folder: &Weak<MutexFolder>,
) {
let weak_mutex_folder = weak_mutex_folder.clone();
af_spawn(async move {
if let Some(mutex_folder) = weak_mutex_folder.upgrade() {
let stream = mutex_folder
.lock()
.as_ref()
.map(|folder| folder.subscribe_snapshot_state());
if let Some(mut state_stream) = stream {
while let Some(snapshot_state) = state_stream.next().await {
if let Some(new_snapshot_id) = snapshot_state.snapshot_id() {
tracing::debug!("Did create folder remote snapshot: {}", new_snapshot_id);
send_notification(
&workspace_id,
FolderNotification::DidUpdateFolderSnapshotState,
)
.payload(FolderSnapshotStatePB { new_snapshot_id })
.send();
}
}
}
}
});
}
fn subscribe_folder_sync_state_changed(
workspace_id: String,
mut folder_sync_state_rx: WatchStream<SyncState>,
_weak_mutex_folder: &Weak<MutexFolder>,
) {
af_spawn(async move {
while let Some(state) = folder_sync_state_rx.next().await {
send_notification(&workspace_id, FolderNotification::DidUpdateFolderSyncUpdate)
.payload(FolderSyncStatePB::from(state))
.send();
}
});
}
/// Listen on the [TrashChange]s and notify the frontend some views were changed.
fn subscribe_folder_trash_changed(
mut rx: TrashChangeReceiver,
weak_mutex_folder: &Weak<MutexFolder>,
) {
let weak_mutex_folder = weak_mutex_folder.clone();
af_spawn(async move {
while let Ok(value) = rx.recv().await {
if let Some(folder) = weak_mutex_folder.upgrade() {
let mut unique_ids = HashSet::new();
tracing::trace!("Did receive trash change: {:?}", value);
let ids = match value {
TrashChange::DidCreateTrash { ids } => ids,
TrashChange::DidDeleteTrash { ids } => ids,
};
if let Some(folder) = folder.lock().as_ref() {
let views = folder.views.get_views(&ids);
for view in views {
unique_ids.insert(view.parent_view_id.clone());
}
let repeated_trash: RepeatedTrashPB = folder.get_all_trash().into();
send_notification("trash", FolderNotification::DidUpdateTrash)
.payload(repeated_trash)
.send();
}
let parent_view_ids = unique_ids.into_iter().collect();
notify_parent_view_did_change(folder.clone(), parent_view_ids);
}
}
});
}
/// Return the views that belong to the workspace. The views are filtered by the trash.
fn get_workspace_view_pbs(_workspace_id: &str, folder: &Folder) -> Vec<ViewPB> {
pub(crate) fn get_workspace_view_pbs(_workspace_id: &str, folder: &Folder) -> Vec<ViewPB> {
let trash_ids = folder
.get_all_trash()
.into_iter()
@ -1236,91 +1018,6 @@ fn get_workspace_view_pbs(_workspace_id: &str, folder: &Folder) -> Vec<ViewPB> {
.collect()
}
fn notify_did_update_workspace(workspace_id: &str, folder: &Folder) {
let repeated_view: RepeatedViewPB = get_workspace_view_pbs(workspace_id, folder).into();
tracing::trace!("Did update workspace views: {:?}", repeated_view);
send_notification(workspace_id, FolderNotification::DidUpdateWorkspaceViews)
.payload(repeated_view)
.send();
}
/// Notify the the list of parent view ids that its child views were changed.
#[tracing::instrument(level = "debug", skip(folder, parent_view_ids))]
fn notify_parent_view_did_change<T: AsRef<str>>(
folder: Arc<MutexFolder>,
parent_view_ids: Vec<T>,
) -> Option<()> {
let folder = folder.lock();
let folder = folder.as_ref()?;
let workspace_id = folder.get_workspace_id();
let trash_ids = folder
.get_all_trash()
.into_iter()
.map(|trash| trash.id)
.collect::<Vec<String>>();
for parent_view_id in parent_view_ids {
let parent_view_id = parent_view_id.as_ref();
// if the view's parent id equal to workspace id. Then it will fetch the current
// workspace views. Because the the workspace is not a view stored in the views map.
if parent_view_id == workspace_id {
notify_did_update_workspace(&workspace_id, folder)
} else {
// Parent view can contain a list of child views. Currently, only get the first level
// child views.
let parent_view = folder.views.get_view(parent_view_id)?;
let mut child_views = folder.views.get_views_belong_to(parent_view_id);
child_views.retain(|view| !trash_ids.contains(&view.id));
event!(Level::DEBUG, child_views_count = child_views.len());
// Post the notification
let parent_view_pb = view_pb_with_child_views(parent_view, child_views);
send_notification(parent_view_id, FolderNotification::DidUpdateView)
.payload(parent_view_pb)
.send();
}
}
None
}
pub enum ChildViewChangeReason {
DidCreateView,
DidDeleteView,
DidUpdateView,
}
/// Notify the the list of parent view ids that its child views were changed.
#[tracing::instrument(level = "debug", skip_all)]
fn notify_child_views_changed(view_pb: ViewPB, reason: ChildViewChangeReason) {
let parent_view_id = view_pb.parent_view_id.clone();
let mut payload = ChildViewUpdatePB {
parent_view_id: view_pb.parent_view_id.clone(),
..Default::default()
};
match reason {
ChildViewChangeReason::DidCreateView => {
payload.create_child_views.push(view_pb);
},
ChildViewChangeReason::DidDeleteView => {
payload.delete_child_views.push(view_pb.id);
},
ChildViewChangeReason::DidUpdateView => {
payload.update_child_views.push(view_pb);
},
}
send_notification(&parent_view_id, FolderNotification::DidUpdateChildViews)
.payload(payload)
.send();
}
fn folder_not_init_error() -> FlowyError {
FlowyError::internal().with_context("Folder not initialized")
}
#[derive(Clone, Default)]
pub struct MutexFolder(Arc<Mutex<Option<Folder>>>);
impl Deref for MutexFolder {
@ -1351,20 +1048,3 @@ impl Display for FolderInitDataSource {
}
}
}
fn is_exist_in_local_disk(user: &Arc<dyn FolderUser>, doc_id: &str) -> FlowyResult<bool> {
let uid = user.user_id()?;
if let Some(collab_db) = user.collab_db(uid)?.upgrade() {
let read_txn = collab_db.read_txn();
Ok(read_txn.is_exist(uid, doc_id))
} else {
Ok(false)
}
}
fn workspace_data_not_sync_error(uid: i64, workspace_id: &str) -> FlowyError {
FlowyError::from(ErrorCode::WorkspaceDataNotSync).with_payload(UserFolderPB {
uid,
workspace_id: workspace_id.to_string(),
})
}

View File

@ -0,0 +1,158 @@
use std::sync::{Arc, Weak};
use collab_folder::{Folder, FolderNotify, UserId};
use tracing::{event, Level};
use collab_integrate::RocksCollabDB;
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use crate::manager::{FolderInitDataSource, FolderManager};
use crate::manager_observer::{
subscribe_folder_snapshot_state_changed, subscribe_folder_sync_state_changed,
subscribe_folder_trash_changed, subscribe_folder_view_changed,
};
use crate::user_default::DefaultFolderBuilder;
use crate::util::is_exist_in_local_disk;
impl FolderManager {
/// Called immediately after the application launched if the user already sign in/sign up.
#[tracing::instrument(level = "info", skip(self, initial_data), err)]
pub async fn initialize(
&self,
uid: i64,
workspace_id: &str,
initial_data: FolderInitDataSource,
) -> FlowyResult<()> {
// Update the workspace id
event!(
Level::INFO,
"Init workspace: {} from: {}",
workspace_id,
initial_data
);
*self.workspace_id.write() = Some(workspace_id.to_string());
let workspace_id = workspace_id.to_string();
// Get the collab db for the user with given user id.
let collab_db = self.user.collab_db(uid)?;
let (view_tx, view_rx) = tokio::sync::broadcast::channel(100);
let (trash_tx, trash_rx) = tokio::sync::broadcast::channel(100);
let folder_notifier = FolderNotify {
view_change_tx: view_tx,
trash_change_tx: trash_tx,
};
let folder = match initial_data {
FolderInitDataSource::LocalDisk {
create_if_not_exist,
} => {
let is_exist = is_exist_in_local_disk(&self.user, &workspace_id).unwrap_or(false);
if is_exist {
self
.open_local_folder(uid, &workspace_id, collab_db, folder_notifier)
.await?
} else if create_if_not_exist {
// Currently, this branch is only used when the server type is supabase. For appflowy cloud,
// the default workspace is already created when the user sign up.
self
.create_default_folder(uid, &workspace_id, collab_db, folder_notifier)
.await?
} else {
return Err(FlowyError::new(
ErrorCode::RecordNotFound,
"Can't find any workspace data",
));
}
},
FolderInitDataSource::Cloud(raw_data) => {
if raw_data.is_empty() {
event!(Level::INFO, "remote folder data is empty, open from local");
self
.open_local_folder(uid, &workspace_id, collab_db, folder_notifier)
.await?
} else {
event!(Level::INFO, "Restore folder with remote data");
let result = self
.collab_for_folder(uid, &workspace_id, collab_db.clone(), raw_data)
.await;
// If failed to open folder with remote data, open from local disk. After open from the local
// disk. the data will be synced to the remote server.
match result {
Ok(collab) => Folder::open(UserId::from(uid), collab, Some(folder_notifier.clone()))?,
Err(err) => {
event!(
Level::ERROR,
"Open folder with remote data failed: {:?}, open from local disk",
err
);
self
.open_local_folder(uid, &workspace_id, collab_db, folder_notifier)
.await?
},
}
}
},
FolderInitDataSource::FolderData(folder_data) => {
event!(Level::INFO, "Restore folder with passed-in folder data");
let collab = self
.collab_for_folder(uid, &workspace_id, collab_db, vec![])
.await?;
Folder::create(
UserId::from(uid),
collab,
Some(folder_notifier),
folder_data,
)
},
};
let folder_state_rx = folder.subscribe_sync_state();
*self.mutex_folder.lock() = Some(folder);
let weak_mutex_folder = Arc::downgrade(&self.mutex_folder);
subscribe_folder_sync_state_changed(workspace_id.clone(), folder_state_rx, &weak_mutex_folder);
subscribe_folder_snapshot_state_changed(workspace_id, &weak_mutex_folder);
subscribe_folder_trash_changed(trash_rx, &weak_mutex_folder);
subscribe_folder_view_changed(view_rx, &weak_mutex_folder);
Ok(())
}
async fn create_default_folder(
&self,
uid: i64,
workspace_id: &String,
collab_db: Weak<RocksCollabDB>,
folder_notifier: FolderNotify,
) -> Result<Folder, FlowyError> {
event!(Level::INFO, "Create folder with default folder builder");
let folder_data =
DefaultFolderBuilder::build(uid, workspace_id.to_string(), &self.operation_handlers).await;
let collab = self
.collab_for_folder(uid, &workspace_id, collab_db, vec![])
.await?;
Ok(Folder::create(
UserId::from(uid),
collab,
Some(folder_notifier),
folder_data,
))
}
async fn open_local_folder(
&self,
uid: i64,
workspace_id: &String,
collab_db: Weak<RocksCollabDB>,
folder_notifier: FolderNotify,
) -> Result<Folder, FlowyError> {
event!(Level::INFO, "Init folder from local disk");
let collab = self
.collab_for_folder(uid, &workspace_id, collab_db, vec![])
.await?;
let folder = Folder::open(UserId::from(uid), collab, Some(folder_notifier))?;
Ok(folder)
}
}

View File

@ -0,0 +1,214 @@
use std::collections::HashSet;
use std::sync::{Arc, Weak};
use collab::core::collab_state::SyncState;
use collab_folder::{Folder, TrashChange, TrashChangeReceiver, ViewChange, ViewChangeReceiver};
use tokio_stream::wrappers::WatchStream;
use tokio_stream::StreamExt;
use tracing::{event, Level};
use lib_dispatch::prelude::af_spawn;
use crate::entities::{
view_pb_with_child_views, view_pb_without_child_views, ChildViewUpdatePB, FolderSnapshotStatePB,
FolderSyncStatePB, RepeatedTrashPB, RepeatedViewPB, ViewPB,
};
use crate::manager::{get_workspace_view_pbs, MutexFolder};
use crate::notification::{send_notification, FolderNotification};
/// Listen on the [ViewChange] after create/delete/update events happened
pub(crate) fn subscribe_folder_view_changed(
mut rx: ViewChangeReceiver,
weak_mutex_folder: &Weak<MutexFolder>,
) {
let weak_mutex_folder = weak_mutex_folder.clone();
af_spawn(async move {
while let Ok(value) = rx.recv().await {
if let Some(folder) = weak_mutex_folder.upgrade() {
tracing::trace!("Did receive view change: {:?}", value);
match value {
ViewChange::DidCreateView { view } => {
notify_child_views_changed(
view_pb_without_child_views(Arc::new(view.clone())),
ChildViewChangeReason::DidCreateView,
);
notify_parent_view_did_change(folder.clone(), vec![view.parent_view_id]);
},
ViewChange::DidDeleteView { views } => {
for view in views {
notify_child_views_changed(
view_pb_without_child_views(view),
ChildViewChangeReason::DidDeleteView,
);
}
},
ViewChange::DidUpdate { view } => {
notify_child_views_changed(
view_pb_without_child_views(Arc::new(view.clone())),
ChildViewChangeReason::DidUpdateView,
);
notify_parent_view_did_change(folder.clone(), vec![view.parent_view_id]);
},
};
}
}
});
}
pub(crate) fn subscribe_folder_snapshot_state_changed(
workspace_id: String,
weak_mutex_folder: &Weak<MutexFolder>,
) {
let weak_mutex_folder = weak_mutex_folder.clone();
af_spawn(async move {
if let Some(mutex_folder) = weak_mutex_folder.upgrade() {
let stream = mutex_folder
.lock()
.as_ref()
.map(|folder| folder.subscribe_snapshot_state());
if let Some(mut state_stream) = stream {
while let Some(snapshot_state) = state_stream.next().await {
if let Some(new_snapshot_id) = snapshot_state.snapshot_id() {
tracing::debug!("Did create folder remote snapshot: {}", new_snapshot_id);
send_notification(
&workspace_id,
FolderNotification::DidUpdateFolderSnapshotState,
)
.payload(FolderSnapshotStatePB { new_snapshot_id })
.send();
}
}
}
}
});
}
pub(crate) fn subscribe_folder_sync_state_changed(
workspace_id: String,
mut folder_sync_state_rx: WatchStream<SyncState>,
_weak_mutex_folder: &Weak<MutexFolder>,
) {
af_spawn(async move {
while let Some(state) = folder_sync_state_rx.next().await {
send_notification(&workspace_id, FolderNotification::DidUpdateFolderSyncUpdate)
.payload(FolderSyncStatePB::from(state))
.send();
}
});
}
/// Listen on the [TrashChange]s and notify the frontend some views were changed.
pub(crate) fn subscribe_folder_trash_changed(
mut rx: TrashChangeReceiver,
weak_mutex_folder: &Weak<MutexFolder>,
) {
let weak_mutex_folder = weak_mutex_folder.clone();
af_spawn(async move {
while let Ok(value) = rx.recv().await {
if let Some(folder) = weak_mutex_folder.upgrade() {
let mut unique_ids = HashSet::new();
tracing::trace!("Did receive trash change: {:?}", value);
let ids = match value {
TrashChange::DidCreateTrash { ids } => ids,
TrashChange::DidDeleteTrash { ids } => ids,
};
if let Some(folder) = folder.lock().as_ref() {
let views = folder.views.get_views(&ids);
for view in views {
unique_ids.insert(view.parent_view_id.clone());
}
let repeated_trash: RepeatedTrashPB = folder.get_all_trash().into();
send_notification("trash", FolderNotification::DidUpdateTrash)
.payload(repeated_trash)
.send();
}
let parent_view_ids = unique_ids.into_iter().collect();
notify_parent_view_did_change(folder.clone(), parent_view_ids);
}
}
});
}
/// Notify the the list of parent view ids that its child views were changed.
#[tracing::instrument(level = "debug", skip(folder, parent_view_ids))]
pub(crate) fn notify_parent_view_did_change<T: AsRef<str>>(
folder: Arc<MutexFolder>,
parent_view_ids: Vec<T>,
) -> Option<()> {
let folder = folder.lock();
let folder = folder.as_ref()?;
let workspace_id = folder.get_workspace_id();
let trash_ids = folder
.get_all_trash()
.into_iter()
.map(|trash| trash.id)
.collect::<Vec<String>>();
for parent_view_id in parent_view_ids {
let parent_view_id = parent_view_id.as_ref();
// if the view's parent id equal to workspace id. Then it will fetch the current
// workspace views. Because the the workspace is not a view stored in the views map.
if parent_view_id == workspace_id {
notify_did_update_workspace(&workspace_id, folder)
} else {
// Parent view can contain a list of child views. Currently, only get the first level
// child views.
let parent_view = folder.views.get_view(parent_view_id)?;
let mut child_views = folder.views.get_views_belong_to(parent_view_id);
child_views.retain(|view| !trash_ids.contains(&view.id));
event!(Level::DEBUG, child_views_count = child_views.len());
// Post the notification
let parent_view_pb = view_pb_with_child_views(parent_view, child_views);
send_notification(parent_view_id, FolderNotification::DidUpdateView)
.payload(parent_view_pb)
.send();
}
}
None
}
pub(crate) fn notify_did_update_workspace(workspace_id: &str, folder: &Folder) {
let repeated_view: RepeatedViewPB = get_workspace_view_pbs(workspace_id, folder).into();
tracing::trace!("Did update workspace views: {:?}", repeated_view);
send_notification(workspace_id, FolderNotification::DidUpdateWorkspaceViews)
.payload(repeated_view)
.send();
}
pub enum ChildViewChangeReason {
DidCreateView,
DidDeleteView,
DidUpdateView,
}
/// Notify the the list of parent view ids that its child views were changed.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn notify_child_views_changed(view_pb: ViewPB, reason: ChildViewChangeReason) {
let parent_view_id = view_pb.parent_view_id.clone();
let mut payload = ChildViewUpdatePB {
parent_view_id: view_pb.parent_view_id.clone(),
..Default::default()
};
match reason {
ChildViewChangeReason::DidCreateView => {
payload.create_child_views.push(view_pb);
},
ChildViewChangeReason::DidDeleteView => {
payload.delete_child_views.push(view_pb.id);
},
ChildViewChangeReason::DidUpdateView => {
payload.update_child_views.push(view_pb);
},
}
send_notification(&parent_view_id, FolderNotification::DidUpdateChildViews)
.payload(payload)
.send();
}

View File

@ -0,0 +1,31 @@
use std::sync::Arc;
use collab_integrate::YrsDocAction;
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use crate::entities::UserFolderPB;
use crate::manager::FolderUser;
pub(crate) fn folder_not_init_error() -> FlowyError {
FlowyError::internal().with_context("Folder not initialized")
}
pub(crate) fn is_exist_in_local_disk(
user: &Arc<dyn FolderUser>,
doc_id: &str,
) -> FlowyResult<bool> {
let uid = user.user_id()?;
if let Some(collab_db) = user.collab_db(uid)?.upgrade() {
let read_txn = collab_db.read_txn();
Ok(read_txn.is_exist(uid, doc_id))
} else {
Ok(false)
}
}
pub(crate) fn workspace_data_not_sync_error(uid: i64, workspace_id: &str) -> FlowyError {
FlowyError::from(ErrorCode::WorkspaceDataNotSync).with_payload(UserFolderPB {
uid,
workspace_id: workspace_id.to_string(),
})
}