Merge pull request #656 from AppFlowy-IO/feat/remove_filter_feature_flag

chore: remove feature flag: filter
This commit is contained in:
Nathan.fooo 2022-07-20 19:29:37 +08:00 committed by GitHub
commit 78b756d65d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 200 additions and 30 deletions

View File

@ -11,9 +11,9 @@ use flowy_database::ConnectionPool;
use flowy_error::{FlowyError, FlowyResult}; use flowy_error::{FlowyError, FlowyResult};
use flowy_folder_data_model::revision::{AppRevision, TrashRevision, ViewRevision, WorkspaceRevision}; use flowy_folder_data_model::revision::{AppRevision, TrashRevision, ViewRevision, WorkspaceRevision};
use flowy_revision::disk::{RevisionRecord, RevisionState}; use flowy_revision::disk::{RevisionRecord, RevisionState};
use flowy_revision::mk_revision_disk_cache; use flowy_revision::mk_text_block_revision_disk_cache;
use flowy_sync::client_folder::initial_folder_delta;
use flowy_sync::{client_folder::FolderPad, entities::revision::Revision}; use flowy_sync::{client_folder::FolderPad, entities::revision::Revision};
use lib_ot::core::PlainTextDeltaBuilder;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock; use tokio::sync::RwLock;
pub use version_1::{app_sql::*, trash_sql::*, v1_impl::V1Transaction, view_sql::*, workspace_sql::*}; pub use version_1::{app_sql::*, trash_sql::*, v1_impl::V1Transaction, view_sql::*, workspace_sql::*};
@ -109,16 +109,16 @@ impl FolderPersistence {
pub async fn save_folder(&self, user_id: &str, folder_id: &FolderId, folder: FolderPad) -> FlowyResult<()> { pub async fn save_folder(&self, user_id: &str, folder_id: &FolderId, folder: FolderPad) -> FlowyResult<()> {
let pool = self.database.db_pool()?; let pool = self.database.db_pool()?;
let delta_data = initial_folder_delta(&folder)?.to_delta_bytes(); let json = folder.to_json()?;
let md5 = folder.md5(); let delta_data = PlainTextDeltaBuilder::new().insert(&json).build().to_delta_bytes();
let revision = Revision::new(folder_id.as_ref(), 0, 0, delta_data, user_id, md5); let revision = Revision::initial_revision(user_id, folder_id.as_ref(), delta_data);
let record = RevisionRecord { let record = RevisionRecord {
revision, revision,
state: RevisionState::Sync, state: RevisionState::Sync,
write_to_disk: true, write_to_disk: true,
}; };
let disk_cache = mk_revision_disk_cache(user_id, pool); let disk_cache = mk_text_block_revision_disk_cache(user_id, pool);
disk_cache.delete_and_insert_records(folder_id.as_ref(), None, vec![record]) disk_cache.delete_and_insert_records(folder_id.as_ref(), None, vec![record])
} }
} }

View File

@ -50,7 +50,6 @@ lib-infra = { path = "../../../shared-lib/lib-infra", features = ["protobuf_file
[features] [features]
default = ["filter"] default = []
dart = ["lib-infra/dart"] dart = ["lib-infra/dart"]
filter = []
flowy_unit_test = ["flowy-revision/flowy_unit_test"] flowy_unit_test = ["flowy-revision/flowy_unit_test"]

View File

@ -2,6 +2,7 @@ use crate::services::block_revision_editor::GridBlockRevisionCompactor;
use crate::services::grid_editor::{GridRevisionCompactor, GridRevisionEditor}; use crate::services::grid_editor::{GridRevisionCompactor, GridRevisionEditor};
use crate::services::persistence::block_index::BlockIndexCache; use crate::services::persistence::block_index::BlockIndexCache;
use crate::services::persistence::kv::GridKVPersistence; use crate::services::persistence::kv::GridKVPersistence;
use crate::services::persistence::migration::GridMigration;
use crate::services::persistence::GridDatabase; use crate::services::persistence::GridDatabase;
use crate::services::tasks::GridTaskScheduler; use crate::services::tasks::GridTaskScheduler;
use bytes::Bytes; use bytes::Bytes;
@ -31,6 +32,7 @@ pub struct GridManager {
#[allow(dead_code)] #[allow(dead_code)]
kv_persistence: Arc<GridKVPersistence>, kv_persistence: Arc<GridKVPersistence>,
task_scheduler: GridTaskSchedulerRwLock, task_scheduler: GridTaskSchedulerRwLock,
migration: GridMigration,
} }
impl GridManager { impl GridManager {
@ -41,17 +43,27 @@ impl GridManager {
) -> Self { ) -> Self {
let grid_editors = Arc::new(DashMap::new()); let grid_editors = Arc::new(DashMap::new());
let kv_persistence = Arc::new(GridKVPersistence::new(database.clone())); let kv_persistence = Arc::new(GridKVPersistence::new(database.clone()));
let block_index_cache = Arc::new(BlockIndexCache::new(database)); let block_index_cache = Arc::new(BlockIndexCache::new(database.clone()));
let task_scheduler = GridTaskScheduler::new(); let task_scheduler = GridTaskScheduler::new();
let migration = GridMigration::new(grid_user.clone(), database);
Self { Self {
grid_editors, grid_editors,
grid_user, grid_user,
kv_persistence, kv_persistence,
block_index_cache, block_index_cache,
task_scheduler, task_scheduler,
migration,
} }
} }
pub async fn initialize_with_new_user(&self, _user_id: &str, _token: &str) -> FlowyResult<()> {
Ok(())
}
pub async fn initialize(&self, _user_id: &str, _token: &str) -> FlowyResult<()> {
Ok(())
}
#[tracing::instrument(level = "debug", skip_all, err)] #[tracing::instrument(level = "debug", skip_all, err)]
pub async fn create_grid<T: AsRef<str>>(&self, grid_id: T, revisions: RepeatedRevision) -> FlowyResult<()> { pub async fn create_grid<T: AsRef<str>>(&self, grid_id: T, revisions: RepeatedRevision) -> FlowyResult<()> {
let grid_id = grid_id.as_ref(); let grid_id = grid_id.as_ref();
@ -74,6 +86,7 @@ impl GridManager {
pub async fn open_grid<T: AsRef<str>>(&self, grid_id: T) -> FlowyResult<Arc<GridRevisionEditor>> { pub async fn open_grid<T: AsRef<str>>(&self, grid_id: T) -> FlowyResult<Arc<GridRevisionEditor>> {
let grid_id = grid_id.as_ref(); let grid_id = grid_id.as_ref();
tracing::Span::current().record("grid_id", &grid_id); tracing::Span::current().record("grid_id", &grid_id);
let _ = self.migration.migration_grid_if_need(grid_id).await;
self.get_or_create_grid_editor(grid_id).await self.get_or_create_grid_editor(grid_id).await
} }

View File

@ -643,8 +643,8 @@ pub struct GridPadBuilder();
impl RevisionObjectBuilder for GridPadBuilder { impl RevisionObjectBuilder for GridPadBuilder {
type Output = GridRevisionPad; type Output = GridRevisionPad;
fn build_object(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> { fn build_object(_object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
let pad = GridRevisionPad::from_revisions(object_id, revisions)?; let pad = GridRevisionPad::from_revisions(revisions)?;
Ok(pad) Ok(pad)
} }
} }

View File

@ -7,6 +7,7 @@ use flowy_database::{
use flowy_error::FlowyResult; use flowy_error::FlowyResult;
use std::sync::Arc; use std::sync::Arc;
/// Allow getting the block id from row id.
pub struct BlockIndexCache { pub struct BlockIndexCache {
database: Arc<dyn GridDatabase>, database: Arc<dyn GridDatabase>,
} }

View File

@ -0,0 +1,113 @@
use crate::manager::GridUser;
use crate::services::persistence::GridDatabase;
use flowy_database::kv::KV;
use flowy_error::FlowyResult;
use flowy_grid_data_model::revision::GridRevision;
use flowy_revision::disk::{RevisionRecord, SQLiteGridRevisionPersistence};
use flowy_revision::{mk_grid_block_revision_disk_cache, RevisionLoader, RevisionPersistence};
use flowy_sync::client_grid::{make_grid_rev_json_str, GridRevisionPad};
use flowy_sync::entities::revision::Revision;
use lib_ot::core::PlainTextDeltaBuilder;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use std::sync::Arc;
pub(crate) struct GridMigration {
user: Arc<dyn GridUser>,
database: Arc<dyn GridDatabase>,
}
impl GridMigration {
pub fn new(user: Arc<dyn GridUser>, database: Arc<dyn GridDatabase>) -> Self {
Self { user, database }
}
pub async fn migration_grid_if_need(&self, grid_id: &str) -> FlowyResult<()> {
match KV::get_str(grid_id) {
None => {
let _ = self.reset_grid_rev(grid_id).await?;
let _ = self.save_migrate_record(grid_id)?;
}
Some(s) => {
let mut record = MigrationGridRecord::from_str(&s)?;
let empty_json = self.empty_grid_rev_json()?;
if record.len < empty_json.len() {
let _ = self.reset_grid_rev(grid_id).await?;
record.len = empty_json.len();
KV::set_str(grid_id, record.to_string());
}
}
}
Ok(())
}
async fn reset_grid_rev(&self, grid_id: &str) -> FlowyResult<()> {
let user_id = self.user.user_id()?;
let pool = self.database.db_pool()?;
let grid_rev_pad = self.get_grid_revision_pad(grid_id).await?;
let json = grid_rev_pad.json_str()?;
let delta_data = PlainTextDeltaBuilder::new().insert(&json).build().to_delta_bytes();
let revision = Revision::initial_revision(&user_id, grid_id, delta_data);
let record = RevisionRecord::new(revision);
//
let disk_cache = mk_grid_block_revision_disk_cache(&user_id, pool);
let _ = disk_cache.delete_and_insert_records(grid_id, None, vec![record]);
Ok(())
}
fn save_migrate_record(&self, grid_id: &str) -> FlowyResult<()> {
let empty_json_str = self.empty_grid_rev_json()?;
let record = MigrationGridRecord {
grid_id: grid_id.to_owned(),
len: empty_json_str.len(),
};
KV::set_str(grid_id, record.to_string());
Ok(())
}
fn empty_grid_rev_json(&self) -> FlowyResult<String> {
let empty_grid_rev = GridRevision::default();
let empty_json = make_grid_rev_json_str(&empty_grid_rev)?;
Ok(empty_json)
}
async fn get_grid_revision_pad(&self, grid_id: &str) -> FlowyResult<GridRevisionPad> {
let pool = self.database.db_pool()?;
let user_id = self.user.user_id()?;
let disk_cache = SQLiteGridRevisionPersistence::new(&user_id, pool);
let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, grid_id, disk_cache));
let (revisions, _) = RevisionLoader {
object_id: grid_id.to_owned(),
user_id,
cloud: None,
rev_persistence,
}
.load()
.await?;
let pad = GridRevisionPad::from_revisions(revisions)?;
Ok(pad)
}
}
#[derive(Serialize, Deserialize)]
struct MigrationGridRecord {
grid_id: String,
len: usize,
}
impl FromStr for MigrationGridRecord {
type Err = serde_json::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
serde_json::from_str::<MigrationGridRecord>(s)
}
}
impl ToString for MigrationGridRecord {
fn to_string(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| "".to_string())
}
}

View File

@ -4,6 +4,7 @@ use std::sync::Arc;
pub mod block_index; pub mod block_index;
pub mod kv; pub mod kv;
pub mod migration;
pub trait GridDatabase: Send + Sync { pub trait GridDatabase: Send + Sync {
fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError>; fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError>;

View File

@ -53,6 +53,14 @@ pub struct RevisionRecord {
} }
impl RevisionRecord { impl RevisionRecord {
pub fn new(revision: Revision) -> Self {
Self {
revision,
state: RevisionState::Sync,
write_to_disk: true,
}
}
pub fn ack(&mut self) { pub fn ack(&mut self) {
self.state = RevisionState::Ack; self.state = RevisionState::Ack;
} }
@ -64,6 +72,8 @@ pub struct RevisionChangeset {
pub(crate) state: RevisionState, pub(crate) state: RevisionState,
} }
/// Sync: revision is not synced to the server
/// Ack: revision is synced to the server
#[derive(Debug, Clone, Eq, PartialEq)] #[derive(Debug, Clone, Eq, PartialEq)]
pub enum RevisionState { pub enum RevisionState {
Sync = 0, Sync = 0,

View File

@ -2,7 +2,7 @@ use crate::cache::{
disk::{RevisionChangeset, RevisionDiskCache, SQLiteTextBlockRevisionPersistence}, disk::{RevisionChangeset, RevisionDiskCache, SQLiteTextBlockRevisionPersistence},
memory::RevisionMemoryCacheDelegate, memory::RevisionMemoryCacheDelegate,
}; };
use crate::disk::{RevisionRecord, RevisionState}; use crate::disk::{RevisionRecord, RevisionState, SQLiteGridBlockRevisionPersistence};
use crate::memory::RevisionMemoryCache; use crate::memory::RevisionMemoryCache;
use crate::RevisionCompactor; use crate::RevisionCompactor;
use flowy_database::ConnectionPool; use flowy_database::ConnectionPool;
@ -214,13 +214,20 @@ impl RevisionPersistence {
} }
} }
pub fn mk_revision_disk_cache( pub fn mk_text_block_revision_disk_cache(
user_id: &str, user_id: &str,
pool: Arc<ConnectionPool>, pool: Arc<ConnectionPool>,
) -> Arc<dyn RevisionDiskCache<Error = FlowyError>> { ) -> Arc<dyn RevisionDiskCache<Error = FlowyError>> {
Arc::new(SQLiteTextBlockRevisionPersistence::new(user_id, pool)) Arc::new(SQLiteTextBlockRevisionPersistence::new(user_id, pool))
} }
pub fn mk_grid_block_revision_disk_cache(
user_id: &str,
pool: Arc<ConnectionPool>,
) -> Arc<dyn RevisionDiskCache<Error = FlowyError>> {
Arc::new(SQLiteGridBlockRevisionPersistence::new(user_id, pool))
}
impl RevisionMemoryCacheDelegate for Arc<dyn RevisionDiskCache<Error = FlowyError>> { impl RevisionMemoryCacheDelegate for Arc<dyn RevisionDiskCache<Error = FlowyError>> {
fn checkpoint_tick(&self, mut records: Vec<RevisionRecord>) -> FlowyResult<()> { fn checkpoint_tick(&self, mut records: Vec<RevisionRecord>) -> FlowyResult<()> {
records.retain(|record| record.write_to_disk); records.retain(|record| record.write_to_disk);

View File

@ -16,14 +16,23 @@ use std::sync::Arc;
pub struct GridDepsResolver(); pub struct GridDepsResolver();
impl GridDepsResolver { impl GridDepsResolver {
pub fn resolve(ws_conn: Arc<FlowyWebSocketConnect>, user_session: Arc<UserSession>) -> Arc<GridManager> { pub async fn resolve(ws_conn: Arc<FlowyWebSocketConnect>, user_session: Arc<UserSession>) -> Arc<GridManager> {
let user = Arc::new(GridUserImpl(user_session.clone())); let user = Arc::new(GridUserImpl(user_session.clone()));
let rev_web_socket = Arc::new(GridWebSocket(ws_conn)); let rev_web_socket = Arc::new(GridWebSocket(ws_conn));
Arc::new(GridManager::new( let grid_manager = Arc::new(GridManager::new(
user, user.clone(),
rev_web_socket, rev_web_socket,
Arc::new(GridDatabaseImpl(user_session)), Arc::new(GridDatabaseImpl(user_session)),
)) ));
if let (Ok(user_id), Ok(token)) = (user.user_id(), user.token()) {
match grid_manager.initialize(&user_id, &token).await {
Ok(_) => {}
Err(e) => tracing::error!("Initialize grid manager failed: {}", e),
}
}
grid_manager
} }
} }

View File

@ -112,7 +112,7 @@ impl FlowySDK {
&config.server_config, &config.server_config,
); );
let grid_manager = GridDepsResolver::resolve(ws_conn.clone(), user_session.clone()); let grid_manager = GridDepsResolver::resolve(ws_conn.clone(), user_session.clone()).await;
let folder_manager = FolderDepsResolver::resolve( let folder_manager = FolderDepsResolver::resolve(
local_server.clone(), local_server.clone(),
@ -147,7 +147,7 @@ impl FlowySDK {
) )
})); }));
_start_listening(&dispatcher, &ws_conn, &user_session, &folder_manager); _start_listening(&dispatcher, &ws_conn, &user_session, &folder_manager, &grid_manager);
Self { Self {
config, config,
@ -171,10 +171,12 @@ fn _start_listening(
ws_conn: &Arc<FlowyWebSocketConnect>, ws_conn: &Arc<FlowyWebSocketConnect>,
user_session: &Arc<UserSession>, user_session: &Arc<UserSession>,
folder_manager: &Arc<FolderManager>, folder_manager: &Arc<FolderManager>,
grid_manager: &Arc<GridManager>,
) { ) {
let subscribe_user_status = user_session.notifier.subscribe_user_status(); let subscribe_user_status = user_session.notifier.subscribe_user_status();
let subscribe_network_type = ws_conn.subscribe_network_ty(); let subscribe_network_type = ws_conn.subscribe_network_ty();
let folder_manager = folder_manager.clone(); let folder_manager = folder_manager.clone();
let grid_manager = grid_manager.clone();
let cloned_folder_manager = folder_manager.clone(); let cloned_folder_manager = folder_manager.clone();
let ws_conn = ws_conn.clone(); let ws_conn = ws_conn.clone();
let user_session = user_session.clone(); let user_session = user_session.clone();
@ -182,7 +184,13 @@ fn _start_listening(
dispatch.spawn(async move { dispatch.spawn(async move {
user_session.init(); user_session.init();
listen_on_websocket(ws_conn.clone()); listen_on_websocket(ws_conn.clone());
_listen_user_status(ws_conn.clone(), subscribe_user_status, folder_manager.clone()).await; _listen_user_status(
ws_conn.clone(),
subscribe_user_status,
folder_manager.clone(),
grid_manager.clone(),
)
.await;
}); });
dispatch.spawn(async move { dispatch.spawn(async move {
@ -209,6 +217,7 @@ async fn _listen_user_status(
ws_conn: Arc<FlowyWebSocketConnect>, ws_conn: Arc<FlowyWebSocketConnect>,
mut subscribe: broadcast::Receiver<UserStatus>, mut subscribe: broadcast::Receiver<UserStatus>,
folder_manager: Arc<FolderManager>, folder_manager: Arc<FolderManager>,
grid_manager: Arc<GridManager>,
) { ) {
while let Ok(status) = subscribe.recv().await { while let Ok(status) = subscribe.recv().await {
let result = || async { let result = || async {
@ -216,6 +225,7 @@ async fn _listen_user_status(
UserStatus::Login { token, user_id } => { UserStatus::Login { token, user_id } => {
tracing::trace!("User did login"); tracing::trace!("User did login");
let _ = folder_manager.initialize(&user_id, &token).await?; let _ = folder_manager.initialize(&user_id, &token).await?;
let _ = grid_manager.initialize(&user_id, &token).await?;
let _ = ws_conn.start(token, user_id).await?; let _ = ws_conn.start(token, user_id).await?;
} }
UserStatus::Logout { .. } => { UserStatus::Logout { .. } => {
@ -233,6 +243,11 @@ async fn _listen_user_status(
let _ = folder_manager let _ = folder_manager
.initialize_with_new_user(&profile.id, &profile.token) .initialize_with_new_user(&profile.id, &profile.token)
.await?; .await?;
let _ = grid_manager
.initialize_with_new_user(&profile.id, &profile.token)
.await?;
let _ = ws_conn.start(profile.token.clone(), profile.id.clone()).await?; let _ = ws_conn.start(profile.token.clone(), profile.id.clone()).await?;
let _ = ret.send(()); let _ = ret.send(());
} }

View File

@ -31,13 +31,8 @@ pub struct GridRevision {
pub fields: Vec<Arc<FieldRevision>>, pub fields: Vec<Arc<FieldRevision>>,
pub blocks: Vec<Arc<GridBlockMetaRevision>>, pub blocks: Vec<Arc<GridBlockMetaRevision>>,
#[cfg(feature = "filter")]
#[serde(default)] #[serde(default)]
pub setting: GridSettingRevision, pub setting: GridSettingRevision,
#[cfg(not(feature = "filter"))]
#[serde(default, skip)]
pub setting: GridSettingRevision,
} }
impl GridRevision { impl GridRevision {

View File

@ -6,5 +6,8 @@ fn grid_default_serde_test() {
let grid = GridRevision::new(&grid_id); let grid = GridRevision::new(&grid_id);
let json = serde_json::to_string(&grid).unwrap(); let json = serde_json::to_string(&grid).unwrap();
assert_eq!(json, r#"{"grid_id":"1","fields":[],"blocks":[]}"#) assert_eq!(
json,
r#"{"grid_id":"1","fields":[],"blocks":[],"setting":{"layout":0,"filters":[]}}"#
)
} }

View File

@ -62,7 +62,7 @@ impl GridRevisionPad {
}) })
} }
pub fn from_revisions(_grid_id: &str, revisions: Vec<Revision>) -> CollaborateResult<Self> { pub fn from_revisions(revisions: Vec<Revision>) -> CollaborateResult<Self> {
let grid_delta: GridRevisionDelta = make_delta_from_revisions::<PlainTextAttributes>(revisions)?; let grid_delta: GridRevisionDelta = make_delta_from_revisions::<PlainTextAttributes>(revisions)?;
Self::from_delta(grid_delta) Self::from_delta(grid_delta)
} }
@ -480,8 +480,8 @@ impl GridRevisionPad {
match f(Arc::make_mut(&mut self.grid_rev))? { match f(Arc::make_mut(&mut self.grid_rev))? {
None => Ok(None), None => Ok(None),
Some(_) => { Some(_) => {
let old = json_from_grid(&cloned_grid)?; let old = make_grid_rev_json_str(&cloned_grid)?;
let new = json_from_grid(&self.grid_rev)?; let new = self.json_str()?;
match cal_diff::<PlainTextAttributes>(old, new) { match cal_diff::<PlainTextAttributes>(old, new) {
None => Ok(None), None => Ok(None),
Some(delta) => { Some(delta) => {
@ -528,9 +528,13 @@ impl GridRevisionPad {
}, },
) )
} }
pub fn json_str(&self) -> CollaborateResult<String> {
make_grid_rev_json_str(&self.grid_rev)
}
} }
fn json_from_grid(grid: &Arc<GridRevision>) -> CollaborateResult<String> { pub fn make_grid_rev_json_str(grid: &GridRevision) -> CollaborateResult<String> {
let json = serde_json::to_string(grid) let json = serde_json::to_string(grid)
.map_err(|err| internal_error(format!("Serialize grid to json str failed. {:?}", err)))?; .map_err(|err| internal_error(format!("Serialize grid to json str failed. {:?}", err)))?;
Ok(json) Ok(json)