feat: enable collaboration update synchronization between different devices (#3169)

* feat: bypass realtime event

* chore: use user device id

* chore: send realtime update

* chore: setup realtime recever

* chore: setup realtime recever

* chore: clippy

* chore: update collab rev

* chore: update realtime subscription

* chore: fix test

* chore: fmt

* test: fix flutter test
This commit is contained in:
Nathan.fooo
2023-08-12 17:36:31 +08:00
committed by GitHub
parent 764b4db166
commit 9063b40e06
45 changed files with 622 additions and 560 deletions

View File

@ -1,6 +1,7 @@
use std::sync::Arc;
use collab_plugins::cloud_storage::RemoteCollabStorage;
use collab_plugins::cloud_storage::{CollabObject, RemoteCollabStorage};
use serde_json::Value;
use flowy_database_deps::cloud::DatabaseCloudService;
use flowy_document_deps::cloud::DocumentCloudService;
@ -16,9 +17,11 @@ pub mod util;
pub trait AppFlowyServer: Send + Sync + 'static {
fn enable_sync(&self, _enable: bool) {}
fn set_sync_device_id(&self, _device_id: &str) {}
fn user_service(&self) -> Arc<dyn UserService>;
fn folder_service(&self) -> Arc<dyn FolderCloudService>;
fn database_service(&self) -> Arc<dyn DatabaseCloudService>;
fn document_service(&self) -> Arc<dyn DocumentCloudService>;
fn collab_storage(&self) -> Option<Arc<dyn RemoteCollabStorage>>;
fn collab_storage(&self, collab_object: &CollabObject) -> Option<Arc<dyn RemoteCollabStorage>>;
fn handle_realtime_event(&self, _json: Value) {}
}

View File

@ -1,6 +1,6 @@
use anyhow::Error;
use std::sync::Arc;
use anyhow::Error;
use lazy_static::lazy_static;
use parking_lot::Mutex;
@ -42,6 +42,7 @@ impl UserService for LocalServerUserAuthServiceImpl {
is_new: true,
email: Some(params.email),
token: None,
device_id: params.device_id,
})
})
}
@ -50,10 +51,7 @@ impl UserService for LocalServerUserAuthServiceImpl {
let db = self.db.clone();
FutureResult::new(async move {
let params: SignInParams = params.unbox_or_error::<SignInParams>()?;
let uid = match params.uid {
None => ID_GEN.lock().next_id(),
Some(uid) => uid,
};
let uid = ID_GEN.lock().next_id();
let user_workspace = db
.get_user_workspace(uid)?
@ -65,6 +63,7 @@ impl UserService for LocalServerUserAuthServiceImpl {
user_workspaces: vec![user_workspace],
email: Some(params.email),
token: None,
device_id: params.device_id,
})
})
}

View File

@ -1,6 +1,6 @@
use std::sync::Arc;
use collab_plugins::cloud_storage::RemoteCollabStorage;
use collab_plugins::cloud_storage::{CollabObject, RemoteCollabStorage};
use parking_lot::RwLock;
use tokio::sync::mpsc;
@ -68,7 +68,7 @@ impl AppFlowyServer for LocalServer {
Arc::new(LocalServerDocumentCloudServiceImpl())
}
fn collab_storage(&self) -> Option<Arc<dyn RemoteCollabStorage>> {
fn collab_storage(&self, _collab_object: &CollabObject) -> Option<Arc<dyn RemoteCollabStorage>> {
None
}
}

View File

@ -1,6 +1,6 @@
use std::sync::Arc;
use collab_plugins::cloud_storage::RemoteCollabStorage;
use collab_plugins::cloud_storage::{CollabObject, RemoteCollabStorage};
use flowy_database_deps::cloud::DatabaseCloudService;
use flowy_document_deps::cloud::DocumentCloudService;
@ -41,7 +41,7 @@ impl AppFlowyServer for SelfHostServer {
Arc::new(SelfHostedDocumentCloudServiceImpl())
}
fn collab_storage(&self) -> Option<Arc<dyn RemoteCollabStorage>> {
fn collab_storage(&self, _collab_object: &CollabObject) -> Option<Arc<dyn RemoteCollabStorage>> {
None
}
}

View File

@ -8,6 +8,7 @@ use collab_plugins::cloud_storage::{
CollabObject, MsgId, RemoteCollabSnapshot, RemoteCollabState, RemoteCollabStorage,
RemoteUpdateReceiver,
};
use parking_lot::Mutex;
use tokio::task::spawn_blocking;
use lib_infra::async_trait::async_trait;
@ -17,15 +18,23 @@ use crate::supabase::api::request::{
create_snapshot, get_latest_snapshot_from_server, get_updates_from_server,
FetchObjectUpdateAction, UpdateItem,
};
use crate::supabase::api::util::{ExtendedResponse, InsertParamsBuilder};
use crate::supabase::api::util::{
ExtendedResponse, InsertParamsBuilder, SupabaseBinaryColumnEncoder,
};
use crate::supabase::api::{PostgresWrapper, SupabaseServerService};
use crate::supabase::define::*;
pub struct SupabaseCollabStorageImpl<T>(T);
pub struct SupabaseCollabStorageImpl<T> {
server: T,
rx: Mutex<Option<RemoteUpdateReceiver>>,
}
impl<T> SupabaseCollabStorageImpl<T> {
pub fn new(server: T) -> Self {
Self(server)
pub fn new(server: T, rx: Option<RemoteUpdateReceiver>) -> Self {
Self {
server,
rx: Mutex::new(rx),
}
}
}
@ -39,21 +48,22 @@ where
}
async fn get_all_updates(&self, object: &CollabObject) -> Result<Vec<Vec<u8>>, Error> {
let postgrest = self.0.try_get_weak_postgrest()?;
let action = FetchObjectUpdateAction::new(object.id.clone(), object.ty.clone(), postgrest);
let postgrest = self.server.try_get_weak_postgrest()?;
let action =
FetchObjectUpdateAction::new(object.object_id.clone(), object.ty.clone(), postgrest);
let updates = action.run().await?;
Ok(updates)
}
async fn get_latest_snapshot(&self, object_id: &str) -> Option<RemoteCollabSnapshot> {
let postgrest = self.0.try_get_postgrest().ok()?;
let postgrest = self.server.try_get_postgrest().ok()?;
get_latest_snapshot_from_server(object_id, postgrest)
.await
.ok()?
}
async fn get_collab_state(&self, object_id: &str) -> Result<Option<RemoteCollabState>, Error> {
let postgrest = self.0.try_get_postgrest()?;
let postgrest = self.server.try_get_postgrest()?;
let json = postgrest
.from("af_collab_state")
.select("*")
@ -92,7 +102,7 @@ where
}
async fn create_snapshot(&self, object: &CollabObject, snapshot: Vec<u8>) -> Result<i64, Error> {
let postgrest = self.0.try_get_postgrest()?;
let postgrest = self.server.try_get_postgrest()?;
create_snapshot(&postgrest, object, snapshot).await
}
@ -102,7 +112,7 @@ where
_id: MsgId,
update: Vec<u8>,
) -> Result<(), Error> {
if let Some(postgrest) = self.0.get_postgrest() {
if let Some(postgrest) = self.server.get_postgrest() {
let workspace_id = object
.get_workspace_id()
.ok_or(anyhow::anyhow!("Invalid workspace id"))?;
@ -118,12 +128,13 @@ where
_id: MsgId,
init_update: Vec<u8>,
) -> Result<(), Error> {
let postgrest = self.0.try_get_postgrest()?;
let postgrest = self.server.try_get_postgrest()?;
let workspace_id = object
.get_workspace_id()
.ok_or(anyhow::anyhow!("Invalid workspace id"))?;
let update_items = get_updates_from_server(&object.id, &object.ty, postgrest.clone()).await?;
let update_items =
get_updates_from_server(&object.object_id, &object.ty, postgrest.clone()).await?;
// If the update_items is empty, we can send the init_update directly
if update_items.is_empty() {
@ -132,14 +143,12 @@ where
// 2.Merge the updates into one and then delete the merged updates
let merge_result = spawn_blocking(move || merge_updates(update_items, init_update)).await??;
tracing::trace!("Merged updates count: {}", merge_result.merged_keys.len());
let override_key = merge_result.merged_keys.last().cloned().unwrap();
let value_size = merge_result.new_update.len() as i32;
let md5 = md5(&merge_result.new_update);
let new_update = format!("\\x{}", hex::encode(merge_result.new_update));
let params = InsertParamsBuilder::new()
.insert("oid", object.id.clone())
.insert("new_key", override_key)
.insert("oid", object.object_id.clone())
.insert("new_value", new_update)
.insert("md5", md5)
.insert("value_size", value_size)
@ -147,10 +156,11 @@ where
.insert("uid", object.uid)
.insert("workspace_id", workspace_id)
.insert("removed_keys", merge_result.merged_keys)
.insert("did", object.get_device_id())
.build();
postgrest
.rpc("flush_collab_updates", params)
.rpc("flush_collab_updates_v2", params)
.execute()
.await?
.success()
@ -159,8 +169,12 @@ where
Ok(())
}
async fn subscribe_remote_updates(&self, _object: &CollabObject) -> Option<RemoteUpdateReceiver> {
todo!()
fn subscribe_remote_updates(&self, _object: &CollabObject) -> Option<RemoteUpdateReceiver> {
let rx = self.rx.lock().take();
if rx.is_none() {
tracing::warn!("The receiver is already taken");
}
rx
}
}
@ -172,14 +186,15 @@ async fn send_update(
) -> Result<(), Error> {
let value_size = update.len() as i32;
let md5 = md5(&update);
let update = format!("\\x{}", hex::encode(update));
let update = SupabaseBinaryColumnEncoder::encode(update);
let builder = InsertParamsBuilder::new()
.insert("oid", object.id.clone())
.insert("oid", object.object_id.clone())
.insert("partition_key", partition_key(&object.ty))
.insert("value", update)
.insert("uid", object.uid)
.insert("md5", md5)
.insert("workspace_id", workspace_id)
.insert("did", object.get_device_id())
.insert("value_size", value_size);
let params = builder.build();

View File

@ -12,4 +12,4 @@ mod folder;
mod postgres_server;
mod request;
mod user;
mod util;
pub mod util;

View File

@ -15,7 +15,9 @@ use tokio_retry::{Action, Condition, RetryIf};
use flowy_database_deps::cloud::{CollabObjectUpdate, CollabObjectUpdateByOid};
use lib_infra::util::md5;
use crate::supabase::api::util::{ExtendedResponse, InsertParamsBuilder};
use crate::supabase::api::util::{
ExtendedResponse, InsertParamsBuilder, SupabaseBinaryColumnDecoder,
};
use crate::supabase::api::PostgresWrapper;
use crate::supabase::define::*;
@ -127,7 +129,7 @@ pub async fn create_snapshot(
.from(AF_COLLAB_SNAPSHOT_TABLE)
.insert(
InsertParamsBuilder::new()
.insert(AF_COLLAB_SNAPSHOT_OID_COLUMN, object.id.clone())
.insert(AF_COLLAB_SNAPSHOT_OID_COLUMN, object.object_id.clone())
.insert("name", object.ty.to_string())
.insert(AF_COLLAB_SNAPSHOT_BLOB_COLUMN, snapshot)
.insert(AF_COLLAB_SNAPSHOT_BLOB_SIZE_COLUMN, value_size)
@ -168,7 +170,7 @@ pub async fn get_latest_snapshot_from_server(
let blob = value
.get("blob")
.and_then(|blob| blob.as_str())
.and_then(decode_hex_string)?;
.and_then(SupabaseBinaryColumnDecoder::decode)?;
let sid = value.get("sid").and_then(|id| id.as_i64())?;
let created_at = value.get("created_at").and_then(|created_at| {
created_at
@ -272,7 +274,7 @@ fn parser_update_from_json(json: &Value) -> Result<UpdateItem, Error> {
let some_record = json
.get("value")
.and_then(|value| value.as_str())
.and_then(decode_hex_string);
.and_then(SupabaseBinaryColumnDecoder::decode);
let some_key = json.get("key").and_then(|value| value.as_i64());
if let (Some(value), Some(key)) = (some_record, some_key) {
@ -301,11 +303,6 @@ pub struct UpdateItem {
pub value: Vec<u8>,
}
fn decode_hex_string(s: &str) -> Option<Vec<u8>> {
let s = s.strip_prefix("\\x")?;
hex::decode(s).ok()
}
pub struct RetryCondition(Weak<PostgresWrapper>);
impl Condition<anyhow::Error> for RetryCondition {
fn should_retry(&mut self, _error: &anyhow::Error) -> bool {

View File

@ -89,6 +89,7 @@ where
is_new: is_new_user,
email: Some(user_profile.email),
token: None,
device_id: params.device_id,
})
})
}
@ -115,6 +116,7 @@ where
user_workspaces,
email: None,
token: None,
device_id: params.device_id,
})
})
}

View File

@ -5,15 +5,14 @@ use serde_json::Value;
use flowy_error::{ErrorCode, FlowyError};
use lib_infra::future::{to_fut, Fut};
#[derive(Default)]
pub struct InsertParamsBuilder {
map: serde_json::Map<String, Value>,
}
impl InsertParamsBuilder {
pub fn new() -> Self {
Self {
map: serde_json::Map::new(),
}
Self::default()
}
pub fn insert<T: serde::Serialize>(mut self, key: &str, value: T) -> Self {
@ -126,3 +125,60 @@ async fn parse_response_as_error(response: Response) -> FlowyError {
),
)
}
/// An encoder for binary columns in Supabase.
///
/// Provides utilities to encode binary data into a format suitable for Supabase columns.
pub struct SupabaseBinaryColumnEncoder;
impl SupabaseBinaryColumnEncoder {
/// Encodes the given binary data into a Supabase-friendly string representation.
///
/// # Parameters
/// - `value`: The binary data to encode.
///
/// # Returns
/// Returns the encoded string in the format: `\\xHEX_ENCODED_STRING`
pub fn encode<T: AsRef<[u8]>>(value: T) -> String {
format!("\\x{}", hex::encode(value))
}
}
/// A decoder for binary columns in Supabase.
///
/// Provides utilities to decode a string from Supabase columns back into binary data.
pub struct SupabaseBinaryColumnDecoder;
impl SupabaseBinaryColumnDecoder {
/// Decodes a Supabase binary column string into binary data.
///
/// # Parameters
/// - `value`: The string representation from a Supabase binary column.
///
/// # Returns
/// Returns an `Option` containing the decoded binary data if decoding is successful.
/// Otherwise, returns `None`.
pub fn decode<T: AsRef<str>>(value: T) -> Option<Vec<u8>> {
let s = value.as_ref().strip_prefix("\\x")?;
hex::decode(s).ok()
}
}
/// A decoder specifically tailored for realtime event binary columns in Supabase.
///
/// Decodes the realtime event binary column data using the standard Supabase binary column decoder.
pub struct SupabaseRealtimeEventBinaryColumnDecoder;
impl SupabaseRealtimeEventBinaryColumnDecoder {
/// Decodes a realtime event binary column string from Supabase into binary data.
///
/// # Parameters
/// - `value`: The string representation from a Supabase realtime event binary column.
///
/// # Returns
/// Returns an `Option` containing the decoded binary data if decoding is successful.
/// Otherwise, returns `None`.
pub fn decode<T: AsRef<str>>(value: T) -> Option<Vec<u8>> {
let bytes = SupabaseBinaryColumnDecoder::decode(value)?;
hex::decode(bytes).ok()
}
}

View File

@ -1,4 +1,4 @@
use collab_plugins::cloud_storage::CollabType;
pub use collab_plugins::cloud_storage::CollabType;
pub const AF_COLLAB_UPDATE_TABLE: &str = "af_collab_update";
pub const AF_COLLAB_KEY_COLUMN: &str = "key";

View File

@ -1,6 +1,11 @@
use serde::Deserialize;
use std::fmt;
use std::fmt::Display;
use serde::de::{Error, Visitor};
use serde::{Deserialize, Deserializer};
use uuid::Uuid;
use crate::supabase::api::util::SupabaseRealtimeEventBinaryColumnDecoder;
use crate::util::deserialize_null_or_default;
pub enum GetUserProfileParams {
@ -30,3 +35,63 @@ pub(crate) struct UidResponse {
#[allow(dead_code)]
pub uid: i64,
}
#[derive(Debug, Deserialize)]
pub struct RealtimeCollabUpdateEvent {
pub schema: String,
pub table: String,
#[serde(rename = "eventType")]
pub event_type: String,
#[serde(rename = "new")]
pub payload: RealtimeCollabUpdate,
}
impl Display for RealtimeCollabUpdateEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"schema: {}, table: {}, event_type: {}",
self.schema, self.table, self.event_type
)
}
}
#[derive(Debug, Deserialize)]
pub struct RealtimeCollabUpdate {
pub oid: String,
pub uid: i64,
pub key: i64,
pub did: String,
#[serde(deserialize_with = "deserialize_value")]
pub value: Vec<u8>,
}
pub fn deserialize_value<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
struct ValueVisitor();
impl<'de> Visitor<'de> for ValueVisitor {
type Value = Vec<u8>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("Expect NodeBody")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: Error,
{
Ok(SupabaseRealtimeEventBinaryColumnDecoder::decode(v).unwrap_or_default())
}
fn visit_string<E>(self, v: String) -> Result<Self::Value, E>
where
E: Error,
{
Ok(SupabaseRealtimeEventBinaryColumnDecoder::decode(v).unwrap_or_default())
}
}
deserializer.deserialize_any(ValueVisitor())
}

View File

@ -1,7 +1,9 @@
use std::collections::HashMap;
use std::sync::Arc;
use collab_plugins::cloud_storage::RemoteCollabStorage;
use parking_lot::RwLock;
use collab_plugins::cloud_storage::{CollabObject, RemoteCollabStorage, RemoteUpdateSender};
use parking_lot::{Mutex, RwLock};
use serde_json::Value;
use flowy_database_deps::cloud::DatabaseCloudService;
use flowy_document_deps::cloud::DocumentCloudService;
@ -14,6 +16,7 @@ use crate::supabase::api::{
SupabaseDatabaseServiceImpl, SupabaseDocumentServiceImpl, SupabaseFolderServiceImpl,
SupabaseServerServiceImpl,
};
use crate::supabase::entities::RealtimeCollabUpdateEvent;
use crate::AppFlowyServer;
/// https://www.pgbouncer.org/features.html
@ -54,11 +57,14 @@ impl PgPoolMode {
pub struct SupabaseServer {
#[allow(dead_code)]
config: SupabaseConfiguration,
device_id: Mutex<String>,
update_tx: RwLock<HashMap<String, RemoteUpdateSender>>,
restful_postgres: Arc<RwLock<Option<Arc<RESTfulPostgresServer>>>>,
}
impl SupabaseServer {
pub fn new(config: SupabaseConfiguration) -> Self {
let update_tx = RwLock::new(HashMap::new());
let restful_postgres = if config.enable_sync {
Some(Arc::new(RESTfulPostgresServer::new(config.clone())))
} else {
@ -66,6 +72,8 @@ impl SupabaseServer {
};
Self {
config,
device_id: Default::default(),
update_tx,
restful_postgres: Arc::new(RwLock::new(restful_postgres)),
}
}
@ -89,6 +97,10 @@ impl AppFlowyServer for SupabaseServer {
self.set_enable_sync(enable);
}
fn set_sync_device_id(&self, device_id: &str) {
*self.device_id.lock() = device_id.to_string();
}
fn user_service(&self) -> Arc<dyn UserService> {
Arc::new(RESTfulSupabaseUserAuthServiceImpl::new(
SupabaseServerServiceImpl(self.restful_postgres.clone()),
@ -113,9 +125,32 @@ impl AppFlowyServer for SupabaseServer {
)))
}
fn collab_storage(&self) -> Option<Arc<dyn RemoteCollabStorage>> {
fn collab_storage(&self, collab_object: &CollabObject) -> Option<Arc<dyn RemoteCollabStorage>> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
self
.update_tx
.write()
.insert(collab_object.object_id.clone(), tx);
Some(Arc::new(SupabaseCollabStorageImpl::new(
SupabaseServerServiceImpl(self.restful_postgres.clone()),
Some(rx),
)))
}
fn handle_realtime_event(&self, json: Value) {
match serde_json::from_value::<RealtimeCollabUpdateEvent>(json) {
Ok(event) => {
if let Some(tx) = self.update_tx.read().get(event.payload.oid.as_str()) {
if self.device_id.lock().as_str() != event.payload.did.as_str() {
if let Err(e) = tx.send(event.payload.value) {
tracing::trace!("send realtime update error: {}", e);
}
}
}
},
Err(e) => {
tracing::error!("parser realtime event error: {}", e);
},
}
}
}

View File

@ -1,10 +1,12 @@
use collab_plugins::cloud_storage::{CollabObject, CollabType};
use uuid::Uuid;
use flowy_user_deps::entities::SignUpResponse;
use lib_infra::box_any::BoxAny;
use crate::supabase_test::util::{
collab_service, database_service, get_supabase_config, sign_up_param, user_auth_service,
};
use collab_plugins::cloud_storage::{CollabObject, CollabType};
use flowy_user_deps::entities::SignUpResponse;
use lib_infra::box_any::BoxAny;
use uuid::Uuid;
#[tokio::test]
async fn supabase_create_workspace_test() {
@ -25,7 +27,7 @@ async fn supabase_create_workspace_test() {
let row_id = uuid::Uuid::new_v4().to_string();
row_ids.push(row_id.clone());
let collab_object = CollabObject {
id: row_id,
object_id: row_id,
uid: user.user_id,
ty: CollabType::DatabaseRow,
meta: Default::default(),

View File

@ -41,7 +41,7 @@ async fn supabase_get_folder_test() {
let user: SignUpResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
let collab_object = CollabObject {
id: user.latest_workspace.id.clone(),
object_id: user.latest_workspace.id.clone(),
uid: user.user_id,
ty: CollabType::Folder,
meta: Default::default(),
@ -124,7 +124,7 @@ async fn supabase_duplicate_updates_test() {
let user: SignUpResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
let collab_object = CollabObject {
id: user.latest_workspace.id.clone(),
object_id: user.latest_workspace.id.clone(),
uid: user.user_id,
ty: CollabType::Folder,
meta: Default::default(),
@ -220,7 +220,7 @@ async fn supabase_diff_state_vec_test() {
let user: SignUpResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
let collab_object = CollabObject {
id: user.latest_workspace.id.clone(),
object_id: user.latest_workspace.id.clone(),
uid: user.user_id,
ty: CollabType::Folder,
meta: Default::default(),

View File

@ -27,6 +27,7 @@ pub fn collab_service() -> Arc<dyn RemoteCollabStorage> {
let server = Arc::new(RESTfulPostgresServer::new(config));
Arc::new(SupabaseCollabStorageImpl::new(
SupabaseServerServiceImpl::new(server),
None,
))
}