chore: node transform path test

This commit is contained in:
appflowy
2022-09-13 20:23:56 +08:00
parent 9ff0975f7c
commit fa2cfd7c20
26 changed files with 515 additions and 227 deletions

View File

@ -4,7 +4,7 @@ use crate::{
};
use flowy_error::FlowyError;
use flowy_sync::entities::text_block::{CreateTextBlockParams, DocumentPB, ResetTextBlockParams, TextBlockIdPB};
use flowy_text_block::BlockCloudService;
use flowy_text_block::TextEditorCloudService;
use http_flowy::response::FlowyResponse;
use lazy_static::lazy_static;
use lib_infra::future::FutureResult;
@ -20,20 +20,20 @@ impl BlockHttpCloudService {
}
}
impl BlockCloudService for BlockHttpCloudService {
fn create_block(&self, token: &str, params: CreateTextBlockParams) -> FutureResult<(), FlowyError> {
impl TextEditorCloudService for BlockHttpCloudService {
fn create_text_block(&self, token: &str, params: CreateTextBlockParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.doc_url();
FutureResult::new(async move { create_document_request(&token, params, &url).await })
}
fn read_block(&self, token: &str, params: TextBlockIdPB) -> FutureResult<Option<DocumentPB>, FlowyError> {
fn read_text_block(&self, token: &str, params: TextBlockIdPB) -> FutureResult<Option<DocumentPB>, FlowyError> {
let token = token.to_owned();
let url = self.config.doc_url();
FutureResult::new(async move { read_document_request(&token, params, &url).await })
}
fn update_block(&self, token: &str, params: ResetTextBlockParams) -> FutureResult<(), FlowyError> {
fn update_text_block(&self, token: &str, params: ResetTextBlockParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.doc_url();
FutureResult::new(async move { reset_doc_request(&token, params, &url).await })

View File

@ -261,7 +261,7 @@ use flowy_folder::entities::{
use flowy_folder_data_model::revision::{
gen_app_id, gen_workspace_id, AppRevision, TrashRevision, ViewRevision, WorkspaceRevision,
};
use flowy_text_block::BlockCloudService;
use flowy_text_block::TextEditorCloudService;
use flowy_user::entities::{
SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserProfileParams, UserProfilePB,
};
@ -414,12 +414,12 @@ impl UserCloudService for LocalServer {
}
}
impl BlockCloudService for LocalServer {
fn create_block(&self, _token: &str, _params: CreateTextBlockParams) -> FutureResult<(), FlowyError> {
impl TextEditorCloudService for LocalServer {
fn create_text_block(&self, _token: &str, _params: CreateTextBlockParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn read_block(&self, _token: &str, params: TextBlockIdPB) -> FutureResult<Option<DocumentPB>, FlowyError> {
fn read_text_block(&self, _token: &str, params: TextBlockIdPB) -> FutureResult<Option<DocumentPB>, FlowyError> {
let doc = DocumentPB {
block_id: params.value,
text: initial_quill_delta_string(),
@ -429,7 +429,7 @@ impl BlockCloudService for LocalServer {
FutureResult::new(async { Ok(Some(doc)) })
}
fn update_block(&self, _token: &str, _params: ResetTextBlockParams) -> FutureResult<(), FlowyError> {
fn update_text_block(&self, _token: &str, _params: ResetTextBlockParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
}

View File

@ -18,7 +18,7 @@ use flowy_revision::{RevisionWebSocket, WSStateReceiver};
use flowy_sync::client_document::default::initial_quill_delta_string;
use flowy_sync::entities::revision::{RepeatedRevision, Revision};
use flowy_sync::entities::ws_data::ClientRevisionWSData;
use flowy_text_block::TextBlockManager;
use flowy_text_block::TextEditorManager;
use flowy_user::services::UserSession;
use futures_core::future::BoxFuture;
use lib_infra::future::{BoxResultFuture, FutureResult};
@ -34,7 +34,7 @@ impl FolderDepsResolver {
user_session: Arc<UserSession>,
server_config: &ClientServerConfiguration,
ws_conn: &Arc<FlowyWebSocketConnect>,
text_block_manager: &Arc<TextBlockManager>,
text_block_manager: &Arc<TextEditorManager>,
grid_manager: &Arc<GridManager>,
) -> Arc<FolderManager> {
let user: Arc<dyn WorkspaceUser> = Arc::new(WorkspaceUserImpl(user_session.clone()));
@ -63,7 +63,7 @@ impl FolderDepsResolver {
}
fn make_view_data_processor(
text_block_manager: Arc<TextBlockManager>,
text_block_manager: Arc<TextEditorManager>,
grid_manager: Arc<GridManager>,
) -> ViewDataProcessorMap {
let mut map: HashMap<ViewDataTypePB, Arc<dyn ViewDataProcessor + Send + Sync>> = HashMap::new();
@ -135,7 +135,7 @@ impl WSMessageReceiver for FolderWSMessageReceiverImpl {
}
}
struct TextBlockViewDataProcessor(Arc<TextBlockManager>);
struct TextBlockViewDataProcessor(Arc<TextEditorManager>);
impl ViewDataProcessor for TextBlockViewDataProcessor {
fn initialize(&self) -> FutureResult<(), FlowyError> {
let manager = self.0.clone();
@ -147,7 +147,7 @@ impl ViewDataProcessor for TextBlockViewDataProcessor {
let view_id = view_id.to_string();
let manager = self.0.clone();
FutureResult::new(async move {
let _ = manager.create_block(view_id, repeated_revision).await?;
let _ = manager.create_text_block(view_id, repeated_revision).await?;
Ok(())
})
}
@ -156,7 +156,7 @@ impl ViewDataProcessor for TextBlockViewDataProcessor {
let manager = self.0.clone();
let view_id = view_id.to_string();
FutureResult::new(async move {
let _ = manager.delete_block(view_id)?;
let _ = manager.close_text_editor(view_id)?;
Ok(())
})
}
@ -165,7 +165,7 @@ impl ViewDataProcessor for TextBlockViewDataProcessor {
let manager = self.0.clone();
let view_id = view_id.to_string();
FutureResult::new(async move {
let _ = manager.close_block(view_id)?;
let _ = manager.close_text_editor(view_id)?;
Ok(())
})
}
@ -174,7 +174,7 @@ impl ViewDataProcessor for TextBlockViewDataProcessor {
let view_id = view_id.to_string();
let manager = self.0.clone();
FutureResult::new(async move {
let editor = manager.open_block(view_id).await?;
let editor = manager.open_text_editor(view_id).await?;
let delta_bytes = Bytes::from(editor.delta_str().await?);
Ok(delta_bytes)
})
@ -195,7 +195,7 @@ impl ViewDataProcessor for TextBlockViewDataProcessor {
let delta_data = Bytes::from(view_data);
let repeated_revision: RepeatedRevision =
Revision::initial_revision(&user_id, &view_id, delta_data.clone()).into();
let _ = manager.create_block(view_id, repeated_revision).await?;
let _ = manager.create_text_block(view_id, repeated_revision).await?;
Ok(delta_data)
})
}

View File

@ -8,7 +8,7 @@ use flowy_revision::{RevisionWebSocket, WSStateReceiver};
use flowy_sync::entities::ws_data::ClientRevisionWSData;
use flowy_text_block::{
errors::{internal_error, FlowyError},
BlockCloudService, TextBlockManager, TextBlockUser,
TextEditorCloudService, TextEditorManager, TextEditorUser,
};
use flowy_user::services::UserSession;
use futures_core::future::BoxFuture;
@ -23,15 +23,15 @@ impl TextBlockDepsResolver {
ws_conn: Arc<FlowyWebSocketConnect>,
user_session: Arc<UserSession>,
server_config: &ClientServerConfiguration,
) -> Arc<TextBlockManager> {
) -> Arc<TextEditorManager> {
let user = Arc::new(BlockUserImpl(user_session));
let rev_web_socket = Arc::new(TextBlockWebSocket(ws_conn.clone()));
let cloud_service: Arc<dyn BlockCloudService> = match local_server {
let cloud_service: Arc<dyn TextEditorCloudService> = match local_server {
None => Arc::new(BlockHttpCloudService::new(server_config.clone())),
Some(local_server) => local_server,
};
let manager = Arc::new(TextBlockManager::new(cloud_service, user, rev_web_socket));
let manager = Arc::new(TextEditorManager::new(cloud_service, user, rev_web_socket));
let receiver = Arc::new(DocumentWSMessageReceiverImpl(manager.clone()));
ws_conn.add_ws_message_receiver(receiver).unwrap();
@ -40,7 +40,7 @@ impl TextBlockDepsResolver {
}
struct BlockUserImpl(Arc<UserSession>);
impl TextBlockUser for BlockUserImpl {
impl TextEditorUser for BlockUserImpl {
fn user_dir(&self) -> Result<String, FlowyError> {
let dir = self.0.user_dir().map_err(|e| FlowyError::unauthorized().context(e))?;
@ -90,7 +90,7 @@ impl RevisionWebSocket for TextBlockWebSocket {
}
}
struct DocumentWSMessageReceiverImpl(Arc<TextBlockManager>);
struct DocumentWSMessageReceiverImpl(Arc<TextEditorManager>);
impl WSMessageReceiver for DocumentWSMessageReceiverImpl {
fn source(&self) -> WSChannel {
WSChannel::Document

View File

@ -11,7 +11,7 @@ use flowy_net::{
local_server::LocalServer,
ws::connection::{listen_on_websocket, FlowyWebSocketConnect},
};
use flowy_text_block::TextBlockManager;
use flowy_text_block::TextEditorManager;
use flowy_user::services::{notifier::UserStatus, UserSession, UserSessionConfig};
use lib_dispatch::prelude::*;
use lib_dispatch::runtime::tokio_default_runtime;
@ -89,7 +89,7 @@ pub struct FlowySDK {
#[allow(dead_code)]
config: FlowySDKConfig,
pub user_session: Arc<UserSession>,
pub text_block_manager: Arc<TextBlockManager>,
pub text_block_manager: Arc<TextEditorManager>,
pub folder_manager: Arc<FolderManager>,
pub grid_manager: Arc<GridManager>,
pub dispatcher: Arc<EventDispatcher>,

View File

@ -1,7 +1,7 @@
use flowy_folder::manager::FolderManager;
use flowy_grid::manager::GridManager;
use flowy_net::ws::connection::FlowyWebSocketConnect;
use flowy_text_block::TextBlockManager;
use flowy_text_block::TextEditorManager;
use flowy_user::services::UserSession;
use lib_dispatch::prelude::Module;
use std::sync::Arc;
@ -11,7 +11,7 @@ pub fn mk_modules(
folder_manager: &Arc<FolderManager>,
grid_manager: &Arc<GridManager>,
user_session: &Arc<UserSession>,
text_block_manager: &Arc<TextBlockManager>,
text_block_manager: &Arc<TextEditorManager>,
) -> Vec<Module> {
let user_module = mk_user_module(user_session.clone());
let folder_module = mk_folder_module(folder_manager.clone());
@ -43,6 +43,6 @@ fn mk_grid_module(grid_manager: Arc<GridManager>) -> Module {
flowy_grid::event_map::create(grid_manager)
}
fn mk_text_block_module(text_block_manager: Arc<TextBlockManager>) -> Module {
fn mk_text_block_module(text_block_manager: Arc<TextEditorManager>) -> Module {
flowy_text_block::event_map::create(text_block_manager)
}

View File

@ -2,7 +2,7 @@ use crate::web_socket::EditorCommandSender;
use crate::{
errors::FlowyError,
queue::{EditBlockQueue, EditorCommand},
TextBlockUser,
TextEditorUser,
};
use bytes::Bytes;
use flowy_error::{internal_error, FlowyResult};
@ -24,7 +24,6 @@ use tokio::sync::{mpsc, oneshot};
pub struct TextBlockEditor {
pub doc_id: String,
#[allow(dead_code)]
rev_manager: Arc<RevisionManager>,
#[cfg(feature = "sync")]
ws_manager: Arc<flowy_revision::RevisionWebSocketManager>,
@ -35,7 +34,7 @@ impl TextBlockEditor {
#[allow(unused_variables)]
pub(crate) async fn new(
doc_id: &str,
user: Arc<dyn TextBlockUser>,
user: Arc<dyn TextEditorUser>,
mut rev_manager: RevisionManager,
rev_web_socket: Arc<dyn RevisionWebSocket>,
cloud_service: Arc<dyn RevisionCloudService>,
@ -194,7 +193,7 @@ impl std::ops::Drop for TextBlockEditor {
// The edit queue will exit after the EditorCommandSender was dropped.
fn spawn_edit_queue(
user: Arc<dyn TextBlockUser>,
user: Arc<dyn TextEditorUser>,
rev_manager: Arc<RevisionManager>,
delta: TextDelta,
) -> EditorCommandSender {

View File

@ -29,6 +29,52 @@ impl std::convert::From<i32> for ExportType {
}
}
#[derive(Default, ProtoBuf)]
pub struct EditPayloadPB {
#[pb(index = 1)]
pub text_block_id: String,
// Encode in JSON format
#[pb(index = 2)]
pub operations: String,
// Encode in JSON format
#[pb(index = 3)]
pub delta: String,
}
#[derive(Default)]
pub struct EditParams {
pub text_block_id: String,
// Encode in JSON format
pub operations: String,
// Encode in JSON format
pub delta: String,
}
impl TryInto<EditParams> for EditPayloadPB {
type Error = ErrorCode;
fn try_into(self) -> Result<EditParams, Self::Error> {
Ok(EditParams {
text_block_id: self.text_block_id,
operations: self.operations,
delta: self.delta,
})
}
}
#[derive(Default, ProtoBuf)]
pub struct TextBlockPB {
#[pb(index = 1)]
pub text_block_id: String,
/// Encode in JSON format
#[pb(index = 2)]
pub snapshot: String,
}
#[derive(Default, ProtoBuf)]
pub struct ExportPayloadPB {
#[pb(index = 1)]

View File

@ -1,39 +1,40 @@
use crate::entities::{ExportDataPB, ExportParams, ExportPayloadPB};
use crate::TextBlockManager;
use crate::entities::{EditParams, EditPayloadPB, ExportDataPB, ExportParams, ExportPayloadPB, TextBlockPB};
use crate::TextEditorManager;
use flowy_error::FlowyError;
use flowy_sync::entities::text_block::{TextBlockDeltaPB, TextBlockIdPB};
use lib_dispatch::prelude::{data_result, AppData, Data, DataResult};
use std::convert::TryInto;
use std::sync::Arc;
pub(crate) async fn get_block_data_handler(
pub(crate) async fn get_text_block_handler(
data: Data<TextBlockIdPB>,
manager: AppData<Arc<TextBlockManager>>,
) -> DataResult<TextBlockDeltaPB, FlowyError> {
let block_id: TextBlockIdPB = data.into_inner();
let editor = manager.open_block(&block_id).await?;
manager: AppData<Arc<TextEditorManager>>,
) -> DataResult<TextBlockPB, FlowyError> {
let text_block_id: TextBlockIdPB = data.into_inner();
let editor = manager.open_text_editor(&text_block_id).await?;
let delta_str = editor.delta_str().await?;
data_result(TextBlockDeltaPB {
block_id: block_id.into(),
delta_str,
data_result(TextBlockPB {
text_block_id: text_block_id.into(),
snapshot: delta_str,
})
}
pub(crate) async fn apply_delta_handler(
data: Data<TextBlockDeltaPB>,
manager: AppData<Arc<TextBlockManager>>,
) -> DataResult<TextBlockDeltaPB, FlowyError> {
let block_delta = manager.receive_local_delta(data.into_inner()).await?;
data_result(block_delta)
pub(crate) async fn apply_edit_handler(
data: Data<EditPayloadPB>,
manager: AppData<Arc<TextEditorManager>>,
) -> Result<(), FlowyError> {
let params: EditParams = data.into_inner().try_into()?;
let _ = manager.apply_edit(params).await?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(data, manager), err)]
pub(crate) async fn export_handler(
data: Data<ExportPayloadPB>,
manager: AppData<Arc<TextBlockManager>>,
manager: AppData<Arc<TextEditorManager>>,
) -> DataResult<ExportDataPB, FlowyError> {
let params: ExportParams = data.into_inner().try_into()?;
let editor = manager.open_block(&params.view_id).await?;
let editor = manager.open_text_editor(&params.view_id).await?;
let delta_json = editor.delta_str().await?;
data_result(ExportDataPB {
data: delta_json,

View File

@ -1,16 +1,16 @@
use crate::event_handler::*;
use crate::TextBlockManager;
use crate::TextEditorManager;
use flowy_derive::{Flowy_Event, ProtoBuf_Enum};
use lib_dispatch::prelude::Module;
use std::sync::Arc;
use strum_macros::Display;
pub fn create(block_manager: Arc<TextBlockManager>) -> Module {
pub fn create(block_manager: Arc<TextEditorManager>) -> Module {
let mut module = Module::new().name(env!("CARGO_PKG_NAME")).data(block_manager);
module = module
.event(TextBlockEvent::GetBlockData, get_block_data_handler)
.event(TextBlockEvent::ApplyDelta, apply_delta_handler)
.event(TextBlockEvent::GetTextBlock, get_text_block_handler)
.event(TextBlockEvent::ApplyEdit, apply_edit_handler)
.event(TextBlockEvent::ExportDocument, export_handler);
module
@ -19,11 +19,11 @@ pub fn create(block_manager: Arc<TextBlockManager>) -> Module {
#[derive(Clone, Copy, PartialEq, Eq, Debug, Display, Hash, ProtoBuf_Enum, Flowy_Event)]
#[event_err = "FlowyError"]
pub enum TextBlockEvent {
#[event(input = "TextBlockIdPB", output = "TextBlockDeltaPB")]
GetBlockData = 0,
#[event(input = "TextBlockIdPB", output = "TextBlockPB")]
GetTextBlock = 0,
#[event(input = "TextBlockDeltaPB", output = "TextBlockDeltaPB")]
ApplyDelta = 1,
#[event(input = "EditPayloadPB")]
ApplyEdit = 1,
#[event(input = "ExportPayloadPB", output = "ExportDataPB")]
ExportDocument = 2,

View File

@ -18,10 +18,10 @@ use crate::errors::FlowyError;
use flowy_sync::entities::text_block::{CreateTextBlockParams, DocumentPB, ResetTextBlockParams, TextBlockIdPB};
use lib_infra::future::FutureResult;
pub trait BlockCloudService: Send + Sync {
fn create_block(&self, token: &str, params: CreateTextBlockParams) -> FutureResult<(), FlowyError>;
pub trait TextEditorCloudService: Send + Sync {
fn create_text_block(&self, token: &str, params: CreateTextBlockParams) -> FutureResult<(), FlowyError>;
fn read_block(&self, token: &str, params: TextBlockIdPB) -> FutureResult<Option<DocumentPB>, FlowyError>;
fn read_text_block(&self, token: &str, params: TextBlockIdPB) -> FutureResult<Option<DocumentPB>, FlowyError>;
fn update_block(&self, token: &str, params: ResetTextBlockParams) -> FutureResult<(), FlowyError>;
fn update_text_block(&self, token: &str, params: ResetTextBlockParams) -> FutureResult<(), FlowyError>;
}

View File

@ -1,5 +1,6 @@
use crate::entities::{EditParams, EditPayloadPB};
use crate::queue::TextBlockRevisionCompactor;
use crate::{editor::TextBlockEditor, errors::FlowyError, BlockCloudService};
use crate::{editor::TextBlockEditor, errors::FlowyError, TextEditorCloudService};
use bytes::Bytes;
use dashmap::DashMap;
use flowy_database::ConnectionPool;
@ -16,30 +17,30 @@ use flowy_sync::entities::{
use lib_infra::future::FutureResult;
use std::{convert::TryInto, sync::Arc};
pub trait TextBlockUser: Send + Sync {
pub trait TextEditorUser: Send + Sync {
fn user_dir(&self) -> Result<String, FlowyError>;
fn user_id(&self) -> Result<String, FlowyError>;
fn token(&self) -> Result<String, FlowyError>;
fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError>;
}
pub struct TextBlockManager {
cloud_service: Arc<dyn BlockCloudService>,
pub struct TextEditorManager {
cloud_service: Arc<dyn TextEditorCloudService>,
rev_web_socket: Arc<dyn RevisionWebSocket>,
editor_map: Arc<TextBlockEditorMap>,
user: Arc<dyn TextBlockUser>,
editor_map: Arc<TextEditorMap>,
user: Arc<dyn TextEditorUser>,
}
impl TextBlockManager {
impl TextEditorManager {
pub fn new(
cloud_service: Arc<dyn BlockCloudService>,
text_block_user: Arc<dyn TextBlockUser>,
cloud_service: Arc<dyn TextEditorCloudService>,
text_block_user: Arc<dyn TextEditorUser>,
rev_web_socket: Arc<dyn RevisionWebSocket>,
) -> Self {
Self {
cloud_service,
rev_web_socket,
editor_map: Arc::new(TextBlockEditorMap::new()),
editor_map: Arc::new(TextEditorMap::new()),
user: text_block_user,
}
}
@ -50,45 +51,47 @@ impl TextBlockManager {
Ok(())
}
#[tracing::instrument(level = "trace", skip(self, block_id), fields(block_id), err)]
pub async fn open_block<T: AsRef<str>>(&self, block_id: T) -> Result<Arc<TextBlockEditor>, FlowyError> {
let block_id = block_id.as_ref();
tracing::Span::current().record("block_id", &block_id);
self.get_block_editor(block_id).await
#[tracing::instrument(level = "trace", skip(self, editor_id), fields(editor_id), err)]
pub async fn open_text_editor<T: AsRef<str>>(&self, editor_id: T) -> Result<Arc<TextBlockEditor>, FlowyError> {
let editor_id = editor_id.as_ref();
tracing::Span::current().record("editor_id", &editor_id);
self.get_text_editor(editor_id).await
}
#[tracing::instrument(level = "trace", skip(self, block_id), fields(block_id), err)]
pub fn close_block<T: AsRef<str>>(&self, block_id: T) -> Result<(), FlowyError> {
let block_id = block_id.as_ref();
tracing::Span::current().record("block_id", &block_id);
self.editor_map.remove(block_id);
#[tracing::instrument(level = "trace", skip(self, editor_id), fields(editor_id), err)]
pub fn close_text_editor<T: AsRef<str>>(&self, editor_id: T) -> Result<(), FlowyError> {
let editor_id = editor_id.as_ref();
tracing::Span::current().record("editor_id", &editor_id);
self.editor_map.remove(editor_id);
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, doc_id), fields(doc_id), err)]
pub fn delete_block<T: AsRef<str>>(&self, doc_id: T) -> Result<(), FlowyError> {
let doc_id = doc_id.as_ref();
tracing::Span::current().record("doc_id", &doc_id);
self.editor_map.remove(doc_id);
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, delta), fields(doc_id = %delta.block_id), err)]
#[tracing::instrument(level = "debug", skip(self, delta), err)]
pub async fn receive_local_delta(&self, delta: TextBlockDeltaPB) -> Result<TextBlockDeltaPB, FlowyError> {
let editor = self.get_block_editor(&delta.block_id).await?;
let editor = self.get_text_editor(&delta.text_block_id).await?;
let _ = editor.compose_local_delta(Bytes::from(delta.delta_str)).await?;
let document_json = editor.delta_str().await?;
let delta_str = editor.delta_str().await?;
Ok(TextBlockDeltaPB {
block_id: delta.block_id.clone(),
delta_str: document_json,
text_block_id: delta.text_block_id.clone(),
delta_str,
})
}
pub async fn create_block<T: AsRef<str>>(&self, doc_id: T, revisions: RepeatedRevision) -> FlowyResult<()> {
let doc_id = doc_id.as_ref().to_owned();
pub async fn apply_edit(&self, params: EditParams) -> FlowyResult<()> {
let editor = self.get_text_editor(&params.text_block_id).await?;
let _ = editor.compose_local_delta(Bytes::from(params.delta)).await?;
Ok(())
}
pub async fn create_text_block<T: AsRef<str>>(
&self,
text_block_id: T,
revisions: RepeatedRevision,
) -> FlowyResult<()> {
let doc_id = text_block_id.as_ref().to_owned();
let db_pool = self.user.db_pool()?;
// Maybe we could save the block to disk without creating the RevisionManager
let rev_manager = self.make_rev_manager(&doc_id, db_pool)?;
let rev_manager = self.make_text_block_rev_manager(&doc_id, db_pool)?;
let _ = rev_manager.reset_object(revisions).await?;
Ok(())
}
@ -110,26 +113,26 @@ impl TextBlockManager {
}
}
impl TextBlockManager {
async fn get_block_editor(&self, block_id: &str) -> FlowyResult<Arc<TextBlockEditor>> {
impl TextEditorManager {
async fn get_text_editor(&self, block_id: &str) -> FlowyResult<Arc<TextBlockEditor>> {
match self.editor_map.get(block_id) {
None => {
let db_pool = self.user.db_pool()?;
self.make_text_block_editor(block_id, db_pool).await
self.make_text_editor(block_id, db_pool).await
}
Some(editor) => Ok(editor),
}
}
#[tracing::instrument(level = "trace", skip(self, pool), err)]
async fn make_text_block_editor(
async fn make_text_editor(
&self,
block_id: &str,
pool: Arc<ConnectionPool>,
) -> Result<Arc<TextBlockEditor>, FlowyError> {
let user = self.user.clone();
let token = self.user.token()?;
let rev_manager = self.make_rev_manager(block_id, pool.clone())?;
let rev_manager = self.make_text_block_rev_manager(block_id, pool.clone())?;
let cloud_service = Arc::new(TextBlockRevisionCloudService {
token,
server: self.cloud_service.clone(),
@ -140,7 +143,11 @@ impl TextBlockManager {
Ok(doc_editor)
}
fn make_rev_manager(&self, doc_id: &str, pool: Arc<ConnectionPool>) -> Result<RevisionManager, FlowyError> {
fn make_text_block_rev_manager(
&self,
doc_id: &str,
pool: Arc<ConnectionPool>,
) -> Result<RevisionManager, FlowyError> {
let user_id = self.user.user_id()?;
let disk_cache = SQLiteTextBlockRevisionPersistence::new(&user_id, pool.clone());
let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache);
@ -161,7 +168,7 @@ impl TextBlockManager {
struct TextBlockRevisionCloudService {
token: String,
server: Arc<dyn BlockCloudService>,
server: Arc<dyn TextEditorCloudService>,
}
impl RevisionCloudService for TextBlockRevisionCloudService {
@ -173,7 +180,7 @@ impl RevisionCloudService for TextBlockRevisionCloudService {
let user_id = user_id.to_string();
FutureResult::new(async move {
match server.read_block(&token, params).await? {
match server.read_text_block(&token, params).await? {
None => Err(FlowyError::record_not_found().context("Remote doesn't have this document")),
Some(doc) => {
let delta_data = Bytes::from(doc.text.clone());
@ -193,36 +200,36 @@ impl RevisionCloudService for TextBlockRevisionCloudService {
}
}
pub struct TextBlockEditorMap {
pub struct TextEditorMap {
inner: DashMap<String, Arc<TextBlockEditor>>,
}
impl TextBlockEditorMap {
impl TextEditorMap {
fn new() -> Self {
Self { inner: DashMap::new() }
}
pub(crate) fn insert(&self, block_id: &str, doc: &Arc<TextBlockEditor>) {
if self.inner.contains_key(block_id) {
log::warn!("Doc:{} already exists in cache", block_id);
pub(crate) fn insert(&self, editor_id: &str, doc: &Arc<TextBlockEditor>) {
if self.inner.contains_key(editor_id) {
log::warn!("Doc:{} already exists in cache", editor_id);
}
self.inner.insert(block_id.to_string(), doc.clone());
self.inner.insert(editor_id.to_string(), doc.clone());
}
pub(crate) fn get(&self, block_id: &str) -> Option<Arc<TextBlockEditor>> {
Some(self.inner.get(block_id)?.clone())
pub(crate) fn get(&self, editor_id: &str) -> Option<Arc<TextBlockEditor>> {
Some(self.inner.get(editor_id)?.clone())
}
pub(crate) fn remove(&self, block_id: &str) {
if let Some(editor) = self.get(block_id) {
pub(crate) fn remove(&self, editor_id: &str) {
if let Some(editor) = self.get(editor_id) {
editor.stop()
}
self.inner.remove(block_id);
self.inner.remove(editor_id);
}
}
#[tracing::instrument(level = "trace", skip(web_socket, handlers))]
fn listen_ws_state_changed(web_socket: Arc<dyn RevisionWebSocket>, handlers: Arc<TextBlockEditorMap>) {
fn listen_ws_state_changed(web_socket: Arc<dyn RevisionWebSocket>, handlers: Arc<TextEditorMap>) {
tokio::spawn(async move {
let mut notify = web_socket.subscribe_state_changed().await;
while let Ok(state) = notify.recv().await {

View File

@ -1,5 +1,5 @@
use crate::web_socket::EditorCommandReceiver;
use crate::TextBlockUser;
use crate::TextEditorUser;
use async_stream::stream;
use bytes::Bytes;
use flowy_error::{FlowyError, FlowyResult};
@ -23,14 +23,14 @@ use tokio::sync::{oneshot, RwLock};
// serial.
pub(crate) struct EditBlockQueue {
document: Arc<RwLock<ClientDocument>>,
user: Arc<dyn TextBlockUser>,
user: Arc<dyn TextEditorUser>,
rev_manager: Arc<RevisionManager>,
receiver: Option<EditorCommandReceiver>,
}
impl EditBlockQueue {
pub(crate) fn new(
user: Arc<dyn TextBlockUser>,
user: Arc<dyn TextEditorUser>,
rev_manager: Arc<RevisionManager>,
delta: TextDelta,
receiver: EditorCommandReceiver,

View File

@ -27,7 +27,7 @@ impl TextBlockEditorTest {
let sdk = FlowySDKTest::default();
let _ = sdk.init_user().await;
let test = ViewTest::new_text_block_view(&sdk).await;
let editor = sdk.text_block_manager.open_block(&test.view.id).await.unwrap();
let editor = sdk.text_block_manager.open_text_editor(&test.view.id).await.unwrap();
Self { sdk, editor }
}