rename some structs

This commit is contained in:
appflowy 2022-01-10 17:39:32 +08:00
parent 08a43c03d4
commit 46a3eb57fa
18 changed files with 144 additions and 129 deletions

View File

@ -7,7 +7,7 @@ use actix_web::web::Data;
use crate::services::document::{
persistence::DocumentKVPersistence,
ws_receiver::{make_document_ws_receiver, DocumentPersistenceImpl},
ws_receiver::{make_document_ws_receiver, HttpServerDocumentPersistence},
};
use flowy_collaboration::sync::ServerDocumentManager;
use lib_ws::WSModule;
@ -30,7 +30,7 @@ impl AppContext {
let kv_store = make_document_kv_store(pg_pool.clone());
let persistence = Arc::new(FlowyPersistence { pg_pool, kv_store });
let document_persistence = Arc::new(DocumentPersistenceImpl(persistence.clone()));
let document_persistence = Arc::new(HttpServerDocumentPersistence(persistence.clone()));
let document_manager = Arc::new(ServerDocumentManager::new(document_persistence));
let document_ws_receiver = make_document_ws_receiver(persistence.clone(), document_manager.clone());

View File

@ -83,7 +83,7 @@ impl DocumentWebSocketActor {
document_client_data.ty
);
let user = Arc::new(ServerDocUser {
let user = Arc::new(HttpDocumentUser {
user,
socket,
persistence,
@ -119,13 +119,13 @@ fn verify_md5(revision: &RevisionPB) -> Result<()> {
}
#[derive(Clone)]
pub struct ServerDocUser {
pub struct HttpDocumentUser {
pub user: Arc<WSUser>,
pub(crate) socket: Socket,
pub persistence: Arc<FlowyPersistence>,
}
impl std::fmt::Debug for ServerDocUser {
impl std::fmt::Debug for HttpDocumentUser {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("ServerDocUser")
.field("user", &self.user)
@ -134,7 +134,7 @@ impl std::fmt::Debug for ServerDocUser {
}
}
impl RevisionUser for ServerDocUser {
impl RevisionUser for HttpDocumentUser {
fn user_id(&self) -> String { self.user.id().to_string() }
fn receive(&self, resp: SyncResponse) {

View File

@ -76,13 +76,13 @@ impl WebSocketReceiver for DocumentWebSocketReceiver {
}
}
pub struct DocumentPersistenceImpl(pub Arc<FlowyPersistence>);
impl Debug for DocumentPersistenceImpl {
pub struct HttpServerDocumentPersistence(pub Arc<FlowyPersistence>);
impl Debug for HttpServerDocumentPersistence {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("DocumentPersistenceImpl") }
}
impl DocumentPersistence for DocumentPersistenceImpl {
fn read_doc(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> {
impl DocumentPersistence for HttpServerDocumentPersistence {
fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> {
let params = DocumentId {
doc_id: doc_id.to_string(),
..Default::default()
@ -99,7 +99,7 @@ impl DocumentPersistence for DocumentPersistenceImpl {
})
}
fn create_doc(
fn create_document(
&self,
doc_id: &str,
repeated_revision: RepeatedRevisionPB,
@ -120,23 +120,24 @@ impl DocumentPersistence for DocumentPersistenceImpl {
})
}
fn get_revisions(&self, doc_id: &str, rev_ids: Vec<i64>) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
let kv_store = self.0.kv_store();
let doc_id = doc_id.to_owned();
let f = || async move {
let mut repeated_revision = kv_store.batch_get_revisions(&doc_id, rev_ids).await?;
Ok(repeated_revision.take_items().into())
};
Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) })
}
fn get_doc_revisions(&self, doc_id: &str) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
fn read_revisions(
&self,
doc_id: &str,
rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
let kv_store = self.0.kv_store();
let doc_id = doc_id.to_owned();
let f = || async move {
match rev_ids {
None => {
let mut repeated_revision = kv_store.get_doc_revisions(&doc_id).await?;
Ok(repeated_revision.take_items().into())
},
Some(rev_ids) => {
let mut repeated_revision = kv_store.batch_get_revisions(&doc_id, rev_ids).await?;
Ok(repeated_revision.take_items().into())
},
}
};
Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) })

View File

@ -121,7 +121,7 @@ impl RevisionTableSql {
.filter(dsl::doc_id.eq(changeset.doc_id));
let _ = update(filter).set(dsl::state.eq(changeset.state)).execute(conn)?;
tracing::debug!(
"[[RevisionTable]] Save:{} state to {:?}",
"[RevisionTable] update revision:{} state:to {:?}",
changeset.rev_id,
changeset.state
);

View File

@ -175,7 +175,6 @@ impl RevisionSyncSequence {
return Err(FlowyError::internal().context(desc));
}
tracing::trace!("{} revision finish synchronizing", pop_rev_id);
self.revs_map.remove(&pop_rev_id);
let _ = self.local_revs.write().await.pop_front();
}

View File

@ -0,0 +1,3 @@
pub use http_ws::*;
mod http_ws;

View File

@ -1,3 +0,0 @@
pub use http_ws_impl::*;
mod http_ws_impl;

View File

@ -1,38 +1,30 @@
use crate::services::local::persistence::LocalServerDocumentPersistence;
use bytes::Bytes;
use dashmap::DashMap;
use flowy_collaboration::{
entities::{
doc::DocumentInfo,
ws::{DocumentClientWSData, DocumentClientWSDataType},
},
entities::ws::{DocumentClientWSData, DocumentClientWSDataType},
errors::CollaborateError,
protobuf::{
DocumentClientWSData as DocumentClientWSDataPB,
RepeatedRevision as RepeatedRevisionPB,
Revision as RevisionPB,
},
protobuf::DocumentClientWSData as DocumentClientWSDataPB,
sync::*,
util::repeated_revision_from_repeated_revision_pb,
};
use lib_infra::future::BoxResultFuture;
use lib_ws::{WSModule, WebSocketRawMessage};
use std::{
convert::TryInto,
fmt::{Debug, Formatter},
sync::Arc,
};
use std::{convert::TryInto, fmt::Debug, sync::Arc};
use tokio::sync::{mpsc, mpsc::UnboundedSender};
pub struct LocalDocumentServer {
pub doc_manager: Arc<ServerDocumentManager>,
sender: mpsc::UnboundedSender<WebSocketRawMessage>,
persistence: Arc<dyn DocumentPersistence>,
}
impl LocalDocumentServer {
pub fn new(sender: mpsc::UnboundedSender<WebSocketRawMessage>) -> Self {
let persistence = Arc::new(LocalDocServerPersistence::default());
let doc_manager = Arc::new(ServerDocumentManager::new(persistence));
LocalDocumentServer { doc_manager, sender }
let persistence = Arc::new(LocalServerDocumentPersistence::default());
let doc_manager = Arc::new(ServerDocumentManager::new(persistence.clone()));
LocalDocumentServer {
doc_manager,
sender,
persistence,
}
}
pub async fn handle_client_data(
@ -49,6 +41,7 @@ impl LocalDocumentServer {
let user = Arc::new(LocalDocumentUser {
user_id,
ws_sender: self.sender.clone(),
persistence: self.persistence.clone(),
});
let ty = client_data.ty.clone();
let document_client_data: DocumentClientWSDataPB = client_data.try_into().unwrap();
@ -67,66 +60,11 @@ impl LocalDocumentServer {
}
}
struct LocalDocServerPersistence {
inner: Arc<DashMap<String, DocumentInfo>>,
}
impl Debug for LocalDocServerPersistence {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("LocalDocServerPersistence") }
}
impl std::default::Default for LocalDocServerPersistence {
fn default() -> Self {
LocalDocServerPersistence {
inner: Arc::new(DashMap::new()),
}
}
}
impl DocumentPersistence for LocalDocServerPersistence {
fn read_doc(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> {
let inner = self.inner.clone();
let doc_id = doc_id.to_owned();
Box::pin(async move {
match inner.get(&doc_id) {
None => Err(CollaborateError::record_not_found()),
Some(val) => {
//
Ok(val.value().clone())
},
}
})
}
fn create_doc(
&self,
doc_id: &str,
repeated_revision: RepeatedRevisionPB,
) -> BoxResultFuture<DocumentInfo, CollaborateError> {
let doc_id = doc_id.to_owned();
Box::pin(async move {
let repeated_revision = repeated_revision_from_repeated_revision_pb(repeated_revision)?;
DocumentInfo::from_revisions(&doc_id, repeated_revision.into_inner())
})
}
fn get_revisions(&self, _doc_id: &str, _rev_ids: Vec<i64>) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
Box::pin(async move { Ok(vec![]) })
}
fn get_doc_revisions(&self, _doc_id: &str) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
unimplemented!()
}
fn reset_document(&self, _doc_id: &str, _revisions: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> {
unimplemented!()
}
}
#[derive(Debug)]
struct LocalDocumentUser {
user_id: String,
ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
persistence: Arc<dyn DocumentPersistence>,
}
impl RevisionUser for LocalDocumentUser {

View File

@ -1,14 +1,13 @@
use crate::services::{
local::local_server::LocalDocumentServer,
ws_conn::{FlowyRawWebSocket, FlowyWSSender},
};
use bytes::Bytes;
use dashmap::DashMap;
use flowy_collaboration::entities::ws::*;
use flowy_error::{internal_error, FlowyError};
use lib_infra::future::FutureResult;
use lib_ws::{WSConnectState, WSMessageReceiver, WSModule, WebSocketRawMessage};
use crate::services::{
local_ws::local_server::LocalDocumentServer,
ws_conn::{FlowyRawWebSocket, FlowyWSSender},
};
use parking_lot::RwLock;
use std::{convert::TryFrom, sync::Arc};
use tokio::sync::{broadcast, broadcast::Receiver, mpsc, mpsc::UnboundedReceiver};

View File

@ -0,0 +1,5 @@
mod local_server;
mod local_ws;
mod persistence;
pub use local_ws::*;

View File

@ -0,0 +1,74 @@
use dashmap::DashMap;
use flowy_collaboration::{
entities::doc::DocumentInfo,
errors::CollaborateError,
protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
sync::*,
util::repeated_revision_from_repeated_revision_pb,
};
use lib_infra::future::BoxResultFuture;
use std::{
fmt::{Debug, Formatter},
sync::Arc,
};
pub(crate) struct LocalServerDocumentPersistence {
// For the moment, we use memory to cache the data, it will be implemented with other storage.
// Like the Firestore,Dropbox.etc.
inner: Arc<DashMap<String, DocumentInfo>>,
}
impl Debug for LocalServerDocumentPersistence {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("LocalDocServerPersistence") }
}
impl std::default::Default for LocalServerDocumentPersistence {
fn default() -> Self {
LocalServerDocumentPersistence {
inner: Arc::new(DashMap::new()),
}
}
}
impl DocumentPersistence for LocalServerDocumentPersistence {
fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> {
let inner = self.inner.clone();
let doc_id = doc_id.to_owned();
Box::pin(async move {
match inner.get(&doc_id) {
None => Err(CollaborateError::record_not_found()),
Some(val) => {
//
Ok(val.value().clone())
},
}
})
}
fn create_document(
&self,
doc_id: &str,
repeated_revision: RepeatedRevisionPB,
) -> BoxResultFuture<DocumentInfo, CollaborateError> {
let doc_id = doc_id.to_owned();
let inner = self.inner.clone();
Box::pin(async move {
let repeated_revision = repeated_revision_from_repeated_revision_pb(repeated_revision)?;
let document_info = DocumentInfo::from_revisions(&doc_id, repeated_revision.into_inner())?;
inner.insert(doc_id, document_info.clone());
Ok(document_info)
})
}
fn read_revisions(
&self,
_doc_id: &str,
_rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
Box::pin(async move { Ok(vec![]) })
}
fn reset_document(&self, _doc_id: &str, _revisions: RepeatedRevisionPB) -> BoxResultFuture<(), CollaborateError> {
unimplemented!()
}
}

View File

@ -1,4 +0,0 @@
mod local_server;
mod local_ws_impl;
pub use local_ws_impl::*;

View File

@ -1,3 +1,3 @@
pub mod http_ws;
pub mod local_ws;
pub mod http;
pub mod local;
pub mod ws_conn;

View File

@ -7,7 +7,7 @@ use flowy_document::context::DocumentContext;
use flowy_net::{
entities::NetworkType,
services::{
local_ws::LocalWebSocket,
local::LocalWebSocket,
ws_conn::{listen_on_websocket, FlowyRawWebSocket, FlowyWebSocketConnect},
},
};

View File

@ -17,17 +17,19 @@ use tokio::{
};
pub trait DocumentPersistence: Send + Sync + Debug {
fn read_doc(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError>;
fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError>;
fn create_doc(
fn create_document(
&self,
doc_id: &str,
repeated_revision: RepeatedRevisionPB,
) -> BoxResultFuture<DocumentInfo, CollaborateError>;
fn get_revisions(&self, doc_id: &str, rev_ids: Vec<i64>) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError>;
fn get_doc_revisions(&self, doc_id: &str) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError>;
fn read_revisions(
&self,
doc_id: &str,
rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError>;
fn reset_document(
&self,
@ -123,7 +125,7 @@ impl ServerDocumentManager {
}
let mut write_guard = self.open_doc_map.write().await;
match self.persistence.read_doc(doc_id).await {
match self.persistence.read_document(doc_id).await {
Ok(doc) => {
let handler = self.create_document_handler(doc).await.map_err(internal_error).unwrap();
write_guard.insert(doc_id.to_owned(), handler.clone());
@ -140,7 +142,7 @@ impl ServerDocumentManager {
doc_id: &str,
repeated_revision: RepeatedRevisionPB,
) -> Result<Arc<OpenDocHandle>, CollaborateError> {
let doc = self.persistence.create_doc(doc_id, repeated_revision).await?;
let doc = self.persistence.create_document(doc_id, repeated_revision).await?;
let handler = self.create_document_handler(doc).await?;
self.open_doc_map
.write()

View File

@ -59,7 +59,7 @@ impl RevisionSynchronizer {
let doc_id = self.doc_id.clone();
if repeated_revision.get_items().is_empty() {
// Return all the revisions to client
let revisions = persistence.get_doc_revisions(&doc_id).await?;
let revisions = persistence.read_revisions(&doc_id, None).await?;
let repeated_revision = repeated_revision_from_revision_pbs(revisions)?;
let data = DocumentServerWSDataBuilder::build_push_message(&doc_id, repeated_revision);
user.receive(SyncResponse::Push(data));
@ -192,7 +192,8 @@ impl RevisionSynchronizer {
pub(crate) fn rev_id(&self) -> i64 { self.rev_id.load(SeqCst) }
async fn is_applied_before(&self, new_revision: &RevisionPB, persistence: &Arc<dyn DocumentPersistence>) -> bool {
if let Ok(revisions) = persistence.get_revisions(&self.doc_id, vec![new_revision.rev_id]).await {
let rev_ids = Some(vec![new_revision.rev_id]);
if let Ok(revisions) = persistence.read_revisions(&self.doc_id, rev_ids).await {
if let Some(revision) = revisions.first() {
if revision.md5 == new_revision.md5 {
return true;
@ -211,7 +212,7 @@ impl RevisionSynchronizer {
to: i64,
) {
let rev_ids: Vec<i64> = (from..=to).collect();
let revisions = match persistence.get_revisions(&self.doc_id, rev_ids).await {
let revisions = match persistence.read_revisions(&self.doc_id, Some(rev_ids)).await {
Ok(revisions) => {
assert_eq!(
revisions.is_empty(),

View File

@ -447,7 +447,7 @@ fn invert_from_other<T: Attributes>(
let other_ops = DeltaIter::from_interval(other, Interval::new(start, end)).ops();
other_ops.into_iter().for_each(|other_op| match operation {
Operation::Delete(n) => {
tracing::trace!("invert delete: {} by add {}", n, other_op);
// tracing::trace!("invert delete: {} by add {}", n, other_op);
base.add(other_op);
},
Operation::Retain(_retain) => {