Refactor/crate directory (#1621)

* chore: fix wanrings

* chore: remove protobuf ref in flowy-error-code

* chore: remove protobuf ref in lib-ws

* refactor: remove protobuf trait in flowy http model

* refactor: remove flowy-error-code crate

Co-authored-by: nathan <nathan@appflowy.io>
This commit is contained in:
Nathan.fooo
2022-12-30 11:16:47 +08:00
committed by GitHub
parent aae8259f63
commit aa5f052ecf
167 changed files with 655 additions and 923 deletions

View File

@ -8,8 +8,8 @@ edition = "2018"
[dependencies]
lib-dispatch = { path = "../lib-dispatch" }
flowy-error = { path = "../flowy-error", features = ["collaboration", "http_server"] }
flowy-derive = { path = "../../../shared-lib/flowy-derive" }
flowy-sync = { path = "../../../shared-lib/flowy-sync"}
flowy-derive = { path = "../flowy-derive" }
flowy-sync = { path = "../flowy-sync"}
flowy-http-model = { path = "../../../shared-lib/flowy-http-model"}
folder-rev-model = { path = "../../../shared-lib/folder-rev-model"}
flowy-folder = { path = "../flowy-folder" }
@ -48,4 +48,4 @@ dart = [
]
[build-dependencies]
flowy-codegen = { path = "../../../shared-lib/flowy-codegen"}
flowy-codegen = { path = "../flowy-codegen"}

View File

@ -4,7 +4,7 @@ use crate::{
};
use flowy_document::DocumentCloudService;
use flowy_error::FlowyError;
use flowy_http_model::document::{CreateDocumentParams, DocumentIdPB, DocumentPayloadPB, ResetDocumentParams};
use flowy_http_model::document::{CreateDocumentParams, DocumentId, DocumentPayload, ResetDocumentParams};
use http_flowy::response::FlowyResponse;
use lazy_static::lazy_static;
use lib_infra::future::FutureResult;
@ -27,7 +27,7 @@ impl DocumentCloudService for DocumentCloudServiceImpl {
FutureResult::new(async move { create_document_request(&token, params, &url).await })
}
fn fetch_document(&self, token: &str, params: DocumentIdPB) -> FutureResult<Option<DocumentPayloadPB>, FlowyError> {
fn fetch_document(&self, token: &str, params: DocumentId) -> FutureResult<Option<DocumentPayload>, FlowyError> {
let token = token.to_owned();
let url = self.config.doc_url();
FutureResult::new(async move { read_document_request(&token, params, &url).await })
@ -44,7 +44,7 @@ pub async fn create_document_request(token: &str, params: CreateDocumentParams,
let _ = request_builder()
.post(url)
.header(HEADER_TOKEN, token)
.protobuf(params)?
.json(params)?
.send()
.await?;
Ok(())
@ -52,14 +52,14 @@ pub async fn create_document_request(token: &str, params: CreateDocumentParams,
pub async fn read_document_request(
token: &str,
params: DocumentIdPB,
params: DocumentId,
url: &str,
) -> Result<Option<DocumentPayloadPB>, FlowyError> {
) -> Result<Option<DocumentPayload>, FlowyError> {
let doc = request_builder()
.get(url)
.header(HEADER_TOKEN, token)
.protobuf(params)?
.option_response()
.json(params)?
.option_json_response()
.await?;
Ok(doc)
@ -69,7 +69,7 @@ pub async fn reset_doc_request(token: &str, params: ResetDocumentParams, url: &s
let _ = request_builder()
.patch(url)
.header(HEADER_TOKEN, token)
.protobuf(params)?
.json(params)?
.send()
.await?;
Ok(())

View File

@ -1,6 +1,6 @@
use flowy_http_model::document::DocumentPayloadPB;
use flowy_http_model::document::DocumentPayload;
use flowy_http_model::folder::FolderInfo;
use flowy_http_model::revision::{RepeatedRevision, Revision};
use flowy_http_model::revision::Revision;
use flowy_sync::{
errors::CollaborateError,
server_document::*,
@ -16,18 +16,14 @@ use std::{
// For the moment, we use memory to cache the data, it will be implemented with
// other storage. Like the Firestore,Dropbox.etc.
pub trait RevisionCloudStorage: Send + Sync {
fn set_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError>;
fn set_revisions(&self, revisions: Vec<Revision>) -> BoxResultFuture<(), CollaborateError>;
fn get_revisions(
&self,
object_id: &str,
rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<RepeatedRevision, CollaborateError>;
) -> BoxResultFuture<Vec<Revision>, CollaborateError>;
fn reset_object(
&self,
object_id: &str,
repeated_revision: RepeatedRevision,
) -> BoxResultFuture<(), CollaborateError>;
fn reset_object(&self, object_id: &str, revisions: Vec<Revision>) -> BoxResultFuture<(), CollaborateError>;
}
pub(crate) struct LocalDocumentCloudPersistence {
@ -53,8 +49,8 @@ impl FolderCloudPersistence for LocalDocumentCloudPersistence {
let storage = self.storage.clone();
let folder_id = folder_id.to_owned();
Box::pin(async move {
let repeated_revision = storage.get_revisions(&folder_id, None).await?;
match make_folder_from_revisions_pb(&folder_id, repeated_revision)? {
let revisions = storage.get_revisions(&folder_id, None).await?;
match make_folder_from_revisions_pb(&folder_id, revisions)? {
Some(folder_info) => Ok(folder_info),
None => Err(CollaborateError::record_not_found()),
}
@ -65,20 +61,20 @@ impl FolderCloudPersistence for LocalDocumentCloudPersistence {
&self,
_user_id: &str,
folder_id: &str,
repeated_revision: RepeatedRevision,
revisions: Vec<Revision>,
) -> BoxResultFuture<Option<FolderInfo>, CollaborateError> {
let folder_id = folder_id.to_owned();
let storage = self.storage.clone();
Box::pin(async move {
let _ = storage.set_revisions(repeated_revision.clone()).await?;
make_folder_from_revisions_pb(&folder_id, repeated_revision)
let _ = storage.set_revisions(revisions.clone()).await?;
make_folder_from_revisions_pb(&folder_id, revisions)
})
}
fn save_folder_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> {
fn save_folder_revisions(&self, revisions: Vec<Revision>) -> BoxResultFuture<(), CollaborateError> {
let storage = self.storage.clone();
Box::pin(async move {
let _ = storage.set_revisions(repeated_revision).await?;
let _ = storage.set_revisions(revisions).await?;
Ok(())
})
}
@ -90,28 +86,21 @@ impl FolderCloudPersistence for LocalDocumentCloudPersistence {
) -> BoxResultFuture<Vec<Revision>, CollaborateError> {
let folder_id = folder_id.to_owned();
let storage = self.storage.clone();
Box::pin(async move {
let repeated_revision = storage.get_revisions(&folder_id, rev_ids).await?;
Ok(repeated_revision.into_inner())
})
Box::pin(async move { storage.get_revisions(&folder_id, rev_ids).await })
}
fn reset_folder(
&self,
folder_id: &str,
repeated_revision: RepeatedRevision,
) -> BoxResultFuture<(), CollaborateError> {
fn reset_folder(&self, folder_id: &str, revisions: Vec<Revision>) -> BoxResultFuture<(), CollaborateError> {
let storage = self.storage.clone();
let folder_id = folder_id.to_owned();
Box::pin(async move {
let _ = storage.reset_object(&folder_id, repeated_revision).await?;
let _ = storage.reset_object(&folder_id, revisions).await?;
Ok(())
})
}
}
impl DocumentCloudPersistence for LocalDocumentCloudPersistence {
fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentPayloadPB, CollaborateError> {
fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentPayload, CollaborateError> {
let storage = self.storage.clone();
let doc_id = doc_id.to_owned();
Box::pin(async move {
@ -126,13 +115,13 @@ impl DocumentCloudPersistence for LocalDocumentCloudPersistence {
fn create_document(
&self,
doc_id: &str,
repeated_revision: RepeatedRevision,
) -> BoxResultFuture<Option<DocumentPayloadPB>, CollaborateError> {
revisions: Vec<Revision>,
) -> BoxResultFuture<Option<DocumentPayload>, CollaborateError> {
let doc_id = doc_id.to_owned();
let storage = self.storage.clone();
Box::pin(async move {
let _ = storage.set_revisions(repeated_revision.clone()).await?;
make_document_from_revision_pbs(&doc_id, repeated_revision)
let _ = storage.set_revisions(revisions.clone()).await?;
make_document_from_revision_pbs(&doc_id, revisions)
})
}
@ -143,21 +132,18 @@ impl DocumentCloudPersistence for LocalDocumentCloudPersistence {
) -> BoxResultFuture<Vec<Revision>, CollaborateError> {
let doc_id = doc_id.to_owned();
let storage = self.storage.clone();
Box::pin(async move {
let repeated_revision = storage.get_revisions(&doc_id, rev_ids).await?;
Ok(repeated_revision.into_inner())
})
Box::pin(async move { storage.get_revisions(&doc_id, rev_ids).await })
}
fn save_document_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> {
fn save_document_revisions(&self, revisions: Vec<Revision>) -> BoxResultFuture<(), CollaborateError> {
let storage = self.storage.clone();
Box::pin(async move {
let _ = storage.set_revisions(repeated_revision).await?;
let _ = storage.set_revisions(revisions).await?;
Ok(())
})
}
fn reset_document(&self, doc_id: &str, revisions: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> {
fn reset_document(&self, doc_id: &str, revisions: Vec<Revision>) -> BoxResultFuture<(), CollaborateError> {
let storage = self.storage.clone();
let doc_id = doc_id.to_owned();
Box::pin(async move {
@ -170,26 +156,19 @@ impl DocumentCloudPersistence for LocalDocumentCloudPersistence {
#[derive(Default)]
struct MemoryDocumentCloudStorage {}
impl RevisionCloudStorage for MemoryDocumentCloudStorage {
fn set_revisions(&self, _repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> {
fn set_revisions(&self, _revisions: Vec<Revision>) -> BoxResultFuture<(), CollaborateError> {
Box::pin(async move { Ok(()) })
}
fn get_revisions(
&self,
_doc_id: &str,
_object_id: &str,
_rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<RepeatedRevision, CollaborateError> {
Box::pin(async move {
let repeated_revisions = RepeatedRevision::default();
Ok(repeated_revisions)
})
) -> BoxResultFuture<Vec<Revision>, CollaborateError> {
Box::pin(async move { Ok(vec![]) })
}
fn reset_object(
&self,
_doc_id: &str,
_repeated_revision: RepeatedRevision,
) -> BoxResultFuture<(), CollaborateError> {
fn reset_object(&self, _object_id: &str, _revisions: Vec<Revision>) -> BoxResultFuture<(), CollaborateError> {
Box::pin(async move { Ok(()) })
}
}

View File

@ -131,7 +131,7 @@ impl LocalWebSocketRunner {
tracing::trace!(
"[LocalFolderServer] receive: {}:{}-{:?} ",
client_data.object_id,
client_data.id(),
client_data.rev_id,
client_data.ty,
);
let client_ws_sender = self.client_ws_sender.clone();
@ -141,19 +141,12 @@ impl LocalWebSocketRunner {
channel: WSChannel::Folder,
});
let ty = client_data.ty.clone();
let document_client_data: ClientRevisionWSDataPB = client_data.try_into().unwrap();
match ty {
ClientRevisionWSDataType::ClientPushRev => {
let _ = self
.folder_manager
.handle_client_revisions(user, document_client_data)
.await?;
let _ = self.folder_manager.handle_client_revisions(user, client_data).await?;
}
ClientRevisionWSDataType::ClientPing => {
let _ = self
.folder_manager
.handle_client_ping(user, document_client_data)
.await?;
let _ = self.folder_manager.handle_client_ping(user, client_data).await?;
}
}
Ok(())
@ -167,7 +160,7 @@ impl LocalWebSocketRunner {
tracing::trace!(
"[LocalDocumentServer] receive: {}:{}-{:?} ",
client_data.object_id,
client_data.id(),
client_data.rev_id,
client_data.ty,
);
let client_ws_sender = self.client_ws_sender.clone();
@ -177,16 +170,12 @@ impl LocalWebSocketRunner {
channel: WSChannel::Document,
});
let ty = client_data.ty.clone();
let document_client_data: ClientRevisionWSDataPB = client_data.try_into().unwrap();
match ty {
ClientRevisionWSDataType::ClientPushRev => {
let _ = self
.doc_manager
.handle_client_revisions(user, document_client_data)
.await?;
let _ = self.doc_manager.handle_client_revisions(user, client_data).await?;
}
ClientRevisionWSDataType::ClientPing => {
let _ = self.doc_manager.handle_client_ping(user, document_client_data).await?;
let _ = self.doc_manager.handle_client_ping(user, client_data).await?;
}
}
Ok(())
@ -253,8 +242,7 @@ use flowy_folder::entities::{
view::{CreateViewParams, RepeatedViewIdPB, UpdateViewParams, ViewIdPB},
workspace::{CreateWorkspaceParams, UpdateWorkspaceParams, WorkspaceIdPB},
};
use flowy_http_model::document::{CreateDocumentParams, DocumentIdPB, DocumentPayloadPB, ResetDocumentParams};
use flowy_http_model::protobuf::ClientRevisionWSData as ClientRevisionWSDataPB;
use flowy_http_model::document::{CreateDocumentParams, DocumentId, DocumentPayload, ResetDocumentParams};
use flowy_http_model::ws_data::{ClientRevisionWSData, ClientRevisionWSDataType};
use flowy_user::entities::{
SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserProfileParams, UserProfilePB,
@ -414,11 +402,7 @@ impl DocumentCloudService for LocalServer {
FutureResult::new(async { Ok(()) })
}
fn fetch_document(
&self,
_token: &str,
_params: DocumentIdPB,
) -> FutureResult<Option<DocumentPayloadPB>, FlowyError> {
fn fetch_document(&self, _token: &str, _params: DocumentId) -> FutureResult<Option<DocumentPayload>, FlowyError> {
FutureResult::new(async { Ok(None) })
}

View File

@ -88,6 +88,14 @@ impl HttpRequestBuilder {
self.bytes(body)
}
pub fn json<T>(self, body: T) -> Result<Self, ServerError>
where
T: serde::Serialize,
{
let bytes = Bytes::from(serde_json::to_vec(&body)?);
self.bytes(bytes)
}
pub fn bytes(mut self, body: Bytes) -> Result<Self, ServerError> {
self.body = Some(body);
Ok(self)
@ -109,7 +117,8 @@ impl HttpRequestBuilder {
}
}
pub async fn option_response<T>(self) -> Result<Option<T>, ServerError>
#[allow(dead_code)]
pub async fn option_protobuf_response<T>(self) -> Result<Option<T>, ServerError>
where
T: TryFrom<Bytes, Error = ProtobufError>,
{
@ -126,6 +135,23 @@ impl HttpRequestBuilder {
}
}
pub async fn option_json_response<T>(self) -> Result<Option<T>, ServerError>
where
T: serde::de::DeserializeOwned + 'static,
{
let result = self.inner_send().await;
match result {
Ok(builder) => match builder.response {
None => Err(unexpected_empty_payload(&builder.url)),
Some(data) => Ok(Some(serde_json::from_slice(&data)?)),
},
Err(error) => match error.is_record_not_found() {
true => Ok(None),
false => Err(error),
},
}
}
fn token(&self) -> Option<String> {
match self.headers.get(HEADER_TOKEN) {
None => None,