rename some structs

This commit is contained in:
appflowy 2021-12-03 22:40:56 +08:00
parent 6fa7b0632a
commit e7cc11cc44
17 changed files with 163 additions and 172 deletions

View File

@ -1,5 +1,5 @@
use crate::service::{ use crate::service::{
doc::doc::DocBiz, doc::manager::DocBiz,
ws::{WsBizHandlers, WsServer}, ws::{WsBizHandlers, WsServer},
}; };
use actix::Addr; use actix::Addr;

View File

@ -1,5 +1,5 @@
use crate::service::{ use crate::service::{
doc::edit::ServerEditDoc, doc::edit::ServerDocEditor,
ws::{entities::Socket, WsUser}, ws::{entities::Socket, WsUser},
}; };
use actix_web::web::Data; use actix_web::web::Data;
@ -48,13 +48,13 @@ pub enum EditMsg {
pub struct EditDocActor { pub struct EditDocActor {
receiver: Option<mpsc::Receiver<EditMsg>>, receiver: Option<mpsc::Receiver<EditMsg>>,
edit_doc: Arc<ServerEditDoc>, edit_doc: Arc<ServerDocEditor>,
pg_pool: Data<PgPool>, pg_pool: Data<PgPool>,
} }
impl EditDocActor { impl EditDocActor {
pub fn new(receiver: mpsc::Receiver<EditMsg>, doc: Doc, pg_pool: Data<PgPool>) -> Result<Self, ServerError> { pub fn new(receiver: mpsc::Receiver<EditMsg>, doc: Doc, pg_pool: Data<PgPool>) -> Result<Self, ServerError> {
let edit_doc = Arc::new(ServerEditDoc::new(doc)?); let edit_doc = Arc::new(ServerDocEditor::new(doc)?);
Ok(Self { Ok(Self {
receiver: Some(receiver), receiver: Some(receiver),
edit_doc, edit_doc,

View File

@ -24,14 +24,14 @@ use std::{
time::Duration, time::Duration,
}; };
pub struct ServerEditDoc { pub struct ServerDocEditor {
pub doc_id: String, pub doc_id: String,
pub rev_id: AtomicI64, pub rev_id: AtomicI64,
document: Arc<RwLock<Document>>, document: Arc<RwLock<Document>>,
users: DashMap<String, EditUser>, users: DashMap<String, EditUser>,
} }
impl ServerEditDoc { impl ServerDocEditor {
pub fn new(doc: Doc) -> Result<Self, ServerError> { pub fn new(doc: Doc) -> Result<Self, ServerError> {
let delta = Delta::from_bytes(&doc.data).map_err(internal_error)?; let delta = Delta::from_bytes(&doc.data).map_err(internal_error)?;
let document = Arc::new(RwLock::new(Document::from_delta(delta))); let document = Arc::new(RwLock::new(Document::from_delta(delta)));

View File

@ -1,7 +1,5 @@
mod edit_actor; pub(crate) mod edit_actor;
mod edit_doc; mod editor;
mod open_handle;
pub use edit_actor::*; pub use edit_actor::*;
pub use edit_doc::*; pub use editor::*;
pub use open_handle::*;

View File

@ -1,70 +0,0 @@
use crate::service::{
doc::edit::edit_actor::{EditDocActor, EditMsg},
ws::{entities::Socket, WsUser},
};
use actix_web::web::Data;
use backend_service::errors::{internal_error, Result as DocResult, ServerError};
use flowy_document_infra::protobuf::{Doc, Revision};
use sqlx::PgPool;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
pub struct DocHandle {
pub sender: mpsc::Sender<EditMsg>,
}
impl DocHandle {
pub fn new(doc: Doc, pg_pool: Data<PgPool>) -> Result<Self, ServerError> {
let (sender, receiver) = mpsc::channel(100);
let actor = EditDocActor::new(receiver, doc, pg_pool)?;
tokio::task::spawn(actor.run());
Ok(Self { sender })
}
pub async fn handle_new_user(&self, user: Arc<WsUser>, rev_id: i64, socket: Socket) -> Result<(), ServerError> {
let (ret, rx) = oneshot::channel();
let msg = EditMsg::NewDocUser {
user,
socket,
rev_id,
ret,
};
let _ = self.send(msg, rx).await?;
Ok(())
}
pub async fn apply_revision(
&self,
user: Arc<WsUser>,
socket: Socket,
revision: Revision,
) -> Result<(), ServerError> {
let (ret, rx) = oneshot::channel();
let msg = EditMsg::Revision {
user,
socket,
revision,
ret,
};
let _ = self.send(msg, rx).await?;
Ok(())
}
pub async fn document_json(&self) -> DocResult<String> {
let (ret, rx) = oneshot::channel();
let msg = EditMsg::DocumentJson { ret };
self.send(msg, rx).await?
}
pub async fn rev_id(&self) -> DocResult<i64> {
let (ret, rx) = oneshot::channel();
let msg = EditMsg::DocumentRevId { ret };
self.send(msg, rx).await?
}
pub(crate) async fn send<T>(&self, msg: EditMsg, rx: oneshot::Receiver<T>) -> DocResult<T> {
let _ = self.sender.send(msg).await.map_err(internal_error)?;
let result = rx.await?;
Ok(result)
}
}

View File

@ -1,15 +1,15 @@
use crate::service::{ use crate::service::{
doc::{ doc::{
edit::DocHandle, edit::edit_actor::{EditDocActor, EditMsg},
read_doc, read_doc,
ws_actor::{DocWsActor, DocWsMsg}, ws_actor::{DocWsActor, DocWsMsg},
}, },
ws::{WsBizHandler, WsClientData}, ws::{entities::Socket, WsBizHandler, WsClientData, WsUser},
}; };
use actix_web::web::Data; use actix_web::web::Data;
use backend_service::errors::{internal_error, ServerError}; use backend_service::errors::{internal_error, Result as DocResult, ServerError};
use dashmap::DashMap; use dashmap::DashMap;
use flowy_document_infra::protobuf::DocIdentifier; use flowy_document_infra::protobuf::{Doc, DocIdentifier, Revision};
use sqlx::PgPool; use sqlx::PgPool;
use std::sync::Arc; use std::sync::Arc;
use tokio::{ use tokio::{
@ -58,7 +58,7 @@ impl WsBizHandler for DocBiz {
} }
pub struct DocManager { pub struct DocManager {
docs_map: DashMap<String, Arc<DocHandle>>, docs_map: DashMap<String, Arc<DocOpenHandle>>,
} }
impl std::default::Default for DocManager { impl std::default::Default for DocManager {
@ -72,7 +72,7 @@ impl std::default::Default for DocManager {
impl DocManager { impl DocManager {
pub fn new() -> Self { DocManager::default() } pub fn new() -> Self { DocManager::default() }
pub async fn get(&self, doc_id: &str, pg_pool: Data<PgPool>) -> Result<Option<Arc<DocHandle>>, ServerError> { pub async fn get(&self, doc_id: &str, pg_pool: Data<PgPool>) -> Result<Option<Arc<DocOpenHandle>>, ServerError> {
match self.docs_map.get(doc_id) { match self.docs_map.get(doc_id) {
None => { None => {
let params = DocIdentifier { let params = DocIdentifier {
@ -80,7 +80,7 @@ impl DocManager {
..Default::default() ..Default::default()
}; };
let doc = read_doc(pg_pool.get_ref(), params).await?; let doc = read_doc(pg_pool.get_ref(), params).await?;
let handle = spawn_blocking(|| DocHandle::new(doc, pg_pool)) let handle = spawn_blocking(|| DocOpenHandle::new(doc, pg_pool))
.await .await
.map_err(internal_error)?; .map_err(internal_error)?;
let handle = Arc::new(handle?); let handle = Arc::new(handle?);
@ -91,3 +91,63 @@ impl DocManager {
} }
} }
} }
pub struct DocOpenHandle {
pub sender: mpsc::Sender<EditMsg>,
}
impl DocOpenHandle {
pub fn new(doc: Doc, pg_pool: Data<PgPool>) -> Result<Self, ServerError> {
let (sender, receiver) = mpsc::channel(100);
let actor = EditDocActor::new(receiver, doc, pg_pool)?;
tokio::task::spawn(actor.run());
Ok(Self { sender })
}
pub async fn add_user(&self, user: Arc<WsUser>, rev_id: i64, socket: Socket) -> Result<(), ServerError> {
let (ret, rx) = oneshot::channel();
let msg = EditMsg::NewDocUser {
user,
socket,
rev_id,
ret,
};
let _ = self.send(msg, rx).await?;
Ok(())
}
pub async fn apply_revision(
&self,
user: Arc<WsUser>,
socket: Socket,
revision: Revision,
) -> Result<(), ServerError> {
let (ret, rx) = oneshot::channel();
let msg = EditMsg::Revision {
user,
socket,
revision,
ret,
};
let _ = self.send(msg, rx).await?;
Ok(())
}
pub async fn document_json(&self) -> DocResult<String> {
let (ret, rx) = oneshot::channel();
let msg = EditMsg::DocumentJson { ret };
self.send(msg, rx).await?
}
pub async fn rev_id(&self) -> DocResult<i64> {
let (ret, rx) = oneshot::channel();
let msg = EditMsg::DocumentRevId { ret };
self.send(msg, rx).await?
}
pub(crate) async fn send<T>(&self, msg: EditMsg, rx: oneshot::Receiver<T>) -> DocResult<T> {
let _ = self.sender.send(msg).await.map_err(internal_error)?;
let result = rx.await?;
Ok(result)
}
}

View File

@ -3,7 +3,7 @@ pub(crate) use crud::*;
pub use router::*; pub use router::*;
pub mod crud; pub mod crud;
pub mod doc;
mod edit; mod edit;
pub mod manager;
pub mod router; pub mod router;
mod ws_actor; mod ws_actor;

View File

@ -1,5 +1,5 @@
use crate::service::{ use crate::service::{
doc::{doc::DocManager, edit::DocHandle}, doc::manager::{DocManager, DocOpenHandle},
util::{md5, parse_from_bytes}, util::{md5, parse_from_bytes},
ws::{entities::Socket, WsClientData, WsUser}, ws::{entities::Socket, WsClientData, WsUser},
}; };
@ -73,14 +73,14 @@ impl DocWsActor {
match document_data.ty { match document_data.ty {
WsDataType::Acked => Ok(()), WsDataType::Acked => Ok(()),
WsDataType::PushRev => self.handle_push_rev(user, socket, data, pool).await, WsDataType::PushRev => self.apply_pushed_rev(user, socket, data, pool).await,
WsDataType::NewDocUser => self.handle_new_doc_user(user, socket, data, pool).await, WsDataType::NewDocUser => self.add_doc_user(user, socket, data, pool).await,
WsDataType::PullRev => Ok(()), WsDataType::PullRev => Ok(()),
WsDataType::Conflict => Ok(()), WsDataType::Conflict => Ok(()),
} }
} }
async fn handle_new_doc_user( async fn add_doc_user(
&self, &self,
user: Arc<WsUser>, user: Arc<WsUser>,
socket: Socket, socket: Socket,
@ -93,13 +93,13 @@ impl DocWsActor {
}) })
.await .await
.map_err(internal_error)??; .map_err(internal_error)??;
if let Some(handle) = self.doc_handle(&doc_user.doc_id, pool).await { if let Some(handle) = self.find_doc_handle(&doc_user.doc_id, pool).await {
handle.handle_new_user(user, doc_user.rev_id, socket).await?; handle.add_user(user, doc_user.rev_id, socket).await?;
} }
Ok(()) Ok(())
} }
async fn handle_push_rev( async fn apply_pushed_rev(
&self, &self,
user: Arc<WsUser>, user: Arc<WsUser>,
socket: Socket, socket: Socket,
@ -113,13 +113,13 @@ impl DocWsActor {
}) })
.await .await
.map_err(internal_error)??; .map_err(internal_error)??;
if let Some(handle) = self.doc_handle(&revision.doc_id, pool).await { if let Some(handle) = self.find_doc_handle(&revision.doc_id, pool).await {
handle.apply_revision(user, socket, revision).await?; handle.apply_revision(user, socket, revision).await?;
} }
Ok(()) Ok(())
} }
async fn doc_handle(&self, doc_id: &str, pool: Data<PgPool>) -> Option<Arc<DocHandle>> { async fn find_doc_handle(&self, doc_id: &str, pool: Data<PgPool>) -> Option<Arc<DocOpenHandle>> {
match self.doc_manager.get(doc_id, pool).await { match self.doc_manager.get(doc_id, pool).await {
Ok(Some(edit_doc)) => Some(edit_doc), Ok(Some(edit_doc)) => Some(edit_doc),
Ok(None) => { Ok(None) => {

View File

@ -1,5 +1,5 @@
use crate::service::{ use crate::service::{
doc::doc::DocBiz, doc::manager::DocBiz,
user::LoggedUser, user::LoggedUser,
util::parse_from_payload, util::parse_from_payload,
view::{create_view, delete_view, read_view, sql_builder::check_view_ids, update_view}, view::{create_view, delete_view, read_view, sql_builder::check_view_ids, update_view},

View File

@ -1,7 +1,7 @@
use actix_web::web::Data; use actix_web::web::Data;
use backend::service::doc::{crud::update_doc, doc::DocManager}; use backend::service::doc::{crud::update_doc, doc::DocManager};
use backend_service::config::ServerConfig; use backend_service::config::ServerConfig;
use flowy_document::services::doc::ClientEditDoc as ClientEditDocContext; use flowy_document::services::doc::ClientDocEditor as ClientEditDocContext;
use flowy_test::{workspace::ViewTest, FlowyTest}; use flowy_test::{workspace::ViewTest, FlowyTest};
use flowy_user::services::user::UserSession; use flowy_user::services::user::UserSession;
use futures_util::{stream, stream::StreamExt}; use futures_util::{stream, stream::StreamExt};

View File

@ -1,7 +1,7 @@
use crate::{ use crate::{
errors::DocError, errors::DocError,
services::{ services::{
doc::{doc_controller::DocController, ClientEditDoc}, doc::{doc_controller::DocController, ClientDocEditor},
server::construct_doc_server, server::construct_doc_server,
ws::WsDocumentManager, ws::WsDocumentManager,
}, },
@ -44,7 +44,7 @@ impl FlowyDocument {
Ok(()) Ok(())
} }
pub async fn open(&self, params: DocIdentifier) -> Result<Arc<ClientEditDoc>, DocError> { pub async fn open(&self, params: DocIdentifier) -> Result<Arc<ClientDocEditor>, DocError> {
let edit_context = self.doc_ctrl.open(params, self.user.db_pool()?).await?; let edit_context = self.doc_ctrl.open(params, self.user.db_pool()?).await?;
Ok(edit_context) Ok(edit_context)
} }

View File

@ -4,25 +4,25 @@ use dashmap::DashMap;
use crate::{ use crate::{
errors::DocError, errors::DocError,
services::doc::{ClientEditDoc, DocId}, services::doc::{ClientDocEditor, DocId},
}; };
pub(crate) struct DocCache { pub(crate) struct DocCache {
inner: DashMap<DocId, Arc<ClientEditDoc>>, inner: DashMap<DocId, Arc<ClientDocEditor>>,
} }
impl DocCache { impl DocCache {
pub(crate) fn new() -> Self { Self { inner: DashMap::new() } } pub(crate) fn new() -> Self { Self { inner: DashMap::new() } }
#[allow(dead_code)] #[allow(dead_code)]
pub(crate) fn all_docs(&self) -> Vec<Arc<ClientEditDoc>> { pub(crate) fn all_docs(&self) -> Vec<Arc<ClientDocEditor>> {
self.inner self.inner
.iter() .iter()
.map(|kv| kv.value().clone()) .map(|kv| kv.value().clone())
.collect::<Vec<Arc<ClientEditDoc>>>() .collect::<Vec<Arc<ClientDocEditor>>>()
} }
pub(crate) fn set(&self, doc: Arc<ClientEditDoc>) { pub(crate) fn set(&self, doc: Arc<ClientDocEditor>) {
let doc_id = doc.doc_id.clone(); let doc_id = doc.doc_id.clone();
if self.inner.contains_key(&doc_id) { if self.inner.contains_key(&doc_id) {
log::warn!("Doc:{} already exists in cache", &doc_id); log::warn!("Doc:{} already exists in cache", &doc_id);
@ -32,7 +32,7 @@ impl DocCache {
pub(crate) fn contains(&self, doc_id: &str) -> bool { self.inner.get(doc_id).is_some() } pub(crate) fn contains(&self, doc_id: &str) -> bool { self.inner.get(doc_id).is_some() }
pub(crate) fn get(&self, doc_id: &str) -> Result<Arc<ClientEditDoc>, DocError> { pub(crate) fn get(&self, doc_id: &str) -> Result<Arc<ClientDocEditor>, DocError> {
if !self.contains(&doc_id) { if !self.contains(&doc_id) {
return Err(doc_not_found()); return Err(doc_not_found());
} }

View File

@ -4,7 +4,7 @@ use crate::{
services::{ services::{
cache::DocCache, cache::DocCache,
doc::{ doc::{
edit::{ClientEditDoc, EditDocWsHandler}, edit::{ClientDocEditor, EditDocWsHandler},
revision::RevisionServer, revision::RevisionServer,
}, },
server::Server, server::Server,
@ -45,7 +45,7 @@ impl DocController {
&self, &self,
params: DocIdentifier, params: DocIdentifier,
pool: Arc<ConnectionPool>, pool: Arc<ConnectionPool>,
) -> Result<Arc<ClientEditDoc>, DocError> { ) -> Result<Arc<ClientDocEditor>, DocError> {
if !self.cache.contains(&params.doc_id) { if !self.cache.contains(&params.doc_id) {
let edit_ctx = self.make_edit_context(&params.doc_id, pool.clone()).await?; let edit_ctx = self.make_edit_context(&params.doc_id, pool.clone()).await?;
return Ok(edit_ctx); return Ok(edit_ctx);
@ -91,7 +91,11 @@ impl DocController {
} }
impl DocController { impl DocController {
async fn make_edit_context(&self, doc_id: &str, pool: Arc<ConnectionPool>) -> Result<Arc<ClientEditDoc>, DocError> { async fn make_edit_context(
&self,
doc_id: &str,
pool: Arc<ConnectionPool>,
) -> Result<Arc<ClientDocEditor>, DocError> {
// Opti: require upgradable_read lock and then upgrade to write lock using // Opti: require upgradable_read lock and then upgrade to write lock using
// RwLockUpgradableReadGuard::upgrade(xx) of ws // RwLockUpgradableReadGuard::upgrade(xx) of ws
// let doc = self.read_doc(doc_id, pool.clone()).await?; // let doc = self.read_doc(doc_id, pool.clone()).await?;
@ -103,7 +107,7 @@ impl DocController {
server: self.server.clone(), server: self.server.clone(),
}); });
let edit_ctx = Arc::new(ClientEditDoc::new(doc_id, pool, ws, server, user).await?); let edit_ctx = Arc::new(ClientDocEditor::new(doc_id, pool, ws, server, user).await?);
let ws_handler = Arc::new(EditDocWsHandler(edit_ctx.clone())); let ws_handler = Arc::new(EditDocWsHandler(edit_ctx.clone()));
self.ws_manager.register_handler(doc_id, ws_handler); self.ws_manager.register_handler(doc_id, ws_handler);
self.cache.set(edit_ctx.clone()); self.cache.set(edit_ctx.clone());

View File

@ -10,14 +10,14 @@ use lib_ot::core::{Attribute, Delta, Interval, OperationTransformable};
use std::{convert::TryFrom, sync::Arc}; use std::{convert::TryFrom, sync::Arc};
use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::sync::{mpsc, oneshot, RwLock};
pub struct DocumentActor { pub(crate) struct EditCommandQueue {
doc_id: String, doc_id: String,
document: Arc<RwLock<Document>>, document: Arc<RwLock<Document>>,
receiver: Option<mpsc::UnboundedReceiver<DocumentMsg>>, receiver: Option<mpsc::UnboundedReceiver<EditCommand>>,
} }
impl DocumentActor { impl EditCommandQueue {
pub fn new(doc_id: &str, delta: Delta, receiver: mpsc::UnboundedReceiver<DocumentMsg>) -> Self { pub(crate) fn new(doc_id: &str, delta: Delta, receiver: mpsc::UnboundedReceiver<EditCommand>) -> Self {
let document = Arc::new(RwLock::new(Document::from_delta(delta))); let document = Arc::new(RwLock::new(Document::from_delta(delta)));
Self { Self {
doc_id: doc_id.to_owned(), doc_id: doc_id.to_owned(),
@ -26,7 +26,7 @@ impl DocumentActor {
} }
} }
pub async fn run(mut self) { pub(crate) async fn run(mut self) {
let mut receiver = self.receiver.take().expect("Should only call once"); let mut receiver = self.receiver.take().expect("Should only call once");
let stream = stream! { let stream = stream! {
loop { loop {
@ -46,13 +46,13 @@ impl DocumentActor {
.await; .await;
} }
async fn handle_message(&self, msg: DocumentMsg) -> Result<(), DocumentError> { async fn handle_message(&self, msg: EditCommand) -> Result<(), DocumentError> {
match msg { match msg {
DocumentMsg::Delta { delta, ret } => { EditCommand::ComposeDelta { delta, ret } => {
let result = self.composed_delta(delta).await; let result = self.composed_delta(delta).await;
let _ = ret.send(result); let _ = ret.send(result);
}, },
DocumentMsg::RemoteRevision { bytes, ret } => { EditCommand::RemoteRevision { bytes, ret } => {
let revision = Revision::try_from(bytes)?; let revision = Revision::try_from(bytes)?;
let delta = Delta::from_bytes(&revision.delta_data)?; let delta = Delta::from_bytes(&revision.delta_data)?;
let rev_id: RevId = revision.rev_id.into(); let rev_id: RevId = revision.rev_id.into();
@ -64,15 +64,15 @@ impl DocumentActor {
}; };
let _ = ret.send(Ok(transform_delta)); let _ = ret.send(Ok(transform_delta));
}, },
DocumentMsg::Insert { index, data, ret } => { EditCommand::Insert { index, data, ret } => {
let delta = self.document.write().await.insert(index, data); let delta = self.document.write().await.insert(index, data);
let _ = ret.send(delta); let _ = ret.send(delta);
}, },
DocumentMsg::Delete { interval, ret } => { EditCommand::Delete { interval, ret } => {
let result = self.document.write().await.delete(interval); let result = self.document.write().await.delete(interval);
let _ = ret.send(result); let _ = ret.send(result);
}, },
DocumentMsg::Format { EditCommand::Format {
interval, interval,
attribute, attribute,
ret, ret,
@ -80,25 +80,25 @@ impl DocumentActor {
let result = self.document.write().await.format(interval, attribute); let result = self.document.write().await.format(interval, attribute);
let _ = ret.send(result); let _ = ret.send(result);
}, },
DocumentMsg::Replace { interval, data, ret } => { EditCommand::Replace { interval, data, ret } => {
let result = self.document.write().await.replace(interval, data); let result = self.document.write().await.replace(interval, data);
let _ = ret.send(result); let _ = ret.send(result);
}, },
DocumentMsg::CanUndo { ret } => { EditCommand::CanUndo { ret } => {
let _ = ret.send(self.document.read().await.can_undo()); let _ = ret.send(self.document.read().await.can_undo());
}, },
DocumentMsg::CanRedo { ret } => { EditCommand::CanRedo { ret } => {
let _ = ret.send(self.document.read().await.can_redo()); let _ = ret.send(self.document.read().await.can_redo());
}, },
DocumentMsg::Undo { ret } => { EditCommand::Undo { ret } => {
let result = self.document.write().await.undo(); let result = self.document.write().await.undo();
let _ = ret.send(result); let _ = ret.send(result);
}, },
DocumentMsg::Redo { ret } => { EditCommand::Redo { ret } => {
let result = self.document.write().await.redo(); let result = self.document.write().await.redo();
let _ = ret.send(result); let _ = ret.send(result);
}, },
DocumentMsg::Doc { ret } => { EditCommand::ReadDoc { ret } => {
let data = self.document.read().await.to_json(); let data = self.document.read().await.to_json();
let _ = ret.send(Ok(data)); let _ = ret.send(Ok(data));
}, },
@ -122,9 +122,9 @@ impl DocumentActor {
} }
} }
pub type Ret<T> = oneshot::Sender<Result<T, DocumentError>>; pub(crate) type Ret<T> = oneshot::Sender<Result<T, DocumentError>>;
pub enum DocumentMsg { pub(crate) enum EditCommand {
Delta { ComposeDelta {
delta: Delta, delta: Delta,
ret: Ret<()>, ret: Ret<()>,
}, },
@ -164,12 +164,12 @@ pub enum DocumentMsg {
Redo { Redo {
ret: Ret<UndoResult>, ret: Ret<UndoResult>,
}, },
Doc { ReadDoc {
ret: Ret<String>, ret: Ret<String>,
}, },
} }
pub struct TransformDeltas { pub(crate) struct TransformDeltas {
pub client_prime: Delta, pub client_prime: Delta,
pub server_prime: Delta, pub server_prime: Delta,
pub server_rev_id: RevId, pub server_rev_id: RevId,

View File

@ -2,7 +2,7 @@ use crate::{
errors::{internal_error, DocError, DocResult}, errors::{internal_error, DocError, DocResult},
module::DocumentUser, module::DocumentUser,
services::{ services::{
doc::{DocumentActor, DocumentMsg, OpenDocAction, RevisionManager, RevisionServer, TransformDeltas}, doc::{EditCommand, EditCommandQueue, OpenDocAction, RevisionManager, RevisionServer, TransformDeltas},
ws::{DocumentWebSocket, WsDocumentHandler}, ws::{DocumentWebSocket, WsDocumentHandler},
}, },
}; };
@ -24,15 +24,15 @@ use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
pub type DocId = String; pub type DocId = String;
pub struct ClientEditDoc { pub struct ClientDocEditor {
pub doc_id: DocId, pub doc_id: DocId,
rev_manager: Arc<RevisionManager>, rev_manager: Arc<RevisionManager>,
document: UnboundedSender<DocumentMsg>, edit_tx: UnboundedSender<EditCommand>,
ws: Arc<dyn DocumentWebSocket>, ws: Arc<dyn DocumentWebSocket>,
user: Arc<dyn DocumentUser>, user: Arc<dyn DocumentUser>,
} }
impl ClientEditDoc { impl ClientDocEditor {
pub(crate) async fn new( pub(crate) async fn new(
doc_id: &str, doc_id: &str,
pool: Arc<ConnectionPool>, pool: Arc<ConnectionPool>,
@ -45,13 +45,13 @@ impl ClientEditDoc {
spawn_rev_receiver(receiver, ws.clone()); spawn_rev_receiver(receiver, ws.clone());
let delta = rev_manager.load_document().await?; let delta = rev_manager.load_document().await?;
let document = spawn_doc_edit_actor(doc_id, delta, pool.clone()); let edit_queue_tx = spawn_edit_queue(doc_id, delta, pool.clone());
let doc_id = doc_id.to_string(); let doc_id = doc_id.to_string();
let rev_manager = Arc::new(rev_manager); let rev_manager = Arc::new(rev_manager);
let edit_doc = Self { let edit_doc = Self {
doc_id, doc_id,
rev_manager, rev_manager,
document, edit_tx: edit_queue_tx,
ws, ws,
user, user,
}; };
@ -61,12 +61,12 @@ impl ClientEditDoc {
pub async fn insert<T: ToString>(&self, index: usize, data: T) -> Result<(), DocError> { pub async fn insert<T: ToString>(&self, index: usize, data: T) -> Result<(), DocError> {
let (ret, rx) = oneshot::channel::<DocumentResult<Delta>>(); let (ret, rx) = oneshot::channel::<DocumentResult<Delta>>();
let msg = DocumentMsg::Insert { let msg = EditCommand::Insert {
index, index,
data: data.to_string(), data: data.to_string(),
ret, ret,
}; };
let _ = self.document.send(msg); let _ = self.edit_tx.send(msg);
let delta = rx.await.map_err(internal_error)??; let delta = rx.await.map_err(internal_error)??;
let _ = self.save_local_delta(delta).await?; let _ = self.save_local_delta(delta).await?;
Ok(()) Ok(())
@ -74,8 +74,8 @@ impl ClientEditDoc {
pub async fn delete(&self, interval: Interval) -> Result<(), DocError> { pub async fn delete(&self, interval: Interval) -> Result<(), DocError> {
let (ret, rx) = oneshot::channel::<DocumentResult<Delta>>(); let (ret, rx) = oneshot::channel::<DocumentResult<Delta>>();
let msg = DocumentMsg::Delete { interval, ret }; let msg = EditCommand::Delete { interval, ret };
let _ = self.document.send(msg); let _ = self.edit_tx.send(msg);
let delta = rx.await.map_err(internal_error)??; let delta = rx.await.map_err(internal_error)??;
let _ = self.save_local_delta(delta).await?; let _ = self.save_local_delta(delta).await?;
Ok(()) Ok(())
@ -83,12 +83,12 @@ impl ClientEditDoc {
pub async fn format(&self, interval: Interval, attribute: Attribute) -> Result<(), DocError> { pub async fn format(&self, interval: Interval, attribute: Attribute) -> Result<(), DocError> {
let (ret, rx) = oneshot::channel::<DocumentResult<Delta>>(); let (ret, rx) = oneshot::channel::<DocumentResult<Delta>>();
let msg = DocumentMsg::Format { let msg = EditCommand::Format {
interval, interval,
attribute, attribute,
ret, ret,
}; };
let _ = self.document.send(msg); let _ = self.edit_tx.send(msg);
let delta = rx.await.map_err(internal_error)??; let delta = rx.await.map_err(internal_error)??;
let _ = self.save_local_delta(delta).await?; let _ = self.save_local_delta(delta).await?;
Ok(()) Ok(())
@ -96,12 +96,12 @@ impl ClientEditDoc {
pub async fn replace<T: ToString>(&mut self, interval: Interval, data: T) -> Result<(), DocError> { pub async fn replace<T: ToString>(&mut self, interval: Interval, data: T) -> Result<(), DocError> {
let (ret, rx) = oneshot::channel::<DocumentResult<Delta>>(); let (ret, rx) = oneshot::channel::<DocumentResult<Delta>>();
let msg = DocumentMsg::Replace { let msg = EditCommand::Replace {
interval, interval,
data: data.to_string(), data: data.to_string(),
ret, ret,
}; };
let _ = self.document.send(msg); let _ = self.edit_tx.send(msg);
let delta = rx.await.map_err(internal_error)??; let delta = rx.await.map_err(internal_error)??;
let _ = self.save_local_delta(delta).await?; let _ = self.save_local_delta(delta).await?;
Ok(()) Ok(())
@ -109,38 +109,38 @@ impl ClientEditDoc {
pub async fn can_undo(&self) -> bool { pub async fn can_undo(&self) -> bool {
let (ret, rx) = oneshot::channel::<bool>(); let (ret, rx) = oneshot::channel::<bool>();
let msg = DocumentMsg::CanUndo { ret }; let msg = EditCommand::CanUndo { ret };
let _ = self.document.send(msg); let _ = self.edit_tx.send(msg);
rx.await.unwrap_or(false) rx.await.unwrap_or(false)
} }
pub async fn can_redo(&self) -> bool { pub async fn can_redo(&self) -> bool {
let (ret, rx) = oneshot::channel::<bool>(); let (ret, rx) = oneshot::channel::<bool>();
let msg = DocumentMsg::CanRedo { ret }; let msg = EditCommand::CanRedo { ret };
let _ = self.document.send(msg); let _ = self.edit_tx.send(msg);
rx.await.unwrap_or(false) rx.await.unwrap_or(false)
} }
pub async fn undo(&self) -> Result<UndoResult, DocError> { pub async fn undo(&self) -> Result<UndoResult, DocError> {
let (ret, rx) = oneshot::channel::<DocumentResult<UndoResult>>(); let (ret, rx) = oneshot::channel::<DocumentResult<UndoResult>>();
let msg = DocumentMsg::Undo { ret }; let msg = EditCommand::Undo { ret };
let _ = self.document.send(msg); let _ = self.edit_tx.send(msg);
let r = rx.await.map_err(internal_error)??; let r = rx.await.map_err(internal_error)??;
Ok(r) Ok(r)
} }
pub async fn redo(&self) -> Result<UndoResult, DocError> { pub async fn redo(&self) -> Result<UndoResult, DocError> {
let (ret, rx) = oneshot::channel::<DocumentResult<UndoResult>>(); let (ret, rx) = oneshot::channel::<DocumentResult<UndoResult>>();
let msg = DocumentMsg::Redo { ret }; let msg = EditCommand::Redo { ret };
let _ = self.document.send(msg); let _ = self.edit_tx.send(msg);
let r = rx.await.map_err(internal_error)??; let r = rx.await.map_err(internal_error)??;
Ok(r) Ok(r)
} }
pub async fn delta(&self) -> DocResult<DocDelta> { pub async fn delta(&self) -> DocResult<DocDelta> {
let (ret, rx) = oneshot::channel::<DocumentResult<String>>(); let (ret, rx) = oneshot::channel::<DocumentResult<String>>();
let msg = DocumentMsg::Doc { ret }; let msg = EditCommand::ReadDoc { ret };
let _ = self.document.send(msg); let _ = self.edit_tx.send(msg);
let data = rx.await.map_err(internal_error)??; let data = rx.await.map_err(internal_error)??;
Ok(DocDelta { Ok(DocDelta {
@ -162,11 +162,11 @@ impl ClientEditDoc {
pub(crate) async fn composing_local_delta(&self, data: Bytes) -> Result<(), DocError> { pub(crate) async fn composing_local_delta(&self, data: Bytes) -> Result<(), DocError> {
let delta = Delta::from_bytes(&data)?; let delta = Delta::from_bytes(&data)?;
let (ret, rx) = oneshot::channel::<DocumentResult<()>>(); let (ret, rx) = oneshot::channel::<DocumentResult<()>>();
let msg = DocumentMsg::Delta { let msg = EditCommand::ComposeDelta {
delta: delta.clone(), delta: delta.clone(),
ret, ret,
}; };
let _ = self.document.send(msg); let _ = self.edit_tx.send(msg);
let _ = rx.await.map_err(internal_error)??; let _ = rx.await.map_err(internal_error)??;
let _ = self.save_local_delta(delta).await?; let _ = self.save_local_delta(delta).await?;
@ -176,8 +176,8 @@ impl ClientEditDoc {
#[cfg(feature = "flowy_test")] #[cfg(feature = "flowy_test")]
pub async fn doc_json(&self) -> DocResult<String> { pub async fn doc_json(&self) -> DocResult<String> {
let (ret, rx) = oneshot::channel::<DocumentResult<String>>(); let (ret, rx) = oneshot::channel::<DocumentResult<String>>();
let msg = DocumentMsg::Doc { ret }; let msg = EditCommand::ReadDoc { ret };
let _ = self.document.send(msg); let _ = self.edit_tx.send(msg);
let s = rx.await.map_err(internal_error)??; let s = rx.await.map_err(internal_error)??;
Ok(s) Ok(s)
} }
@ -208,7 +208,7 @@ impl ClientEditDoc {
async fn handle_push_rev(&self, bytes: Bytes) -> DocResult<()> { async fn handle_push_rev(&self, bytes: Bytes) -> DocResult<()> {
// Transform the revision // Transform the revision
let (ret, rx) = oneshot::channel::<DocumentResult<TransformDeltas>>(); let (ret, rx) = oneshot::channel::<DocumentResult<TransformDeltas>>();
let _ = self.document.send(DocumentMsg::RemoteRevision { bytes, ret }); let _ = self.edit_tx.send(EditCommand::RemoteRevision { bytes, ret });
let TransformDeltas { let TransformDeltas {
client_prime, client_prime,
server_prime, server_prime,
@ -222,11 +222,11 @@ impl ClientEditDoc {
// compose delta // compose delta
let (ret, rx) = oneshot::channel::<DocumentResult<()>>(); let (ret, rx) = oneshot::channel::<DocumentResult<()>>();
let msg = DocumentMsg::Delta { let msg = EditCommand::ComposeDelta {
delta: client_prime.clone(), delta: client_prime.clone(),
ret, ret,
}; };
let _ = self.document.send(msg); let _ = self.edit_tx.send(msg);
let _ = rx.await.map_err(internal_error)??; let _ = rx.await.map_err(internal_error)??;
// update rev id // update rev id
@ -278,7 +278,7 @@ impl ClientEditDoc {
} }
} }
pub struct EditDocWsHandler(pub Arc<ClientEditDoc>); pub struct EditDocWsHandler(pub Arc<ClientDocEditor>);
impl WsDocumentHandler for EditDocWsHandler { impl WsDocumentHandler for EditDocWsHandler {
fn receive(&self, doc_data: WsDocumentData) { fn receive(&self, doc_data: WsDocumentData) {
@ -313,9 +313,9 @@ fn spawn_rev_receiver(mut receiver: mpsc::UnboundedReceiver<Revision>, ws: Arc<d
}); });
} }
fn spawn_doc_edit_actor(doc_id: &str, delta: Delta, _pool: Arc<ConnectionPool>) -> UnboundedSender<DocumentMsg> { fn spawn_edit_queue(doc_id: &str, delta: Delta, _pool: Arc<ConnectionPool>) -> UnboundedSender<EditCommand> {
let (sender, receiver) = mpsc::unbounded_channel::<DocumentMsg>(); let (sender, receiver) = mpsc::unbounded_channel::<EditCommand>();
let actor = DocumentActor::new(doc_id, delta, receiver); let actor = EditCommandQueue::new(doc_id, delta, receiver);
tokio::spawn(actor.run()); tokio::spawn(actor.run());
sender sender
} }

View File

@ -1,7 +1,7 @@
mod doc_actor; mod doc_actor;
mod edit_doc; mod editor;
mod model; mod model;
pub(crate) use doc_actor::*; pub(crate) use doc_actor::*;
pub use edit_doc::*; pub use editor::*;
pub(crate) use model::*; pub(crate) use model::*;

View File

@ -5,7 +5,6 @@ use crate::{
use flowy_database::ConnectionPool; use flowy_database::ConnectionPool;
use flowy_document_infra::entities::doc::{Revision, RevisionRange}; use flowy_document_infra::entities::doc::{Revision, RevisionRange};
use lib_infra::future::ResultFuture; use lib_infra::future::ResultFuture;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::broadcast; use tokio::sync::broadcast;