refactor: rename structs

This commit is contained in:
appflowy
2022-02-26 11:03:42 +08:00
parent 6078e46d3d
commit 01985848f9
14 changed files with 72 additions and 69 deletions

View File

@ -1,8 +1,8 @@
use crate::queue::DocumentRevisionCompact;
use crate::web_socket::{make_document_ws_manager, EditorCommandSender};
use crate::queue::BlockRevisionCompact;
use crate::web_socket::{make_block_ws_manager, EditorCommandSender};
use crate::{
errors::FlowyError,
queue::{EditorCommand, EditorCommandQueue},
queue::{EditBlockQueue, EditorCommand},
BlockUser,
};
use bytes::Bytes;
@ -41,7 +41,7 @@ impl ClientBlockEditor {
cloud_service: Arc<dyn RevisionCloudService>,
) -> FlowyResult<Arc<Self>> {
let document_info = rev_manager
.load::<BlockInfoBuilder, DocumentRevisionCompact>(cloud_service)
.load::<BlockInfoBuilder, BlockRevisionCompact>(cloud_service)
.await?;
let delta = document_info.delta()?;
let rev_manager = Arc::new(rev_manager);
@ -49,7 +49,7 @@ impl ClientBlockEditor {
let user_id = user.user_id()?;
let edit_cmd_tx = spawn_edit_queue(user, rev_manager.clone(), delta);
let ws_manager = make_document_ws_manager(
let ws_manager = make_block_ws_manager(
doc_id.clone(),
user_id.clone(),
edit_cmd_tx.clone(),
@ -176,7 +176,7 @@ impl ClientBlockEditor {
impl std::ops::Drop for ClientBlockEditor {
fn drop(&mut self) {
tracing::trace!("{} ClientDocumentEditor was dropped", self.doc_id)
tracing::trace!("{} ClientBlockEditor was dropped", self.doc_id)
}
}
@ -187,8 +187,8 @@ fn spawn_edit_queue(
delta: RichTextDelta,
) -> EditorCommandSender {
let (sender, receiver) = mpsc::channel(1000);
let actor = EditorCommandQueue::new(user, rev_manager, delta, receiver);
tokio::spawn(actor.run());
let edit_queue = EditBlockQueue::new(user, rev_manager, delta, receiver);
tokio::spawn(edit_queue.run());
sender
}

View File

@ -1,4 +1,4 @@
pub mod editor;
pub mod block_editor;
pub mod manager;
mod queue;
mod web_socket;

View File

@ -1,4 +1,4 @@
use crate::{editor::ClientBlockEditor, errors::FlowyError, BlockCloudService};
use crate::{block_editor::ClientBlockEditor, errors::FlowyError, BlockCloudService};
use bytes::Bytes;
use dashmap::DashMap;
use flowy_collaboration::entities::{
@ -22,27 +22,27 @@ pub trait BlockUser: Send + Sync {
pub struct BlockManager {
cloud_service: Arc<dyn BlockCloudService>,
rev_web_socket: Arc<dyn RevisionWebSocket>,
block_handlers: Arc<BlockEditorHandlers>,
document_user: Arc<dyn BlockUser>,
block_editors: Arc<BlockEditors>,
block_user: Arc<dyn BlockUser>,
}
impl BlockManager {
pub fn new(
cloud_service: Arc<dyn BlockCloudService>,
document_user: Arc<dyn BlockUser>,
block_user: Arc<dyn BlockUser>,
rev_web_socket: Arc<dyn RevisionWebSocket>,
) -> Self {
let block_handlers = Arc::new(BlockEditorHandlers::new());
let block_handlers = Arc::new(BlockEditors::new());
Self {
cloud_service,
rev_web_socket,
block_handlers,
document_user,
block_editors: block_handlers,
block_user,
}
}
pub fn init(&self) -> FlowyResult<()> {
listen_ws_state_changed(self.rev_web_socket.clone(), self.block_handlers.clone());
listen_ws_state_changed(self.rev_web_socket.clone(), self.block_editors.clone());
Ok(())
}
@ -58,7 +58,7 @@ impl BlockManager {
pub fn close_block<T: AsRef<str>>(&self, block_id: T) -> Result<(), FlowyError> {
let block_id = block_id.as_ref();
tracing::Span::current().record("block_id", &block_id);
self.block_handlers.remove(block_id);
self.block_editors.remove(block_id);
Ok(())
}
@ -66,7 +66,7 @@ impl BlockManager {
pub fn delete<T: AsRef<str>>(&self, doc_id: T) -> Result<(), FlowyError> {
let doc_id = doc_id.as_ref();
tracing::Span::current().record("doc_id", &doc_id);
self.block_handlers.remove(doc_id);
self.block_editors.remove(doc_id);
Ok(())
}
@ -83,7 +83,7 @@ impl BlockManager {
pub async fn reset_with_revisions<T: AsRef<str>>(&self, doc_id: T, revisions: RepeatedRevision) -> FlowyResult<()> {
let doc_id = doc_id.as_ref().to_owned();
let db_pool = self.document_user.db_pool()?;
let db_pool = self.block_user.db_pool()?;
let rev_manager = self.make_rev_manager(&doc_id, db_pool)?;
let _ = rev_manager.reset_object(revisions).await?;
Ok(())
@ -92,7 +92,7 @@ impl BlockManager {
pub async fn receive_ws_data(&self, data: Bytes) {
let result: Result<ServerRevisionWSData, protobuf::ProtobufError> = data.try_into();
match result {
Ok(data) => match self.block_handlers.get(&data.object_id) {
Ok(data) => match self.block_editors.get(&data.object_id) {
None => tracing::error!("Can't find any source handler for {:?}-{:?}", data.object_id, data.ty),
Some(block_editor) => match block_editor.receive_ws_data(data).await {
Ok(_) => {}
@ -108,9 +108,9 @@ impl BlockManager {
impl BlockManager {
async fn get_block_editor(&self, block_id: &str) -> FlowyResult<Arc<ClientBlockEditor>> {
match self.block_handlers.get(block_id) {
match self.block_editors.get(block_id) {
None => {
let db_pool = self.document_user.db_pool()?;
let db_pool = self.block_user.db_pool()?;
self.make_block_editor(block_id, db_pool).await
}
Some(editor) => Ok(editor),
@ -122,32 +122,32 @@ impl BlockManager {
block_id: &str,
pool: Arc<ConnectionPool>,
) -> Result<Arc<ClientBlockEditor>, FlowyError> {
let user = self.document_user.clone();
let token = self.document_user.token()?;
let user = self.block_user.clone();
let token = self.block_user.token()?;
let rev_manager = self.make_rev_manager(block_id, pool.clone())?;
let cloud_service = Arc::new(DocumentRevisionCloudServiceImpl {
let cloud_service = Arc::new(BlockRevisionCloudService {
token,
server: self.cloud_service.clone(),
});
let doc_editor =
ClientBlockEditor::new(block_id, user, rev_manager, self.rev_web_socket.clone(), cloud_service).await?;
self.block_handlers.insert(block_id, &doc_editor);
self.block_editors.insert(block_id, &doc_editor);
Ok(doc_editor)
}
fn make_rev_manager(&self, doc_id: &str, pool: Arc<ConnectionPool>) -> Result<RevisionManager, FlowyError> {
let user_id = self.document_user.user_id()?;
let user_id = self.block_user.user_id()?;
let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, doc_id, pool));
Ok(RevisionManager::new(&user_id, doc_id, rev_persistence))
}
}
struct DocumentRevisionCloudServiceImpl {
struct BlockRevisionCloudService {
token: String,
server: Arc<dyn BlockCloudService>,
}
impl RevisionCloudService for DocumentRevisionCloudServiceImpl {
impl RevisionCloudService for BlockRevisionCloudService {
#[tracing::instrument(level = "trace", skip(self))]
fn fetch_object(&self, user_id: &str, object_id: &str) -> FutureResult<Vec<Revision>, FlowyError> {
let params: BlockId = object_id.to_string().into();
@ -170,11 +170,11 @@ impl RevisionCloudService for DocumentRevisionCloudServiceImpl {
}
}
pub struct BlockEditorHandlers {
pub struct BlockEditors {
inner: DashMap<String, Arc<ClientBlockEditor>>,
}
impl BlockEditorHandlers {
impl BlockEditors {
fn new() -> Self {
Self { inner: DashMap::new() }
}
@ -207,7 +207,7 @@ impl BlockEditorHandlers {
}
#[tracing::instrument(level = "trace", skip(web_socket, handlers))]
fn listen_ws_state_changed(web_socket: Arc<dyn RevisionWebSocket>, handlers: Arc<BlockEditorHandlers>) {
fn listen_ws_state_changed(web_socket: Arc<dyn RevisionWebSocket>, handlers: Arc<BlockEditors>) {
tokio::spawn(async move {
let mut notify = web_socket.subscribe_state_changed().await;
while let Ok(state) = notify.recv().await {

View File

@ -19,14 +19,14 @@ use tokio::sync::{oneshot, RwLock};
// The EditorCommandQueue executes each command that will alter the document in
// serial.
pub(crate) struct EditorCommandQueue {
pub(crate) struct EditBlockQueue {
document: Arc<RwLock<ClientDocument>>,
user: Arc<dyn BlockUser>,
rev_manager: Arc<RevisionManager>,
receiver: Option<EditorCommandReceiver>,
}
impl EditorCommandQueue {
impl EditBlockQueue {
pub(crate) fn new(
user: Arc<dyn BlockUser>,
rev_manager: Arc<RevisionManager>,
@ -187,17 +187,17 @@ impl EditorCommandQueue {
);
let _ = self
.rev_manager
.add_local_revision::<DocumentRevisionCompact>(&revision)
.add_local_revision::<BlockRevisionCompact>(&revision)
.await?;
Ok(rev_id.into())
}
}
pub(crate) struct DocumentRevisionCompact();
impl RevisionCompact for DocumentRevisionCompact {
pub(crate) struct BlockRevisionCompact();
impl RevisionCompact for BlockRevisionCompact {
fn compact_revisions(user_id: &str, object_id: &str, mut revisions: Vec<Revision>) -> FlowyResult<Revision> {
if revisions.is_empty() {
return Err(FlowyError::internal().context("Can't compact the empty document's revisions"));
return Err(FlowyError::internal().context("Can't compact the empty block's revisions"));
}
if revisions.len() == 1 {

View File

@ -22,7 +22,7 @@ use tokio::sync::{
pub(crate) type EditorCommandSender = Sender<EditorCommand>;
pub(crate) type EditorCommandReceiver = Receiver<EditorCommand>;
pub(crate) async fn make_document_ws_manager(
pub(crate) async fn make_block_ws_manager(
doc_id: String,
user_id: String,
edit_cmd_tx: EditorCommandSender,
@ -41,7 +41,7 @@ pub(crate) async fn make_document_ws_manager(
let ws_data_sink = Arc::new(BlockWSDataSink(ws_data_provider));
let ping_duration = Duration::from_millis(DOCUMENT_SYNC_INTERVAL_IN_MILLIS);
let ws_manager = Arc::new(RevisionWebSocketManager::new(
"Document",
"Block",
&doc_id,
rev_web_socket,
ws_data_sink,

View File

@ -1,5 +1,5 @@
use flowy_collaboration::entities::revision::RevisionState;
use flowy_document::editor::ClientBlockEditor;
use flowy_document::block_editor::ClientBlockEditor;
use flowy_document::DOCUMENT_SYNC_INTERVAL_IN_MILLIS;
use flowy_test::{helper::ViewTest, FlowySDKTest};
use lib_ot::{core::Interval, rich_text::RichTextDelta};