chore: fix database row sync (#4964)

* chore: fix database row sync

* ci: fix test

* ci: fix web build

* chore: bump collab
This commit is contained in:
Nathan.fooo
2024-03-23 09:18:47 +08:00
committed by GitHub
parent c0642d3ff3
commit b307312a71
45 changed files with 364 additions and 347 deletions

View File

@ -2,7 +2,7 @@ use anyhow::Error;
use client_api::entity::QueryCollabResult::{Failed, Success};
use client_api::entity::{QueryCollab, QueryCollabParams};
use client_api::error::ErrorCode::RecordNotFound;
use collab::core::collab::CollabDocState;
use collab::core::collab::DocStateSource;
use collab::core::collab_plugin::EncodedCollab;
use collab_entity::CollabType;
use tracing::error;
@ -23,7 +23,7 @@ where
object_id: &str,
collab_type: CollabType,
workspace_id: &str,
) -> FutureResult<CollabDocState, Error> {
) -> FutureResult<Vec<u8>, Error> {
let workspace_id = workspace_id.to_string();
let object_id = object_id.to_string();
let try_get_client = self.0.try_get_client();
@ -73,7 +73,10 @@ where
.flat_map(|(object_id, result)| match result {
Success { encode_collab_v1 } => {
match EncodedCollab::decode_from_bytes(&encode_collab_v1) {
Ok(encode) => Some((object_id, encode.doc_state.to_vec())),
Ok(encode) => Some((
object_id,
DocStateSource::FromDocState(encode.doc_state.to_vec()),
)),
Err(err) => {
error!("Failed to decode collab: {}", err);
None

View File

@ -1,6 +1,6 @@
use anyhow::Error;
use client_api::entity::{QueryCollab, QueryCollabParams};
use collab::core::collab::CollabDocState;
use collab::core::collab::DocStateSource;
use collab::core::origin::CollabOrigin;
use collab_document::document::Document;
use collab_entity::CollabType;
@ -21,7 +21,7 @@ where
&self,
document_id: &str,
workspace_id: &str,
) -> FutureResult<CollabDocState, FlowyError> {
) -> FutureResult<Vec<u8>, FlowyError> {
let workspace_id = workspace_id.to_string();
let try_get_client = self.0.try_get_client();
let document_id = document_id.to_string();
@ -74,8 +74,12 @@ where
.map_err(FlowyError::from)?
.doc_state
.to_vec();
let document =
Document::from_doc_state(CollabOrigin::Empty, doc_state, &document_id, vec![])?;
let document = Document::from_doc_state(
CollabOrigin::Empty,
DocStateSource::FromDocState(doc_state),
&document_id,
vec![],
)?;
Ok(document.get_document_data().ok())
})
}

View File

@ -2,7 +2,7 @@ use anyhow::Error;
use client_api::entity::{
workspace_dto::CreateWorkspaceParam, CollabParams, QueryCollab, QueryCollabParams,
};
use collab::core::collab::CollabDocState;
use collab::core::collab::DocStateSource;
use collab::core::origin::CollabOrigin;
use collab_entity::CollabType;
use collab_folder::RepeatedViewIdentifier;
@ -96,8 +96,13 @@ where
.map_err(FlowyError::from)?
.doc_state
.to_vec();
let folder =
Folder::from_collab_doc_state(uid, CollabOrigin::Empty, doc_state, &workspace_id, vec![])?;
let folder = Folder::from_collab_doc_state(
uid,
CollabOrigin::Empty,
DocStateSource::FromDocState(doc_state),
&workspace_id,
vec![],
)?;
Ok(folder.get_folder_data())
})
}
@ -116,7 +121,7 @@ where
_uid: i64,
collab_type: CollabType,
object_id: &str,
) -> FutureResult<CollabDocState, Error> {
) -> FutureResult<Vec<u8>, Error> {
let object_id = object_id.to_string();
let workspace_id = workspace_id.to_string();
let try_get_client = self.0.try_get_client();

View File

@ -7,7 +7,6 @@ use client_api::entity::workspace_dto::{
};
use client_api::entity::{AFRole, AFWorkspace, AuthProvider, CollabParams, CreateCollabParams};
use client_api::{Client, ClientConfiguration};
use collab::core::collab::CollabDocState;
use collab_entity::CollabObject;
use parking_lot::RwLock;
@ -239,7 +238,7 @@ where
})
}
fn get_user_awareness_doc_state(&self, _uid: i64) -> FutureResult<CollabDocState, FlowyError> {
fn get_user_awareness_doc_state(&self, _uid: i64) -> FutureResult<Vec<u8>, FlowyError> {
FutureResult::new(async { Ok(vec![]) })
}

View File

@ -25,7 +25,6 @@ use flowy_server_pub::af_cloud_config::AFCloudConfiguration;
use flowy_user_pub::cloud::{UserCloudService, UserUpdate};
use flowy_user_pub::entities::UserTokenState;
use lib_dispatch::prelude::af_spawn;
use lib_infra::future::FutureResult;
use crate::af_cloud::impls::{
AFCloudDatabaseCloudServiceImpl, AFCloudDocumentCloudServiceImpl, AFCloudFileStorageServiceImpl,
@ -196,7 +195,7 @@ impl AppFlowyServer for AppFlowyCloudServer {
fn collab_ws_channel(
&self,
_object_id: &str,
) -> FutureResult<
) -> Result<
Option<(
Arc<WebSocketChannel<ServerCollabMessage>>,
WSConnectStateReceiver,
@ -204,22 +203,10 @@ impl AppFlowyServer for AppFlowyCloudServer {
)>,
Error,
> {
if self.enable_sync.load(Ordering::SeqCst) {
let object_id = _object_id.to_string();
let weak_ws_client = Arc::downgrade(&self.ws_client);
FutureResult::new(async move {
match weak_ws_client.upgrade() {
None => Ok(None),
Some(ws_client) => {
let channel = ws_client.subscribe_collab(object_id).ok();
let connect_state_recv = ws_client.subscribe_connect_state();
Ok(channel.map(|c| (c, connect_state_recv, ws_client.is_connected())))
},
}
})
} else {
FutureResult::new(async { Ok(None) })
}
let object_id = _object_id.to_string();
let channel = self.ws_client.subscribe_collab(object_id).ok();
let connect_state_recv = self.ws_client.subscribe_connect_state();
Ok(channel.map(|c| (c, connect_state_recv, self.ws_client.is_connected())))
}
fn file_storage(&self) -> Option<Arc<dyn ObjectStorageService>> {

View File

@ -1,5 +1,4 @@
use anyhow::Error;
use collab::core::collab::CollabDocState;
use collab_entity::CollabType;
use flowy_database_pub::cloud::{CollabDocStateByOid, DatabaseCloudService, DatabaseSnapshot};
@ -13,7 +12,7 @@ impl DatabaseCloudService for LocalServerDatabaseCloudServiceImpl {
_object_id: &str,
_collab_type: CollabType,
_workspace_id: &str,
) -> FutureResult<CollabDocState, Error> {
) -> FutureResult<Vec<u8>, Error> {
FutureResult::new(async move { Ok(vec![]) })
}

View File

@ -1,5 +1,4 @@
use anyhow::Error;
use collab::core::collab::CollabDocState;
use flowy_document_pub::cloud::*;
use flowy_error::{ErrorCode, FlowyError};
@ -12,7 +11,7 @@ impl DocumentCloudService for LocalServerDocumentCloudServiceImpl {
&self,
document_id: &str,
_workspace_id: &str,
) -> FutureResult<CollabDocState, FlowyError> {
) -> FutureResult<Vec<u8>, FlowyError> {
let document_id = document_id.to_string();
FutureResult::new(async move {
Err(FlowyError::new(

View File

@ -1,7 +1,6 @@
use std::sync::Arc;
use anyhow::{anyhow, Error};
use collab::core::collab::CollabDocState;
use collab_entity::CollabType;
use flowy_folder_pub::cloud::{
@ -59,7 +58,7 @@ impl FolderCloudService for LocalServerFolderCloudServiceImpl {
_uid: i64,
_collab_type: CollabType,
_object_id: &str,
) -> FutureResult<CollabDocState, Error> {
) -> FutureResult<Vec<u8>, Error> {
FutureResult::new(async {
Err(anyhow!(
"Local server doesn't support get collab doc state from remote"

View File

@ -1,6 +1,5 @@
use std::sync::Arc;
use collab::core::collab::CollabDocState;
use collab_entity::CollabObject;
use lazy_static::lazy_static;
use parking_lot::Mutex;
@ -149,7 +148,7 @@ impl UserCloudService for LocalServerUserAuthServiceImpl {
FutureResult::new(async { Ok(vec![]) })
}
fn get_user_awareness_doc_state(&self, _uid: i64) -> FutureResult<CollabDocState, FlowyError> {
fn get_user_awareness_doc_state(&self, _uid: i64) -> FutureResult<Vec<u8>, FlowyError> {
FutureResult::new(async { Ok(vec![]) })
}

View File

@ -16,7 +16,6 @@ use flowy_document_pub::cloud::DocumentCloudService;
use flowy_folder_pub::cloud::FolderCloudService;
use flowy_user_pub::cloud::UserCloudService;
use flowy_user_pub::entities::UserTokenState;
use lib_infra::future::FutureResult;
pub trait AppFlowyEncryption: Send + Sync + 'static {
fn get_secret(&self) -> Option<String>;
@ -123,7 +122,7 @@ pub trait AppFlowyServer: Send + Sync + 'static {
fn collab_ws_channel(
&self,
_object_id: &str,
) -> FutureResult<
) -> Result<
Option<(
Arc<WebSocketChannel<ServerCollabMessage>>,
WSConnectStateReceiver,
@ -131,7 +130,7 @@ pub trait AppFlowyServer: Send + Sync + 'static {
)>,
anyhow::Error,
> {
FutureResult::new(async { Ok(None) })
Ok(None)
}
fn file_storage(&self) -> Option<Arc<dyn ObjectStorageService>>;

View File

@ -4,7 +4,7 @@ use std::sync::{Arc, Weak};
use anyhow::Error;
use chrono::{DateTime, Utc};
use client_api::collab_sync::collab_msg::MsgId;
use collab::core::collab::CollabDocState;
use collab::core::collab::DocStateSource;
use collab::preclude::merge_updates_v1;
use collab_entity::CollabObject;
use collab_plugins::cloud_storage::{
@ -62,7 +62,7 @@ where
true
}
async fn get_doc_state(&self, object: &CollabObject) -> Result<CollabDocState, Error> {
async fn get_doc_state(&self, object: &CollabObject) -> Result<DocStateSource, Error> {
let postgrest = self.server.try_get_weak_postgrest()?;
let action = FetchObjectUpdateAction::new(
object.object_id.clone(),
@ -70,7 +70,7 @@ where
postgrest,
);
let doc_state = action.run().await?;
Ok(doc_state)
Ok(DocStateSource::FromDocState(doc_state))
}
async fn get_snapshots(&self, object_id: &str, limit: usize) -> Vec<RemoteCollabSnapshot> {

View File

@ -1,5 +1,4 @@
use anyhow::Error;
use collab::core::collab::CollabDocState;
use collab_entity::CollabType;
use tokio::sync::oneshot::channel;
@ -31,7 +30,7 @@ where
object_id: &str,
collab_type: CollabType,
_workspace_id: &str,
) -> FutureResult<CollabDocState, Error> {
) -> FutureResult<Vec<u8>, Error> {
let try_get_postgrest = self.server.try_get_weak_postgrest();
let object_id = object_id.to_string();
let (tx, rx) = channel();

View File

@ -1,5 +1,5 @@
use anyhow::Error;
use collab::core::collab::CollabDocState;
use collab::core::collab::DocStateSource;
use collab::core::origin::CollabOrigin;
use collab_document::blocks::DocumentData;
use collab_document::document::Document;
@ -33,7 +33,7 @@ where
&self,
document_id: &str,
workspace_id: &str,
) -> FutureResult<CollabDocState, FlowyError> {
) -> FutureResult<Vec<u8>, FlowyError> {
let try_get_postgrest = self.server.try_get_weak_postgrest();
let document_id = document_id.to_string();
let (tx, rx) = channel();
@ -94,8 +94,12 @@ where
let action =
FetchObjectUpdateAction::new(document_id.clone(), CollabType::Document, postgrest);
let doc_state = action.run_with_fix_interval(5, 10).await?;
let document =
Document::from_doc_state(CollabOrigin::Empty, doc_state, &document_id, vec![])?;
let document = Document::from_doc_state(
CollabOrigin::Empty,
DocStateSource::FromDocState(doc_state),
&document_id,
vec![],
)?;
Ok(document.get_document_data().ok())
}
.await,

View File

@ -2,7 +2,7 @@ use std::str::FromStr;
use anyhow::{anyhow, Error};
use chrono::{DateTime, Utc};
use collab::core::collab::CollabDocState;
use collab::core::collab::DocStateSource;
use collab::core::origin::CollabOrigin;
use collab_entity::CollabType;
use serde_json::Value;
@ -102,8 +102,13 @@ where
let doc_state = merge_updates_v1(&updates)
.map_err(|err| anyhow::anyhow!("merge updates failed: {:?}", err))?;
let folder =
Folder::from_collab_doc_state(uid, CollabOrigin::Empty, doc_state, &workspace_id, vec![])?;
let folder = Folder::from_collab_doc_state(
uid,
CollabOrigin::Empty,
DocStateSource::FromDocState(doc_state),
&workspace_id,
vec![],
)?;
Ok(folder.get_folder_data())
})
}
@ -137,7 +142,7 @@ where
_uid: i64,
collab_type: CollabType,
object_id: &str,
) -> FutureResult<CollabDocState, Error> {
) -> FutureResult<Vec<u8>, Error> {
let try_get_postgrest = self.server.try_get_weak_postgrest();
let object_id = object_id.to_string();
let (tx, rx) = channel();

View File

@ -7,7 +7,7 @@ use std::time::Duration;
use anyhow::Error;
use chrono::{DateTime, Utc};
use collab::core::collab::CollabDocState;
use collab::core::collab::DocStateSource;
use collab_entity::{CollabObject, CollabType};
use collab_plugins::cloud_storage::RemoteCollabSnapshot;
use serde_json::Value;
@ -60,7 +60,7 @@ impl FetchObjectUpdateAction {
impl Action for FetchObjectUpdateAction {
type Future = Pin<Box<dyn Future<Output = Result<Self::Item, Self::Error>> + Send>>;
type Item = CollabDocState;
type Item = Vec<u8>;
type Error = anyhow::Error;
fn run(&mut self) -> Self::Future {
@ -284,7 +284,7 @@ pub async fn batch_get_updates_from_server(
match parser_updates_form_json(record.clone(), &postgrest.secret()) {
Ok(items) => {
if items.is_empty() {
updates_by_oid.insert(oid.to_string(), vec![]);
updates_by_oid.insert(oid.to_string(), DocStateSource::FromDocState(vec![]));
} else {
let updates = items
.iter()
@ -293,7 +293,7 @@ pub async fn batch_get_updates_from_server(
let doc_state = merge_updates_v1(&updates)
.map_err(|err| anyhow::anyhow!("merge updates failed: {:?}", err))?;
updates_by_oid.insert(oid.to_string(), doc_state);
updates_by_oid.insert(oid.to_string(), DocStateSource::FromDocState(doc_state));
}
},
Err(e) => {

View File

@ -6,7 +6,7 @@ use std::sync::{Arc, Weak};
use std::time::Duration;
use anyhow::Error;
use collab::core::collab::{CollabDocState, MutexCollab};
use collab::core::collab::MutexCollab;
use collab::core::origin::CollabOrigin;
use collab_entity::{CollabObject, CollabType};
use parking_lot::RwLock;
@ -249,7 +249,7 @@ where
})
}
fn get_user_awareness_doc_state(&self, uid: i64) -> FutureResult<CollabDocState, FlowyError> {
fn get_user_awareness_doc_state(&self, uid: i64) -> FutureResult<Vec<u8>, FlowyError> {
let try_get_postgrest = self.server.try_get_weak_postgrest();
let awareness_id = uid.to_string();
let (tx, rx) = channel();

View File

@ -1,3 +1,4 @@
use collab::core::collab::DocStateSource;
use collab_entity::{CollabObject, CollabType};
use uuid::Uuid;
@ -50,7 +51,12 @@ async fn supabase_create_database_test() {
.unwrap();
assert_eq!(updates_by_oid.len(), 3);
for (_, update) in updates_by_oid {
assert_eq!(update.len(), 2);
for (_, source) in updates_by_oid {
match source {
DocStateSource::FromDisk => panic!("should not be from disk"),
DocStateSource::FromDocState(doc_state) => {
assert_eq!(doc_state.len(), 2);
},
}
}
}

View File

@ -2,7 +2,7 @@ use flowy_storage::ObjectStorageService;
use std::collections::HashMap;
use std::sync::Arc;
use collab::core::collab::MutexCollab;
use collab::core::collab::{DocStateSource, MutexCollab};
use collab::core::origin::CollabOrigin;
use collab_plugins::cloud_storage::RemoteCollabStorage;
use uuid::Uuid;
@ -122,8 +122,14 @@ pub async fn print_encryption_folder_snapshot(
.pop()
.unwrap();
let collab = Arc::new(
MutexCollab::new_with_doc_state(CollabOrigin::Empty, folder_id, snapshot.blob, vec![], false)
.unwrap(),
MutexCollab::new_with_doc_state(
CollabOrigin::Empty,
folder_id,
DocStateSource::FromDocState(snapshot.blob),
vec![],
false,
)
.unwrap(),
);
let folder_data = Folder::open(uid, collab, None)
.unwrap()