trait for flowy-core persistence

This commit is contained in:
appflowy 2022-01-14 09:09:25 +08:00
parent dac86ef857
commit f9b552395b
25 changed files with 618 additions and 472 deletions

View File

@ -8,7 +8,7 @@ import 'package:flowy_sdk/protobuf/dart-notify/subject.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-core-data-model/app.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-core-data-model/view.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-core/observable.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-core/dart_notification.pb.dart';
import 'package:flowy_sdk/rust_stream.dart';
import 'helper.dart';

View File

@ -3,7 +3,7 @@ import 'package:flowy_sdk/protobuf/dart-notify/protobuf.dart';
import 'package:flowy_sdk/protobuf/flowy-user/protobuf.dart';
import 'package:dartz/dartz.dart';
import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-core/observable.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-core/dart_notification.pb.dart';
typedef UserNotificationCallback = void Function(UserNotification, Either<Uint8List, FlowyError>);

View File

@ -9,8 +9,14 @@ use crate::{
dart_notification::{send_dart_notification, WorkspaceNotification},
entities::workspace::RepeatedWorkspace,
errors::{FlowyError, FlowyResult},
module::{WorkspaceCloudService, WorkspaceDatabase, WorkspaceUser},
services::{AppController, TrashController, ViewController, WorkspaceController},
module::{WorkspaceCloudService, WorkspaceUser},
services::{
persistence::FlowyCorePersistence,
AppController,
TrashController,
ViewController,
WorkspaceController,
},
};
lazy_static! {
@ -20,7 +26,7 @@ lazy_static! {
pub struct CoreContext {
pub user: Arc<dyn WorkspaceUser>,
pub(crate) cloud_service: Arc<dyn WorkspaceCloudService>,
pub(crate) database: Arc<dyn WorkspaceDatabase>,
pub(crate) persistence: Arc<FlowyCorePersistence>,
pub workspace_controller: Arc<WorkspaceController>,
pub(crate) app_controller: Arc<AppController>,
pub(crate) view_controller: Arc<ViewController>,
@ -31,7 +37,7 @@ impl CoreContext {
pub(crate) fn new(
user: Arc<dyn WorkspaceUser>,
cloud_service: Arc<dyn WorkspaceCloudService>,
database: Arc<dyn WorkspaceDatabase>,
persistence: Arc<FlowyCorePersistence>,
workspace_controller: Arc<WorkspaceController>,
app_controller: Arc<AppController>,
view_controller: Arc<ViewController>,
@ -44,7 +50,7 @@ impl CoreContext {
Self {
user,
cloud_service,
database,
persistence,
workspace_controller,
app_controller,
view_controller,

View File

@ -10,6 +10,7 @@ use crate::{
event::WorkspaceEvent,
services::{
app::event_handler::*,
persistence::FlowyCorePersistence,
trash::event_handler::*,
view::event_handler::*,
workspace::event_handler::*,
@ -49,15 +50,17 @@ pub fn init_core(
flowy_document: Arc<DocumentContext>,
cloud_service: Arc<dyn WorkspaceCloudService>,
) -> Arc<CoreContext> {
let persistence = Arc::new(FlowyCorePersistence::new(database.clone()));
let trash_controller = Arc::new(TrashController::new(
database.clone(),
persistence.clone(),
cloud_service.clone(),
user.clone(),
));
let view_controller = Arc::new(ViewController::new(
user.clone(),
database.clone(),
persistence.clone(),
cloud_service.clone(),
trash_controller.clone(),
flowy_document,
@ -65,14 +68,14 @@ pub fn init_core(
let app_controller = Arc::new(AppController::new(
user.clone(),
database.clone(),
persistence.clone(),
trash_controller.clone(),
cloud_service.clone(),
));
let workspace_controller = Arc::new(WorkspaceController::new(
user.clone(),
database.clone(),
persistence.clone(),
trash_controller.clone(),
cloud_service.clone(),
));
@ -80,7 +83,7 @@ pub fn init_core(
Arc::new(CoreContext::new(
user,
cloud_service,
database,
persistence,
workspace_controller,
app_controller,
view_controller,

View File

@ -5,35 +5,35 @@ use crate::{
trash::TrashType,
},
errors::*,
module::{WorkspaceCloudService, WorkspaceDatabase, WorkspaceUser},
module::{WorkspaceCloudService, WorkspaceUser},
services::{
app::sql::{AppChangeset, AppTable, AppTableSql},
persistence::{AppChangeset, FlowyCorePersistence, FlowyCorePersistenceTransaction},
TrashController,
TrashEvent,
},
};
use flowy_database::SqliteConnection;
use futures::{FutureExt, StreamExt};
use std::{collections::HashSet, sync::Arc};
pub(crate) struct AppController {
user: Arc<dyn WorkspaceUser>,
database: Arc<dyn WorkspaceDatabase>,
trash_can: Arc<TrashController>,
persistence: Arc<FlowyCorePersistence>,
trash_controller: Arc<TrashController>,
cloud_service: Arc<dyn WorkspaceCloudService>,
}
impl AppController {
pub(crate) fn new(
user: Arc<dyn WorkspaceUser>,
database: Arc<dyn WorkspaceDatabase>,
persistence: Arc<FlowyCorePersistence>,
trash_can: Arc<TrashController>,
cloud_service: Arc<dyn WorkspaceCloudService>,
) -> Self {
Self {
user,
database,
trash_can,
persistence,
trash_controller: trash_can,
cloud_service,
}
}
@ -50,62 +50,52 @@ impl AppController {
}
pub(crate) async fn create_app_on_local(&self, app: App) -> Result<App, FlowyError> {
let conn = &*self.database.db_connection()?;
conn.immediate_transaction::<_, FlowyError, _>(|| {
let _ = self.save_app(app.clone(), &*conn)?;
let _ = notify_apps_changed(&app.workspace_id, self.trash_can.clone(), conn)?;
let _ = self.persistence.begin_transaction(|transaction| {
let _ = transaction.create_app(app.clone())?;
let _ = notify_apps_changed(&app.workspace_id, self.trash_controller.clone(), &transaction)?;
Ok(())
})?;
Ok(app)
}
pub(crate) fn save_app(&self, app: App, conn: &SqliteConnection) -> Result<(), FlowyError> {
let _ = AppTableSql::create_app(app, &*conn)?;
Ok(())
}
pub(crate) async fn read_app(&self, params: AppId) -> Result<App, FlowyError> {
let conn = self.database.db_connection()?;
let app_table = AppTableSql::read_app(&params.app_id, &*conn)?;
let trash_ids = self.trash_can.read_trash_ids(&conn)?;
if trash_ids.contains(&app_table.id) {
return Err(FlowyError::record_not_found());
}
let app = self.persistence.begin_transaction(|transaction| {
let app = transaction.read_app(&params.app_id)?;
let trash_ids = self.trash_controller.read_trash_ids(&transaction)?;
if trash_ids.contains(&app.id) {
return Err(FlowyError::record_not_found());
}
Ok(app)
})?;
let _ = self.read_app_on_server(params)?;
Ok(app_table.into())
Ok(app)
}
pub(crate) async fn update_app(&self, params: UpdateAppParams) -> Result<(), FlowyError> {
let changeset = AppChangeset::new(params.clone());
let app_id = changeset.id.clone();
let conn = &*self.database.db_connection()?;
conn.immediate_transaction::<_, FlowyError, _>(|| {
let _ = AppTableSql::update_app(changeset, conn)?;
let app: App = AppTableSql::read_app(&app_id, conn)?.into();
send_dart_notification(&app_id, WorkspaceNotification::AppUpdated)
.payload(app)
.send();
Ok(())
})?;
let app = self.persistence.begin_transaction(|transaction| {
let _ = transaction.update_app(changeset)?;
let app = transaction.read_app(&app_id)?;
Ok(app)
})?;
send_dart_notification(&app_id, WorkspaceNotification::AppUpdated)
.payload(app)
.send();
let _ = self.update_app_on_server(params)?;
Ok(())
}
pub(crate) fn read_app_tables(&self, ids: Vec<String>) -> Result<Vec<AppTable>, FlowyError> {
let conn = &*self.database.db_connection()?;
let mut app_tables = vec![];
conn.immediate_transaction::<_, FlowyError, _>(|| {
for app_id in ids {
app_tables.push(AppTableSql::read_app(&app_id, conn)?);
pub(crate) fn read_local_apps(&self, ids: Vec<String>) -> Result<Vec<App>, FlowyError> {
let apps = self.persistence.begin_transaction(|transaction| {
let mut apps = vec![];
for id in ids {
apps.push(transaction.read_app(&id)?);
}
Ok(())
Ok(apps)
})?;
Ok(app_tables)
Ok(apps)
}
}
@ -137,23 +127,18 @@ impl AppController {
fn read_app_on_server(&self, params: AppId) -> Result<(), FlowyError> {
let token = self.user.token()?;
let server = self.cloud_service.clone();
let pool = self.database.db_pool()?;
let persistence = self.persistence.clone();
tokio::spawn(async move {
// Opti: retry?
match server.read_app(&token, params).await {
Ok(Some(app)) => match pool.get() {
Ok(conn) => {
let result = AppTableSql::create_app(app.clone(), &*conn);
match result {
Ok(_) => {
send_dart_notification(&app.id, WorkspaceNotification::AppUpdated)
.payload(app)
.send();
},
Err(e) => log::error!("Save app failed: {:?}", e),
}
},
Err(e) => log::error!("Require db connection failed: {:?}", e),
Ok(Some(app)) => {
match persistence.begin_transaction(|transaction| transaction.create_app(app.clone())) {
Ok(_) => {
send_dart_notification(&app.id, WorkspaceNotification::AppUpdated)
.payload(app)
.send();
},
Err(e) => log::error!("Save app failed: {:?}", e),
}
},
Ok(None) => {},
Err(e) => log::error!("Read app failed: {:?}", e),
@ -163,9 +148,9 @@ impl AppController {
}
fn listen_trash_controller_event(&self) {
let mut rx = self.trash_can.subscribe();
let database = self.database.clone();
let trash_can = self.trash_can.clone();
let mut rx = self.trash_controller.subscribe();
let persistence = self.persistence.clone();
let trash_controller = self.trash_controller.clone();
let _ = tokio::spawn(async move {
loop {
let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move {
@ -175,76 +160,69 @@ impl AppController {
}
}));
if let Some(event) = stream.next().await {
handle_trash_event(database.clone(), trash_can.clone(), event).await
handle_trash_event(persistence.clone(), trash_controller.clone(), event).await
}
}
});
}
}
#[tracing::instrument(level = "trace", skip(database, trash_can))]
async fn handle_trash_event(database: Arc<dyn WorkspaceDatabase>, trash_can: Arc<TrashController>, event: TrashEvent) {
let db_result = database.db_connection();
#[tracing::instrument(level = "trace", skip(persistence, trash_controller))]
async fn handle_trash_event(
persistence: Arc<FlowyCorePersistence>,
trash_controller: Arc<TrashController>,
event: TrashEvent,
) {
match event {
TrashEvent::NewTrash(identifiers, ret) | TrashEvent::Putback(identifiers, ret) => {
let result = || {
let conn = &*db_result?;
let _ = conn.immediate_transaction::<_, FlowyError, _>(|| {
for identifier in identifiers.items {
let app_table = AppTableSql::read_app(&identifier.id, conn)?;
let _ = notify_apps_changed(&app_table.workspace_id, trash_can.clone(), conn)?;
}
Ok(())
})?;
Ok::<(), FlowyError>(())
};
let _ = ret.send(result()).await;
let result = persistence.begin_transaction(|transaction| {
for identifier in identifiers.items {
let app = transaction.read_app(&identifier.id)?;
let _ = notify_apps_changed(&app.workspace_id, trash_controller.clone(), &transaction)?;
}
Ok(())
});
let _ = ret.send(result).await;
},
TrashEvent::Delete(identifiers, ret) => {
let result = || {
let conn = &*db_result?;
let _ = conn.immediate_transaction::<_, FlowyError, _>(|| {
let mut notify_ids = HashSet::new();
for identifier in identifiers.items {
let app_table = AppTableSql::read_app(&identifier.id, conn)?;
let _ = AppTableSql::delete_app(&identifier.id, conn)?;
notify_ids.insert(app_table.workspace_id);
}
let result = persistence.begin_transaction(|transaction| {
let mut notify_ids = HashSet::new();
for identifier in identifiers.items {
let app = transaction.read_app(&identifier.id)?;
let _ = transaction.delete_app(&identifier.id)?;
notify_ids.insert(app.workspace_id);
}
for notify_id in notify_ids {
let _ = notify_apps_changed(&notify_id, trash_can.clone(), conn)?;
}
Ok(())
})?;
Ok::<(), FlowyError>(())
};
let _ = ret.send(result()).await;
for notify_id in notify_ids {
let _ = notify_apps_changed(&notify_id, trash_controller.clone(), &transaction)?;
}
Ok(())
});
let _ = ret.send(result).await;
},
}
}
#[tracing::instrument(skip(workspace_id, trash_can, conn), err)]
fn notify_apps_changed(
#[tracing::instrument(skip(workspace_id, trash_controller, transaction), err)]
fn notify_apps_changed<'a>(
workspace_id: &str,
trash_can: Arc<TrashController>,
conn: &SqliteConnection,
trash_controller: Arc<TrashController>,
transaction: &'a (dyn FlowyCorePersistenceTransaction + 'a),
) -> FlowyResult<()> {
let repeated_app = read_local_workspace_apps(workspace_id, trash_can, conn)?;
let repeated_app = read_local_workspace_apps(workspace_id, trash_controller, transaction)?;
send_dart_notification(workspace_id, WorkspaceNotification::WorkspaceAppsChanged)
.payload(repeated_app)
.send();
Ok(())
}
pub fn read_local_workspace_apps(
pub fn read_local_workspace_apps<'a>(
workspace_id: &str,
trash_controller: Arc<TrashController>,
conn: &SqliteConnection,
transaction: &'a (dyn FlowyCorePersistenceTransaction + 'a),
) -> Result<RepeatedApp, FlowyError> {
let mut app_tables = AppTableSql::read_workspace_apps(workspace_id, false, conn)?;
let trash_ids = trash_controller.read_trash_ids(conn)?;
app_tables.retain(|app_table| !trash_ids.contains(&app_table.id));
let apps = app_tables.into_iter().map(|table| table.into()).collect::<Vec<App>>();
let mut apps = transaction.read_workspace_apps(workspace_id)?;
let trash_ids = trash_controller.read_trash_ids(transaction)?;
apps.retain(|app| !trash_ids.contains(&app.id));
Ok(RepeatedApp { items: apps })
}

View File

@ -21,14 +21,14 @@ pub(crate) async fn create_app_handler(
pub(crate) async fn delete_app_handler(
data: Data<QueryAppRequest>,
view_controller: Unit<Arc<AppController>>,
app_controller: Unit<Arc<AppController>>,
trash_controller: Unit<Arc<TrashController>>,
) -> Result<(), FlowyError> {
let params: AppId = data.into_inner().try_into()?;
let trash = view_controller
.read_app_tables(vec![params.app_id])?
let trash = app_controller
.read_local_apps(vec![params.app_id])?
.into_iter()
.map(|view_table| view_table.into())
.map(|app| app.into())
.collect::<Vec<Trash>>();
let _ = trash_controller.add(trash).await?;

View File

@ -1,3 +1,2 @@
pub mod controller;
pub mod event_handler;
pub(crate) mod sql;

View File

@ -4,6 +4,7 @@ pub(crate) use view::controller::*;
pub(crate) use workspace::controller::*;
pub(crate) mod app;
pub(crate) mod persistence;
pub(crate) mod trash;
pub(crate) mod view;
pub(crate) mod workspace;

View File

@ -0,0 +1,76 @@
mod version_1;
use std::sync::Arc;
pub use version_1::{app_sql::*, trash_sql::*, v1_impl::V1Transaction, view_sql::*, workspace_sql::*};
use crate::module::WorkspaceDatabase;
use flowy_core_data_model::entities::{
app::App,
prelude::RepeatedTrash,
trash::Trash,
view::View,
workspace::Workspace,
};
use flowy_error::{FlowyError, FlowyResult};
pub trait FlowyCorePersistenceTransaction {
fn create_workspace(&self, user_id: &str, workspace: Workspace) -> FlowyResult<()>;
fn read_workspaces(&self, user_id: &str, workspace_id: Option<String>) -> FlowyResult<Vec<Workspace>>;
fn update_workspace(&self, changeset: WorkspaceChangeset) -> FlowyResult<()>;
fn delete_workspace(&self, workspace_id: &str) -> FlowyResult<()>;
fn create_app(&self, app: App) -> FlowyResult<()>;
fn update_app(&self, changeset: AppChangeset) -> FlowyResult<()>;
fn read_app(&self, app_id: &str) -> FlowyResult<App>;
fn read_workspace_apps(&self, workspace_id: &str) -> FlowyResult<Vec<App>>;
fn delete_app(&self, app_id: &str) -> FlowyResult<App>;
fn create_view(&self, view: View) -> FlowyResult<()>;
fn read_view(&self, view_id: &str) -> FlowyResult<View>;
fn read_views(&self, belong_to_id: &str) -> FlowyResult<Vec<View>>;
fn update_view(&self, changeset: ViewChangeset) -> FlowyResult<()>;
fn delete_view(&self, view_id: &str) -> FlowyResult<()>;
fn create_trash(&self, trashes: Vec<Trash>) -> FlowyResult<()>;
fn read_all_trash(&self) -> FlowyResult<RepeatedTrash>;
fn delete_all_trash(&self) -> FlowyResult<()>;
fn read_trash(&self, trash_id: &str) -> FlowyResult<Trash>;
fn delete_trash(&self, trash_ids: Vec<String>) -> FlowyResult<()>;
}
pub struct FlowyCorePersistence {
database: Arc<dyn WorkspaceDatabase>,
}
impl FlowyCorePersistence {
pub fn new(database: Arc<dyn WorkspaceDatabase>) -> Self { Self { database } }
pub fn begin_transaction<F, O>(&self, f: F) -> FlowyResult<O>
where
F: for<'a> FnOnce(Box<dyn FlowyCorePersistenceTransaction + 'a>) -> FlowyResult<O>,
{
//[[immediate_transaction]]
// https://sqlite.org/lang_transaction.html
// IMMEDIATE cause the database connection to start a new write immediately,
// without waiting for a write statement. The BEGIN IMMEDIATE might fail
// with SQLITE_BUSY if another write transaction is already active on another
// database connection.
//
// EXCLUSIVE is similar to IMMEDIATE in that a write transaction is started
// immediately. EXCLUSIVE and IMMEDIATE are the same in WAL mode, but in
// other journaling modes, EXCLUSIVE prevents other database connections from
// reading the database while the transaction is underway.
let conn = self.database.db_connection()?;
conn.immediate_transaction::<_, FlowyError, _>(|| f(Box::new(V1Transaction(&conn))))
}
// pub fn scope_transaction<F, O>(&self, f: F) -> FlowyResult<O>
// where
// F: for<'a> FnOnce(Box<dyn FlowyCorePersistenceTransaction + 'a>) ->
// FlowyResult<O>, {
// match thread::scope(|_s| self.begin_transaction(f)) {
// Ok(result) => result,
// Err(e) => Err(FlowyError::internal().context(e)),
// }
// }
}

View File

@ -1,10 +1,7 @@
use crate::{
entities::{
app::{App, ColorStyle, UpdateAppParams},
trash::{Trash, TrashType},
view::RepeatedView,
},
services::workspace::sql::WorkspaceTable,
use crate::entities::{
app::{App, ColorStyle, UpdateAppParams},
trash::{Trash, TrashType},
view::RepeatedView,
};
use diesel::sql_types::Binary;
use flowy_database::{
@ -15,10 +12,9 @@ use flowy_database::{
use serde::{Deserialize, Serialize, __private::TryFrom};
use std::convert::TryInto;
use crate::errors::FlowyError;
pub struct AppTableSql {}
use crate::{errors::FlowyError, services::persistence::version_1::workspace_sql::WorkspaceTable};
pub struct AppTableSql();
impl AppTableSql {
pub(crate) fn create_app(app: App, conn: &SqliteConnection) -> Result<(), FlowyError> {
let app_table = AppTable::new(app);
@ -161,7 +157,7 @@ impl_sql_binary_expression!(ColorStyleCol);
#[derive(AsChangeset, Identifiable, Default, Debug)]
#[table_name = "app_table"]
pub(crate) struct AppChangeset {
pub struct AppChangeset {
pub id: String,
pub name: Option<String>,
pub desc: Option<String>,

View File

@ -0,0 +1,5 @@
pub mod app_sql;
pub mod trash_sql;
pub mod v1_impl;
pub mod view_sql;
pub mod workspace_sql;

View File

@ -9,8 +9,7 @@ use flowy_database::{
SqliteConnection,
};
pub struct TrashTableSql {}
pub struct TrashTableSql();
impl TrashTableSql {
pub(crate) fn create_trash(trashes: Vec<Trash>, conn: &SqliteConnection) -> Result<(), FlowyError> {
for trash in trashes {

View File

@ -0,0 +1,163 @@
use crate::services::persistence::{
version_1::{
app_sql::{AppChangeset, AppTableSql},
view_sql::{ViewChangeset, ViewTableSql},
workspace_sql::{WorkspaceChangeset, WorkspaceTableSql},
},
FlowyCorePersistenceTransaction,
TrashTableSql,
};
use flowy_core_data_model::entities::{
app::App,
prelude::{RepeatedTrash, Trash, View, Workspace},
};
use flowy_error::FlowyResult;
use lib_sqlite::DBConnection;
pub struct V1Transaction<'a>(pub &'a DBConnection);
impl<'a> FlowyCorePersistenceTransaction for V1Transaction<'a> {
fn create_workspace(&self, user_id: &str, workspace: Workspace) -> FlowyResult<()> {
let _ = WorkspaceTableSql::create_workspace(user_id, workspace, &*self.0)?;
Ok(())
}
fn read_workspaces(&self, user_id: &str, workspace_id: Option<String>) -> FlowyResult<Vec<Workspace>> {
let tables = WorkspaceTableSql::read_workspaces(workspace_id, user_id, &*self.0)?;
let workspaces = tables.into_iter().map(Workspace::from).collect::<Vec<_>>();
Ok(workspaces)
}
fn update_workspace(&self, changeset: WorkspaceChangeset) -> FlowyResult<()> {
WorkspaceTableSql::update_workspace(changeset, &*self.0)
}
fn delete_workspace(&self, workspace_id: &str) -> FlowyResult<()> {
WorkspaceTableSql::delete_workspace(workspace_id, &*self.0)
}
fn create_app(&self, app: App) -> FlowyResult<()> {
let _ = AppTableSql::create_app(app, &*self.0)?;
Ok(())
}
fn update_app(&self, changeset: AppChangeset) -> FlowyResult<()> {
let _ = AppTableSql::update_app(changeset, &*self.0)?;
Ok(())
}
fn read_app(&self, app_id: &str) -> FlowyResult<App> {
let table = AppTableSql::read_app(app_id, &*self.0)?;
Ok(App::from(table))
}
fn read_workspace_apps(&self, workspace_id: &str) -> FlowyResult<Vec<App>> {
let tables = AppTableSql::read_workspace_apps(workspace_id, false, &*self.0)?;
let apps = tables.into_iter().map(App::from).collect::<Vec<_>>();
Ok(apps)
}
fn delete_app(&self, app_id: &str) -> FlowyResult<App> {
let table = AppTableSql::delete_app(app_id, &*self.0)?;
Ok(App::from(table))
}
fn create_view(&self, view: View) -> FlowyResult<()> {
let _ = ViewTableSql::create_view(view, &*self.0)?;
Ok(())
}
fn read_view(&self, view_id: &str) -> FlowyResult<View> {
let table = ViewTableSql::read_view(view_id, &*self.0)?;
Ok(View::from(table))
}
fn read_views(&self, belong_to_id: &str) -> FlowyResult<Vec<View>> {
let tables = ViewTableSql::read_views(belong_to_id, &*self.0)?;
let views = tables.into_iter().map(View::from).collect::<Vec<_>>();
Ok(views)
}
fn update_view(&self, changeset: ViewChangeset) -> FlowyResult<()> {
let _ = ViewTableSql::update_view(changeset, &*self.0)?;
Ok(())
}
fn delete_view(&self, view_id: &str) -> FlowyResult<()> {
let _ = ViewTableSql::delete_view(view_id, &*self.0)?;
Ok(())
}
fn create_trash(&self, trashes: Vec<Trash>) -> FlowyResult<()> {
let _ = TrashTableSql::create_trash(trashes, &*self.0)?;
Ok(())
}
fn read_all_trash(&self) -> FlowyResult<RepeatedTrash> { TrashTableSql::read_all(&*self.0) }
fn delete_all_trash(&self) -> FlowyResult<()> { TrashTableSql::delete_all(&*self.0) }
fn read_trash(&self, trash_id: &str) -> FlowyResult<Trash> {
let table = TrashTableSql::read(trash_id, &*self.0)?;
Ok(Trash::from(table))
}
fn delete_trash(&self, trash_ids: Vec<String>) -> FlowyResult<()> {
for trash_id in &trash_ids {
let _ = TrashTableSql::delete_trash(&trash_id, &*self.0)?;
}
Ok(())
}
}
// https://www.reddit.com/r/rust/comments/droxdg/why_arent_traits_impld_for_boxdyn_trait/
impl<T> FlowyCorePersistenceTransaction for Box<T>
where
T: FlowyCorePersistenceTransaction + ?Sized,
{
fn create_workspace(&self, user_id: &str, workspace: Workspace) -> FlowyResult<()> {
(**self).create_workspace(user_id, workspace)
}
fn read_workspaces(&self, user_id: &str, workspace_id: Option<String>) -> FlowyResult<Vec<Workspace>> {
(**self).read_workspaces(user_id, workspace_id)
}
fn update_workspace(&self, changeset: WorkspaceChangeset) -> FlowyResult<()> {
(**self).update_workspace(changeset)
}
fn delete_workspace(&self, workspace_id: &str) -> FlowyResult<()> { (**self).delete_workspace(workspace_id) }
fn create_app(&self, app: App) -> FlowyResult<()> { (**self).create_app(app) }
fn update_app(&self, changeset: AppChangeset) -> FlowyResult<()> { (**self).update_app(changeset) }
fn read_app(&self, app_id: &str) -> FlowyResult<App> { (**self).read_app(app_id) }
fn read_workspace_apps(&self, workspace_id: &str) -> FlowyResult<Vec<App>> {
(**self).read_workspace_apps(workspace_id)
}
fn delete_app(&self, app_id: &str) -> FlowyResult<App> { (**self).delete_app(app_id) }
fn create_view(&self, view: View) -> FlowyResult<()> { (**self).create_view(view) }
fn read_view(&self, view_id: &str) -> FlowyResult<View> { (**self).read_view(view_id) }
fn read_views(&self, belong_to_id: &str) -> FlowyResult<Vec<View>> { (**self).read_views(belong_to_id) }
fn update_view(&self, changeset: ViewChangeset) -> FlowyResult<()> { (**self).update_view(changeset) }
fn delete_view(&self, view_id: &str) -> FlowyResult<()> { (**self).delete_view(view_id) }
fn create_trash(&self, trashes: Vec<Trash>) -> FlowyResult<()> { (**self).create_trash(trashes) }
fn read_all_trash(&self) -> FlowyResult<RepeatedTrash> { (**self).read_all_trash() }
fn delete_all_trash(&self) -> FlowyResult<()> { (**self).delete_all_trash() }
fn read_trash(&self, trash_id: &str) -> FlowyResult<Trash> { (**self).read_trash(trash_id) }
fn delete_trash(&self, trash_ids: Vec<String>) -> FlowyResult<()> { (**self).delete_trash(trash_ids) }
}

View File

@ -4,7 +4,7 @@ use crate::{
view::{RepeatedView, UpdateViewParams, View, ViewType},
},
errors::FlowyError,
services::app::sql::AppTable,
services::persistence::version_1::app_sql::AppTable,
};
use diesel::sql_types::Integer;
use flowy_database::{
@ -14,8 +14,7 @@ use flowy_database::{
};
use lib_infra::timestamp;
pub struct ViewTableSql {}
pub struct ViewTableSql();
impl ViewTableSql {
pub(crate) fn create_view(view: View, conn: &SqliteConnection) -> Result<(), FlowyError> {
let view_table = ViewTable::new(view);
@ -182,7 +181,7 @@ impl std::convert::From<ViewTable> for Trash {
#[derive(AsChangeset, Identifiable, Clone, Default, Debug)]
#[table_name = "view_table"]
pub(crate) struct ViewChangeset {
pub struct ViewChangeset {
pub id: String,
pub name: Option<String>,
pub desc: Option<String>,

View File

@ -10,8 +10,7 @@ use flowy_database::{
prelude::*,
schema::{workspace_table, workspace_table::dsl},
};
pub(crate) struct WorkspaceTableSql {}
pub(crate) struct WorkspaceTableSql();
impl WorkspaceTableSql {
pub(crate) fn create_workspace(
user_id: &str,
@ -22,7 +21,7 @@ impl WorkspaceTableSql {
match diesel_record_count!(workspace_table, &table.id, conn) {
0 => diesel_insert_table!(workspace_table, &table, conn),
_ => {
let changeset = WorkspaceTableChangeset::from_table(table);
let changeset = WorkspaceChangeset::from_table(table);
diesel_update_table!(workspace_table, changeset, conn);
},
}
@ -49,10 +48,7 @@ impl WorkspaceTableSql {
}
#[allow(dead_code)]
pub(crate) fn update_workspace(
changeset: WorkspaceTableChangeset,
conn: &SqliteConnection,
) -> Result<(), FlowyError> {
pub(crate) fn update_workspace(changeset: WorkspaceChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> {
diesel_update_table!(workspace_table, changeset, conn);
Ok(())
}
@ -106,15 +102,15 @@ impl std::convert::From<WorkspaceTable> for Workspace {
#[derive(AsChangeset, Identifiable, Clone, Default, Debug)]
#[table_name = "workspace_table"]
pub struct WorkspaceTableChangeset {
pub struct WorkspaceChangeset {
pub id: String,
pub name: Option<String>,
pub desc: Option<String>,
}
impl WorkspaceTableChangeset {
impl WorkspaceChangeset {
pub fn new(params: UpdateWorkspaceParams) -> Self {
WorkspaceTableChangeset {
WorkspaceChangeset {
id: params.id,
name: params.name,
desc: params.desc,
@ -122,7 +118,7 @@ impl WorkspaceTableChangeset {
}
pub(crate) fn from_table(table: WorkspaceTable) -> Self {
WorkspaceTableChangeset {
WorkspaceChangeset {
id: table.id,
name: Some(table.name),
desc: Some(table.desc),

View File

@ -2,16 +2,15 @@ use crate::{
dart_notification::{send_anonymous_dart_notification, WorkspaceNotification},
entities::trash::{RepeatedTrash, RepeatedTrashId, Trash, TrashId, TrashType},
errors::{FlowyError, FlowyResult},
module::{WorkspaceCloudService, WorkspaceDatabase, WorkspaceUser},
services::trash::sql::TrashTableSql,
module::{WorkspaceCloudService, WorkspaceUser},
services::persistence::{FlowyCorePersistence, FlowyCorePersistenceTransaction},
};
use crossbeam_utils::thread;
use flowy_database::SqliteConnection;
use std::{fmt::Formatter, sync::Arc};
use tokio::sync::{broadcast, mpsc};
pub struct TrashController {
pub database: Arc<dyn WorkspaceDatabase>,
persistence: Arc<FlowyCorePersistence>,
notify: broadcast::Sender<TrashEvent>,
cloud_service: Arc<dyn WorkspaceCloudService>,
user: Arc<dyn WorkspaceUser>,
@ -19,14 +18,13 @@ pub struct TrashController {
impl TrashController {
pub fn new(
database: Arc<dyn WorkspaceDatabase>,
persistence: Arc<FlowyCorePersistence>,
cloud_service: Arc<dyn WorkspaceCloudService>,
user: Arc<dyn WorkspaceUser>,
) -> Self {
let (tx, _) = broadcast::channel(10);
Self {
database,
persistence,
notify: tx,
cloud_service,
user,
@ -38,22 +36,16 @@ impl TrashController {
#[tracing::instrument(level = "debug", skip(self), fields(putback) err)]
pub async fn putback(&self, trash_id: &str) -> FlowyResult<()> {
let (tx, mut rx) = mpsc::channel::<FlowyResult<()>>(1);
let trash_table = TrashTableSql::read(trash_id, &*self.database.db_connection()?)?;
let _ = thread::scope(|_s| {
let conn = self.database.db_connection()?;
conn.immediate_transaction::<_, FlowyError, _>(|| {
let _ = TrashTableSql::delete_trash(trash_id, &*conn)?;
notify_trash_changed(TrashTableSql::read_all(&conn)?);
Ok(())
})?;
Ok::<(), FlowyError>(())
})
.unwrap()?;
let trash = self.persistence.begin_transaction(|transaction| {
let trash = transaction.read_trash(trash_id);
let _ = transaction.delete_trash(vec![trash_id.to_owned()])?;
notify_trash_changed(transaction.read_all_trash()?);
trash
})?;
let identifier = TrashId {
id: trash_table.id,
ty: trash_table.ty.into(),
id: trash.id,
ty: trash.ty,
};
let _ = self.delete_trash_on_server(RepeatedTrashId {
@ -69,15 +61,11 @@ impl TrashController {
#[tracing::instrument(level = "debug", skip(self) err)]
pub async fn restore_all(&self) -> FlowyResult<()> {
let repeated_trash = thread::scope(|_s| {
let conn = self.database.db_connection()?;
conn.immediate_transaction::<_, FlowyError, _>(|| {
let repeated_trash = TrashTableSql::read_all(&*conn)?;
let _ = TrashTableSql::delete_all(&*conn)?;
Ok(repeated_trash)
})
})
.unwrap()?;
let repeated_trash = self.persistence.begin_transaction(|transaction| {
let trash = transaction.read_all_trash();
let _ = transaction.delete_all_trash();
trash
})?;
let identifiers: RepeatedTrashId = repeated_trash.items.clone().into();
let (tx, mut rx) = mpsc::channel::<FlowyResult<()>>(1);
@ -91,7 +79,9 @@ impl TrashController {
#[tracing::instrument(level = "debug", skip(self), err)]
pub async fn delete_all(&self) -> FlowyResult<()> {
let repeated_trash = TrashTableSql::read_all(&*(self.database.db_connection()?))?;
let repeated_trash = self
.persistence
.begin_transaction(|transaction| transaction.read_all_trash())?;
let trash_identifiers: RepeatedTrashId = repeated_trash.items.clone().into();
let _ = self.delete_with_identifiers(trash_identifiers.clone()).await?;
@ -103,7 +93,10 @@ impl TrashController {
#[tracing::instrument(level = "debug", skip(self), err)]
pub async fn delete(&self, trash_identifiers: RepeatedTrashId) -> FlowyResult<()> {
let _ = self.delete_with_identifiers(trash_identifiers.clone()).await?;
notify_trash_changed(TrashTableSql::read_all(&*(self.database.db_connection()?))?);
let repeated_trash = self
.persistence
.begin_transaction(|transaction| transaction.read_all_trash())?;
notify_trash_changed(repeated_trash);
let _ = self.delete_trash_on_server(trash_identifiers)?;
Ok(())
@ -122,14 +115,15 @@ impl TrashController {
Err(e) => log::error!("{}", e),
},
}
let conn = self.database.db_connection()?;
conn.immediate_transaction::<_, FlowyError, _>(|| {
for trash_identifier in &trash_identifiers.items {
let _ = TrashTableSql::delete_trash(&trash_identifier.id, &conn)?;
}
Ok(())
let _ = self.persistence.begin_transaction(|transaction| {
let ids = trash_identifiers
.items
.into_iter()
.map(|item| item.id)
.collect::<Vec<_>>();
transaction.delete_trash(ids)
})?;
Ok(())
}
@ -156,19 +150,13 @@ impl TrashController {
)
.as_str(),
);
let _ = thread::scope(|_s| {
let conn = self.database.db_connection()?;
conn.immediate_transaction::<_, FlowyError, _>(|| {
let _ = TrashTableSql::create_trash(repeated_trash.clone(), &*conn)?;
let _ = self.create_trash_on_server(repeated_trash);
notify_trash_changed(TrashTableSql::read_all(&conn)?);
Ok(())
})?;
Ok::<(), FlowyError>(())
})
.unwrap()?;
let _ = self.persistence.begin_transaction(|transaction| {
let _ = transaction.create_trash(repeated_trash.clone())?;
let _ = self.create_trash_on_server(repeated_trash);
notify_trash_changed(transaction.read_all_trash()?);
Ok(())
})?;
let _ = self.notify.send(TrashEvent::NewTrash(identifiers.into(), tx));
let _ = rx.recv().await.unwrap()?;
@ -177,14 +165,20 @@ impl TrashController {
pub fn subscribe(&self) -> broadcast::Receiver<TrashEvent> { self.notify.subscribe() }
pub fn read_trash(&self, conn: &SqliteConnection) -> Result<RepeatedTrash, FlowyError> {
let repeated_trash = TrashTableSql::read_all(&*conn)?;
pub fn read_trash(&self) -> Result<RepeatedTrash, FlowyError> {
let repeated_trash = self
.persistence
.begin_transaction(|transaction| transaction.read_all_trash())?;
let _ = self.read_trash_on_server()?;
Ok(repeated_trash)
}
pub fn read_trash_ids(&self, conn: &SqliteConnection) -> Result<Vec<String>, FlowyError> {
let ids = TrashTableSql::read_all(&*conn)?
pub fn read_trash_ids<'a>(
&self,
transaction: &'a (dyn FlowyCorePersistenceTransaction + 'a),
) -> Result<Vec<String>, FlowyError> {
let ids = transaction
.read_all_trash()?
.into_inner()
.into_iter()
.map(|item| item.id)
@ -227,27 +221,22 @@ impl TrashController {
fn read_trash_on_server(&self) -> FlowyResult<()> {
let token = self.user.token()?;
let server = self.cloud_service.clone();
let pool = self.database.db_pool()?;
let persistence = self.persistence.clone();
tokio::spawn(async move {
match server.read_trash(&token).await {
Ok(repeated_trash) => {
tracing::debug!("Remote trash count: {}", repeated_trash.items.len());
match pool.get() {
Ok(conn) => {
let result = conn.immediate_transaction::<_, FlowyError, _>(|| {
let _ = TrashTableSql::create_trash(repeated_trash.items.clone(), &*conn)?;
TrashTableSql::read_all(&conn)
});
let result = persistence.begin_transaction(|transaction| {
let _ = transaction.create_trash(repeated_trash.items.clone())?;
transaction.read_all_trash()
});
match result {
Ok(repeated_trash) => {
notify_trash_changed(repeated_trash);
},
Err(e) => log::error!("Save trash failed: {:?}", e),
}
match result {
Ok(repeated_trash) => {
notify_trash_changed(repeated_trash);
},
Err(e) => log::error!("Require db connection failed: {:?}", e),
Err(e) => log::error!("Save trash failed: {:?}", e),
}
},
Err(e) => log::error!("Read trash failed: {:?}", e),

View File

@ -10,8 +10,7 @@ use std::sync::Arc;
pub(crate) async fn read_trash_handler(
controller: Unit<Arc<TrashController>>,
) -> DataResult<RepeatedTrash, FlowyError> {
let conn = controller.database.db_connection()?;
let repeated_trash = controller.read_trash(&conn)?;
let repeated_trash = controller.read_trash()?;
data_result(repeated_trash)
}

View File

@ -1,3 +1,2 @@
pub mod controller;
pub mod event_handler;
mod sql;

View File

@ -3,7 +3,7 @@ use flowy_collaboration::entities::{
doc::{DocumentDelta, DocumentId},
revision::{RepeatedRevision, Revision},
};
use flowy_database::SqliteConnection;
use futures::{FutureExt, StreamExt};
use std::{collections::HashSet, sync::Arc};
@ -14,9 +14,9 @@ use crate::{
view::{CreateViewParams, RepeatedView, UpdateViewParams, View, ViewId},
},
errors::{FlowyError, FlowyResult},
module::{WorkspaceCloudService, WorkspaceDatabase, WorkspaceUser},
module::{WorkspaceCloudService, WorkspaceUser},
services::{
view::sql::{ViewChangeset, ViewTable, ViewTableSql},
persistence::{FlowyCorePersistence, FlowyCorePersistenceTransaction, ViewChangeset},
TrashController,
TrashEvent,
},
@ -31,7 +31,7 @@ const LATEST_VIEW_ID: &str = "latest_view_id";
pub(crate) struct ViewController {
user: Arc<dyn WorkspaceUser>,
cloud_service: Arc<dyn WorkspaceCloudService>,
database: Arc<dyn WorkspaceDatabase>,
persistence: Arc<FlowyCorePersistence>,
trash_controller: Arc<TrashController>,
document_ctx: Arc<DocumentContext>,
}
@ -39,7 +39,7 @@ pub(crate) struct ViewController {
impl ViewController {
pub(crate) fn new(
user: Arc<dyn WorkspaceUser>,
database: Arc<dyn WorkspaceDatabase>,
persistence: Arc<FlowyCorePersistence>,
cloud_service: Arc<dyn WorkspaceCloudService>,
trash_can: Arc<TrashController>,
document_ctx: Arc<DocumentContext>,
@ -47,7 +47,7 @@ impl ViewController {
Self {
user,
cloud_service,
database,
persistence,
trash_controller: trash_can,
document_ctx,
}
@ -77,51 +77,37 @@ impl ViewController {
}
pub(crate) async fn create_view_on_local(&self, view: View) -> Result<(), FlowyError> {
let conn = &*self.database.db_connection()?;
let trash_can = self.trash_controller.clone();
conn.immediate_transaction::<_, FlowyError, _>(|| {
let trash_controller = self.trash_controller.clone();
self.persistence.begin_transaction(|transaction| {
let belong_to_id = view.belong_to_id.clone();
let _ = self.save_view(view, conn)?;
let _ = notify_views_changed(&belong_to_id, trash_can, &conn)?;
let _ = transaction.create_view(view)?;
let _ = notify_views_changed(&belong_to_id, trash_controller, &transaction)?;
Ok(())
})?;
Ok(())
}
pub(crate) fn save_view(&self, view: View, conn: &SqliteConnection) -> Result<(), FlowyError> {
let _ = ViewTableSql::create_view(view, conn)?;
Ok(())
})
}
#[tracing::instrument(skip(self, params), fields(view_id = %params.view_id), err)]
pub(crate) async fn read_view(&self, params: ViewId) -> Result<View, FlowyError> {
let conn = self.database.db_connection()?;
let view_table = ViewTableSql::read_view(&params.view_id, &*conn)?;
let trash_ids = self.trash_controller.read_trash_ids(&conn)?;
if trash_ids.contains(&view_table.id) {
return Err(FlowyError::record_not_found());
}
let view: View = view_table.into();
let view = self.persistence.begin_transaction(|transaction| {
let view = transaction.read_view(&params.view_id)?;
let trash_ids = self.trash_controller.read_trash_ids(&transaction)?;
if trash_ids.contains(&view.id) {
return Err(FlowyError::record_not_found());
}
Ok(view)
})?;
let _ = self.read_view_on_server(params);
Ok(view)
}
pub(crate) fn read_view_tables(&self, ids: Vec<String>) -> Result<Vec<ViewTable>, FlowyError> {
let conn = &*self.database.db_connection()?;
let mut view_tables = vec![];
conn.immediate_transaction::<_, FlowyError, _>(|| {
pub(crate) fn read_local_views(&self, ids: Vec<String>) -> Result<Vec<View>, FlowyError> {
self.persistence.begin_transaction(|transaction| {
let mut views = vec![];
for view_id in ids {
view_tables.push(ViewTableSql::read_view(&view_id, conn)?);
views.push(transaction.read_view(&view_id)?);
}
Ok(())
})?;
Ok(view_tables)
Ok(views)
})
}
#[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
@ -156,7 +142,10 @@ impl ViewController {
#[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
pub(crate) async fn duplicate_view(&self, params: DocumentId) -> Result<(), FlowyError> {
let view: View = ViewTableSql::read_view(&params.doc_id, &*self.database.db_connection()?)?.into();
let view = self
.persistence
.begin_transaction(|transaction| transaction.read_view(&params.doc_id))?;
let editor = self.document_ctx.controller.open_document(&params.doc_id).await?;
let document_json = editor.document_json().await?;
let duplicate_params = CreateViewParams {
@ -186,31 +175,27 @@ impl ViewController {
// belong_to_id will be the app_id or view_id.
#[tracing::instrument(level = "debug", skip(self), err)]
pub(crate) async fn read_views_belong_to(&self, belong_to_id: &str) -> Result<RepeatedView, FlowyError> {
// TODO: read from server
let conn = self.database.db_connection()?;
let repeated_view = read_belonging_views_on_local(belong_to_id, self.trash_controller.clone(), &conn)?;
Ok(repeated_view)
self.persistence.begin_transaction(|transaction| {
read_belonging_views_on_local(belong_to_id, self.trash_controller.clone(), &transaction)
})
}
#[tracing::instrument(level = "debug", skip(self, params), err)]
pub(crate) async fn update_view(&self, params: UpdateViewParams) -> Result<View, FlowyError> {
let conn = &*self.database.db_connection()?;
let changeset = ViewChangeset::new(params.clone());
let view_id = changeset.id.clone();
let updated_view = conn.immediate_transaction::<_, FlowyError, _>(|| {
let _ = ViewTableSql::update_view(changeset, conn)?;
let view: View = ViewTableSql::read_view(&view_id, conn)?.into();
let view = self.persistence.begin_transaction(|transaction| {
let _ = transaction.update_view(changeset)?;
let view = transaction.read_view(&view_id)?;
send_dart_notification(&view_id, WorkspaceNotification::ViewUpdated)
.payload(view.clone())
.send();
let _ = notify_views_changed(&view.belong_to_id, self.trash_controller.clone(), &transaction)?;
Ok(view)
})?;
send_dart_notification(&view_id, WorkspaceNotification::ViewUpdated)
.payload(updated_view.clone())
.send();
//
let _ = notify_views_changed(&updated_view.belong_to_id, self.trash_controller.clone(), conn)?;
let _ = self.update_view_on_server(params);
Ok(updated_view)
Ok(view)
}
pub(crate) async fn receive_document_delta(&self, params: DocumentDelta) -> Result<DocumentDelta, FlowyError> {
@ -222,9 +207,10 @@ impl ViewController {
match KV::get_str(LATEST_VIEW_ID) {
None => Ok(None),
Some(view_id) => {
let conn = self.database.db_connection()?;
let view_table = ViewTableSql::read_view(&view_id, &*conn)?;
Ok(Some(view_table.into()))
let view = self
.persistence
.begin_transaction(|transaction| transaction.read_view(&view_id))?;
Ok(Some(view))
},
}
}
@ -260,23 +246,19 @@ impl ViewController {
fn read_view_on_server(&self, params: ViewId) -> Result<(), FlowyError> {
let token = self.user.token()?;
let server = self.cloud_service.clone();
let pool = self.database.db_pool()?;
let persistence = self.persistence.clone();
// TODO: Retry with RetryAction?
tokio::spawn(async move {
match server.read_view(&token, params).await {
Ok(Some(view)) => match pool.get() {
Ok(conn) => {
let result = ViewTableSql::create_view(view.clone(), &conn);
match result {
Ok(_) => {
send_dart_notification(&view.id, WorkspaceNotification::ViewUpdated)
.payload(view.clone())
.send();
},
Err(e) => log::error!("Save view failed: {:?}", e),
}
},
Err(e) => log::error!("Require db connection failed: {:?}", e),
Ok(Some(view)) => {
match persistence.begin_transaction(|transaction| transaction.create_view(view.clone())) {
Ok(_) => {
send_dart_notification(&view.id, WorkspaceNotification::ViewUpdated)
.payload(view.clone())
.send();
},
Err(e) => log::error!("Save view failed: {:?}", e),
}
},
Ok(None) => {},
Err(e) => log::error!("Read view failed: {:?}", e),
@ -287,9 +269,9 @@ impl ViewController {
fn listen_trash_can_event(&self) {
let mut rx = self.trash_controller.subscribe();
let database = self.database.clone();
let persistence = self.persistence.clone();
let document = self.document_ctx.clone();
let trash_can = self.trash_controller.clone();
let trash_controller = self.trash_controller.clone();
let _ = tokio::spawn(async move {
loop {
let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move {
@ -300,96 +282,87 @@ impl ViewController {
}));
if let Some(event) = stream.next().await {
handle_trash_event(database.clone(), document.clone(), trash_can.clone(), event).await
handle_trash_event(persistence.clone(), document.clone(), trash_controller.clone(), event).await
}
}
});
}
}
#[tracing::instrument(level = "trace", skip(database, context, trash_can))]
#[tracing::instrument(level = "trace", skip(persistence, context, trash_can))]
async fn handle_trash_event(
database: Arc<dyn WorkspaceDatabase>,
persistence: Arc<FlowyCorePersistence>,
context: Arc<DocumentContext>,
trash_can: Arc<TrashController>,
event: TrashEvent,
) {
let db_result = database.db_connection();
match event {
TrashEvent::NewTrash(identifiers, ret) => {
let result = || {
let conn = &*db_result?;
let view_tables = read_view_tables(identifiers, conn)?;
for view_table in view_tables {
let _ = notify_views_changed(&view_table.belong_to_id, trash_can.clone(), conn)?;
notify_dart(view_table, WorkspaceNotification::ViewDeleted);
let result = persistence.begin_transaction(|transaction| {
let views = read_local_views_with_transaction(identifiers, &transaction)?;
for view in views {
let _ = notify_views_changed(&view.belong_to_id, trash_can.clone(), &transaction)?;
notify_dart(view, WorkspaceNotification::ViewDeleted);
}
Ok::<(), FlowyError>(())
};
let _ = ret.send(result()).await;
Ok(())
});
let _ = ret.send(result).await;
},
TrashEvent::Putback(identifiers, ret) => {
let result = || {
let conn = &*db_result?;
let view_tables = read_view_tables(identifiers, conn)?;
for view_table in view_tables {
let _ = notify_views_changed(&view_table.belong_to_id, trash_can.clone(), conn)?;
notify_dart(view_table, WorkspaceNotification::ViewRestored);
let result = persistence.begin_transaction(|transaction| {
let views = read_local_views_with_transaction(identifiers, &transaction)?;
for view in views {
let _ = notify_views_changed(&view.belong_to_id, trash_can.clone(), &transaction)?;
notify_dart(view, WorkspaceNotification::ViewRestored);
}
Ok::<(), FlowyError>(())
};
let _ = ret.send(result()).await;
Ok(())
});
let _ = ret.send(result).await;
},
TrashEvent::Delete(identifiers, ret) => {
let result = || {
let conn = &*db_result?;
let _ = conn.immediate_transaction::<_, FlowyError, _>(|| {
let mut notify_ids = HashSet::new();
for identifier in identifiers.items {
let view_table = ViewTableSql::read_view(&identifier.id, conn)?;
let _ = ViewTableSql::delete_view(&identifier.id, conn)?;
let _ = context.controller.delete(&identifier.id)?;
notify_ids.insert(view_table.belong_to_id);
}
let result = persistence.begin_transaction(|transaction| {
let mut notify_ids = HashSet::new();
for identifier in identifiers.items {
let view = transaction.read_view(&identifier.id)?;
let _ = transaction.delete_view(&identifier.id)?;
let _ = context.controller.delete(&identifier.id)?;
notify_ids.insert(view.belong_to_id);
}
for notify_id in notify_ids {
let _ = notify_views_changed(&notify_id, trash_can.clone(), conn)?;
}
for notify_id in notify_ids {
let _ = notify_views_changed(&notify_id, trash_can.clone(), &transaction)?;
}
Ok(())
})?;
Ok::<(), FlowyError>(())
};
let _ = ret.send(result()).await;
Ok(())
});
let _ = ret.send(result).await;
},
}
}
fn read_view_tables(identifiers: RepeatedTrashId, conn: &SqliteConnection) -> Result<Vec<ViewTable>, FlowyError> {
let mut view_tables = vec![];
let _ = conn.immediate_transaction::<_, FlowyError, _>(|| {
for identifier in identifiers.items {
let view_table = ViewTableSql::read_view(&identifier.id, conn)?;
view_tables.push(view_table);
}
Ok(())
})?;
Ok(view_tables)
fn read_local_views_with_transaction<'a>(
identifiers: RepeatedTrashId,
transaction: &'a (dyn FlowyCorePersistenceTransaction + 'a),
) -> Result<Vec<View>, FlowyError> {
let mut views = vec![];
for identifier in identifiers.items {
let view = transaction.read_view(&identifier.id)?;
views.push(view);
}
Ok(views)
}
fn notify_dart(view_table: ViewTable, notification: WorkspaceNotification) {
let view: View = view_table.into();
fn notify_dart(view: View, notification: WorkspaceNotification) {
send_dart_notification(&view.id, notification).payload(view).send();
}
#[tracing::instrument(skip(belong_to_id, trash_controller, conn), fields(view_count), err)]
fn notify_views_changed(
#[tracing::instrument(skip(belong_to_id, trash_controller, transaction), fields(view_count), err)]
fn notify_views_changed<'a>(
belong_to_id: &str,
trash_controller: Arc<TrashController>,
conn: &SqliteConnection,
transaction: &'a (dyn FlowyCorePersistenceTransaction + 'a),
) -> FlowyResult<()> {
let repeated_view = read_belonging_views_on_local(belong_to_id, trash_controller.clone(), conn)?;
let repeated_view = read_belonging_views_on_local(belong_to_id, trash_controller.clone(), transaction)?;
tracing::Span::current().record("view_count", &format!("{}", repeated_view.len()).as_str());
send_dart_notification(&belong_to_id, WorkspaceNotification::AppViewsChanged)
.payload(repeated_view)
@ -397,19 +370,14 @@ fn notify_views_changed(
Ok(())
}
fn read_belonging_views_on_local(
fn read_belonging_views_on_local<'a>(
belong_to_id: &str,
trash_controller: Arc<TrashController>,
conn: &SqliteConnection,
transaction: &'a (dyn FlowyCorePersistenceTransaction + 'a),
) -> FlowyResult<RepeatedView> {
let mut view_tables = ViewTableSql::read_views(belong_to_id, conn)?;
let trash_ids = trash_controller.read_trash_ids(conn)?;
view_tables.retain(|view_table| !trash_ids.contains(&view_table.id));
let views = view_tables
.into_iter()
.map(|view_table| view_table.into())
.collect::<Vec<View>>();
let mut views = transaction.read_views(belong_to_id)?;
let trash_ids = trash_controller.read_trash_ids(transaction)?;
views.retain(|view_table| !trash_ids.contains(&view_table.id));
Ok(RepeatedView { items: views })
}

View File

@ -70,9 +70,9 @@ pub(crate) async fn delete_view_handler(
}
let trash = view_controller
.read_view_tables(params.items)?
.read_local_views(params.items)?
.into_iter()
.map(|view_table| view_table.into())
.map(|view| view.into())
.collect::<Vec<Trash>>();
let _ = trash_controller.add(trash).await?;

View File

@ -1,3 +1,2 @@
pub mod controller;
pub mod event_handler;
mod sql;

View File

@ -1,20 +1,20 @@
use crate::{
dart_notification::*,
errors::*,
module::{WorkspaceCloudService, WorkspaceDatabase, WorkspaceUser},
module::{WorkspaceCloudService, WorkspaceUser},
services::{
persistence::{FlowyCorePersistence, FlowyCorePersistenceTransaction, WorkspaceChangeset},
read_local_workspace_apps,
workspace::sql::{WorkspaceTableChangeset, WorkspaceTableSql},
TrashController,
},
};
use flowy_core_data_model::entities::{app::RepeatedApp, workspace::*};
use flowy_database::{kv::KV, SqliteConnection};
use flowy_database::kv::KV;
use std::sync::Arc;
pub struct WorkspaceController {
pub user: Arc<dyn WorkspaceUser>,
pub(crate) database: Arc<dyn WorkspaceDatabase>,
persistence: Arc<FlowyCorePersistence>,
pub(crate) trash_controller: Arc<TrashController>,
cloud_service: Arc<dyn WorkspaceCloudService>,
}
@ -22,13 +22,13 @@ pub struct WorkspaceController {
impl WorkspaceController {
pub(crate) fn new(
user: Arc<dyn WorkspaceUser>,
database: Arc<dyn WorkspaceDatabase>,
persistence: Arc<FlowyCorePersistence>,
trash_can: Arc<TrashController>,
cloud_service: Arc<dyn WorkspaceCloudService>,
) -> Self {
Self {
user,
database,
persistence,
trash_controller: trash_can,
cloud_service,
}
@ -47,49 +47,31 @@ impl WorkspaceController {
pub(crate) async fn create_workspace_on_local(&self, workspace: Workspace) -> Result<Workspace, FlowyError> {
let user_id = self.user.user_id()?;
let token = self.user.token()?;
let conn = &*self.database.db_connection()?;
//[[immediate_transaction]]
// https://sqlite.org/lang_transaction.html
// IMMEDIATE cause the database connection to start a new write immediately,
// without waiting for a write statement. The BEGIN IMMEDIATE might fail
// with SQLITE_BUSY if another write transaction is already active on another
// database connection.
//
// EXCLUSIVE is similar to IMMEDIATE in that a write transaction is started
// immediately. EXCLUSIVE and IMMEDIATE are the same in WAL mode, but in
// other journaling modes, EXCLUSIVE prevents other database connections from
// reading the database while the transaction is underway.
conn.immediate_transaction::<_, FlowyError, _>(|| {
WorkspaceTableSql::create_workspace(&user_id, workspace.clone(), conn)?;
let repeated_workspace = self.read_local_workspaces(None, &user_id, conn)?;
send_dart_notification(&token, WorkspaceNotification::UserCreateWorkspace)
.payload(repeated_workspace)
.send();
Ok(())
let workspaces = self.persistence.begin_transaction(|transaction| {
let _ = transaction.create_workspace(&user_id, workspace.clone())?;
transaction.read_workspaces(&user_id, None)
})?;
let repeated_workspace = RepeatedWorkspace { items: workspaces };
send_dart_notification(&token, WorkspaceNotification::UserCreateWorkspace)
.payload(repeated_workspace)
.send();
set_current_workspace(&workspace.id);
Ok(workspace)
}
#[allow(dead_code)]
pub(crate) async fn update_workspace(&self, params: UpdateWorkspaceParams) -> Result<(), FlowyError> {
let changeset = WorkspaceTableChangeset::new(params.clone());
let changeset = WorkspaceChangeset::new(params.clone());
let workspace_id = changeset.id.clone();
let conn = &*self.database.db_connection()?;
conn.immediate_transaction::<_, FlowyError, _>(|| {
let _ = WorkspaceTableSql::update_workspace(changeset, conn)?;
let workspace = self.persistence.begin_transaction(|transaction| {
let _ = transaction.update_workspace(changeset)?;
let user_id = self.user.user_id()?;
let workspace = self.read_local_workspace(workspace_id.clone(), &user_id, conn)?;
send_dart_notification(&workspace_id, WorkspaceNotification::WorkspaceUpdated)
.payload(workspace)
.send();
Ok(())
self.read_local_workspace(workspace_id.clone(), &user_id, &transaction)
})?;
send_dart_notification(&workspace_id, WorkspaceNotification::WorkspaceUpdated)
.payload(workspace)
.send();
let _ = self.update_workspace_on_server(params)?;
Ok(())
@ -99,26 +81,23 @@ impl WorkspaceController {
pub(crate) async fn delete_workspace(&self, workspace_id: &str) -> Result<(), FlowyError> {
let user_id = self.user.user_id()?;
let token = self.user.token()?;
let conn = &*self.database.db_connection()?;
conn.immediate_transaction::<_, FlowyError, _>(|| {
let _ = WorkspaceTableSql::delete_workspace(workspace_id, conn)?;
let repeated_workspace = self.read_local_workspaces(None, &user_id, conn)?;
send_dart_notification(&token, WorkspaceNotification::UserDeleteWorkspace)
.payload(repeated_workspace)
.send();
Ok(())
let repeated_workspace = self.persistence.begin_transaction(|transaction| {
let _ = transaction.delete_workspace(workspace_id)?;
self.read_local_workspaces(None, &user_id, &transaction)
})?;
send_dart_notification(&token, WorkspaceNotification::UserDeleteWorkspace)
.payload(repeated_workspace)
.send();
let _ = self.delete_workspace_on_server(workspace_id)?;
Ok(())
}
pub(crate) async fn open_workspace(&self, params: WorkspaceId) -> Result<Workspace, FlowyError> {
let user_id = self.user.user_id()?;
let conn = self.database.db_connection()?;
if let Some(workspace_id) = params.workspace_id {
let workspace = self.read_local_workspace(workspace_id, &user_id, &*conn)?;
let workspace = self
.persistence
.begin_transaction(|transaction| self.read_local_workspace(workspace_id, &user_id, &transaction))?;
set_current_workspace(&workspace.id);
Ok(workspace)
} else {
@ -128,52 +107,39 @@ impl WorkspaceController {
pub(crate) async fn read_current_workspace_apps(&self) -> Result<RepeatedApp, FlowyError> {
let workspace_id = get_current_workspace()?;
let conn = self.database.db_connection()?;
let repeated_app = self.read_local_apps(&workspace_id, &*conn)?;
let repeated_app = self.persistence.begin_transaction(|transaction| {
read_local_workspace_apps(&workspace_id, self.trash_controller.clone(), &transaction)
})?;
// TODO: read from server
Ok(repeated_app)
}
#[tracing::instrument(level = "debug", skip(self, conn), err)]
pub(crate) fn read_local_workspaces(
#[tracing::instrument(level = "debug", skip(self, transaction), err)]
pub(crate) fn read_local_workspaces<'a>(
&self,
workspace_id: Option<String>,
user_id: &str,
conn: &SqliteConnection,
transaction: &'a (dyn FlowyCorePersistenceTransaction + 'a),
) -> Result<RepeatedWorkspace, FlowyError> {
let workspace_id = workspace_id.to_owned();
let workspace_tables = WorkspaceTableSql::read_workspaces(workspace_id, user_id, conn)?;
let mut workspaces = vec![];
for table in workspace_tables {
let workspace: Workspace = table.into();
workspaces.push(workspace);
}
let workspaces = transaction.read_workspaces(user_id, workspace_id)?;
Ok(RepeatedWorkspace { items: workspaces })
}
pub(crate) fn read_local_workspace(
pub(crate) fn read_local_workspace<'a>(
&self,
workspace_id: String,
user_id: &str,
conn: &SqliteConnection,
transaction: &'a (dyn FlowyCorePersistenceTransaction + 'a),
) -> Result<Workspace, FlowyError> {
// Opti: fetch single workspace from local db
let mut repeated_workspace = self.read_local_workspaces(Some(workspace_id.clone()), user_id, conn)?;
if repeated_workspace.is_empty() {
let mut workspaces = transaction.read_workspaces(user_id, Some(workspace_id.clone()))?;
if workspaces.is_empty() {
return Err(FlowyError::record_not_found().context(format!("{} workspace not found", workspace_id)));
}
debug_assert_eq!(repeated_workspace.len(), 1);
let workspace = repeated_workspace.drain(..1).collect::<Vec<Workspace>>().pop().unwrap();
debug_assert_eq!(workspaces.len(), 1);
let workspace = workspaces.drain(..1).collect::<Vec<Workspace>>().pop().unwrap();
Ok(workspace)
}
#[tracing::instrument(level = "debug", skip(self, conn), err)]
fn read_local_apps(&self, workspace_id: &str, conn: &SqliteConnection) -> Result<RepeatedApp, FlowyError> {
let repeated_app = read_local_workspace_apps(workspace_id, self.trash_controller.clone(), conn)?;
Ok(repeated_app)
}
}
impl WorkspaceController {

View File

@ -2,18 +2,14 @@ use crate::{
context::CoreContext,
dart_notification::{send_dart_notification, WorkspaceNotification},
errors::FlowyError,
services::{
get_current_workspace,
read_local_workspace_apps,
workspace::sql::WorkspaceTableSql,
WorkspaceController,
},
services::{get_current_workspace, read_local_workspace_apps, WorkspaceController},
};
use flowy_core_data_model::entities::{
app::RepeatedApp,
view::View,
workspace::{CurrentWorkspaceSetting, QueryWorkspaceRequest, RepeatedWorkspace, WorkspaceId, *},
};
use flowy_error::FlowyResult;
use lib_dispatch::prelude::{data_result, Data, DataResult, Unit};
use std::{convert::TryInto, sync::Arc};
@ -53,21 +49,19 @@ pub(crate) async fn read_workspaces_handler(
) -> DataResult<RepeatedWorkspace, FlowyError> {
let params: WorkspaceId = data.into_inner().try_into()?;
let user_id = core.user.user_id()?;
let conn = &*core.database.db_connection()?;
let workspace_controller = core.workspace_controller.clone();
let trash_controller = core.trash_controller.clone();
let workspaces = conn.immediate_transaction::<_, FlowyError, _>(|| {
let mut workspaces = workspace_controller.read_local_workspaces(params.workspace_id.clone(), &user_id, conn)?;
let workspaces = core.persistence.begin_transaction(|transaction| {
let mut workspaces =
workspace_controller.read_local_workspaces(params.workspace_id.clone(), &user_id, &transaction)?;
for workspace in workspaces.iter_mut() {
let apps = read_local_workspace_apps(&workspace.id, trash_controller.clone(), conn)?.into_inner();
let apps = read_local_workspace_apps(&workspace.id, trash_controller.clone(), &transaction)?.into_inner();
workspace.apps.items = apps;
}
Ok(workspaces)
})?;
let _ = read_workspaces_on_server(core, user_id, params);
data_result(workspaces)
}
@ -80,10 +74,11 @@ pub async fn read_cur_workspace_handler(
let params = WorkspaceId {
workspace_id: Some(workspace_id.clone()),
};
let conn = &*core.database.db_connection()?;
let workspace = core
.workspace_controller
.read_local_workspace(workspace_id, &user_id, conn)?;
let workspace = core.persistence.begin_transaction(|transaction| {
core.workspace_controller
.read_local_workspace(workspace_id, &user_id, &transaction)
})?;
let latest_view: Option<View> = core.view_controller.latest_visit_view().unwrap_or(None);
let setting = CurrentWorkspaceSetting { workspace, latest_view };
@ -98,30 +93,29 @@ fn read_workspaces_on_server(
params: WorkspaceId,
) -> Result<(), FlowyError> {
let (token, server) = (core.user.token()?, core.cloud_service.clone());
let app_ctrl = core.app_controller.clone();
let view_ctrl = core.view_controller.clone();
let conn = core.database.db_connection()?;
let _app_ctrl = core.app_controller.clone();
let _view_ctrl = core.view_controller.clone();
let persistence = core.persistence.clone();
tokio::spawn(async move {
// Opti: handle the error and retry?
let workspaces = server.read_workspace(&token, params).await?;
let _ = (&*conn).immediate_transaction::<_, FlowyError, _>(|| {
let _ = persistence.begin_transaction(|transaction| {
tracing::debug!("Save {} workspace", workspaces.len());
for workspace in &workspaces.items {
let m_workspace = workspace.clone();
let apps = m_workspace.apps.clone().into_inner();
let _ = WorkspaceTableSql::create_workspace(&user_id, m_workspace, &*conn)?;
let _ = transaction.create_workspace(&user_id, m_workspace)?;
tracing::debug!("Save {} apps", apps.len());
for app in apps {
let views = app.belongings.clone().into_inner();
match app_ctrl.save_app(app, &*conn) {
match transaction.create_app(app) {
Ok(_) => {},
Err(e) => log::error!("create app failed: {:?}", e),
}
tracing::debug!("Save {} views", views.len());
for view in views {
match view_ctrl.save_view(view, &*conn) {
match transaction.create_view(view) {
Ok(_) => {},
Err(e) => log::error!("create view failed: {:?}", e),
}

View File

@ -1,3 +1,2 @@
pub mod controller;
pub mod event_handler;
pub(crate) mod sql;

View File

@ -1,4 +1,4 @@
use crate::impl_def_and_def_mut;
use crate::{entities::app::App, impl_def_and_def_mut};
use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
use std::fmt::Formatter;
@ -123,3 +123,15 @@ pub struct RepeatedTrash {
}
impl_def_and_def_mut!(RepeatedTrash, Trash);
impl std::convert::From<App> for Trash {
fn from(app: App) -> Self {
Trash {
id: app.id,
name: app.name,
modified_time: app.modified_time,
create_time: app.create_time,
ty: TrashType::App,
}
}
}