add doc persistence trait

This commit is contained in:
appflowy 2021-12-12 16:02:58 +08:00
parent 90e3ba14f1
commit 0231ad3adf
8 changed files with 50 additions and 33 deletions

View File

@ -14,13 +14,13 @@ use sqlx::PgPool;
use std::sync::Arc; use std::sync::Arc;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct DocUser { pub struct ServerDocUser {
pub user: Arc<WsUser>, pub user: Arc<WsUser>,
pub(crate) socket: Socket, pub(crate) socket: Socket,
pub pg_pool: Data<PgPool>, pub pg_pool: Data<PgPool>,
} }
impl RevisionUser for DocUser { impl RevisionUser for ServerDocUser {
fn user_id(&self) -> String { self.user.id().to_string() } fn user_id(&self) -> String { self.user.id().to_string() }
fn recv(&self, resp: SyncResponse) { fn recv(&self, resp: SyncResponse) {

View File

@ -3,20 +3,25 @@ use crate::{
web_socket::{WsBizHandler, WsClientData}, web_socket::{WsBizHandler, WsClientData},
}; };
use actix_web::web::Data; use actix_web::web::Data;
use flowy_collaboration::core::sync::DocManager; use flowy_collaboration::{
core::sync::{ServerDocManager, ServerDocPersistence},
entities::doc::Doc,
errors::CollaborateResult,
};
use lib_ot::rich_text::RichTextDelta;
use sqlx::PgPool; use sqlx::PgPool;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
pub struct DocumentCore { pub struct DocumentCore {
pub manager: Arc<DocManager>, pub manager: Arc<ServerDocManager>,
ws_sender: mpsc::Sender<DocWsMsg>, ws_sender: mpsc::Sender<DocWsMsg>,
pg_pool: Data<PgPool>, pg_pool: Data<PgPool>,
} }
impl DocumentCore { impl DocumentCore {
pub fn new(pg_pool: Data<PgPool>) -> Self { pub fn new(pg_pool: Data<PgPool>) -> Self {
let manager = Arc::new(DocManager::new()); let manager = Arc::new(ServerDocManager::new(Arc::new(DocPersistenceImpl())));
let (ws_sender, rx) = mpsc::channel(100); let (ws_sender, rx) = mpsc::channel(100);
let actor = DocWsActor::new(rx, manager.clone()); let actor = DocWsActor::new(rx, manager.clone());
tokio::task::spawn(actor.run()); tokio::task::spawn(actor.run());
@ -51,3 +56,12 @@ impl WsBizHandler for DocumentCore {
}); });
} }
} }
struct DocPersistenceImpl();
impl ServerDocPersistence for DocPersistenceImpl {
fn create_doc(&self, doc_id: &str, delta: RichTextDelta) -> CollaborateResult<()> { unimplemented!() }
fn update_doc(&self, doc_id: &str, delta: RichTextDelta) -> CollaborateResult<()> { unimplemented!() }
fn read_doc(&self, doc_id: &str) -> CollaborateResult<Doc> { unimplemented!() }
}

View File

@ -1,6 +1,6 @@
use crate::{ use crate::{
services::{ services::{
doc::{editor::DocUser, read_doc}, doc::{editor::ServerDocUser, read_doc},
util::{md5, parse_from_bytes}, util::{md5, parse_from_bytes},
}, },
web_socket::{entities::Socket, WsClientData, WsUser}, web_socket::{entities::Socket, WsClientData, WsUser},
@ -10,7 +10,7 @@ use actix_web::web::Data;
use async_stream::stream; use async_stream::stream;
use backend_service::errors::{internal_error, Result as DocResult, ServerError}; use backend_service::errors::{internal_error, Result as DocResult, ServerError};
use flowy_collaboration::{ use flowy_collaboration::{
core::sync::{DocManager, OpenDocHandle}, core::sync::{OpenDocHandle, ServerDocManager},
protobuf::{DocIdentifier, NewDocUser, WsDataType, WsDocumentData}, protobuf::{DocIdentifier, NewDocUser, WsDataType, WsDocumentData},
}; };
use futures::stream::StreamExt; use futures::stream::StreamExt;
@ -29,11 +29,11 @@ pub enum DocWsMsg {
pub struct DocWsActor { pub struct DocWsActor {
receiver: Option<mpsc::Receiver<DocWsMsg>>, receiver: Option<mpsc::Receiver<DocWsMsg>>,
doc_manager: Arc<DocManager>, doc_manager: Arc<ServerDocManager>,
} }
impl DocWsActor { impl DocWsActor {
pub fn new(receiver: mpsc::Receiver<DocWsMsg>, manager: Arc<DocManager>) -> Self { pub fn new(receiver: mpsc::Receiver<DocWsMsg>, manager: Arc<ServerDocManager>) -> Self {
Self { Self {
receiver: Some(receiver), receiver: Some(receiver),
doc_manager: manager, doc_manager: manager,
@ -100,7 +100,7 @@ impl DocWsActor {
.await .await
.map_err(internal_error)??; .map_err(internal_error)??;
if let Some(handle) = self.get_doc_handle(&doc_user.doc_id, pg_pool.clone()).await { if let Some(handle) = self.get_doc_handle(&doc_user.doc_id, pg_pool.clone()).await {
let user = Arc::new(DocUser { user, socket, pg_pool }); let user = Arc::new(ServerDocUser { user, socket, pg_pool });
handle.add_user(user, doc_user.rev_id).await.map_err(internal_error)?; handle.add_user(user, doc_user.rev_id).await.map_err(internal_error)?;
} }
Ok(()) Ok(())
@ -121,7 +121,7 @@ impl DocWsActor {
.await .await
.map_err(internal_error)??; .map_err(internal_error)??;
if let Some(handle) = self.get_doc_handle(&revision.doc_id, pg_pool.clone()).await { if let Some(handle) = self.get_doc_handle(&revision.doc_id, pg_pool.clone()).await {
let user = Arc::new(DocUser { user, socket, pg_pool }); let user = Arc::new(ServerDocUser { user, socket, pg_pool });
let revision = (&mut revision).try_into().map_err(internal_error).unwrap(); let revision = (&mut revision).try_into().map_err(internal_error).unwrap();
handle.apply_revision(user, revision).await.map_err(internal_error)?; handle.apply_revision(user, revision).await.map_err(internal_error)?;
} }

View File

@ -15,7 +15,7 @@ use flowy_collaboration::{entities::doc::DocIdentifier, protobuf::UpdateDocParam
use lib_ot::rich_text::{RichTextAttribute, RichTextDelta}; use lib_ot::rich_text::{RichTextAttribute, RichTextDelta};
use parking_lot::RwLock; use parking_lot::RwLock;
use lib_ot::core::Interval; use lib_ot::core::Interval;
use flowy_collaboration::core::sync::DocManager; use flowy_collaboration::core::sync::ServerDocManager;
pub struct DocumentTest { pub struct DocumentTest {
server: TestServer, server: TestServer,
@ -53,7 +53,7 @@ struct ScriptContext {
client_edit_context: Option<Arc<ClientEditDocContext>>, client_edit_context: Option<Arc<ClientEditDocContext>>,
client_sdk: FlowySDKTest, client_sdk: FlowySDKTest,
client_user_session: Arc<UserSession>, client_user_session: Arc<UserSession>,
server_doc_manager: Arc<DocManager>, server_doc_manager: Arc<ServerDocManager>,
server_pg_pool: Data<PgPool>, server_pg_pool: Data<PgPool>,
doc_id: String, doc_id: String,
} }

View File

@ -20,4 +20,5 @@ bytes = { version = "1.0" }
pin-project = "1.0" pin-project = "1.0"
futures-core = { version = "0.3", default-features = false } futures-core = { version = "0.3", default-features = false }
tokio = { version = "1.0", features = ["time", "rt"] } tokio = { version = "1.0", features = ["time", "rt"] }
rand = "0.8.3" rand = "0.8.3"

View File

@ -1,5 +1,5 @@
mod rev_sync;
mod server_editor; mod server_editor;
mod synchronizer;
pub use rev_sync::*;
pub use server_editor::*; pub use server_editor::*;
pub use synchronizer::*;

View File

@ -19,12 +19,17 @@ use tokio::{
task::spawn_blocking, task::spawn_blocking,
}; };
pub trait ServerDocPersistence: Send + Sync {
fn create_doc(&self, doc_id: &str, delta: RichTextDelta) -> CollaborateResult<()>;
fn update_doc(&self, doc_id: &str, delta: RichTextDelta) -> CollaborateResult<()>;
fn read_doc(&self, doc_id: &str) -> CollaborateResult<Doc>;
}
#[rustfmt::skip] #[rustfmt::skip]
// ┌────────────┐ // ─────────────────┐
// │ DocManager │ // │ServerDocManager │
// └────────────┘ // ─────────────────┘
// │ 1 // │ 1
// │
// ▼ n // ▼ n
// ┌───────────────┐ // ┌───────────────┐
// │ OpenDocHandle │ // │ OpenDocHandle │
@ -33,29 +38,26 @@ use tokio::{
// ▼ // ▼
// ┌──────────────────┐ // ┌──────────────────┐
// │ DocCommandQueue │ // │ DocCommandQueue │
// └──────────────────┘ ┌──────────────────────┐ ┌────────────┐ // └──────────────────┘
// │ ┌────▶│ RevisionSynchronizer │────▶│ Document │ // │ ┌──────────────────────┐ ┌────────────┐
// ▼ │ └──────────────────────┘ └────────────┘ // ▼ ┌────▶│ RevisionSynchronizer │────▶│ Document │
// ┌────────────────┐ │ // ┌────────────────┐ │ └──────────────────────┘ └────────────┘
// │ServerDocEditor │─────┤ // │ServerDocEditor │─────┤
// └────────────────┘ │ // └────────────────┘ │ ┌────────┐ ┌────────────┐
// │
// │ ┌────────┐ ┌────────────┐
// └────▶│ Users │◆──────│RevisionUser│ // └────▶│ Users │◆──────│RevisionUser│
// └────────┘ └────────────┘ // └────────┘ └────────────┘
pub struct DocManager { pub struct ServerDocManager {
open_doc_map: DashMap<String, Arc<OpenDocHandle>>, open_doc_map: DashMap<String, Arc<OpenDocHandle>>,
persistence: Arc<dyn ServerDocPersistence>,
} }
impl std::default::Default for DocManager { impl ServerDocManager {
fn default() -> Self { pub fn new(persistence: Arc<dyn ServerDocPersistence>) -> Self {
Self { Self {
open_doc_map: DashMap::new(), open_doc_map: DashMap::new(),
persistence,
} }
} }
}
impl DocManager {
pub fn new() -> Self { DocManager::default() }
pub fn get(&self, doc_id: &str) -> Option<Arc<OpenDocHandle>> { pub fn get(&self, doc_id: &str) -> Option<Arc<OpenDocHandle>> {
self.open_doc_map.get(doc_id).map(|ctx| ctx.clone()) self.open_doc_map.get(doc_id).map(|ctx| ctx.clone())