feat: realtime user event (#3241)

* feat: update user profile after receiving realtime user event

* chore: logout if other deivce enable encyrption

* test: fix test

* chore: fix checkbox UI

* chore: fix tauri build

* chore: fix device id

* chore: fix duplicate run appflowy
This commit is contained in:
Nathan.fooo
2023-08-20 14:13:54 +08:00
committed by GitHub
parent c5719be7ae
commit a1647bee78
39 changed files with 814 additions and 865 deletions

View File

@ -2,7 +2,6 @@ use std::sync::Arc;
use collab_plugins::cloud_storage::{CollabObject, RemoteCollabStorage};
use parking_lot::RwLock;
use serde_json::Value;
use flowy_database_deps::cloud::DatabaseCloudService;
use flowy_document_deps::cloud::DocumentCloudService;
@ -36,13 +35,11 @@ where
pub trait AppFlowyServer: Send + Sync + 'static {
fn set_enable_sync(&self, _enable: bool) {}
fn set_sync_device_id(&self, _device_id: &str) {}
fn user_service(&self) -> Arc<dyn UserService>;
fn folder_service(&self) -> Arc<dyn FolderCloudService>;
fn database_service(&self) -> Arc<dyn DatabaseCloudService>;
fn document_service(&self) -> Arc<dyn DocumentCloudService>;
fn collab_storage(&self, collab_object: &CollabObject) -> Option<Arc<dyn RemoteCollabStorage>>;
fn handle_realtime_event(&self, _json: Value) {}
}
pub struct EncryptionImpl {

View File

@ -16,7 +16,8 @@ use flowy_database_deps::cloud::{CollabObjectUpdate, CollabObjectUpdateByOid};
use lib_infra::util::md5;
use crate::supabase::api::util::{
ExtendedResponse, InsertParamsBuilder, SupabaseBinaryColumnDecoder, SupabaseBinaryColumnEncoder,
BinaryColumnDecoder, ExtendedResponse, InsertParamsBuilder, SupabaseBinaryColumnDecoder,
SupabaseBinaryColumnEncoder,
};
use crate::supabase::api::PostgresWrapper;
use crate::supabase::define::*;
@ -220,7 +221,8 @@ fn parser_snapshot(
.and_then(|value| value.as_str()),
) {
(Some(encrypt), Some(value)) => {
SupabaseBinaryColumnDecoder::decode(value, encrypt as i32, secret).ok()
SupabaseBinaryColumnDecoder::decode::<_, BinaryColumnDecoder>(value, encrypt as i32, secret)
.ok()
},
_ => None,
}?;
@ -364,7 +366,11 @@ fn parser_update_from_json(
json.get("value").and_then(|value| value.as_str()),
) {
(Some(encrypt), Some(value)) => {
match SupabaseBinaryColumnDecoder::decode(value, encrypt as i32, encryption_secret) {
match SupabaseBinaryColumnDecoder::decode::<_, BinaryColumnDecoder>(
value,
encrypt as i32,
encryption_secret,
) {
Ok(value) => Some(value),
Err(err) => {
tracing::error!("Decode value column failed: {:?}", err);

View File

@ -1,8 +1,10 @@
use std::str::FromStr;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use anyhow::Error;
use collab_plugins::cloud_storage::CollabObject;
use parking_lot::RwLock;
use serde_json::Value;
use tokio::sync::oneshot::channel;
use uuid::Uuid;
@ -13,20 +15,34 @@ use lib_infra::box_any::BoxAny;
use lib_infra::future::FutureResult;
use crate::supabase::api::request::FetchObjectUpdateAction;
use crate::supabase::api::util::{ExtendedResponse, InsertParamsBuilder};
use crate::supabase::api::util::{
ExtendedResponse, InsertParamsBuilder, RealtimeBinaryColumnDecoder, SupabaseBinaryColumnDecoder,
};
use crate::supabase::api::{send_update, PostgresWrapper, SupabaseServerService};
use crate::supabase::define::*;
use crate::supabase::entities::GetUserProfileParams;
use crate::supabase::entities::UidResponse;
use crate::supabase::entities::UserProfileResponse;
use crate::supabase::entities::{GetUserProfileParams, RealtimeUserEvent};
use crate::supabase::entities::{RealtimeCollabUpdateEvent, RealtimeEvent, UidResponse};
use crate::supabase::CollabUpdateSenderByOid;
use crate::AppFlowyEncryption;
pub struct SupabaseUserServiceImpl<T> {
server: T,
realtime_event_handlers: Vec<Box<dyn RealtimeEventHandler>>,
user_update_tx: Option<UserUpdateSender>,
}
impl<T> SupabaseUserServiceImpl<T> {
pub fn new(server: T) -> Self {
Self { server }
pub fn new(
server: T,
realtime_event_handlers: Vec<Box<dyn RealtimeEventHandler>>,
user_update_tx: Option<UserUpdateSender>,
) -> Self {
Self {
server,
realtime_event_handlers,
user_update_tx,
}
}
}
@ -67,7 +83,11 @@ where
}
// Query the user profile and workspaces
tracing::debug!("user uuid: {}", params.uuid);
tracing::debug!(
"user uuid: {}, device_id: {}",
params.uuid,
params.device_id
);
let user_profile =
get_user_profile(postgrest.clone(), GetUserProfileParams::Uuid(params.uuid))
.await?
@ -226,6 +246,26 @@ where
FutureResult::new(async { rx.await? })
}
fn receive_realtime_event(&self, json: Value) {
match serde_json::from_value::<RealtimeEvent>(json) {
Ok(event) => {
tracing::trace!("Realtime event: {}", event);
for handler in &self.realtime_event_handlers {
if event.table.as_str().starts_with(handler.table_name()) {
handler.handler_event(&event);
}
}
},
Err(e) => {
tracing::error!("parser realtime event error: {}", e);
},
}
}
fn subscribe_user_update(&self) -> Option<UserUpdateReceiver> {
self.user_update_tx.as_ref().map(|tx| tx.subscribe())
}
fn create_collab_object(
&self,
collab_object: &CollabObject,
@ -384,3 +424,95 @@ async fn check_user(
}
Ok(())
}
pub trait RealtimeEventHandler: Send + Sync + 'static {
fn table_name(&self) -> &str;
fn handler_event(&self, event: &RealtimeEvent);
}
pub struct RealtimeUserHandler(pub UserUpdateSender);
impl RealtimeEventHandler for RealtimeUserHandler {
fn table_name(&self) -> &str {
"af_user"
}
fn handler_event(&self, event: &RealtimeEvent) {
if let Ok(user_event) = serde_json::from_value::<RealtimeUserEvent>(event.new.clone()) {
let _ = self.0.send(UserUpdate {
uid: user_event.uid,
name: user_event.name,
email: user_event.email,
encryption_sign: user_event.encryption_sign,
});
}
}
}
pub struct RealtimeCollabUpdateHandler {
sender_by_oid: Weak<CollabUpdateSenderByOid>,
device_id: Arc<RwLock<String>>,
encryption: Weak<dyn AppFlowyEncryption>,
}
impl RealtimeCollabUpdateHandler {
pub fn new(
sender_by_oid: Weak<CollabUpdateSenderByOid>,
device_id: Arc<RwLock<String>>,
encryption: Weak<dyn AppFlowyEncryption>,
) -> Self {
Self {
sender_by_oid,
device_id,
encryption,
}
}
}
impl RealtimeEventHandler for RealtimeCollabUpdateHandler {
fn table_name(&self) -> &str {
"af_collab_update"
}
fn handler_event(&self, event: &RealtimeEvent) {
if let Ok(collab_update) =
serde_json::from_value::<RealtimeCollabUpdateEvent>(event.new.clone())
{
if let Some(sender_by_oid) = self.sender_by_oid.upgrade() {
if let Some(sender) = sender_by_oid.read().get(collab_update.oid.as_str()) {
tracing::trace!(
"current device: {}, event device: {}",
self.device_id.read(),
collab_update.did.as_str()
);
if *self.device_id.read() != collab_update.did.as_str() {
let encryption_secret = self
.encryption
.upgrade()
.and_then(|encryption| encryption.get_secret());
tracing::trace!(
"Parse collab update with len: {}, encrypt: {}",
collab_update.value.len(),
collab_update.encrypt,
);
match SupabaseBinaryColumnDecoder::decode::<_, RealtimeBinaryColumnDecoder>(
collab_update.value.as_str(),
collab_update.encrypt,
&encryption_secret,
) {
Ok(value) => {
if let Err(e) = sender.send(value) {
tracing::debug!("send realtime update error: {}", e);
}
},
Err(err) => {
tracing::error!("decode collab update error: {}", err);
},
}
}
}
}
}
}
}

View File

@ -171,7 +171,7 @@ impl SupabaseBinaryColumnDecoder {
/// # Returns
/// Returns an `Option` containing the decoded binary data if decoding is successful.
/// Otherwise, returns `None`.
pub fn decode<T: AsRef<str>>(
pub fn decode<T: AsRef<str>, D: HexDecoder>(
value: T,
encrypt: i32,
encryption_secret: &Option<String>,
@ -182,7 +182,7 @@ impl SupabaseBinaryColumnDecoder {
.ok_or(anyhow::anyhow!("Value is not start with: \\x",))?;
if encrypt == 0 {
let bytes = hex::decode(s)?;
let bytes = D::decode(s)?;
Ok(bytes)
} else {
match encryption_secret {
@ -190,7 +190,7 @@ impl SupabaseBinaryColumnDecoder {
"encryption_secret is None, but encrypt is 1"
)),
Some(encryption_secret) => {
let encrypt_data = hex::decode(s)?;
let encrypt_data = D::decode(s)?;
decrypt_bytes(encrypt_data, encryption_secret)
},
}
@ -198,15 +198,24 @@ impl SupabaseBinaryColumnDecoder {
}
}
/// A decoder specifically tailored for realtime event binary columns in Supabase.
///
pub struct SupabaseRealtimeEventBinaryColumnDecoder;
pub trait HexDecoder {
fn decode<T: AsRef<[u8]>>(data: T) -> Result<Vec<u8>, Error>;
}
impl SupabaseRealtimeEventBinaryColumnDecoder {
/// The realtime event binary column string is encoded twice. So it needs to be decoded twice.
pub fn decode<T: AsRef<str>>(value: T) -> Option<Vec<u8>> {
let s = value.as_ref().strip_prefix("\\x")?;
let bytes = hex::decode(s).ok()?;
hex::decode(bytes).ok()
pub struct RealtimeBinaryColumnDecoder;
impl HexDecoder for RealtimeBinaryColumnDecoder {
fn decode<T: AsRef<[u8]>>(data: T) -> Result<Vec<u8>, Error> {
// The realtime event binary column string is encoded twice. So it needs to be decoded twice.
let bytes = hex::decode(data)?;
let bytes = hex::decode(bytes)?;
Ok(bytes)
}
}
pub struct BinaryColumnDecoder;
impl HexDecoder for BinaryColumnDecoder {
fn decode<T: AsRef<[u8]>>(data: T) -> Result<Vec<u8>, Error> {
let bytes = hex::decode(data)?;
Ok(bytes)
}
}

View File

@ -1,11 +1,10 @@
use std::fmt;
use std::fmt::Display;
use serde::de::{Error, Visitor};
use serde::{Deserialize, Deserializer};
use serde::Deserialize;
use serde_json::Value;
use uuid::Uuid;
use crate::supabase::api::util::SupabaseRealtimeEventBinaryColumnDecoder;
use crate::util::deserialize_null_or_default;
pub enum GetUserProfileParams {
@ -40,16 +39,14 @@ pub(crate) struct UidResponse {
}
#[derive(Debug, Deserialize)]
pub struct RealtimeCollabUpdateEvent {
pub struct RealtimeEvent {
pub schema: String,
pub table: String,
#[serde(rename = "eventType")]
pub event_type: String,
#[serde(rename = "new")]
pub payload: RealtimeCollabUpdate,
pub new: Value,
}
impl Display for RealtimeCollabUpdateEvent {
impl Display for RealtimeEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
@ -60,43 +57,23 @@ impl Display for RealtimeCollabUpdateEvent {
}
#[derive(Debug, Deserialize)]
pub struct RealtimeCollabUpdate {
pub struct RealtimeCollabUpdateEvent {
pub oid: String,
pub uid: i64,
pub key: i64,
pub did: String,
#[serde(deserialize_with = "deserialize_value")]
pub value: Vec<u8>,
pub value: String,
#[serde(default)]
pub encrypt: i32,
}
pub fn deserialize_value<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
struct ValueVisitor();
impl<'de> Visitor<'de> for ValueVisitor {
type Value = Vec<u8>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("Expect NodeBody")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: Error,
{
Ok(SupabaseRealtimeEventBinaryColumnDecoder::decode(v).unwrap_or_default())
}
fn visit_string<E>(self, v: String) -> Result<Self::Value, E>
where
E: Error,
{
Ok(SupabaseRealtimeEventBinaryColumnDecoder::decode(v).unwrap_or_default())
}
}
deserializer.deserialize_any(ValueVisitor())
#[derive(Debug, Deserialize)]
pub struct RealtimeUserEvent {
pub uid: i64,
#[serde(deserialize_with = "deserialize_null_or_default")]
pub name: String,
#[serde(deserialize_with = "deserialize_null_or_default")]
pub email: String,
#[serde(deserialize_with = "deserialize_null_or_default")]
pub encryption_sign: String,
}

View File

@ -2,22 +2,19 @@ use std::collections::HashMap;
use std::sync::{Arc, Weak};
use collab_plugins::cloud_storage::{CollabObject, RemoteCollabStorage, RemoteUpdateSender};
use parking_lot::{Mutex, RwLock};
use serde_json::Value;
use parking_lot::RwLock;
use flowy_database_deps::cloud::DatabaseCloudService;
use flowy_document_deps::cloud::DocumentCloudService;
use flowy_encrypt::decrypt_bytes;
use flowy_folder_deps::cloud::FolderCloudService;
use flowy_server_config::supabase_config::SupabaseConfiguration;
use flowy_user_deps::cloud::UserService;
use crate::supabase::api::{
RESTfulPostgresServer, SupabaseCollabStorageImpl, SupabaseDatabaseServiceImpl,
SupabaseDocumentServiceImpl, SupabaseFolderServiceImpl, SupabaseServerServiceImpl,
SupabaseUserServiceImpl,
RESTfulPostgresServer, RealtimeCollabUpdateHandler, RealtimeEventHandler, RealtimeUserHandler,
SupabaseCollabStorageImpl, SupabaseDatabaseServiceImpl, SupabaseDocumentServiceImpl,
SupabaseFolderServiceImpl, SupabaseServerServiceImpl, SupabaseUserServiceImpl,
};
use crate::supabase::entities::RealtimeCollabUpdateEvent;
use crate::{AppFlowyEncryption, AppFlowyServer};
/// https://www.pgbouncer.org/features.html
@ -53,14 +50,16 @@ impl PgPoolMode {
matches!(self, PgPoolMode::Session)
}
}
pub type CollabUpdateSenderByOid = RwLock<HashMap<String, RemoteUpdateSender>>;
/// Supabase server is used to provide the implementation of the [AppFlowyServer] trait.
/// It contains the configuration of the supabase server and the postgres server.
pub struct SupabaseServer {
#[allow(dead_code)]
config: SupabaseConfiguration,
/// did represents as the device id is used to identify the device that is currently using the app.
did: Mutex<String>,
update_tx: RwLock<HashMap<String, RemoteUpdateSender>>,
device_id: Arc<RwLock<String>>,
collab_update_sender: Arc<CollabUpdateSenderByOid>,
restful_postgres: Arc<RwLock<Option<Arc<RESTfulPostgresServer>>>>,
encryption: Weak<dyn AppFlowyEncryption>,
}
@ -69,9 +68,10 @@ impl SupabaseServer {
pub fn new(
config: SupabaseConfiguration,
enable_sync: bool,
device_id: Arc<RwLock<String>>,
encryption: Weak<dyn AppFlowyEncryption>,
) -> Self {
let update_tx = RwLock::new(HashMap::new());
let collab_update_sender = Default::default();
let restful_postgres = if enable_sync {
Some(Arc::new(RESTfulPostgresServer::new(
config.clone(),
@ -82,8 +82,8 @@ impl SupabaseServer {
};
Self {
config,
did: Default::default(),
update_tx,
device_id,
collab_update_sender,
restful_postgres: Arc::new(RwLock::new(restful_postgres)),
encryption,
}
@ -108,14 +108,25 @@ impl AppFlowyServer for SupabaseServer {
self.set_enable_sync(enable);
}
fn set_sync_device_id(&self, device_id: &str) {
*self.did.lock() = device_id.to_string();
}
fn user_service(&self) -> Arc<dyn UserService> {
Arc::new(SupabaseUserServiceImpl::new(SupabaseServerServiceImpl(
self.restful_postgres.clone(),
)))
// handle the realtime collab update event.
let (user_update_tx, _) = tokio::sync::broadcast::channel(100);
let collab_update_handler = Box::new(RealtimeCollabUpdateHandler::new(
Arc::downgrade(&self.collab_update_sender),
self.device_id.clone(),
self.encryption.clone(),
));
// handle the realtime user event.
let user_handler = Box::new(RealtimeUserHandler(user_update_tx.clone()));
let handlers: Vec<Box<dyn RealtimeEventHandler>> = vec![collab_update_handler, user_handler];
Arc::new(SupabaseUserServiceImpl::new(
SupabaseServerServiceImpl(self.restful_postgres.clone()),
handlers,
Some(user_update_tx),
))
}
fn folder_service(&self) -> Arc<dyn FolderCloudService> {
@ -139,53 +150,14 @@ impl AppFlowyServer for SupabaseServer {
fn collab_storage(&self, collab_object: &CollabObject) -> Option<Arc<dyn RemoteCollabStorage>> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
self
.update_tx
.collab_update_sender
.write()
.insert(collab_object.object_id.clone(), tx);
Some(Arc::new(SupabaseCollabStorageImpl::new(
SupabaseServerServiceImpl(self.restful_postgres.clone()),
Some(rx),
self.encryption.clone(),
)))
}
fn handle_realtime_event(&self, json: Value) {
match serde_json::from_value::<RealtimeCollabUpdateEvent>(json) {
Ok(event) => {
if let Some(tx) = self.update_tx.read().get(event.payload.oid.as_str()) {
tracing::trace!(
"current device: {}, event device: {}",
self.did.lock().as_str(),
event.payload.did.as_str()
);
if self.did.lock().as_str() != event.payload.did.as_str() {
tracing::trace!("Did receive realtime event: {}", event);
let value = if event.payload.encrypt == 1 {
match self
.encryption
.upgrade()
.and_then(|encryption| encryption.get_secret())
{
None => vec![],
Some(secret) => decrypt_bytes(event.payload.value, &secret).unwrap_or_default(),
}
} else {
event.payload.value
};
if !value.is_empty() {
tracing::trace!("Parse payload with len: {} success", value.len());
if let Err(e) = tx.send(value) {
tracing::trace!("send realtime update error: {}", e);
}
}
}
}
},
Err(e) => {
tracing::error!("parser realtime event error: {}", e);
},
}
}
}

View File

@ -48,7 +48,7 @@ pub fn database_service() -> Arc<dyn DatabaseCloudService> {
pub fn user_auth_service() -> Arc<dyn UserService> {
let (server, _encryption_impl) = appflowy_server(None);
Arc::new(SupabaseUserServiceImpl::new(server))
Arc::new(SupabaseUserServiceImpl::new(server, vec![], None))
}
pub fn folder_service() -> Arc<dyn FolderCloudService> {