mirror of
https://github.com/AppFlowy-IO/AppFlowy.git
synced 2024-08-30 18:12:39 +00:00
chore: impl view data processor
This commit is contained in:
@ -42,7 +42,7 @@ bytes = { version = "1.0" }
|
||||
crossbeam = "0.8"
|
||||
crossbeam-utils = "0.8"
|
||||
chrono = "0.4"
|
||||
async-trait = "0.1.52"
|
||||
dashmap = "4.0"
|
||||
|
||||
[dev-dependencies]
|
||||
serial_test = "0.5.1"
|
||||
|
@ -8,17 +8,18 @@ use crate::{
|
||||
TrashController, ViewController, WorkspaceController,
|
||||
},
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use chrono::Utc;
|
||||
use flowy_block::BlockManager;
|
||||
use flowy_collaboration::client_document::default::{initial_quill_delta, initial_quill_delta_string, initial_read_me};
|
||||
use flowy_collaboration::entities::revision::{RepeatedRevision, Revision};
|
||||
|
||||
use flowy_collaboration::client_document::default::{initial_quill_delta_string, initial_read_me};
|
||||
use flowy_collaboration::entities::revision::RepeatedRevision;
|
||||
use flowy_collaboration::{client_folder::FolderPad, entities::ws_data::ServerRevisionWSData};
|
||||
use flowy_error::FlowyError;
|
||||
use flowy_folder_data_model::entities::view::ViewDataType;
|
||||
use flowy_folder_data_model::user_default;
|
||||
use flowy_sync::RevisionWebSocket;
|
||||
use lazy_static::lazy_static;
|
||||
use lib_infra::future::FutureResult;
|
||||
use std::{collections::HashMap, convert::TryInto, fmt::Formatter, sync::Arc};
|
||||
use tokio::sync::RwLock as TokioRwLock;
|
||||
lazy_static! {
|
||||
@ -62,6 +63,7 @@ pub struct FolderManager {
|
||||
pub(crate) trash_controller: Arc<TrashController>,
|
||||
web_socket: Arc<dyn RevisionWebSocket>,
|
||||
folder_editor: Arc<TokioRwLock<Option<Arc<ClientFolderEditor>>>>,
|
||||
data_processors: ViewDataProcessorMap,
|
||||
}
|
||||
|
||||
impl FolderManager {
|
||||
@ -69,8 +71,7 @@ impl FolderManager {
|
||||
user: Arc<dyn WorkspaceUser>,
|
||||
cloud_service: Arc<dyn FolderCouldServiceV1>,
|
||||
database: Arc<dyn WorkspaceDatabase>,
|
||||
data_processors: DataProcessorMap,
|
||||
block_manager: Arc<BlockManager>,
|
||||
data_processors: ViewDataProcessorMap,
|
||||
web_socket: Arc<dyn RevisionWebSocket>,
|
||||
) -> Self {
|
||||
if let Ok(user_id) = user.user_id() {
|
||||
@ -93,8 +94,7 @@ impl FolderManager {
|
||||
persistence.clone(),
|
||||
cloud_service.clone(),
|
||||
trash_controller.clone(),
|
||||
data_processors,
|
||||
block_manager,
|
||||
data_processors.clone(),
|
||||
));
|
||||
|
||||
let app_controller = Arc::new(AppController::new(
|
||||
@ -121,6 +121,7 @@ impl FolderManager {
|
||||
trash_controller,
|
||||
web_socket,
|
||||
folder_editor,
|
||||
data_processors,
|
||||
}
|
||||
}
|
||||
|
||||
@ -167,6 +168,11 @@ impl FolderManager {
|
||||
|
||||
let _ = self.app_controller.initialize()?;
|
||||
let _ = self.view_controller.initialize()?;
|
||||
|
||||
self.data_processors.iter().for_each(|(_, processor)| {
|
||||
processor.initialize();
|
||||
});
|
||||
|
||||
write_guard.insert(user_id.to_owned(), true);
|
||||
Ok(())
|
||||
}
|
||||
@ -201,7 +207,9 @@ impl DefaultFolderBuilder {
|
||||
initial_quill_delta_string()
|
||||
};
|
||||
view_controller.set_latest_view(view);
|
||||
let _ = view_controller.create_view(&view.id, Bytes::from(view_data)).await?;
|
||||
let _ = view_controller
|
||||
.create_view(&view.id, ViewDataType::RichText, Bytes::from(view_data))
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
let folder = FolderPad::new(vec![workspace.clone()], vec![])?;
|
||||
@ -222,13 +230,20 @@ impl FolderManager {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait ViewDataProcessor {
|
||||
async fn create_container(&self, view_id: &str, repeated_revision: RepeatedRevision) -> FlowyResult<()>;
|
||||
async fn delete_container(&self, view_id: &str) -> FlowyResult<()>;
|
||||
async fn close_container(&self, view_id: &str) -> FlowyResult<()>;
|
||||
async fn delta_str(&self, view_id: &str) -> FlowyResult<String>;
|
||||
fn initialize(&self) -> FutureResult<(), FlowyError>;
|
||||
|
||||
fn create_container(&self, view_id: &str, repeated_revision: RepeatedRevision) -> FutureResult<(), FlowyError>;
|
||||
|
||||
fn delete_container(&self, view_id: &str) -> FutureResult<(), FlowyError>;
|
||||
|
||||
fn close_container(&self, view_id: &str) -> FutureResult<(), FlowyError>;
|
||||
|
||||
fn delta_str(&self, view_id: &str) -> FutureResult<String, FlowyError>;
|
||||
|
||||
fn default_view_data(&self) -> String;
|
||||
|
||||
fn data_type(&self) -> ViewDataType;
|
||||
}
|
||||
|
||||
pub type DataProcessorMap = Arc<HashMap<ViewDataType, Arc<dyn ViewDataProcessor + Send + Sync>>>;
|
||||
pub type ViewDataProcessorMap = Arc<HashMap<ViewDataType, Arc<dyn ViewDataProcessor + Send + Sync>>>;
|
||||
|
@ -1,15 +1,4 @@
|
||||
use bytes::Bytes;
|
||||
use flowy_collaboration::entities::{
|
||||
document_info::{BlockDelta, BlockId},
|
||||
revision::{RepeatedRevision, Revision},
|
||||
};
|
||||
|
||||
use flowy_collaboration::client_document::default::initial_quill_delta_string;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use std::collections::HashMap;
|
||||
use std::{collections::HashSet, sync::Arc};
|
||||
|
||||
use crate::manager::DataProcessorMap;
|
||||
use crate::manager::{ViewDataProcessor, ViewDataProcessorMap};
|
||||
use crate::{
|
||||
dart_notification::{send_dart_notification, FolderNotification},
|
||||
entities::{
|
||||
@ -23,10 +12,16 @@ use crate::{
|
||||
TrashController, TrashEvent,
|
||||
},
|
||||
};
|
||||
use flowy_block::BlockManager;
|
||||
use bytes::Bytes;
|
||||
use flowy_collaboration::entities::{
|
||||
document_info::{BlockDelta, BlockId},
|
||||
revision::{RepeatedRevision, Revision},
|
||||
};
|
||||
use flowy_database::kv::KV;
|
||||
use flowy_folder_data_model::entities::view::ViewDataType;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use lib_infra::uuid;
|
||||
use std::{collections::HashSet, sync::Arc};
|
||||
|
||||
const LATEST_VIEW_ID: &str = "latest_view_id";
|
||||
|
||||
@ -35,8 +30,7 @@ pub(crate) struct ViewController {
|
||||
cloud_service: Arc<dyn FolderCouldServiceV1>,
|
||||
persistence: Arc<FolderPersistence>,
|
||||
trash_controller: Arc<TrashController>,
|
||||
data_processors: DataProcessorMap,
|
||||
block_manager: Arc<BlockManager>,
|
||||
data_processors: ViewDataProcessorMap,
|
||||
}
|
||||
|
||||
impl ViewController {
|
||||
@ -45,8 +39,7 @@ impl ViewController {
|
||||
persistence: Arc<FolderPersistence>,
|
||||
cloud_service: Arc<dyn FolderCouldServiceV1>,
|
||||
trash_controller: Arc<TrashController>,
|
||||
data_processors: DataProcessorMap,
|
||||
block_manager: Arc<BlockManager>,
|
||||
data_processors: ViewDataProcessorMap,
|
||||
) -> Self {
|
||||
Self {
|
||||
user,
|
||||
@ -54,38 +47,48 @@ impl ViewController {
|
||||
persistence,
|
||||
trash_controller,
|
||||
data_processors,
|
||||
block_manager,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn initialize(&self) -> Result<(), FlowyError> {
|
||||
let _ = self.block_manager.init()?;
|
||||
self.listen_trash_can_event();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(self, params), fields(name = %params.name), err)]
|
||||
pub(crate) async fn create_view_from_params(&self, params: CreateViewParams) -> Result<View, FlowyError> {
|
||||
let view_data = if params.data.is_empty() {
|
||||
initial_quill_delta_string()
|
||||
pub(crate) async fn create_view_from_params(&self, mut params: CreateViewParams) -> Result<View, FlowyError> {
|
||||
let processor = self.get_data_processor(¶ms.data_type)?;
|
||||
let content = if params.data.is_empty() {
|
||||
let default_view_data = processor.default_view_data();
|
||||
params.data = default_view_data.clone();
|
||||
default_view_data
|
||||
} else {
|
||||
params.data.clone()
|
||||
};
|
||||
|
||||
let _ = self.create_view(¶ms.view_id, Bytes::from(view_data)).await?;
|
||||
let delta_data = Bytes::from(content);
|
||||
let _ = self
|
||||
.create_view(¶ms.view_id, params.data_type.clone(), delta_data)
|
||||
.await?;
|
||||
let view = self.create_view_on_server(params).await?;
|
||||
let _ = self.create_view_on_local(view.clone()).await?;
|
||||
Ok(view)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self, view_id, delta_data), err)]
|
||||
pub(crate) async fn create_view(&self, view_id: &str, delta_data: Bytes) -> Result<(), FlowyError> {
|
||||
pub(crate) async fn create_view(
|
||||
&self,
|
||||
view_id: &str,
|
||||
data_type: ViewDataType,
|
||||
delta_data: Bytes,
|
||||
) -> Result<(), FlowyError> {
|
||||
if delta_data.is_empty() {
|
||||
return Err(FlowyError::internal().context("The content of the view should not be empty"));
|
||||
}
|
||||
let user_id = self.user.user_id()?;
|
||||
let repeated_revision: RepeatedRevision = Revision::initial_revision(&user_id, view_id, delta_data).into();
|
||||
let _ = self.block_manager.create_block(view_id, repeated_revision).await?;
|
||||
let processor = self.get_data_processor(&data_type)?;
|
||||
let _ = processor.create_container(view_id, repeated_revision).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -132,8 +135,8 @@ impl ViewController {
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self), err)]
|
||||
pub(crate) async fn open_view(&self, view_id: &str) -> Result<BlockDelta, FlowyError> {
|
||||
let editor = self.block_manager.open_block(view_id).await?;
|
||||
let delta_str = editor.delta_str().await?;
|
||||
let processor = self.get_data_processor_from_view_id(view_id).await?;
|
||||
let delta_str = processor.delta_str(view_id).await?;
|
||||
KV::set_str(LATEST_VIEW_ID, view_id.to_owned());
|
||||
Ok(BlockDelta {
|
||||
block_id: view_id.to_string(),
|
||||
@ -142,8 +145,9 @@ impl ViewController {
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self), err)]
|
||||
pub(crate) async fn close_view(&self, doc_id: &str) -> Result<(), FlowyError> {
|
||||
let _ = self.block_manager.close_block(doc_id)?;
|
||||
pub(crate) async fn close_view(&self, view_id: &str) -> Result<(), FlowyError> {
|
||||
let processor = self.get_data_processor_from_view_id(view_id).await?;
|
||||
let _ = processor.close_container(view_id).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -154,7 +158,8 @@ impl ViewController {
|
||||
let _ = KV::remove(LATEST_VIEW_ID);
|
||||
}
|
||||
}
|
||||
let _ = self.block_manager.close_block(¶ms.value)?;
|
||||
let processor = self.get_data_processor_from_view_id(¶ms.value).await?;
|
||||
let _ = processor.close_container(¶ms.value).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -165,8 +170,8 @@ impl ViewController {
|
||||
.begin_transaction(|transaction| transaction.read_view(view_id))
|
||||
.await?;
|
||||
|
||||
let editor = self.block_manager.open_block(view_id).await?;
|
||||
let delta_str = editor.delta_str().await?;
|
||||
let processor = self.get_data_processor(&view.data_type)?;
|
||||
let delta_str = processor.delta_str(view_id).await?;
|
||||
let duplicate_params = CreateViewParams {
|
||||
belong_to_id: view.belong_to_id.clone(),
|
||||
name: format!("{} (copy)", &view.name),
|
||||
@ -287,7 +292,7 @@ impl ViewController {
|
||||
fn listen_trash_can_event(&self) {
|
||||
let mut rx = self.trash_controller.subscribe();
|
||||
let persistence = self.persistence.clone();
|
||||
let block_manager = self.block_manager.clone();
|
||||
let data_processors = self.data_processors.clone();
|
||||
let trash_controller = self.trash_controller.clone();
|
||||
let _ = tokio::spawn(async move {
|
||||
loop {
|
||||
@ -301,7 +306,7 @@ impl ViewController {
|
||||
if let Some(event) = stream.next().await {
|
||||
handle_trash_event(
|
||||
persistence.clone(),
|
||||
block_manager.clone(),
|
||||
data_processors.clone(),
|
||||
trash_controller.clone(),
|
||||
event,
|
||||
)
|
||||
@ -310,12 +315,34 @@ impl ViewController {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn get_data_processor_from_view_id(
|
||||
&self,
|
||||
view_id: &str,
|
||||
) -> FlowyResult<Arc<dyn ViewDataProcessor + Send + Sync>> {
|
||||
let view = self
|
||||
.persistence
|
||||
.begin_transaction(|transaction| transaction.read_view(view_id))
|
||||
.await?;
|
||||
self.get_data_processor(&view.data_type)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn get_data_processor(&self, data_type: &ViewDataType) -> FlowyResult<Arc<dyn ViewDataProcessor + Send + Sync>> {
|
||||
match self.data_processors.get(data_type) {
|
||||
None => Err(FlowyError::internal().context(format!(
|
||||
"Get data processor failed. Unknown view data type: {:?}",
|
||||
data_type
|
||||
))),
|
||||
Some(processor) => Ok(processor.clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(persistence, block_manager, trash_can))]
|
||||
#[tracing::instrument(level = "trace", skip(persistence, data_processors, trash_can))]
|
||||
async fn handle_trash_event(
|
||||
persistence: Arc<FolderPersistence>,
|
||||
block_manager: Arc<BlockManager>,
|
||||
data_processors: ViewDataProcessorMap,
|
||||
trash_can: Arc<TrashController>,
|
||||
event: TrashEvent,
|
||||
) {
|
||||
@ -347,28 +374,54 @@ async fn handle_trash_event(
|
||||
let _ = ret.send(result).await;
|
||||
}
|
||||
TrashEvent::Delete(identifiers, ret) => {
|
||||
let result = persistence
|
||||
.begin_transaction(|transaction| {
|
||||
let mut notify_ids = HashSet::new();
|
||||
for identifier in identifiers.items {
|
||||
let view = transaction.read_view(&identifier.id)?;
|
||||
let _ = transaction.delete_view(&identifier.id)?;
|
||||
let _ = block_manager.delete_block(&identifier.id)?;
|
||||
notify_ids.insert(view.belong_to_id);
|
||||
}
|
||||
let result = || async {
|
||||
let views = persistence
|
||||
.begin_transaction(|transaction| {
|
||||
let mut notify_ids = HashSet::new();
|
||||
let mut views = vec![];
|
||||
for identifier in identifiers.items {
|
||||
let view = transaction.read_view(&identifier.id)?;
|
||||
let _ = transaction.delete_view(&view.id)?;
|
||||
notify_ids.insert(view.belong_to_id.clone());
|
||||
views.push(view);
|
||||
}
|
||||
for notify_id in notify_ids {
|
||||
let _ = notify_views_changed(¬ify_id, trash_can.clone(), &transaction)?;
|
||||
}
|
||||
Ok(views)
|
||||
})
|
||||
.await?;
|
||||
|
||||
for notify_id in notify_ids {
|
||||
let _ = notify_views_changed(¬ify_id, trash_can.clone(), &transaction)?;
|
||||
for view in views {
|
||||
match get_data_processor(data_processors.clone(), &view.data_type) {
|
||||
Ok(processor) => {
|
||||
let _ = processor.close_container(&view.id).await?;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("{}", e)
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await;
|
||||
let _ = ret.send(result).await;
|
||||
}
|
||||
Ok(())
|
||||
};
|
||||
let _ = ret.send(result().await).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_data_processor(
|
||||
data_processors: ViewDataProcessorMap,
|
||||
data_type: &ViewDataType,
|
||||
) -> FlowyResult<Arc<dyn ViewDataProcessor + Send + Sync>> {
|
||||
match data_processors.get(data_type) {
|
||||
None => Err(FlowyError::internal().context(format!(
|
||||
"Get data processor failed. Unknown view data type: {:?}",
|
||||
data_type
|
||||
))),
|
||||
Some(processor) => Ok(processor.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
fn read_local_views_with_transaction<'a>(
|
||||
identifiers: RepeatedTrashId,
|
||||
transaction: &'a (dyn FolderPersistenceTransaction + 'a),
|
||||
|
Reference in New Issue
Block a user