mirror of
https://github.com/AppFlowy-IO/AppFlowy.git
synced 2024-08-30 18:12:39 +00:00
fix: only encrypt if enable (#3236)
* fix: error page display issue * fix: override document with empty data * chore: add logs * fix: encrypt errors * fix: encrypt errors
This commit is contained in:
@ -38,9 +38,10 @@ where
|
||||
tx.send(
|
||||
async move {
|
||||
let postgrest = try_get_postgrest?;
|
||||
FetchObjectUpdateAction::new(object_id.to_string(), object_ty, postgrest)
|
||||
let updates = FetchObjectUpdateAction::new(object_id.to_string(), object_ty, postgrest)
|
||||
.run_with_fix_interval(5, 10)
|
||||
.await
|
||||
.await?;
|
||||
Ok(updates)
|
||||
}
|
||||
.await,
|
||||
)
|
||||
|
@ -6,6 +6,7 @@ use collab_plugins::cloud_storage::CollabType;
|
||||
use tokio::sync::oneshot::channel;
|
||||
|
||||
use flowy_document_deps::cloud::{DocumentCloudService, DocumentSnapshot};
|
||||
use flowy_error::FlowyError;
|
||||
use lib_infra::future::FutureResult;
|
||||
|
||||
use crate::supabase::api::request::{get_snapshots_from_server, FetchObjectUpdateAction};
|
||||
@ -34,7 +35,11 @@ where
|
||||
async move {
|
||||
let postgrest = try_get_postgrest?;
|
||||
let action = FetchObjectUpdateAction::new(document_id, CollabType::Document, postgrest);
|
||||
action.run_with_fix_interval(5, 10).await
|
||||
let updates = action.run_with_fix_interval(5, 10).await?;
|
||||
if updates.is_empty() {
|
||||
return Err(FlowyError::collab_not_sync().into());
|
||||
}
|
||||
Ok(updates)
|
||||
}
|
||||
.await,
|
||||
)
|
||||
|
@ -65,9 +65,12 @@ impl Action for FetchObjectUpdateAction {
|
||||
Box::pin(async move {
|
||||
match weak_postgres.upgrade() {
|
||||
None => Ok(vec![]),
|
||||
Some(postgrest) => {
|
||||
let items = get_updates_from_server(&object_id, &object_ty, postgrest).await?;
|
||||
Ok(items.into_iter().map(|item| item.value).collect())
|
||||
Some(postgrest) => match get_updates_from_server(&object_id, &object_ty, postgrest).await {
|
||||
Ok(items) => Ok(items.into_iter().map(|item| item.value).collect()),
|
||||
Err(err) => {
|
||||
tracing::error!("Get {} updates failed with error: {:?}", object_id, err);
|
||||
Err(err)
|
||||
},
|
||||
},
|
||||
}
|
||||
})
|
||||
@ -112,7 +115,19 @@ impl Action for BatchFetchObjectUpdateAction {
|
||||
Box::pin(async move {
|
||||
match weak_postgrest.upgrade() {
|
||||
None => Ok(CollabObjectUpdateByOid::default()),
|
||||
Some(server) => batch_get_updates_from_server(object_ids, &object_ty, server).await,
|
||||
Some(server) => {
|
||||
match batch_get_updates_from_server(object_ids.clone(), &object_ty, server).await {
|
||||
Ok(updates_by_oid) => Ok(updates_by_oid),
|
||||
Err(err) => {
|
||||
tracing::error!(
|
||||
"Batch get object with given ids:{:?} failed with error: {:?}",
|
||||
object_ids,
|
||||
err
|
||||
);
|
||||
Err(err)
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -349,7 +364,13 @@ fn parser_update_from_json(
|
||||
json.get("value").and_then(|value| value.as_str()),
|
||||
) {
|
||||
(Some(encrypt), Some(value)) => {
|
||||
SupabaseBinaryColumnDecoder::decode(value, encrypt as i32, encryption_secret).ok()
|
||||
match SupabaseBinaryColumnDecoder::decode(value, encrypt as i32, encryption_secret) {
|
||||
Ok(value) => Some(value),
|
||||
Err(err) => {
|
||||
tracing::error!("Decode value column failed: {:?}", err);
|
||||
None
|
||||
},
|
||||
}
|
||||
},
|
||||
_ => None,
|
||||
};
|
||||
@ -371,9 +392,12 @@ fn parser_update_from_json(
|
||||
}
|
||||
Ok(UpdateItem { key, value })
|
||||
} else {
|
||||
let keys = json
|
||||
.as_object()
|
||||
.map(|map| map.iter().map(|(key, _)| key).collect::<Vec<&String>>());
|
||||
Err(anyhow::anyhow!(
|
||||
"missing key or value column in json: {:?}",
|
||||
json
|
||||
"missing key or value column. Current keys:: {:?}",
|
||||
keys
|
||||
))
|
||||
}
|
||||
}
|
||||
|
@ -152,17 +152,24 @@ impl AppFlowyServer for SupabaseServer {
|
||||
fn handle_realtime_event(&self, json: Value) {
|
||||
match serde_json::from_value::<RealtimeCollabUpdateEvent>(json) {
|
||||
Ok(event) => {
|
||||
if let (Some(tx), Some(secret)) = (
|
||||
self.update_tx.read().get(event.payload.oid.as_str()),
|
||||
self
|
||||
.encryption
|
||||
.upgrade()
|
||||
.and_then(|encryption| encryption.get_secret()),
|
||||
) {
|
||||
if let Some(tx) = self.update_tx.read().get(event.payload.oid.as_str()) {
|
||||
tracing::trace!(
|
||||
"current device: {}, event device: {}",
|
||||
self.did.lock().as_str(),
|
||||
event.payload.did.as_str()
|
||||
);
|
||||
|
||||
if self.did.lock().as_str() != event.payload.did.as_str() {
|
||||
tracing::trace!("Did receive realtime event: {}", event);
|
||||
let value = if event.payload.encrypt == 1 {
|
||||
decrypt_bytes(event.payload.value, &secret).unwrap_or_default()
|
||||
match self
|
||||
.encryption
|
||||
.upgrade()
|
||||
.and_then(|encryption| encryption.get_secret())
|
||||
{
|
||||
None => vec![],
|
||||
Some(secret) => decrypt_bytes(event.payload.value, &secret).unwrap_or_default(),
|
||||
}
|
||||
} else {
|
||||
event.payload.value
|
||||
};
|
||||
|
Reference in New Issue
Block a user