This commit is contained in:
appflowy 2021-12-10 11:05:23 +08:00
parent 45d9a0918f
commit 34441ee076
11 changed files with 102 additions and 100 deletions

View File

@ -59,7 +59,7 @@ pub fn run(listener: TcpListener, app_ctx: AppContext) -> Result<Server, std::io
.app_data(app_ctx.ws_server.clone())
.app_data(app_ctx.pg_pool.clone())
.app_data(app_ctx.ws_bizs.clone())
.app_data(app_ctx.doc_biz.clone())
.app_data(app_ctx.document_core.clone())
})
.listen(listener)?
.run();

View File

@ -1,5 +1,5 @@
use crate::{
services::doc::manager::DocBiz,
services::doc::manager::DocumentCore,
web_socket::{WsBizHandlers, WsServer},
};
use actix::Addr;
@ -13,7 +13,7 @@ pub struct AppContext {
pub ws_server: Data<Addr<WsServer>>,
pub pg_pool: Data<PgPool>,
pub ws_bizs: Data<WsBizHandlers>,
pub doc_biz: Data<Arc<DocBiz>>,
pub document_core: Data<Arc<DocumentCore>>,
}
impl AppContext {
@ -22,14 +22,14 @@ impl AppContext {
let pg_pool = Data::new(db_pool);
let mut ws_bizs = WsBizHandlers::new();
let doc_biz = Arc::new(DocBiz::new(pg_pool.clone()));
ws_bizs.register(WsModule::Doc, doc_biz.clone());
let document_core = Arc::new(DocumentCore::new(pg_pool.clone()));
ws_bizs.register(WsModule::Doc, document_core.clone());
AppContext {
ws_server,
pg_pool,
ws_bizs: Data::new(ws_bizs),
doc_biz: Data::new(doc_biz),
document_core: Data::new(document_core),
}
}
}

View File

@ -18,30 +18,34 @@ use tokio::{
task::spawn_blocking,
};
pub struct DocBiz {
#[rustfmt::skip]
// ┌──────────────┐ ┌────────────┐ 1 n ┌───────────────┐
// │ DocumentCore │────▶│ DocManager │─────▶│ OpenDocHandle │
// └──────────────┘ └────────────┘ └───────────────┘
pub struct DocumentCore {
pub manager: Arc<DocManager>,
sender: mpsc::Sender<DocWsMsg>,
ws_sender: mpsc::Sender<DocWsMsg>,
pg_pool: Data<PgPool>,
}
impl DocBiz {
impl DocumentCore {
pub fn new(pg_pool: Data<PgPool>) -> Self {
let manager = Arc::new(DocManager::new());
let (tx, rx) = mpsc::channel(100);
let (ws_sender, rx) = mpsc::channel(100);
let actor = DocWsActor::new(rx, manager.clone());
tokio::task::spawn(actor.run());
Self {
manager,
sender: tx,
ws_sender,
pg_pool,
}
}
}
impl WsBizHandler for DocBiz {
impl WsBizHandler for DocumentCore {
fn receive_data(&self, client_data: WsClientData) {
let (ret, rx) = oneshot::channel();
let sender = self.sender.clone();
let sender = self.ws_sender.clone();
let pool = self.pg_pool.clone();
actix_rt::spawn(async move {
@ -58,14 +62,25 @@ impl WsBizHandler for DocBiz {
}
}
#[rustfmt::skip]
// EditDocActor
// ┌────────────────────────────────────┐
// │ ServerDocEditor │
// │ ┌──────────────────────────────┐ │
// ┌────────────┐ 1 n ┌───────────────┐ │ │ ┌──────────┐ ┌──────────┐ │ │
// │ DocManager │─────▶│ OpenDocHandle │──────▶│ │ │ Document │ │ Users │ │ │
// └────────────┘ └───────────────┘ │ │ └──────────┘ └──────────┘ │ │
// │ └──────────────────────────────┘ │
// │ │
// └────────────────────────────────────┘
pub struct DocManager {
docs_map: DashMap<String, Arc<DocOpenHandle>>,
open_doc_map: DashMap<String, Arc<OpenDocHandle>>,
}
impl std::default::Default for DocManager {
fn default() -> Self {
Self {
docs_map: DashMap::new(),
open_doc_map: DashMap::new(),
}
}
}
@ -73,19 +88,19 @@ impl std::default::Default for DocManager {
impl DocManager {
pub fn new() -> Self { DocManager::default() }
pub async fn get(&self, doc_id: &str, pg_pool: Data<PgPool>) -> Result<Option<Arc<DocOpenHandle>>, ServerError> {
match self.docs_map.get(doc_id) {
pub async fn get(&self, doc_id: &str, pg_pool: Data<PgPool>) -> Result<Option<Arc<OpenDocHandle>>, ServerError> {
match self.open_doc_map.get(doc_id) {
None => {
let params = DocIdentifier {
doc_id: doc_id.to_string(),
..Default::default()
};
let doc = read_doc(pg_pool.get_ref(), params).await?;
let handle = spawn_blocking(|| DocOpenHandle::new(doc, pg_pool))
let handle = spawn_blocking(|| OpenDocHandle::new(doc, pg_pool))
.await
.map_err(internal_error)?;
let handle = Arc::new(handle?);
self.docs_map.insert(doc_id.to_string(), handle.clone());
self.open_doc_map.insert(doc_id.to_string(), handle.clone());
Ok(Some(handle))
},
Some(ctx) => Ok(Some(ctx.clone())),
@ -93,11 +108,11 @@ impl DocManager {
}
}
pub struct DocOpenHandle {
pub struct OpenDocHandle {
pub sender: mpsc::Sender<EditMsg>,
}
impl DocOpenHandle {
impl OpenDocHandle {
pub fn new(doc: Doc, pg_pool: Data<PgPool>) -> Result<Self, ServerError> {
let (sender, receiver) = mpsc::channel(100);
let actor = EditDocActor::new(receiver, doc, pg_pool)?;

View File

@ -1,6 +1,6 @@
use crate::{
services::{
doc::manager::{DocManager, DocOpenHandle},
doc::manager::{DocManager, OpenDocHandle},
util::{md5, parse_from_bytes},
},
web_socket::{entities::Socket, WsClientData, WsUser},
@ -122,7 +122,7 @@ impl DocWsActor {
Ok(())
}
async fn find_doc_handle(&self, doc_id: &str, pool: Data<PgPool>) -> Option<Arc<DocOpenHandle>> {
async fn find_doc_handle(&self, doc_id: &str, pool: Data<PgPool>) -> Option<Arc<OpenDocHandle>> {
match self.doc_manager.get(doc_id, pool).await {
Ok(Some(edit_doc)) => Some(edit_doc),
Ok(None) => {

View File

@ -1,5 +1,5 @@
use crate::services::{
doc::manager::DocBiz,
doc::manager::DocumentCore,
user::LoggedUser,
util::parse_from_payload,
view::{create_view, delete_view, read_view, sql_builder::check_view_ids, update_view},
@ -23,7 +23,7 @@ use std::sync::Arc;
pub async fn create_handler(
payload: Payload,
pool: Data<PgPool>,
_doc_biz: Data<Arc<DocBiz>>,
_doc_biz: Data<Arc<DocumentCore>>,
) -> Result<HttpResponse, ServerError> {
let params: CreateViewParams = parse_from_payload(payload).await?;
let mut transaction = pool

View File

@ -1,53 +0,0 @@
use std::sync::Arc;
use dashmap::DashMap;
use crate::{
errors::DocError,
services::doc::edit::{ClientDocEditor, DocId},
};
pub(crate) struct DocCache {
inner: DashMap<DocId, Arc<ClientDocEditor>>,
}
impl DocCache {
pub(crate) fn new() -> Self { Self { inner: DashMap::new() } }
#[allow(dead_code)]
pub(crate) fn all_docs(&self) -> Vec<Arc<ClientDocEditor>> {
self.inner
.iter()
.map(|kv| kv.value().clone())
.collect::<Vec<Arc<ClientDocEditor>>>()
}
pub(crate) fn set(&self, doc: Arc<ClientDocEditor>) {
let doc_id = doc.doc_id.clone();
if self.inner.contains_key(&doc_id) {
log::warn!("Doc:{} already exists in cache", &doc_id);
}
self.inner.insert(doc_id, doc);
}
pub(crate) fn contains(&self, doc_id: &str) -> bool { self.inner.get(doc_id).is_some() }
pub(crate) fn get(&self, doc_id: &str) -> Result<Arc<ClientDocEditor>, DocError> {
if !self.contains(&doc_id) {
return Err(doc_not_found());
}
let opened_doc = self.inner.get(doc_id).unwrap();
Ok(opened_doc.clone())
}
pub(crate) fn remove(&self, id: &str) {
let doc_id: DocId = id.into();
match self.get(id) {
Ok(editor) => editor.stop_sync(),
Err(e) => log::error!("{}", e),
}
self.inner.remove(&doc_id);
}
}
fn doc_not_found() -> DocError { DocError::doc_not_found().context("Doc is close or you should call open first") }

View File

@ -2,7 +2,6 @@ use crate::{
errors::{DocError, DocResult},
module::DocumentUser,
services::{
cache::DocCache,
doc::{
edit::{ClientDocEditor, EditDocWsHandler},
revision::{RevisionCache, RevisionManager, RevisionServer},
@ -12,6 +11,7 @@ use crate::{
},
};
use bytes::Bytes;
use dashmap::DashMap;
use flowy_database::ConnectionPool;
use flowy_document_infra::entities::doc::{Doc, DocDelta, DocIdentifier};
use lib_infra::future::{wrap_future, FnFuture, ResultFuture};
@ -21,18 +21,18 @@ use tokio::time::{interval, Duration};
pub(crate) struct DocController {
server: Server,
ws_manager: Arc<WsDocumentManager>,
cache: Arc<DocCache>,
open_cache: Arc<OpenDocCache>,
user: Arc<dyn DocumentUser>,
}
impl DocController {
pub(crate) fn new(server: Server, user: Arc<dyn DocumentUser>, ws: Arc<WsDocumentManager>) -> Self {
let cache = Arc::new(DocCache::new());
let open_cache = Arc::new(OpenDocCache::new());
Self {
server,
user,
ws_manager: ws,
cache,
open_cache,
}
}
@ -46,18 +46,18 @@ impl DocController {
params: DocIdentifier,
pool: Arc<ConnectionPool>,
) -> Result<Arc<ClientDocEditor>, DocError> {
if !self.cache.contains(&params.doc_id) {
if !self.open_cache.contains(&params.doc_id) {
let edit_ctx = self.make_edit_context(&params.doc_id, pool.clone()).await?;
return Ok(edit_ctx);
}
let edit_doc_ctx = self.cache.get(&params.doc_id)?;
let edit_doc_ctx = self.open_cache.get(&params.doc_id)?;
Ok(edit_doc_ctx)
}
pub(crate) fn close(&self, doc_id: &str) -> Result<(), DocError> {
log::debug!("Close doc {}", doc_id);
self.cache.remove(doc_id);
self.open_cache.remove(doc_id);
self.ws_manager.remove_handler(doc_id);
Ok(())
}
@ -65,7 +65,7 @@ impl DocController {
#[tracing::instrument(level = "debug", skip(self), err)]
pub(crate) fn delete(&self, params: DocIdentifier) -> Result<(), DocError> {
let doc_id = &params.doc_id;
self.cache.remove(doc_id);
self.open_cache.remove(doc_id);
self.ws_manager.remove_handler(doc_id);
Ok(())
}
@ -80,12 +80,12 @@ impl DocController {
delta: DocDelta,
db_pool: Arc<ConnectionPool>,
) -> Result<DocDelta, DocError> {
if !self.cache.contains(&delta.doc_id) {
if !self.open_cache.contains(&delta.doc_id) {
let doc_identifier: DocIdentifier = delta.doc_id.clone().into();
let _ = self.open(doc_identifier, db_pool).await?;
}
let edit_doc_ctx = self.cache.get(&delta.doc_id)?;
let edit_doc_ctx = self.open_cache.get(&delta.doc_id)?;
let _ = edit_doc_ctx.composing_local_delta(Bytes::from(delta.data)).await?;
Ok(edit_doc_ctx.delta().await?)
}
@ -102,7 +102,7 @@ impl DocController {
let edit_ctx = ClientDocEditor::new(doc_id, user, pool, rev_manager, self.ws_manager.ws()).await?;
let ws_handler = Arc::new(EditDocWsHandler(edit_ctx.clone()));
self.ws_manager.register_handler(doc_id, ws_handler);
self.cache.set(edit_ctx.clone());
self.open_cache.set(edit_ctx.clone());
Ok(edit_ctx)
}
@ -145,13 +145,39 @@ impl RevisionServer for RevisionServerImpl {
}
}
#[allow(dead_code)]
fn event_loop(_cache: Arc<DocCache>) -> FnFuture<()> {
let mut i = interval(Duration::from_secs(3));
wrap_future(async move {
loop {
// cache.all_docs().iter().for_each(|doc| doc.tick());
i.tick().await;
}
})
pub struct OpenDocCache {
inner: DashMap<String, Arc<ClientDocEditor>>,
}
impl OpenDocCache {
fn new() -> Self { Self { inner: DashMap::new() } }
pub(crate) fn set(&self, doc: Arc<ClientDocEditor>) {
let doc_id = doc.doc_id.clone();
if self.inner.contains_key(&doc_id) {
log::warn!("Doc:{} already exists in cache", &doc_id);
}
self.inner.insert(doc_id, doc);
}
pub(crate) fn contains(&self, doc_id: &str) -> bool { self.inner.get(doc_id).is_some() }
pub(crate) fn get(&self, doc_id: &str) -> Result<Arc<ClientDocEditor>, DocError> {
if !self.contains(&doc_id) {
return Err(doc_not_found());
}
let opened_doc = self.inner.get(doc_id).unwrap();
Ok(opened_doc.clone())
}
pub(crate) fn remove(&self, id: &str) {
let doc_id = id.to_string();
match self.get(id) {
Ok(editor) => editor.stop_sync(),
Err(e) => log::error!("{}", e),
}
self.inner.remove(&doc_id);
}
}
fn doc_not_found() -> DocError { DocError::doc_not_found().context("Doc is close or you should call open first") }

View File

@ -129,6 +129,7 @@ impl EditCommandQueue {
}
pub(crate) type Ret<T> = oneshot::Sender<Result<T, DocumentError>>;
#[allow(dead_code)]
pub(crate) enum EditCommand {
ComposeDelta {
delta: RichTextDelta,

View File

@ -1,4 +1,3 @@
mod cache;
pub mod doc;
pub mod server;
pub mod ws;

View File

@ -1,3 +1,4 @@
use flowy_document_infra::core::{Document, FlowyDoc};
use flowy_test::editor::{EditorScript::*, *};
use lib_ot::{revision::RevState, rich_text::RichTextDeltaBuilder};
@ -50,3 +51,16 @@ async fn doc_push_test() {
];
EditorTest::new().await.run_scripts(scripts).await;
}
#[tokio::test]
async fn doc_push_test2() {
let mut document = Document::new::<FlowyDoc>();
let delta_1 = document.insert(0, "123").unwrap();
let json = document.to_json();
let scripts = vec![
SimulatePushRevisionMessageWithDelta(delta_1),
AssertJson(r#"[{"insert":"\n123"}]"#),
];
EditorTest::new().await.run_scripts(scripts).await;
}

View File

@ -47,7 +47,7 @@ impl EditorTest {
self.run_script(script).await;
}
sleep(Duration::from_secs(5)).await;
sleep(Duration::from_secs(10)).await;
}
async fn run_script(&mut self, script: EditorScript) {