refactor: refactor some crates with http_server

This commit is contained in:
appflowy 2022-02-07 14:40:45 +08:00
parent 680d130986
commit 084e9c5f6f
33 changed files with 3172 additions and 1477 deletions

View File

@ -4,7 +4,6 @@
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Cargo.lock
# These are backup files generated by rustfmt
**/*.rs.bk

3119
frontend/rust-lib/Cargo.lock generated Executable file

File diff suppressed because it is too large Load Diff

View File

@ -17,7 +17,7 @@ derive_more = {version = "0.99", features = ["display"]}
lib-dispatch = { path = "../lib-dispatch" }
flowy-database = { path = "../flowy-database" }
flowy-sync = { path = "../flowy-sync" }
flowy-error = { path = "../flowy-error", features = ["collaboration", "ot", "backend", "serde", "db"] }
flowy-error = { path = "../flowy-error", features = ["collaboration", "ot", "http_server", "serde", "db"] }
dart-notify = { path = "../dart-notify" }
diesel = {version = "1.4.8", features = ["sqlite"]}

View File

@ -16,7 +16,7 @@ bytes = "1.0"
flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration", optional = true}
lib-ot = { path = "../../../shared-lib/lib-ot", optional = true}
serde_json = {version = "1.0", optional = true}
backend-service = { path = "../../../shared-lib/backend-service", optional = true}
http-flowy = { git = "https://github.com/AppFlowy-IO/AppFlowy-Server", optional = true}
flowy-database = { path = "../flowy-database", optional = true}
r2d2 = { version = "0.8", optional = true}
lib-sqlite = { path = "../lib-sqlite", optional = true }
@ -25,5 +25,5 @@ lib-sqlite = { path = "../lib-sqlite", optional = true }
collaboration = ["flowy-collaboration"]
ot = ["lib-ot"]
serde = ["serde_json"]
backend = ["backend-service"]
http_server = ["http-flowy"]
db = ["flowy-database", "lib-sqlite", "r2d2"]

View File

@ -1,6 +1,6 @@
use crate::FlowyError;
use backend_service::errors::{ErrorCode as ServerErrorCode, ServerError};
use error_code::ErrorCode;
use http_flowy::errors::{ErrorCode as ServerErrorCode, ServerError};
impl std::convert::From<ServerError> for FlowyError {
fn from(error: ServerError) -> Self {

View File

@ -16,10 +16,10 @@ mod serde;
pub use serde::*;
//
#[cfg(feature = "backend")]
mod backend;
#[cfg(feature = "backend")]
pub use backend::*;
#[cfg(feature = "http_server")]
mod http_server;
#[cfg(feature = "http_server")]
pub use http_server::*;
#[cfg(feature = "db")]
mod database;

View File

@ -14,7 +14,7 @@ lib-infra = { path = "../../../shared-lib/lib-infra" }
flowy-document = { path = "../flowy-document" }
flowy-database = { path = "../flowy-database" }
flowy-error = { path = "../flowy-error", features = ["db", "backend"]}
flowy-error = { path = "../flowy-error", features = ["db", "http_server"]}
dart-notify = { path = "../dart-notify" }
lib-dispatch = { path = "../lib-dispatch" }
lib-sqlite = { path = "../lib-sqlite" }

View File

@ -7,10 +7,9 @@ edition = "2018"
[dependencies]
lib-dispatch = { path = "../lib-dispatch" }
flowy-error = { path = "../flowy-error", features = ["collaboration"] }
flowy-error = { path = "../flowy-error", features = ["collaboration", "http_server"] }
flowy-derive = { path = "../../../shared-lib/flowy-derive" }
flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration"}
backend-service = { path = "../../../shared-lib/backend-service" }
flowy-folder-data-model = { path = "../../../shared-lib/flowy-folder-data-model" }
flowy-user-data-model = { path = "../../../shared-lib/flowy-user-data-model"}
flowy-folder = { path = "../flowy-folder" }
@ -30,5 +29,14 @@ tracing = { version = "0.1", features = ["log"] }
dashmap = {version = "4.0"}
async-stream = "0.3.2"
futures-util = "0.3.15"
http-flowy = { git = "https://github.com/AppFlowy-IO/AppFlowy-Server", features = ["with_reqwest"] }
serde-aux = "1.0.1"
reqwest = "0.11"
hyper = "0.14"
config = { version = "0.10.1", default-features = false, features = ["yaml"] }
log = "0.4.14"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
[features]
http_server = []

View File

@ -1,7 +1,6 @@
use config::FileFormat;
use serde_aux::field_attributes::deserialize_number_from_string;
use std::convert::{TryFrom, TryInto};
pub const HOST: &str = "localhost:8000";
pub const HEADER_TOKEN: &str = "token";
#[derive(serde::Deserialize, Clone, Debug)]
@ -89,6 +88,7 @@ pub enum Environment {
}
impl Environment {
#[allow(dead_code)]
pub fn as_str(&self) -> &'static str {
match self {
Environment::Local => "local",

View File

@ -1,11 +1,11 @@
use backend_service::{
use crate::{
configuration::*,
request::{HttpRequestBuilder, ResponseMiddleware},
response::FlowyResponse,
};
use flowy_collaboration::entities::document_info::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams};
use flowy_document::DocumentCloudService;
use flowy_error::FlowyError;
use http_flowy::response::FlowyResponse;
use lazy_static::lazy_static;
use lib_infra::future::FutureResult;
use std::sync::Arc;

View File

@ -1,8 +1,6 @@
use backend_service::{
use crate::{
configuration::{ClientServerConfiguration, HEADER_TOKEN},
errors::ServerError,
request::{HttpRequestBuilder, ResponseMiddleware},
response::FlowyResponse,
};
use flowy_error::FlowyError;
use flowy_folder_data_model::entities::{
@ -13,6 +11,8 @@ use flowy_folder_data_model::entities::{
};
use flowy_folder::event_map::FolderCouldServiceV1;
use http_flowy::errors::ServerError;
use http_flowy::response::FlowyResponse;
use lazy_static::lazy_static;
use lib_infra::future::FutureResult;
use std::sync::Arc;
@ -338,17 +338,17 @@ pub async fn read_trash_request(token: &str, url: &str) -> Result<RepeatedTrash,
}
lazy_static! {
static ref MIDDLEWARE: Arc<CoreResponseMiddleware> = Arc::new(CoreResponseMiddleware::new());
static ref MIDDLEWARE: Arc<FolderResponseMiddleware> = Arc::new(FolderResponseMiddleware::new());
}
pub struct CoreResponseMiddleware {
pub struct FolderResponseMiddleware {
invalid_token_sender: broadcast::Sender<String>,
}
impl CoreResponseMiddleware {
impl FolderResponseMiddleware {
fn new() -> Self {
let (sender, _) = broadcast::channel(10);
CoreResponseMiddleware {
FolderResponseMiddleware {
invalid_token_sender: sender,
}
}
@ -359,7 +359,7 @@ impl CoreResponseMiddleware {
}
}
impl ResponseMiddleware for CoreResponseMiddleware {
impl ResponseMiddleware for FolderResponseMiddleware {
fn receive_response(&self, token: &Option<String>, response: &FlowyResponse) {
if let Some(error) = &response.error {
if error.is_unauthorized() {

View File

@ -1,3 +1,3 @@
pub mod core;
pub mod document;
pub mod folder;
pub mod user;

View File

@ -1,9 +1,10 @@
use backend_service::{configuration::*, errors::ServerError, request::HttpRequestBuilder};
use crate::{configuration::*, request::HttpRequestBuilder};
use flowy_error::FlowyError;
use flowy_user::event_map::UserCloudService;
use flowy_user_data_model::entities::{
SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserParams, UserProfile,
};
use http_flowy::errors::ServerError;
use lib_infra::future::FutureResult;
pub struct UserHttpCloudService {

View File

@ -1,3 +1,4 @@
mod configuration;
pub mod entities;
mod event;
mod handlers;
@ -5,6 +6,7 @@ pub mod http_server;
pub mod local_server;
pub mod module;
pub mod protobuf;
mod request;
pub mod ws;
pub use backend_service::configuration::{get_client_server_configuration, ClientServerConfiguration};
pub use crate::configuration::{get_client_server_configuration, ClientServerConfiguration};

View File

@ -1,4 +1,4 @@
use backend_service::configuration::ClientServerConfiguration;
use crate::configuration::ClientServerConfiguration;
use tokio::sync::{broadcast, mpsc};
mod persistence;

View File

@ -1,5 +1,7 @@
use crate::{configuration::HEADER_TOKEN, errors::ServerError, response::FlowyResponse};
use crate::configuration::HEADER_TOKEN;
use bytes::Bytes;
use http_flowy::errors::ServerError;
use http_flowy::response::FlowyResponse;
use hyper::http;
use protobuf::ProtobufError;
use reqwest::{header::HeaderMap, Client, Method, Response};
@ -141,7 +143,7 @@ impl HttpRequestBuilder {
let method = self.method.clone();
let headers = self.headers.clone();
// reqwest client is not 'Sync' by channel is.
// reqwest client is not 'Sync' but channel is.
tokio::spawn(async move {
let client = default_client();
let mut builder = client.request(method.clone(), url).headers(headers);
@ -153,7 +155,10 @@ impl HttpRequestBuilder {
let _ = tx.send(response);
});
let response = rx.await??;
let response = rx.await.map_err(|e| {
let mag = format!("Receive http response channel error: {}", e);
ServerError::internal().context(mag)
})??;
tracing::trace!("Http Response: {:?}", response);
let flowy_response = flowy_response_from(response).await?;
let token = self.token();

View File

@ -9,7 +9,7 @@ use flowy_folder::{
};
use flowy_net::ClientServerConfiguration;
use flowy_net::{
http_server::core::FolderHttpCloudService, local_server::LocalServer, ws::connection::FlowyWebSocketConnect,
http_server::folder::FolderHttpCloudService, local_server::LocalServer, ws::connection::FlowyWebSocketConnect,
};
use flowy_sync::{RevisionWebSocket, WSStateReceiver};
use flowy_user::services::UserSession;

View File

@ -11,7 +11,7 @@ lib-ot = { path = "../../../shared-lib/lib-ot" }
lib-ws = { path = "../../../shared-lib/lib-ws" }
lib-infra = { path = "../../../shared-lib/lib-infra" }
flowy-database = { path = "../flowy-database" }
flowy-error = { path = "../flowy-error", features = ["collaboration", "ot", "backend", "serde", "db"] }
flowy-error = { path = "../flowy-error", features = ["collaboration", "ot", "http_server", "serde", "db"] }
diesel = {version = "1.4.8", features = ["sqlite"]}
diesel_derives = {version = "1.4.1", features = ["sqlite"]}
protobuf = {version = "2.18.0"}

View File

@ -14,7 +14,7 @@ derive_more = {version = "0.99", features = ["display"]}
flowy-database = { path = "../flowy-database" }
dart-notify = { path = "../dart-notify" }
lib-dispatch = { path = "../lib-dispatch" }
flowy-error = { path = "../flowy-error", features = ["db", "backend"] }
flowy-error = { path = "../flowy-error", features = ["db", "http_server"] }
lib-sqlite = { path = "../lib-sqlite" }
tracing = { version = "0.1", features = ["log"] }

View File

@ -34,6 +34,7 @@ private = true
script = [
"""
cd rust-lib/
rustup show
echo cargo build --package=dart-ffi --target ${RUST_COMPILE_TARGET} --features "${FEATURES}"
cargo build --package=dart-ffi --target ${RUST_COMPILE_TARGET} --features "${FEATURES}"
cd ../

1094
shared-lib/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -6,7 +6,6 @@ members = [
"lib-ot",
"lib-ws",
"lib-infra",
"backend-service",
"flowy-derive",
"flowy-ast",
"error-code",

View File

@ -1,33 +0,0 @@
[package]
name = "backend-service"
version = "0.1.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
flowy-folder-data-model = { path = "../flowy-folder-data-model" }
flowy-user-data-model = { path = "../flowy-user-data-model" }
flowy-collaboration = { path = "../flowy-collaboration" }
log = "0.4.14"
lazy_static = "1.4.0"
tokio = { version = "1", features = ["rt"] }
anyhow = "1.0"
thiserror = "1.0.24"
bytes = { version = "1.0", features = ["serde"]}
reqwest = "0.11"
hyper = "0.14"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_repr = "0.1"
uuid = { version = "0.8", features = ["v4"] }
protobuf = {version = "2.18.0"}
derive_more = {version = "0.99", features = ["display"]}
tracing = { version = "0.1", features = ["log"] }
actix-web = {version = "4.0.0-beta.8", optional = true}
config = { version = "0.10.1", default-features = false, features = ["yaml"] }
serde-aux = "1.0.1"
[features]
http_server = ["actix-web"]

View File

@ -1,138 +0,0 @@
use crate::response::FlowyResponse;
use bytes::Bytes;
use serde::{Deserialize, Serialize, __private::Formatter};
use serde_repr::*;
use std::{fmt, fmt::Debug};
pub type Result<T> = std::result::Result<T, ServerError>;
use flowy_collaboration::errors::CollaborateError;
#[derive(thiserror::Error, Debug, Serialize, Deserialize, Clone)]
pub struct ServerError {
pub code: ErrorCode,
pub msg: String,
}
macro_rules! static_error {
($name:ident, $status:expr) => {
#[allow(non_snake_case, missing_docs)]
pub fn $name() -> ServerError {
ServerError {
code: $status,
msg: format!("{}", $status),
}
}
};
}
impl ServerError {
static_error!(internal, ErrorCode::InternalError);
static_error!(http, ErrorCode::HttpError);
static_error!(payload_none, ErrorCode::PayloadUnexpectedNone);
static_error!(unauthorized, ErrorCode::UserUnauthorized);
static_error!(password_not_match, ErrorCode::PasswordNotMatch);
static_error!(params_invalid, ErrorCode::ParamsInvalid);
static_error!(connect_timeout, ErrorCode::ConnectTimeout);
static_error!(connect_close, ErrorCode::ConnectClose);
static_error!(connect_cancel, ErrorCode::ConnectCancel);
static_error!(connect_refused, ErrorCode::ConnectRefused);
static_error!(record_not_found, ErrorCode::RecordNotFound);
pub fn new(msg: String, code: ErrorCode) -> Self {
Self { code, msg }
}
pub fn context<T: Debug>(mut self, error: T) -> Self {
self.msg = format!("{:?}", error);
self
}
pub fn is_record_not_found(&self) -> bool {
self.code == ErrorCode::RecordNotFound
}
pub fn is_unauthorized(&self) -> bool {
self.code == ErrorCode::UserUnauthorized
}
pub fn to_collaborate_error(&self) -> CollaborateError {
if self.is_record_not_found() {
CollaborateError::record_not_found()
} else {
CollaborateError::internal().context(self.msg.clone())
}
}
}
pub fn internal_error<T>(e: T) -> ServerError
where
T: std::fmt::Debug,
{
ServerError::internal().context(e)
}
pub fn invalid_params<T: Debug>(e: T) -> ServerError {
ServerError::params_invalid().context(e)
}
impl std::fmt::Display for ServerError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let msg = format!("{:?}:{}", self.code, self.msg);
f.write_str(&msg)
}
}
impl std::convert::From<&ServerError> for FlowyResponse {
fn from(error: &ServerError) -> Self {
FlowyResponse {
data: Bytes::from(vec![]),
error: Some(error.clone()),
}
}
}
#[derive(Serialize_repr, Deserialize_repr, PartialEq, Debug, Clone, derive_more::Display)]
#[repr(u16)]
pub enum ErrorCode {
#[display(fmt = "Unauthorized")]
UserUnauthorized = 1,
#[display(fmt = "Payload too large")]
PayloadOverflow = 2,
#[display(fmt = "Payload deserialize failed")]
PayloadSerdeFail = 3,
#[display(fmt = "Unexpected empty payload")]
PayloadUnexpectedNone = 4,
#[display(fmt = "Params is invalid")]
ParamsInvalid = 5,
#[display(fmt = "Protobuf serde error")]
ProtobufError = 10,
#[display(fmt = "Json serde Error")]
SerdeError = 11,
#[display(fmt = "Email address already exists")]
EmailAlreadyExists = 50,
#[display(fmt = "Username and password do not match")]
PasswordNotMatch = 51,
#[display(fmt = "Connect refused")]
ConnectRefused = 100,
#[display(fmt = "Connection timeout")]
ConnectTimeout = 101,
#[display(fmt = "Connection closed")]
ConnectClose = 102,
#[display(fmt = "Connection canceled")]
ConnectCancel = 103,
#[display(fmt = "Sql error")]
SqlError = 200,
#[display(fmt = "Record not found")]
RecordNotFound = 201,
#[display(fmt = "Http request error")]
HttpError = 300,
#[display(fmt = "Internal error")]
InternalError = 1000,
}

View File

@ -1,4 +0,0 @@
pub mod configuration;
pub mod errors;
pub mod request;
pub mod response;

View File

@ -1,41 +0,0 @@
use crate::{request::ResponseMiddleware, response::FlowyResponse};
use lazy_static::lazy_static;
use std::sync::Arc;
use tokio::sync::broadcast;
lazy_static! {
pub static ref BACKEND_API_MIDDLEWARE: Arc<WorkspaceMiddleware> = Arc::new(WorkspaceMiddleware::new());
}
pub struct WorkspaceMiddleware {
invalid_token_sender: broadcast::Sender<String>,
}
impl WorkspaceMiddleware {
fn new() -> Self {
let (sender, _) = broadcast::channel(10);
WorkspaceMiddleware {
invalid_token_sender: sender,
}
}
pub fn invalid_token_subscribe(&self) -> broadcast::Receiver<String> {
self.invalid_token_sender.subscribe()
}
}
impl ResponseMiddleware for WorkspaceMiddleware {
fn receive_response(&self, token: &Option<String>, response: &FlowyResponse) {
if let Some(error) = &response.error {
if error.is_unauthorized() {
log::error!("user is unauthorized");
match token {
None => {}
Some(token) => match self.invalid_token_sender.send(token.clone()) {
Ok(_) => {}
Err(e) => log::error!("{:?}", e),
},
}
}
}
}
}

View File

@ -1,4 +0,0 @@
#![allow(clippy::module_inception)]
mod request;
pub use request::*;

View File

@ -1,7 +0,0 @@
#![allow(clippy::module_inception)]
mod response;
#[cfg(feature = "http_server")]
mod response_http;
pub use response::*;

View File

@ -1,100 +0,0 @@
use crate::errors::{ErrorCode, ServerError};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::{convert::TryInto, error::Error, fmt::Debug};
use tokio::sync::oneshot::error::RecvError;
#[derive(Debug, Serialize, Deserialize)]
pub struct FlowyResponse {
pub data: Bytes,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<ServerError>,
}
impl FlowyResponse {
pub fn new(data: Bytes, error: Option<ServerError>) -> Self {
FlowyResponse { data, error }
}
pub fn success() -> Self {
Self::new(Bytes::new(), None)
}
pub fn data<T: TryInto<Bytes, Error = protobuf::ProtobufError>>(mut self, data: T) -> Result<Self, ServerError> {
let bytes: Bytes = data.try_into()?;
self.data = bytes;
Ok(self)
}
pub fn pb<T: ::protobuf::Message>(mut self, data: T) -> Result<Self, ServerError> {
let bytes: Bytes = Bytes::from(data.write_to_bytes()?);
self.data = bytes;
Ok(self)
}
}
impl std::convert::From<protobuf::ProtobufError> for ServerError {
fn from(e: protobuf::ProtobufError) -> Self {
ServerError::internal().context(e)
}
}
impl std::convert::From<RecvError> for ServerError {
fn from(error: RecvError) -> Self {
ServerError::internal().context(error)
}
}
impl std::convert::From<serde_json::Error> for ServerError {
fn from(e: serde_json::Error) -> Self {
ServerError::internal().context(e)
}
}
impl std::convert::From<anyhow::Error> for ServerError {
fn from(error: anyhow::Error) -> Self {
ServerError::internal().context(error)
}
}
impl std::convert::From<reqwest::Error> for ServerError {
fn from(error: reqwest::Error) -> Self {
if error.is_timeout() {
return ServerError::connect_timeout().context(error);
}
if error.is_request() {
let hyper_error: Option<&hyper::Error> = error.source().unwrap().downcast_ref();
return match hyper_error {
None => ServerError::connect_refused().context(error),
Some(hyper_error) => {
let mut code = ErrorCode::InternalError;
let msg = format!("{}", error);
if hyper_error.is_closed() {
code = ErrorCode::ConnectClose;
}
if hyper_error.is_connect() {
code = ErrorCode::ConnectRefused;
}
if hyper_error.is_canceled() {
code = ErrorCode::ConnectCancel;
}
if hyper_error.is_timeout() {}
ServerError { code, msg }
}
};
}
ServerError::internal().context(error)
}
}
impl std::convert::From<uuid::Error> for ServerError {
fn from(e: uuid::Error) -> Self {
ServerError::internal().context(e)
}
}

View File

@ -1,26 +0,0 @@
use crate::response::*;
use actix_web::{error::ResponseError, HttpResponse};
use crate::errors::ServerError;
use actix_web::body::AnyBody;
impl ResponseError for ServerError {
fn error_response(&self) -> HttpResponse {
let response: FlowyResponse = self.into();
response.into()
}
}
impl std::convert::Into<HttpResponse> for FlowyResponse {
fn into(self) -> HttpResponse {
HttpResponse::Ok().json(self)
}
}
impl std::convert::Into<AnyBody> for FlowyResponse {
fn into(self) -> AnyBody {
match serde_json::to_string(&self) {
Ok(body) => AnyBody::from(body),
Err(_) => AnyBody::Empty,
}
}
}