mirror of
https://github.com/AppFlowy-IO/AppFlowy.git
synced 2024-08-30 18:12:39 +00:00
fix: import document from database row (#4295)
* fix: import document from database row * chore: update test * chore: fix test * chore: fix test * chore: fix test * chore: fix local user on appflowy cloud error * chore: clippy * chore: bump pubspec version
This commit is contained in:
parent
de08c01c4c
commit
eac878d563
2
frontend/.vscode/launch.json
vendored
2
frontend/.vscode/launch.json
vendored
@ -12,7 +12,7 @@
|
||||
"program": "./lib/main.dart",
|
||||
"type": "dart",
|
||||
"env": {
|
||||
"RUST_LOG": "debug",
|
||||
"RUST_LOG": "trace",
|
||||
"RUST_BACKTRACE": "1"
|
||||
},
|
||||
// uncomment the following line to testing performance.
|
||||
|
@ -15,7 +15,7 @@ publish_to: "none" # Remove this line if you wish to publish to pub.dev
|
||||
# In iOS, build-name is used as CFBundleShortVersionString while build-number used as CFBundleVersion.
|
||||
# Read more about iOS versioning at
|
||||
# https://developer.apple.com/library/archive/documentation/General/Reference/InfoPlistKeyReference/Articles/CoreFoundationKeys.html
|
||||
version: 0.4.0
|
||||
version: 0.4.1
|
||||
|
||||
environment:
|
||||
flutter: ">=3.18.0-0.2.pre"
|
||||
|
@ -53,10 +53,8 @@ impl EventIntegrationTest {
|
||||
let path = path_buf.to_str().unwrap().to_string();
|
||||
let device_id = uuid::Uuid::new_v4().to_string();
|
||||
|
||||
let level = "trace";
|
||||
std::env::set_var("RUST_LOG", level);
|
||||
let config = AppFlowyCoreConfig::new(path.clone(), path, device_id, name).log_filter(
|
||||
level,
|
||||
"trace",
|
||||
vec![
|
||||
"flowy_test".to_string(),
|
||||
"tokio".to_string(),
|
||||
@ -79,27 +77,16 @@ impl EventIntegrationTest {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_appflowy_cloud_server(&self) -> Arc<dyn AppFlowyServer> {
|
||||
self
|
||||
.appflowy_core
|
||||
.server_provider
|
||||
.get_appflowy_cloud_server()
|
||||
.unwrap()
|
||||
pub fn get_server(&self) -> Arc<dyn AppFlowyServer> {
|
||||
self.appflowy_core.server_provider.get_server().unwrap()
|
||||
}
|
||||
|
||||
pub async fn wait_ws_connected(&self) {
|
||||
if self
|
||||
.get_appflowy_cloud_server()
|
||||
.get_ws_state()
|
||||
.is_connected()
|
||||
{
|
||||
if self.get_server().get_ws_state().is_connected() {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut ws_state = self
|
||||
.get_appflowy_cloud_server()
|
||||
.subscribe_ws_state()
|
||||
.unwrap();
|
||||
let mut ws_state = self.get_server().subscribe_ws_state().unwrap();
|
||||
loop {
|
||||
select! {
|
||||
_ = sleep(Duration::from_secs(20)) => {
|
||||
@ -121,7 +108,7 @@ impl EventIntegrationTest {
|
||||
oid: &str,
|
||||
collay_type: CollabType,
|
||||
) -> Result<CollabDocState, FlowyError> {
|
||||
let server = self.server_provider.get_appflowy_cloud_server().unwrap();
|
||||
let server = self.server_provider.get_server().unwrap();
|
||||
let workspace_id = self.get_current_workspace().await.id;
|
||||
let uid = self.get_user_profile().await?.id;
|
||||
let doc_state = server
|
||||
|
Binary file not shown.
@ -1,5 +1,6 @@
|
||||
use crate::util::unzip_history_user_db;
|
||||
use assert_json_diff::assert_json_include;
|
||||
use collab_database::rows::database_row_document_id_from_row_id;
|
||||
use collab_entity::CollabType;
|
||||
use event_integration::user_event::user_localhost_af_cloud;
|
||||
use event_integration::{document_data_from_document_doc_state, EventIntegrationTest};
|
||||
@ -45,6 +46,9 @@ async fn import_appflowy_data_folder_into_new_view_test() {
|
||||
assert_eq!(views.len(), 2);
|
||||
assert_eq!(views[1].name, import_container_name);
|
||||
|
||||
// the 040_local should be an empty document, so try to get the document data
|
||||
let _ = test.get_document_data(&views[1].id).await;
|
||||
|
||||
let local_child_views = test.get_view(&views[1].id).await.child_views;
|
||||
assert_eq!(local_child_views.len(), 1);
|
||||
assert_eq!(local_child_views[0].name, "Document1");
|
||||
@ -61,6 +65,14 @@ async fn import_appflowy_data_folder_into_new_view_test() {
|
||||
assert_eq!(document2_child_views[0].name, "Grid1");
|
||||
assert_eq!(document2_child_views[1].name, "Grid2");
|
||||
|
||||
let rows = test.get_database(&document2_child_views[1].id).await.rows;
|
||||
assert_eq!(rows.len(), 3);
|
||||
|
||||
// In the 040_local, only the first row has a document with content
|
||||
let row_document_id = database_row_document_id_from_row_id(&rows[0].id);
|
||||
let row_document_data = test.get_document_data(&row_document_id).await;
|
||||
assert_json_include!(actual: json!(row_document_data), expected: expected_row_doc_json());
|
||||
|
||||
drop(cleaner);
|
||||
}
|
||||
|
||||
@ -370,3 +382,46 @@ fn expected_doc_2_json() -> Value {
|
||||
"page_id": "ZVogdaK9yO"
|
||||
})
|
||||
}
|
||||
|
||||
fn expected_row_doc_json() -> Value {
|
||||
json!( {
|
||||
"blocks": {
|
||||
"eSBQHZ28e0": {
|
||||
"children": "RbLAaE9UDJ",
|
||||
"data": {},
|
||||
"external_id": null,
|
||||
"external_type": null,
|
||||
"id": "eSBQHZ28e0",
|
||||
"parent": "",
|
||||
"ty": "page"
|
||||
},
|
||||
"eUIL6qjgj3": {
|
||||
"children": "fUnGRcvPEA",
|
||||
"data": {
|
||||
"delta": [
|
||||
{
|
||||
"insert": "document in database row"
|
||||
}
|
||||
]
|
||||
},
|
||||
"external_id": "-DliEUjHr2",
|
||||
"external_type": "text",
|
||||
"id": "eUIL6qjgj3",
|
||||
"parent": "eSBQHZ28e0",
|
||||
"ty": "paragraph"
|
||||
}
|
||||
},
|
||||
"meta": {
|
||||
"children_map": {
|
||||
"RbLAaE9UDJ": [
|
||||
"eUIL6qjgj3"
|
||||
],
|
||||
"fUnGRcvPEA": []
|
||||
},
|
||||
"text_map": {
|
||||
"-DliEUjHr2": "[{\"insert\":\"document in database row\"}]"
|
||||
}
|
||||
},
|
||||
"page_id": "eSBQHZ28e0"
|
||||
})
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
use event_integration::EventIntegrationTest;
|
||||
use flowy_core::DEFAULT_NAME;
|
||||
use flowy_folder::entities::ViewLayoutPB;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::util::unzip_history_user_db;
|
||||
|
||||
@ -140,10 +141,7 @@ async fn collab_db_backup_test() {
|
||||
assert_eq!(backups.len(), 1);
|
||||
assert_eq!(
|
||||
backups[0],
|
||||
format!(
|
||||
"collab_db_{}",
|
||||
chrono::Local::now().format("%Y%m%d").to_string()
|
||||
)
|
||||
format!("collab_db_{}", chrono::Local::now().format("%Y%m%d"))
|
||||
);
|
||||
drop(cleaner);
|
||||
}
|
||||
@ -157,7 +155,15 @@ async fn delete_outdated_collab_db_backup_test() {
|
||||
EventIntegrationTest::new_with_user_data_path(user_db_path, DEFAULT_NAME.to_string()).await;
|
||||
|
||||
let uid = test.get_user_profile().await.unwrap().id;
|
||||
// saving the backup is a background task, so we need to wait for it to finish
|
||||
// 2 seconds should be enough for the background task to finish
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
let backups = test.user_manager.get_collab_backup_list(uid);
|
||||
|
||||
if backups.len() != 10 {
|
||||
dbg!("backups: {:?}", backups.clone());
|
||||
}
|
||||
|
||||
assert_eq!(backups.len(), 10);
|
||||
assert_eq!(backups[0], "collab_db_0.4.0_20231202");
|
||||
assert_eq!(backups[1], "collab_db_0.4.0_20231203");
|
||||
@ -170,10 +176,7 @@ async fn delete_outdated_collab_db_backup_test() {
|
||||
assert_eq!(backups[8], "collab_db_0.4.0_20231210");
|
||||
assert_eq!(
|
||||
backups[9],
|
||||
format!(
|
||||
"collab_db_{}",
|
||||
chrono::Local::now().format("%Y%m%d").to_string()
|
||||
)
|
||||
format!("collab_db_{}", chrono::Local::now().format("%Y%m%d"))
|
||||
);
|
||||
drop(cleaner);
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ use flowy_user::entities::{AuthenticatorPB, UpdateUserProfilePayloadPB};
|
||||
use flowy_user::errors::FlowyError;
|
||||
|
||||
use flowy_user::event_map::UserEvent::*;
|
||||
use flowy_user_deps::cloud::{UserCloudService, UserCloudServiceProvider};
|
||||
use flowy_user_deps::cloud::UserCloudService;
|
||||
use flowy_user_deps::entities::Authenticator;
|
||||
|
||||
pub fn get_supabase_config() -> Option<SupabaseConfiguration> {
|
||||
|
@ -19,7 +19,7 @@ pub(crate) fn create_log_filter(level: String, with_crates: Vec<String>) -> Stri
|
||||
.map(|crate_name| format!("{}={}", crate_name, level))
|
||||
.collect::<Vec<String>>();
|
||||
filters.push(format!("flowy_core={}", level));
|
||||
filters.push(format!("flowy_folder2={}", level));
|
||||
filters.push(format!("flowy_folder={}", level));
|
||||
filters.push(format!("collab_sync={}", level));
|
||||
filters.push(format!("collab_folder={}", level));
|
||||
filters.push(format!("collab_persistence={}", level));
|
||||
@ -28,7 +28,7 @@ pub(crate) fn create_log_filter(level: String, with_crates: Vec<String>) -> Stri
|
||||
filters.push(format!("collab_integrate={}", level));
|
||||
filters.push(format!("collab={}", level));
|
||||
filters.push(format!("flowy_user={}", level));
|
||||
filters.push(format!("flowy_document2={}", level));
|
||||
filters.push(format!("flowy_document={}", level));
|
||||
filters.push(format!("flowy_database2={}", level));
|
||||
filters.push(format!("flowy_server={}", level));
|
||||
filters.push(format!("flowy_notification={}", "info"));
|
||||
|
@ -33,6 +33,12 @@ pub enum Server {
|
||||
Supabase = 2,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
pub fn is_local(&self) -> bool {
|
||||
matches!(self, Server::Local)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Server {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
@ -49,7 +55,6 @@ impl Display for Server {
|
||||
/// Each server implements the [AppFlowyServer] trait, which provides the [UserCloudService], etc.
|
||||
pub struct ServerProvider {
|
||||
config: AppFlowyCoreConfig,
|
||||
server: RwLock<Server>,
|
||||
providers: RwLock<HashMap<Server, Arc<dyn AppFlowyServer>>>,
|
||||
pub(crate) encryption: RwLock<Arc<dyn AppFlowyEncryption>>,
|
||||
#[allow(dead_code)]
|
||||
@ -57,7 +62,7 @@ pub struct ServerProvider {
|
||||
pub(crate) user_enable_sync: RwLock<bool>,
|
||||
|
||||
/// The authenticator type of the user.
|
||||
pub(crate) user_authenticator: RwLock<Authenticator>,
|
||||
authenticator: RwLock<Authenticator>,
|
||||
pub(crate) uid: Arc<RwLock<Option<i64>>>,
|
||||
}
|
||||
|
||||
@ -70,10 +75,9 @@ impl ServerProvider {
|
||||
let encryption = EncryptionImpl::new(None);
|
||||
Self {
|
||||
config,
|
||||
server: RwLock::new(server),
|
||||
providers: RwLock::new(HashMap::new()),
|
||||
user_enable_sync: RwLock::new(true),
|
||||
user_authenticator: RwLock::new(Authenticator::Local),
|
||||
authenticator: RwLock::new(Authenticator::from(server)),
|
||||
encryption: RwLock::new(Arc::new(encryption)),
|
||||
store_preferences,
|
||||
uid: Default::default(),
|
||||
@ -81,30 +85,32 @@ impl ServerProvider {
|
||||
}
|
||||
|
||||
pub fn get_server_type(&self) -> Server {
|
||||
self.server.read().clone()
|
||||
match &*self.authenticator.read() {
|
||||
Authenticator::Local => Server::Local,
|
||||
Authenticator::AppFlowyCloud => Server::AppFlowyCloud,
|
||||
Authenticator::Supabase => Server::Supabase,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_server_type(&self, server_type: Server) {
|
||||
let old_server_type = self.server.read().clone();
|
||||
if server_type != old_server_type {
|
||||
pub fn set_authenticator(&self, authenticator: Authenticator) {
|
||||
let old_server_type = self.get_server_type();
|
||||
*self.authenticator.write() = authenticator;
|
||||
let new_server_type = self.get_server_type();
|
||||
|
||||
if old_server_type != new_server_type {
|
||||
self.providers.write().remove(&old_server_type);
|
||||
}
|
||||
|
||||
*self.server.write() = server_type;
|
||||
}
|
||||
|
||||
pub fn get_user_authenticator(&self) -> Authenticator {
|
||||
self.user_authenticator.read().clone()
|
||||
}
|
||||
|
||||
pub fn get_appflowy_cloud_server(&self) -> FlowyResult<Arc<dyn AppFlowyServer>> {
|
||||
let server = self.get_server(&Server::AppFlowyCloud)?;
|
||||
Ok(server)
|
||||
pub fn get_authenticator(&self) -> Authenticator {
|
||||
self.authenticator.read().clone()
|
||||
}
|
||||
|
||||
/// Returns a [AppFlowyServer] trait implementation base on the provider_type.
|
||||
pub fn get_server(&self, server_type: &Server) -> FlowyResult<Arc<dyn AppFlowyServer>> {
|
||||
if let Some(provider) = self.providers.read().get(server_type) {
|
||||
pub fn get_server(&self) -> FlowyResult<Arc<dyn AppFlowyServer>> {
|
||||
let server_type = self.get_server_type();
|
||||
|
||||
if let Some(provider) = self.providers.read().get(&server_type) {
|
||||
return Ok(provider.clone());
|
||||
}
|
||||
|
||||
|
@ -32,7 +32,7 @@ use crate::integrate::server::{Server, ServerProvider};
|
||||
|
||||
impl FileStorageService for ServerProvider {
|
||||
fn create_object(&self, object: StorageObject) -> FutureResult<String, FlowyError> {
|
||||
let server = self.get_server(&self.get_server_type());
|
||||
let server = self.get_server();
|
||||
FutureResult::new(async move {
|
||||
let storage = server?.file_storage().ok_or(FlowyError::internal())?;
|
||||
storage.create_object(object).await
|
||||
@ -40,7 +40,7 @@ impl FileStorageService for ServerProvider {
|
||||
}
|
||||
|
||||
fn delete_object_by_url(&self, object_url: String) -> FutureResult<(), FlowyError> {
|
||||
let server = self.get_server(&self.get_server_type());
|
||||
let server = self.get_server();
|
||||
FutureResult::new(async move {
|
||||
let storage = server?.file_storage().ok_or(FlowyError::internal())?;
|
||||
storage.delete_object_by_url(object_url).await
|
||||
@ -48,7 +48,7 @@ impl FileStorageService for ServerProvider {
|
||||
}
|
||||
|
||||
fn get_object_by_url(&self, object_url: String) -> FutureResult<Bytes, FlowyError> {
|
||||
let server = self.get_server(&self.get_server_type());
|
||||
let server = self.get_server();
|
||||
FutureResult::new(async move {
|
||||
let storage = server?.file_storage().ok_or(FlowyError::internal())?;
|
||||
storage.get_object_by_url(object_url).await
|
||||
@ -58,42 +58,40 @@ impl FileStorageService for ServerProvider {
|
||||
|
||||
impl UserCloudServiceProvider for ServerProvider {
|
||||
fn set_token(&self, token: &str) -> Result<(), FlowyError> {
|
||||
let server = self.get_server(&self.get_server_type())?;
|
||||
let server = self.get_server()?;
|
||||
server.set_token(token)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn subscribe_token_state(&self) -> Option<WatchStream<UserTokenState>> {
|
||||
let server = self.get_server(&self.get_server_type()).ok()?;
|
||||
let server = self.get_server().ok()?;
|
||||
server.subscribe_token_state()
|
||||
}
|
||||
|
||||
fn set_enable_sync(&self, uid: i64, enable_sync: bool) {
|
||||
if let Ok(server) = self.get_server(&self.get_server_type()) {
|
||||
if let Ok(server) = self.get_server() {
|
||||
server.set_enable_sync(uid, enable_sync);
|
||||
*self.user_enable_sync.write() = enable_sync;
|
||||
*self.uid.write() = Some(uid);
|
||||
}
|
||||
}
|
||||
|
||||
fn set_user_authenticator(&self, authenticator: &Authenticator) {
|
||||
debug!("set user authenticator: {:?}", authenticator);
|
||||
*self.user_authenticator.write() = authenticator.clone();
|
||||
}
|
||||
|
||||
/// When user login, the provider type is set by the [Authenticator] and save to disk for next use.
|
||||
///
|
||||
/// Each [Authenticator] has a corresponding [Server]. The [Server] is used
|
||||
/// to create a new [AppFlowyServer] if it doesn't exist. Once the [Server] is set,
|
||||
/// it will be used when user open the app again.
|
||||
///
|
||||
fn set_authenticator(&self, authenticator: Authenticator) {
|
||||
let server_type: Server = authenticator.into();
|
||||
self.set_server_type(server_type.clone());
|
||||
fn set_user_authenticator(&self, authenticator: &Authenticator) {
|
||||
self.set_authenticator(authenticator.clone());
|
||||
}
|
||||
|
||||
fn get_user_authenticator(&self) -> Authenticator {
|
||||
self.get_authenticator()
|
||||
}
|
||||
|
||||
fn set_network_reachable(&self, reachable: bool) {
|
||||
if let Ok(server) = self.get_server(&self.get_server_type()) {
|
||||
if let Ok(server) = self.get_server() {
|
||||
server.set_network_reachable(reachable);
|
||||
}
|
||||
}
|
||||
@ -103,16 +101,10 @@ impl UserCloudServiceProvider for ServerProvider {
|
||||
self.encryption.write().set_secret(secret);
|
||||
}
|
||||
|
||||
fn get_authenticator(&self) -> Authenticator {
|
||||
let server_type = self.get_server_type();
|
||||
Authenticator::from(server_type)
|
||||
}
|
||||
|
||||
/// Returns the [UserCloudService] base on the current [Server].
|
||||
/// Creates a new [AppFlowyServer] if it doesn't exist.
|
||||
fn get_user_service(&self) -> Result<Arc<dyn UserCloudService>, FlowyError> {
|
||||
let server_type = self.get_server_type();
|
||||
let user_service = self.get_server(&server_type)?.user_service();
|
||||
let user_service = self.get_server()?.user_service();
|
||||
Ok(user_service)
|
||||
}
|
||||
|
||||
@ -131,19 +123,19 @@ impl UserCloudServiceProvider for ServerProvider {
|
||||
|
||||
impl FolderCloudService for ServerProvider {
|
||||
fn create_workspace(&self, uid: i64, name: &str) -> FutureResult<Workspace, Error> {
|
||||
let server = self.get_server(&self.get_server_type());
|
||||
let server = self.get_server();
|
||||
let name = name.to_string();
|
||||
FutureResult::new(async move { server?.folder_service().create_workspace(uid, &name).await })
|
||||
}
|
||||
|
||||
fn open_workspace(&self, workspace_id: &str) -> FutureResult<(), Error> {
|
||||
let workspace_id = workspace_id.to_string();
|
||||
let server = self.get_server(&self.get_server_type());
|
||||
let server = self.get_server();
|
||||
FutureResult::new(async move { server?.folder_service().open_workspace(&workspace_id).await })
|
||||
}
|
||||
|
||||
fn get_all_workspace(&self) -> FutureResult<Vec<WorkspaceRecord>, Error> {
|
||||
let server = self.get_server(&self.get_server_type());
|
||||
let server = self.get_server();
|
||||
FutureResult::new(async move { server?.folder_service().get_all_workspace().await })
|
||||
}
|
||||
|
||||
@ -153,7 +145,7 @@ impl FolderCloudService for ServerProvider {
|
||||
uid: &i64,
|
||||
) -> FutureResult<Option<FolderData>, Error> {
|
||||
let uid = *uid;
|
||||
let server = self.get_server(&self.get_server_type());
|
||||
let server = self.get_server();
|
||||
let workspace_id = workspace_id.to_string();
|
||||
FutureResult::new(async move {
|
||||
server?
|
||||
@ -169,7 +161,7 @@ impl FolderCloudService for ServerProvider {
|
||||
limit: usize,
|
||||
) -> FutureResult<Vec<FolderSnapshot>, Error> {
|
||||
let workspace_id = workspace_id.to_string();
|
||||
let server = self.get_server(&self.get_server_type());
|
||||
let server = self.get_server();
|
||||
FutureResult::new(async move {
|
||||
server?
|
||||
.folder_service()
|
||||
@ -187,7 +179,7 @@ impl FolderCloudService for ServerProvider {
|
||||
) -> FutureResult<CollabDocState, Error> {
|
||||
let object_id = object_id.to_string();
|
||||
let workspace_id = workspace_id.to_string();
|
||||
let server = self.get_server(&self.get_server_type());
|
||||
let server = self.get_server();
|
||||
FutureResult::new(async move {
|
||||
server?
|
||||
.folder_service()
|
||||
@ -202,7 +194,7 @@ impl FolderCloudService for ServerProvider {
|
||||
objects: Vec<FolderCollabParams>,
|
||||
) -> FutureResult<(), Error> {
|
||||
let workspace_id = workspace_id.to_string();
|
||||
let server = self.get_server(&self.get_server_type());
|
||||
let server = self.get_server();
|
||||
FutureResult::new(async move {
|
||||
server?
|
||||
.folder_service()
|
||||
@ -213,7 +205,7 @@ impl FolderCloudService for ServerProvider {
|
||||
|
||||
fn service_name(&self) -> String {
|
||||
self
|
||||
.get_server(&self.get_server_type())
|
||||
.get_server()
|
||||
.map(|provider| provider.folder_service().service_name())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
@ -227,7 +219,7 @@ impl DatabaseCloudService for ServerProvider {
|
||||
workspace_id: &str,
|
||||
) -> FutureResult<CollabDocState, Error> {
|
||||
let workspace_id = workspace_id.to_string();
|
||||
let server = self.get_server(&self.get_server_type());
|
||||
let server = self.get_server();
|
||||
let database_id = object_id.to_string();
|
||||
FutureResult::new(async move {
|
||||
server?
|
||||
@ -244,7 +236,7 @@ impl DatabaseCloudService for ServerProvider {
|
||||
workspace_id: &str,
|
||||
) -> FutureResult<CollabDocStateByOid, Error> {
|
||||
let workspace_id = workspace_id.to_string();
|
||||
let server = self.get_server(&self.get_server_type());
|
||||
let server = self.get_server();
|
||||
FutureResult::new(async move {
|
||||
server?
|
||||
.database_service()
|
||||
@ -258,7 +250,7 @@ impl DatabaseCloudService for ServerProvider {
|
||||
object_id: &str,
|
||||
limit: usize,
|
||||
) -> FutureResult<Vec<DatabaseSnapshot>, Error> {
|
||||
let server = self.get_server(&self.get_server_type());
|
||||
let server = self.get_server();
|
||||
let database_id = object_id.to_string();
|
||||
FutureResult::new(async move {
|
||||
server?
|
||||
@ -277,7 +269,7 @@ impl DocumentCloudService for ServerProvider {
|
||||
) -> FutureResult<CollabDocState, FlowyError> {
|
||||
let workspace_id = workspace_id.to_string();
|
||||
let document_id = document_id.to_string();
|
||||
let server = self.get_server(&self.get_server_type());
|
||||
let server = self.get_server();
|
||||
FutureResult::new(async move {
|
||||
server?
|
||||
.document_service()
|
||||
@ -293,7 +285,7 @@ impl DocumentCloudService for ServerProvider {
|
||||
workspace_id: &str,
|
||||
) -> FutureResult<Vec<DocumentSnapshot>, Error> {
|
||||
let workspace_id = workspace_id.to_string();
|
||||
let server = self.get_server(&self.get_server_type());
|
||||
let server = self.get_server();
|
||||
let document_id = document_id.to_string();
|
||||
FutureResult::new(async move {
|
||||
server?
|
||||
@ -309,7 +301,7 @@ impl DocumentCloudService for ServerProvider {
|
||||
workspace_id: &str,
|
||||
) -> FutureResult<Option<DocumentData>, Error> {
|
||||
let workspace_id = workspace_id.to_string();
|
||||
let server = self.get_server(&self.get_server_type());
|
||||
let server = self.get_server();
|
||||
let document_id = document_id.to_string();
|
||||
FutureResult::new(async move {
|
||||
server?
|
||||
@ -328,7 +320,7 @@ impl CollabCloudPluginProvider for ServerProvider {
|
||||
#[instrument(level = "debug", skip(self, context), fields(server_type = %self.get_server_type()))]
|
||||
fn get_plugins(&self, context: CollabPluginProviderContext) -> Fut<Vec<Arc<dyn CollabPlugin>>> {
|
||||
// If the user is local, we don't need to create a sync plugin.
|
||||
if self.user_authenticator.read().is_local() {
|
||||
if self.get_server_type().is_local() {
|
||||
debug!(
|
||||
"User authenticator is local, skip create sync plugin for: {}",
|
||||
context
|
||||
@ -343,7 +335,7 @@ impl CollabCloudPluginProvider for ServerProvider {
|
||||
collab_object,
|
||||
local_collab,
|
||||
} => {
|
||||
if let Ok(server) = self.get_server(&Server::AppFlowyCloud) {
|
||||
if let Ok(server) = self.get_server() {
|
||||
to_fut(async move {
|
||||
let mut plugins: Vec<Arc<dyn CollabPlugin>> = vec![];
|
||||
|
||||
@ -394,7 +386,7 @@ impl CollabCloudPluginProvider for ServerProvider {
|
||||
} => {
|
||||
let mut plugins: Vec<Arc<dyn CollabPlugin>> = vec![];
|
||||
if let Some(remote_collab_storage) = self
|
||||
.get_server(&Server::Supabase)
|
||||
.get_server()
|
||||
.ok()
|
||||
.and_then(|provider| provider.collab_storage(&collab_object))
|
||||
{
|
||||
|
@ -8,7 +8,7 @@ use collab::core::origin::CollabOrigin;
|
||||
use collab::preclude::Collab;
|
||||
use collab_document::blocks::DocumentData;
|
||||
use collab_document::document::Document;
|
||||
use collab_document::document_data::{default_document_collab_data, default_document_data};
|
||||
use collab_document::document_data::default_document_data;
|
||||
use collab_document::YrsDocAction;
|
||||
use collab_entity::CollabType;
|
||||
use lru::LruCache;
|
||||
@ -86,25 +86,45 @@ impl DocumentManager {
|
||||
///
|
||||
/// if the document already exists, return the existing document.
|
||||
/// if the data is None, will create a document with default data.
|
||||
#[instrument(level = "info", skip(self, data))]
|
||||
pub async fn create_document(
|
||||
&self,
|
||||
uid: i64,
|
||||
doc_id: &str,
|
||||
data: Option<DocumentData>,
|
||||
) -> FlowyResult<()> {
|
||||
tracing::trace!("create a document: {:?}", doc_id);
|
||||
if self.is_doc_exist(doc_id).unwrap_or(false) {
|
||||
Err(FlowyError::new(
|
||||
ErrorCode::RecordAlreadyExists,
|
||||
format!("document {} already exists", doc_id),
|
||||
))
|
||||
} else {
|
||||
let encoded_collab_v1 =
|
||||
doc_state_from_document_data(doc_id, data.unwrap_or_else(default_document_data))?;
|
||||
let collab = self
|
||||
.collab_for_document(uid, doc_id, encoded_collab_v1.doc_state.to_vec(), false)
|
||||
.await?;
|
||||
collab.lock().flush();
|
||||
let result: Result<CollabDocState, FlowyError> = self
|
||||
.cloud_service
|
||||
.get_document_doc_state(doc_id, &self.user.workspace_id()?)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(data) => {
|
||||
let collab = self.collab_for_document(uid, doc_id, data, false).await?;
|
||||
collab.lock().flush();
|
||||
},
|
||||
Err(err) => {
|
||||
if err.is_record_not_found() {
|
||||
let doc_state =
|
||||
doc_state_from_document_data(doc_id, data.unwrap_or_else(default_document_data))?
|
||||
.doc_state
|
||||
.to_vec();
|
||||
let collab = self
|
||||
.collab_for_document(uid, doc_id, doc_state, false)
|
||||
.await?;
|
||||
collab.lock().flush();
|
||||
} else {
|
||||
return Err(err);
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -119,29 +139,10 @@ impl DocumentManager {
|
||||
let mut doc_state = vec![];
|
||||
if !self.is_doc_exist(doc_id)? {
|
||||
// Try to get the document from the cloud service
|
||||
let result: Result<CollabDocState, FlowyError> = self
|
||||
doc_state = self
|
||||
.cloud_service
|
||||
.get_document_doc_state(doc_id, &self.user.workspace_id()?)
|
||||
.await;
|
||||
|
||||
doc_state = match result {
|
||||
Ok(data) => data,
|
||||
Err(err) => {
|
||||
if err.is_record_not_found() {
|
||||
// The document's ID exists in the cloud, but its content does not.
|
||||
// This occurs when user A's document hasn't finished syncing and user B tries to open it.
|
||||
// As a result, a blank document is created for user B.
|
||||
event!(
|
||||
tracing::Level::INFO,
|
||||
"can't find the document in the cloud, doc_id: {}",
|
||||
doc_id
|
||||
);
|
||||
default_document_collab_data(doc_id).doc_state.to_vec()
|
||||
} else {
|
||||
return Err(err);
|
||||
}
|
||||
},
|
||||
}
|
||||
.await?;
|
||||
}
|
||||
|
||||
let uid = self.user.user_id()?;
|
||||
|
@ -21,7 +21,7 @@ use collab_integrate::RocksCollabDB;
|
||||
use flowy_document::document::MutexDocument;
|
||||
use flowy_document::manager::{DocumentManager, DocumentUser};
|
||||
use flowy_document_deps::cloud::*;
|
||||
use flowy_error::FlowyError;
|
||||
use flowy_error::{ErrorCode, FlowyError};
|
||||
use flowy_storage::{FileStorageService, StorageObject};
|
||||
use lib_infra::async_trait::async_trait;
|
||||
use lib_infra::future::{to_fut, Fut, FutureResult};
|
||||
@ -135,10 +135,16 @@ pub struct LocalTestDocumentCloudServiceImpl();
|
||||
impl DocumentCloudService for LocalTestDocumentCloudServiceImpl {
|
||||
fn get_document_doc_state(
|
||||
&self,
|
||||
_document_id: &str,
|
||||
document_id: &str,
|
||||
_workspace_id: &str,
|
||||
) -> FutureResult<CollabDocState, FlowyError> {
|
||||
FutureResult::new(async move { Ok(vec![]) })
|
||||
let document_id = document_id.to_string();
|
||||
FutureResult::new(async move {
|
||||
Err(FlowyError::new(
|
||||
ErrorCode::RecordNotFound,
|
||||
format!("Document {} not found", document_id),
|
||||
))
|
||||
})
|
||||
}
|
||||
|
||||
fn get_document_snapshots(
|
||||
|
@ -398,6 +398,7 @@ impl FolderManager {
|
||||
params: CreateViewParams,
|
||||
) -> FlowyResult<View> {
|
||||
let view_layout: ViewLayout = params.layout.clone().into();
|
||||
// TODO(nathan): remove orphan view. Just use for create document in row
|
||||
let handler = self.get_handler(&view_layout)?;
|
||||
let user_id = self.user.user_id()?;
|
||||
handler
|
||||
|
@ -2,7 +2,7 @@ use anyhow::Error;
|
||||
use collab::core::collab::CollabDocState;
|
||||
|
||||
use flowy_document_deps::cloud::*;
|
||||
use flowy_error::FlowyError;
|
||||
use flowy_error::{ErrorCode, FlowyError};
|
||||
use lib_infra::future::FutureResult;
|
||||
|
||||
pub(crate) struct LocalServerDocumentCloudServiceImpl();
|
||||
@ -10,10 +10,16 @@ pub(crate) struct LocalServerDocumentCloudServiceImpl();
|
||||
impl DocumentCloudService for LocalServerDocumentCloudServiceImpl {
|
||||
fn get_document_doc_state(
|
||||
&self,
|
||||
_document_id: &str,
|
||||
document_id: &str,
|
||||
_workspace_id: &str,
|
||||
) -> FutureResult<CollabDocState, FlowyError> {
|
||||
FutureResult::new(async move { Ok(vec![]) })
|
||||
let document_id = document_id.to_string();
|
||||
FutureResult::new(async move {
|
||||
Err(FlowyError::new(
|
||||
ErrorCode::RecordNotFound,
|
||||
format!("Document {} not found", document_id),
|
||||
))
|
||||
})
|
||||
}
|
||||
|
||||
fn get_document_snapshots(
|
||||
|
@ -86,16 +86,15 @@ pub trait UserCloudServiceProvider: Send + Sync + 'static {
|
||||
/// * `enable_sync`: A boolean indicating whether synchronization should be enabled or disabled.
|
||||
fn set_enable_sync(&self, uid: i64, enable_sync: bool);
|
||||
|
||||
/// Sets the authentication type for a user. The authentication type is the type when user sign in or sign up.
|
||||
fn set_user_authenticator(&self, authenticator: &Authenticator);
|
||||
|
||||
/// Sets the authenticator when user sign in or sign up.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `authenticator`: An `Authenticator` object.
|
||||
fn set_authenticator(&self, authenticator: Authenticator);
|
||||
fn set_user_authenticator(&self, authenticator: &Authenticator);
|
||||
|
||||
/// Sets the network reachability status.
|
||||
fn get_user_authenticator(&self) -> Authenticator;
|
||||
|
||||
/// Sets the network reachability statset_user_authenticatorus.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `reachable`: A boolean indicating whether the network is reachable.
|
||||
@ -107,12 +106,6 @@ pub trait UserCloudServiceProvider: Send + Sync + 'static {
|
||||
/// * `secret`: A `String` representing the encryption secret.
|
||||
fn set_encrypt_secret(&self, secret: String);
|
||||
|
||||
/// Retrieves the current authenticator.
|
||||
///
|
||||
/// # Returns
|
||||
/// The current `Authenticator` object.
|
||||
fn get_authenticator(&self) -> Authenticator;
|
||||
|
||||
/// Retrieves the user-specific cloud service.
|
||||
///
|
||||
/// # Returns
|
||||
|
@ -43,8 +43,16 @@ pub async fn sign_in_with_email_password_handler(
|
||||
let params: SignInParams = data.into_inner().try_into()?;
|
||||
let auth_type = params.auth_type.clone();
|
||||
|
||||
let user_profile: UserProfilePB = manager.sign_in(params, auth_type).await?.into();
|
||||
data_result_ok(user_profile)
|
||||
let old_authenticator = manager.cloud_services.get_user_authenticator();
|
||||
match manager.sign_in(params, auth_type).await {
|
||||
Ok(profile) => data_result_ok(UserProfilePB::from(profile)),
|
||||
Err(err) => {
|
||||
manager
|
||||
.cloud_services
|
||||
.set_user_authenticator(&old_authenticator);
|
||||
return Err(err);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(
|
||||
@ -63,10 +71,18 @@ pub async fn sign_up(
|
||||
) -> DataResult<UserProfilePB, FlowyError> {
|
||||
let manager = upgrade_manager(manager)?;
|
||||
let params: SignUpParams = data.into_inner().try_into()?;
|
||||
let auth_type = params.auth_type.clone();
|
||||
let authenticator = params.auth_type.clone();
|
||||
|
||||
let user_profile = manager.sign_up(auth_type, BoxAny::new(params)).await?;
|
||||
data_result_ok(user_profile.into())
|
||||
let old_authenticator = manager.cloud_services.get_user_authenticator();
|
||||
match manager.sign_up(authenticator, BoxAny::new(params)).await {
|
||||
Ok(profile) => data_result_ok(UserProfilePB::from(profile)),
|
||||
Err(err) => {
|
||||
manager
|
||||
.cloud_services
|
||||
.set_user_authenticator(&old_authenticator);
|
||||
return Err(err);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(manager))]
|
||||
@ -135,7 +151,6 @@ pub async fn set_appearance_setting(
|
||||
if setting.theme.is_empty() {
|
||||
setting.theme = APPEARANCE_DEFAULT_THEME.to_string();
|
||||
}
|
||||
|
||||
store_preferences.set_object(APPEARANCE_SETTING_CACHE_KEY, setting)?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -287,7 +287,8 @@ impl UserManager {
|
||||
params: SignInParams,
|
||||
authenticator: Authenticator,
|
||||
) -> Result<UserProfile, FlowyError> {
|
||||
self.update_authenticator(&authenticator).await;
|
||||
self.cloud_services.set_user_authenticator(&authenticator);
|
||||
|
||||
let response: AuthResponse = self
|
||||
.cloud_services
|
||||
.get_user_service()?
|
||||
@ -326,10 +327,6 @@ impl UserManager {
|
||||
Ok(user_profile)
|
||||
}
|
||||
|
||||
pub(crate) async fn update_authenticator(&self, authenticator: &Authenticator) {
|
||||
self.cloud_services.set_authenticator(authenticator.clone());
|
||||
}
|
||||
|
||||
/// Manages the user sign-up process, potentially migrating data if necessary.
|
||||
///
|
||||
/// This asynchronous function interacts with an external authentication service to register and sign up a user
|
||||
@ -346,7 +343,7 @@ impl UserManager {
|
||||
// sign out the current user if there is one
|
||||
let migration_user = self.get_migration_user(&authenticator).await;
|
||||
|
||||
self.update_authenticator(&authenticator).await;
|
||||
self.cloud_services.set_user_authenticator(&authenticator);
|
||||
let auth_service = self.cloud_services.get_user_service()?;
|
||||
let response: AuthResponse = auth_service.sign_up(params).await?;
|
||||
let new_user_profile = UserProfile::from((&response, &authenticator));
|
||||
@ -722,7 +719,7 @@ impl UserManager {
|
||||
authenticator: &Authenticator,
|
||||
email: &str,
|
||||
) -> Result<String, FlowyError> {
|
||||
self.update_authenticator(authenticator).await;
|
||||
self.cloud_services.set_user_authenticator(authenticator);
|
||||
|
||||
let auth_service = self.cloud_services.get_user_service()?;
|
||||
let url = auth_service
|
||||
@ -737,8 +734,8 @@ impl UserManager {
|
||||
oauth_provider: &str,
|
||||
) -> Result<String, FlowyError> {
|
||||
self
|
||||
.update_authenticator(&Authenticator::AppFlowyCloud)
|
||||
.await;
|
||||
.cloud_services
|
||||
.set_user_authenticator(&Authenticator::AppFlowyCloud);
|
||||
let auth_service = self.cloud_services.get_user_service()?;
|
||||
let url = auth_service
|
||||
.generate_oauth_url_with_provider(oauth_provider)
|
||||
|
@ -135,7 +135,6 @@ pub(crate) fn import_appflowy_data_folder(
|
||||
&mut all_imported_object_ids,
|
||||
&imported_collab_by_oid,
|
||||
&row_object_ids,
|
||||
&document_object_ids,
|
||||
)?;
|
||||
|
||||
// the object ids now only contains the document collab object ids
|
||||
@ -143,7 +142,7 @@ pub(crate) fn import_appflowy_data_folder(
|
||||
if let Some(imported_collab) = imported_collab_by_oid.get(object_id) {
|
||||
let new_object_id = old_to_new_id_map.lock().renew_id(object_id);
|
||||
document_object_ids.lock().insert(new_object_id.clone());
|
||||
tracing::debug!("import from: {}, to: {}", object_id, new_object_id,);
|
||||
debug!("import from: {}, to: {}", object_id, new_object_id,);
|
||||
import_collab_object(
|
||||
imported_collab,
|
||||
session.user_id,
|
||||
@ -184,6 +183,9 @@ pub(crate) fn import_appflowy_data_folder(
|
||||
collab_write_txn,
|
||||
)?;
|
||||
|
||||
document_object_ids
|
||||
.lock()
|
||||
.insert(import_container_view_id.clone());
|
||||
let import_container_view =
|
||||
ViewBuilder::new(session.user_id, session.user_workspace.id.clone())
|
||||
.with_view_id(import_container_view_id)
|
||||
@ -191,6 +193,7 @@ pub(crate) fn import_appflowy_data_folder(
|
||||
.with_name(name)
|
||||
.with_child_views(child_views)
|
||||
.build();
|
||||
|
||||
Ok(vec![import_container_view])
|
||||
},
|
||||
}
|
||||
@ -256,7 +259,6 @@ fn migrate_databases<'a, W>(
|
||||
imported_object_ids: &mut Vec<String>,
|
||||
imported_collab_by_oid: &HashMap<String, Collab>,
|
||||
row_object_ids: &Mutex<HashSet<String>>,
|
||||
document_object_ids: &Mutex<HashSet<String>>,
|
||||
) -> Result<(), PersistenceError>
|
||||
where
|
||||
W: YrsDocAction<'a>,
|
||||
@ -265,7 +267,6 @@ where
|
||||
// Migrate databases
|
||||
let mut database_object_ids = vec![];
|
||||
let imported_database_row_object_ids = RwLock::new(HashSet::new());
|
||||
let imported_database_row_document_object_ids = RwLock::new(HashSet::new());
|
||||
|
||||
for object_id in &mut *imported_object_ids {
|
||||
if let Some(database_collab) = imported_collab_by_oid.get(object_id) {
|
||||
@ -298,9 +299,6 @@ where
|
||||
|
||||
row_order.id = RowId::from(new_row_id);
|
||||
imported_database_row_object_ids.write().insert(old_row_id);
|
||||
imported_database_row_document_object_ids
|
||||
.write()
|
||||
.insert(old_row_document_id);
|
||||
});
|
||||
|
||||
// collect the ids
|
||||
@ -310,13 +308,7 @@ where
|
||||
.map(|order| order.id.clone().into_inner())
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
let new_row_document_ids = new_row_ids
|
||||
.iter()
|
||||
.map(|id| database_row_document_id_from_row_id(id))
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
row_object_ids.lock().extend(new_row_ids);
|
||||
document_object_ids.lock().extend(new_row_document_ids);
|
||||
});
|
||||
|
||||
let new_object_id = old_to_new_id_map.lock().renew_id(object_id);
|
||||
@ -333,22 +325,10 @@ where
|
||||
}
|
||||
}
|
||||
let imported_database_row_object_ids = imported_database_row_object_ids.read();
|
||||
let imported_database_row_document_object_ids = imported_database_row_document_object_ids.read();
|
||||
|
||||
debug!(
|
||||
"imported_database_row_object_ids: {:?}",
|
||||
imported_database_row_object_ids
|
||||
);
|
||||
|
||||
debug!(
|
||||
"imported_database_row_document_object_ids: {:?}",
|
||||
imported_database_row_document_object_ids
|
||||
);
|
||||
|
||||
// remove the database object ids from the object ids
|
||||
imported_object_ids.retain(|id| !database_object_ids.contains(id));
|
||||
imported_object_ids.retain(|id| !imported_database_row_object_ids.contains(id));
|
||||
imported_object_ids.retain(|id| !imported_database_row_document_object_ids.contains(id));
|
||||
|
||||
for imported_row_id in &*imported_database_row_object_ids {
|
||||
if let Some(imported_collab) = imported_collab_by_oid.get(imported_row_id) {
|
||||
@ -369,18 +349,15 @@ where
|
||||
}
|
||||
|
||||
let imported_row_document_id = database_row_document_id_from_row_id(imported_row_id);
|
||||
if let Some(imported_collab) = imported_collab_by_oid.get(&imported_row_document_id) {
|
||||
if imported_collab_by_oid
|
||||
.get(&imported_row_document_id)
|
||||
.is_some()
|
||||
{
|
||||
let new_row_document_id = old_to_new_id_map.lock().renew_id(&imported_row_document_id);
|
||||
info!(
|
||||
"import database row document from: {}, to: {}",
|
||||
"map row document from: {}, to: {}",
|
||||
imported_row_document_id, new_row_document_id,
|
||||
);
|
||||
import_collab_object(
|
||||
imported_collab,
|
||||
session.user_id,
|
||||
&new_row_document_id,
|
||||
collab_write_txn,
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
@ -258,7 +258,9 @@ impl CollabDBZipBackup {
|
||||
}
|
||||
|
||||
// Clean up old backups
|
||||
self.clean_old_backups()?;
|
||||
if let Err(err) = self.clean_old_backups() {
|
||||
error!("Clean up old backups failed: {:?}", err);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -292,7 +294,7 @@ impl CollabDBZipBackup {
|
||||
let path = entry.path();
|
||||
if path.is_file() && path.extension().and_then(|s| s.to_str()) == Some("zip") {
|
||||
if let Some(file_name) = path.file_stem().and_then(|s| s.to_str()) {
|
||||
if let Some(timestamp_str) = file_name.split("_").last() {
|
||||
if let Some(timestamp_str) = file_name.split('_').last() {
|
||||
match latest_zip {
|
||||
Some((latest_timestamp, _)) if timestamp_str > latest_timestamp.as_str() => {
|
||||
latest_zip = Some((timestamp_str.to_string(), path));
|
||||
@ -343,6 +345,7 @@ impl CollabDBZipBackup {
|
||||
// Remove backups older than 10 days
|
||||
let threshold_str = threshold_date.format(zip_time_format()).to_string();
|
||||
|
||||
info!("Current backup: {:?}", backups.len());
|
||||
// If there are more than 10 backups, remove the oldest ones
|
||||
while backups.len() > 10 {
|
||||
if let Some((date_str, path)) = backups.first() {
|
||||
|
Loading…
Reference in New Issue
Block a user