From 855d396122f91128b572fdc25af57ba3ce46b30c Mon Sep 17 00:00:00 2001 From: appflowy Date: Mon, 10 Jan 2022 23:45:59 +0800 Subject: [PATCH] refactor cloud service triat --- backend/Cargo.lock | 11 +- backend/src/services/document/ws_receiver.rs | 4 +- backend/tests/util/helper.rs | 11 +- frontend/rust-lib/flowy-core/Cargo.toml | 2 - frontend/rust-lib/flowy-core/src/context.rs | 35 +- .../rust-lib/flowy-core/src/event_handler.rs | 2 +- frontend/rust-lib/flowy-core/src/module.rs | 68 ++- .../flowy-core/src/services/app/controller.rs | 15 +- .../rust-lib/flowy-core/src/services/mod.rs | 1 - .../flowy-core/src/services/server/mod.rs | 69 --- .../src/services/server/server_api.rs | 170 ------- .../src/services/server/server_api_mock.rs | 116 ----- .../src/services/trash/controller.rs | 22 +- .../src/services/view/controller.rs | 15 +- .../src/services/workspace/controller.rs | 15 +- frontend/rust-lib/flowy-core/src/util.rs | 14 +- frontend/rust-lib/flowy-document/Cargo.toml | 4 - .../rust-lib/flowy-document/src/context.rs | 16 +- .../rust-lib/flowy-document/src/controller.rs | 14 +- frontend/rust-lib/flowy-document/src/lib.rs | 13 +- .../flowy-document/src/server/middleware.rs | 29 -- .../rust-lib/flowy-document/src/server/mod.rs | 31 -- .../flowy-document/src/server/server_api.rs | 67 --- .../src/server/server_api_mock.rs | 29 -- .../flowy-document/tests/editor/mod.rs | 3 +- frontend/rust-lib/flowy-net/Cargo.toml | 4 + frontend/rust-lib/flowy-net/src/cloud/core.rs | 476 ++++++++++++++++++ .../rust-lib/flowy-net/src/cloud/document.rs | 134 +++++ frontend/rust-lib/flowy-net/src/cloud/mod.rs | 3 + frontend/rust-lib/flowy-net/src/cloud/user.rs | 155 ++++++ frontend/rust-lib/flowy-net/src/lib.rs | 1 + .../src/services/local/local_server.rs | 4 +- .../src/services/local/persistence.rs | 2 +- .../flowy-sdk/src/deps_resolve/core_deps.rs | 211 ++++++++ .../src/deps_resolve/document_deps.rs | 124 +++-- .../flowy-sdk/src/deps_resolve/mod.rs | 6 +- .../flowy-sdk/src/deps_resolve/user_deps.rs | 58 +++ .../src/deps_resolve/workspace_deps.rs | 49 -- frontend/rust-lib/flowy-sdk/src/lib.rs | 35 +- frontend/rust-lib/flowy-user/Cargo.toml | 3 - frontend/rust-lib/flowy-user/src/lib.rs | 4 - frontend/rust-lib/flowy-user/src/module.rs | 18 +- .../rust-lib/flowy-user/src/services/mod.rs | 1 - .../flowy-user/src/services/server/mod.rs | 31 -- .../src/services/server/server_api.rs | 88 ---- .../src/services/server/server_api_mock.rs | 49 -- .../src/services/user/user_session.rs | 40 +- .../flowy-user/tests/event/auth_test.rs | 3 +- .../tests/event/user_profile_test.rs | 3 +- .../backend-service/src/http_request.rs | 219 -------- shared-lib/backend-service/src/lib.rs | 2 - shared-lib/backend-service/src/middleware.rs | 39 -- .../flowy-collaboration/src/sync/server.rs | 16 +- .../src/sync/synchronizer.rs | 16 +- shared-lib/lib-ot/src/core/delta/delta.rs | 2 +- 55 files changed, 1368 insertions(+), 1204 deletions(-) delete mode 100644 frontend/rust-lib/flowy-core/src/services/server/mod.rs delete mode 100644 frontend/rust-lib/flowy-core/src/services/server/server_api.rs delete mode 100644 frontend/rust-lib/flowy-core/src/services/server/server_api_mock.rs delete mode 100644 frontend/rust-lib/flowy-document/src/server/middleware.rs delete mode 100644 frontend/rust-lib/flowy-document/src/server/mod.rs delete mode 100644 frontend/rust-lib/flowy-document/src/server/server_api.rs delete mode 100644 frontend/rust-lib/flowy-document/src/server/server_api_mock.rs create mode 100644 frontend/rust-lib/flowy-net/src/cloud/core.rs create mode 100644 frontend/rust-lib/flowy-net/src/cloud/document.rs create mode 100644 frontend/rust-lib/flowy-net/src/cloud/mod.rs create mode 100644 frontend/rust-lib/flowy-net/src/cloud/user.rs create mode 100644 frontend/rust-lib/flowy-sdk/src/deps_resolve/core_deps.rs create mode 100644 frontend/rust-lib/flowy-sdk/src/deps_resolve/user_deps.rs delete mode 100644 frontend/rust-lib/flowy-sdk/src/deps_resolve/workspace_deps.rs delete mode 100644 frontend/rust-lib/flowy-user/src/services/server/mod.rs delete mode 100644 frontend/rust-lib/flowy-user/src/services/server/server_api.rs delete mode 100644 frontend/rust-lib/flowy-user/src/services/server/server_api_mock.rs delete mode 100644 shared-lib/backend-service/src/http_request.rs delete mode 100644 shared-lib/backend-service/src/middleware.rs diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 9d1a6cb773..324673e384 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1247,7 +1247,6 @@ dependencies = [ name = "flowy-core" version = "0.1.0" dependencies = [ - "backend-service", "bincode", "bytes", "chrono", @@ -1263,7 +1262,6 @@ dependencies = [ "flowy-derive", "flowy-document", "flowy-error", - "flowy-net", "futures", "futures-core", "lazy_static", @@ -1327,7 +1325,6 @@ name = "flowy-document" version = "0.1.0" dependencies = [ "async-stream", - "backend-service", "bytecount", "byteorder", "bytes", @@ -1342,9 +1339,7 @@ dependencies = [ "flowy-derive", "flowy-error", "futures", - "futures-core", "futures-util", - "lazy_static", "lib-dispatch", "lib-infra", "lib-ot", @@ -1386,11 +1381,15 @@ name = "flowy-net" version = "0.1.0" dependencies = [ "anyhow", + "backend-service", "bytes", "dashmap", "flowy-collaboration", + "flowy-core-data-model", "flowy-derive", "flowy-error", + "flowy-user-data-model", + "lazy_static", "lib-dispatch", "lib-infra", "lib-ws", @@ -1455,7 +1454,6 @@ dependencies = [ name = "flowy-user" version = "0.1.0" dependencies = [ - "backend-service", "bytes", "dart-notify", "dashmap", @@ -1465,7 +1463,6 @@ dependencies = [ "flowy-database", "flowy-derive", "flowy-error", - "flowy-net", "flowy-user-data-model", "futures-core", "lazy_static", diff --git a/backend/src/services/document/ws_receiver.rs b/backend/src/services/document/ws_receiver.rs index b8baa0d19a..17cea410ce 100644 --- a/backend/src/services/document/ws_receiver.rs +++ b/backend/src/services/document/ws_receiver.rs @@ -18,7 +18,7 @@ use flowy_collaboration::{ RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB, }, - sync::{DocumentPersistence, ServerDocumentManager}, + sync::{ServerDocumentManager, ServerDocumentPersistence}, util::repeated_revision_from_repeated_revision_pb, }; use lib_infra::future::BoxResultFuture; @@ -81,7 +81,7 @@ impl Debug for HttpServerDocumentPersistence { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("DocumentPersistenceImpl") } } -impl DocumentPersistence for HttpServerDocumentPersistence { +impl ServerDocumentPersistence for HttpServerDocumentPersistence { fn read_document(&self, doc_id: &str) -> BoxResultFuture { let params = DocumentId { doc_id: doc_id.to_string(), diff --git a/backend/tests/util/helper.rs b/backend/tests/util/helper.rs index ee1b92b96a..819e868002 100644 --- a/backend/tests/util/helper.rs +++ b/backend/tests/util/helper.rs @@ -6,14 +6,17 @@ use backend::{ use backend_service::{ configuration::{get_client_server_configuration, ClientServerConfiguration}, errors::ServerError, - http_request::*, }; use flowy_collaboration::{ document::default::initial_delta_string, entities::doc::{CreateDocParams, DocumentId, DocumentInfo}, }; use flowy_core_data_model::entities::prelude::*; -use flowy_document::server::{create_doc_request, read_doc_request}; +use flowy_net::cloud::{ + core::*, + document::{create_document_request, read_document_request}, + user::*, +}; use flowy_user_data_model::entities::*; use lib_infra::uuid_string; use sqlx::{Connection, Executor, PgConnection, PgPool}; @@ -153,13 +156,13 @@ impl TestUserServer { pub async fn read_doc(&self, params: DocumentId) -> Option { let url = format!("{}/api/doc", self.http_addr()); - let doc = read_doc_request(self.user_token(), params, &url).await.unwrap(); + let doc = read_document_request(self.user_token(), params, &url).await.unwrap(); doc } pub async fn create_doc(&self, params: CreateDocParams) { let url = format!("{}/api/doc", self.http_addr()); - let _ = create_doc_request(self.user_token(), params, &url).await.unwrap(); + let _ = create_document_request(self.user_token(), params, &url).await.unwrap(); } pub async fn register_user(&self) -> SignUpResponse { diff --git a/frontend/rust-lib/flowy-core/Cargo.toml b/frontend/rust-lib/flowy-core/Cargo.toml index 3644c6ff20..9d61d2e119 100644 --- a/frontend/rust-lib/flowy-core/Cargo.toml +++ b/frontend/rust-lib/flowy-core/Cargo.toml @@ -10,13 +10,11 @@ flowy-core-data-model = { path = "../../../shared-lib/flowy-core-data-model" } flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration" } flowy-derive = { path = "../../../shared-lib/flowy-derive" } lib-ot = { path = "../../../shared-lib/lib-ot" } -backend-service = { path = "../../../shared-lib/backend-service" } lib-infra = { path = "../../../shared-lib/lib-infra" } flowy-document = { path = "../flowy-document" } flowy-database = { path = "../flowy-database" } flowy-error = { path = "../flowy-error", features = ["db", "backend"]} -flowy-net = { path = "../flowy-net" } dart-notify = { path = "../dart-notify" } lib-dispatch = { path = "../lib-dispatch" } lib-sqlite = { path = "../lib-sqlite" } diff --git a/frontend/rust-lib/flowy-core/src/context.rs b/frontend/rust-lib/flowy-core/src/context.rs index 4df573515a..b28f264577 100644 --- a/frontend/rust-lib/flowy-core/src/context.rs +++ b/frontend/rust-lib/flowy-core/src/context.rs @@ -1,19 +1,16 @@ -use std::{collections::HashMap, sync::Arc}; - use chrono::Utc; -use lazy_static::lazy_static; -use parking_lot::RwLock; - use flowy_collaboration::document::default::{initial_delta, initial_read_me}; use flowy_core_data_model::{entities::view::CreateViewParams, user_default}; -use flowy_net::entities::NetworkType; +use lazy_static::lazy_static; +use parking_lot::RwLock; +use std::{collections::HashMap, sync::Arc}; use crate::{ entities::workspace::RepeatedWorkspace, errors::{FlowyError, FlowyResult}, - module::{WorkspaceDatabase, WorkspaceUser}, + module::{CoreCloudService, WorkspaceDatabase, WorkspaceUser}, notify::{send_dart_notification, WorkspaceNotification}, - services::{server::Server, AppController, TrashController, ViewController, WorkspaceController}, + services::{AppController, TrashController, ViewController, WorkspaceController}, }; lazy_static! { @@ -22,7 +19,7 @@ lazy_static! { pub struct CoreContext { pub user: Arc, - pub(crate) server: Server, + pub(crate) cloud_service: Arc, pub(crate) database: Arc, pub workspace_controller: Arc, pub(crate) app_controller: Arc, @@ -33,7 +30,7 @@ pub struct CoreContext { impl CoreContext { pub(crate) fn new( user: Arc, - server: Server, + cloud_service: Arc, database: Arc, workspace_controller: Arc, app_controller: Arc, @@ -46,7 +43,7 @@ impl CoreContext { Self { user, - server, + cloud_service, database, workspace_controller, app_controller, @@ -55,14 +52,14 @@ impl CoreContext { } } - pub fn network_state_changed(&self, new_type: NetworkType) { - match new_type { - NetworkType::UnknownNetworkType => {}, - NetworkType::Wifi => {}, - NetworkType::Cell => {}, - NetworkType::Ethernet => {}, - } - } + // pub fn network_state_changed(&self, new_type: NetworkType) { + // match new_type { + // NetworkType::UnknownNetworkType => {}, + // NetworkType::Wifi => {}, + // NetworkType::Cell => {}, + // NetworkType::Ethernet => {}, + // } + // } pub async fn user_did_sign_in(&self, token: &str) -> FlowyResult<()> { log::debug!("workspace initialize after sign in"); diff --git a/frontend/rust-lib/flowy-core/src/event_handler.rs b/frontend/rust-lib/flowy-core/src/event_handler.rs index 13d32013d3..fb71f993f7 100644 --- a/frontend/rust-lib/flowy-core/src/event_handler.rs +++ b/frontend/rust-lib/flowy-core/src/event_handler.rs @@ -66,7 +66,7 @@ fn read_workspaces_on_server( user_id: String, params: WorkspaceId, ) -> Result<(), FlowyError> { - let (token, server) = (core.user.token()?, core.server.clone()); + let (token, server) = (core.user.token()?, core.cloud_service.clone()); let app_ctrl = core.app_controller.clone(); let view_ctrl = core.view_controller.clone(); let conn = core.database.db_connection()?; diff --git a/frontend/rust-lib/flowy-core/src/module.rs b/frontend/rust-lib/flowy-core/src/module.rs index e22a2b3d5e..aea31edccc 100644 --- a/frontend/rust-lib/flowy-core/src/module.rs +++ b/frontend/rust-lib/flowy-core/src/module.rs @@ -1,13 +1,16 @@ -use std::sync::Arc; - use crate::{ context::CoreContext, + entities::{ + app::{App, AppId, CreateAppParams, UpdateAppParams}, + trash::{RepeatedTrash, RepeatedTrashId}, + view::{CreateViewParams, RepeatedViewId, UpdateViewParams, View, ViewId}, + workspace::{CreateWorkspaceParams, RepeatedWorkspace, UpdateWorkspaceParams, Workspace, WorkspaceId}, + }, errors::FlowyError, event::WorkspaceEvent, event_handler::*, services::{ app::event_handler::*, - server::construct_workspace_server, trash::event_handler::*, view::event_handler::*, workspace::event_handler::*, @@ -17,11 +20,12 @@ use crate::{ WorkspaceController, }, }; -use backend_service::configuration::ClientServerConfiguration; use flowy_database::DBConnection; use flowy_document::context::DocumentContext; use lib_dispatch::prelude::*; +use lib_infra::future::FutureResult; use lib_sqlite::ConnectionPool; +use std::sync::Arc; pub trait WorkspaceDeps: WorkspaceUser + WorkspaceDatabase {} @@ -44,16 +48,18 @@ pub fn init_core( user: Arc, database: Arc, flowy_document: Arc, - server_config: &ClientServerConfiguration, + cloud_service: Arc, ) -> Arc { - let server = construct_workspace_server(server_config); - - let trash_controller = Arc::new(TrashController::new(database.clone(), server.clone(), user.clone())); + let trash_controller = Arc::new(TrashController::new( + database.clone(), + cloud_service.clone(), + user.clone(), + )); let view_controller = Arc::new(ViewController::new( user.clone(), database.clone(), - server.clone(), + cloud_service.clone(), trash_controller.clone(), flowy_document, )); @@ -62,19 +68,19 @@ pub fn init_core( user.clone(), database.clone(), trash_controller.clone(), - server.clone(), + cloud_service.clone(), )); let workspace_controller = Arc::new(WorkspaceController::new( user.clone(), database.clone(), trash_controller.clone(), - server.clone(), + cloud_service.clone(), )); Arc::new(CoreContext::new( user, - server, + cloud_service, database, workspace_controller, app_controller, @@ -126,3 +132,41 @@ pub fn create(core: Arc) -> Module { module } + +pub trait CoreCloudService: Send + Sync { + fn init(&self); + + // Workspace + fn create_workspace(&self, token: &str, params: CreateWorkspaceParams) -> FutureResult; + + fn read_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult; + + fn update_workspace(&self, token: &str, params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError>; + + fn delete_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<(), FlowyError>; + + // View + fn create_view(&self, token: &str, params: CreateViewParams) -> FutureResult; + + fn read_view(&self, token: &str, params: ViewId) -> FutureResult, FlowyError>; + + fn delete_view(&self, token: &str, params: RepeatedViewId) -> FutureResult<(), FlowyError>; + + fn update_view(&self, token: &str, params: UpdateViewParams) -> FutureResult<(), FlowyError>; + + // App + fn create_app(&self, token: &str, params: CreateAppParams) -> FutureResult; + + fn read_app(&self, token: &str, params: AppId) -> FutureResult, FlowyError>; + + fn update_app(&self, token: &str, params: UpdateAppParams) -> FutureResult<(), FlowyError>; + + fn delete_app(&self, token: &str, params: AppId) -> FutureResult<(), FlowyError>; + + // Trash + fn create_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError>; + + fn delete_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError>; + + fn read_trash(&self, token: &str) -> FutureResult; +} diff --git a/frontend/rust-lib/flowy-core/src/services/app/controller.rs b/frontend/rust-lib/flowy-core/src/services/app/controller.rs index e533810f9d..7e4799a423 100644 --- a/frontend/rust-lib/flowy-core/src/services/app/controller.rs +++ b/frontend/rust-lib/flowy-core/src/services/app/controller.rs @@ -4,11 +4,10 @@ use crate::{ trash::TrashType, }, errors::*, - module::{WorkspaceDatabase, WorkspaceUser}, + module::{CoreCloudService, WorkspaceDatabase, WorkspaceUser}, notify::*, services::{ app::sql::{AppTable, AppTableChangeset, AppTableSql}, - server::Server, TrashController, TrashEvent, }, @@ -21,7 +20,7 @@ pub(crate) struct AppController { user: Arc, database: Arc, trash_can: Arc, - server: Server, + cloud_service: Arc, } impl AppController { @@ -29,13 +28,13 @@ impl AppController { user: Arc, database: Arc, trash_can: Arc, - server: Server, + cloud_service: Arc, ) -> Self { Self { user, database, trash_can, - server, + cloud_service, } } @@ -115,14 +114,14 @@ impl AppController { #[tracing::instrument(level = "debug", skip(self), err)] async fn create_app_on_server(&self, params: CreateAppParams) -> Result { let token = self.user.token()?; - let app = self.server.create_app(&token, params).await?; + let app = self.cloud_service.create_app(&token, params).await?; Ok(app) } #[tracing::instrument(level = "debug", skip(self), err)] fn update_app_on_server(&self, params: UpdateAppParams) -> Result<(), FlowyError> { let token = self.user.token()?; - let server = self.server.clone(); + let server = self.cloud_service.clone(); tokio::spawn(async move { match server.update_app(&token, params).await { Ok(_) => {}, @@ -138,7 +137,7 @@ impl AppController { #[tracing::instrument(level = "debug", skip(self), err)] fn read_app_on_server(&self, params: AppId) -> Result<(), FlowyError> { let token = self.user.token()?; - let server = self.server.clone(); + let server = self.cloud_service.clone(); let pool = self.database.db_pool()?; tokio::spawn(async move { // Opti: retry? diff --git a/frontend/rust-lib/flowy-core/src/services/mod.rs b/frontend/rust-lib/flowy-core/src/services/mod.rs index 5ce07dd23b..20e280ae63 100644 --- a/frontend/rust-lib/flowy-core/src/services/mod.rs +++ b/frontend/rust-lib/flowy-core/src/services/mod.rs @@ -4,7 +4,6 @@ pub(crate) use view::controller::*; pub(crate) use workspace::controller::*; pub(crate) mod app; -pub(crate) mod server; pub(crate) mod trash; pub(crate) mod view; pub(crate) mod workspace; diff --git a/frontend/rust-lib/flowy-core/src/services/server/mod.rs b/frontend/rust-lib/flowy-core/src/services/server/mod.rs deleted file mode 100644 index 36f0754fa4..0000000000 --- a/frontend/rust-lib/flowy-core/src/services/server/mod.rs +++ /dev/null @@ -1,69 +0,0 @@ -mod server_api; -mod server_api_mock; - -pub use server_api::*; -// TODO: ignore mock files in production -pub use server_api_mock::*; - -use crate::{ - entities::{ - app::{App, AppId, CreateAppParams, UpdateAppParams}, - trash::{RepeatedTrash, RepeatedTrashId}, - view::{CreateViewParams, RepeatedViewId, UpdateViewParams, View, ViewId}, - workspace::{CreateWorkspaceParams, RepeatedWorkspace, UpdateWorkspaceParams, Workspace, WorkspaceId}, - }, - errors::FlowyError, -}; -use backend_service::configuration::ClientServerConfiguration; -use lib_infra::future::FutureResult; -use std::sync::Arc; - -pub(crate) type Server = Arc; - -pub trait WorkspaceServerAPI { - fn init(&self); - - // Workspace - fn create_workspace(&self, token: &str, params: CreateWorkspaceParams) -> FutureResult; - - fn read_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult; - - fn update_workspace(&self, token: &str, params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError>; - - fn delete_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<(), FlowyError>; - - // View - fn create_view(&self, token: &str, params: CreateViewParams) -> FutureResult; - - fn read_view(&self, token: &str, params: ViewId) -> FutureResult, FlowyError>; - - fn delete_view(&self, token: &str, params: RepeatedViewId) -> FutureResult<(), FlowyError>; - - fn update_view(&self, token: &str, params: UpdateViewParams) -> FutureResult<(), FlowyError>; - - // App - fn create_app(&self, token: &str, params: CreateAppParams) -> FutureResult; - - fn read_app(&self, token: &str, params: AppId) -> FutureResult, FlowyError>; - - fn update_app(&self, token: &str, params: UpdateAppParams) -> FutureResult<(), FlowyError>; - - fn delete_app(&self, token: &str, params: AppId) -> FutureResult<(), FlowyError>; - - // Trash - fn create_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError>; - - fn delete_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError>; - - fn read_trash(&self, token: &str) -> FutureResult; -} - -pub(crate) fn construct_workspace_server( - config: &ClientServerConfiguration, -) -> Arc { - if cfg!(feature = "http_server") { - Arc::new(WorkspaceHttpServer::new(config.clone())) - } else { - Arc::new(WorkspaceServerMock {}) - } -} diff --git a/frontend/rust-lib/flowy-core/src/services/server/server_api.rs b/frontend/rust-lib/flowy-core/src/services/server/server_api.rs deleted file mode 100644 index 2b6b5eabf7..0000000000 --- a/frontend/rust-lib/flowy-core/src/services/server/server_api.rs +++ /dev/null @@ -1,170 +0,0 @@ -use crate::{ - entities::{ - app::{App, AppId, CreateAppParams, UpdateAppParams}, - trash::{RepeatedTrash, RepeatedTrashId}, - view::{CreateViewParams, RepeatedViewId, UpdateViewParams, View, ViewId}, - workspace::{CreateWorkspaceParams, RepeatedWorkspace, UpdateWorkspaceParams, Workspace, WorkspaceId}, - }, - errors::{ErrorCode, FlowyError}, - notify::{send_dart_notification, WorkspaceNotification}, - services::server::WorkspaceServerAPI, -}; -use backend_service::{configuration::ClientServerConfiguration, http_request::*, middleware::*}; -use lib_infra::future::FutureResult; - -pub struct WorkspaceHttpServer { - config: ClientServerConfiguration, -} - -impl WorkspaceHttpServer { - pub fn new(config: ClientServerConfiguration) -> WorkspaceHttpServer { Self { config } } -} - -impl WorkspaceServerAPI for WorkspaceHttpServer { - fn init(&self) { - let mut rx = BACKEND_API_MIDDLEWARE.invalid_token_subscribe(); - tokio::spawn(async move { - while let Ok(invalid_token) = rx.recv().await { - let error = FlowyError::new(ErrorCode::UserUnauthorized, ""); - send_dart_notification(&invalid_token, WorkspaceNotification::UserUnauthorized) - .error(error) - .send() - } - }); - } - - fn create_workspace(&self, token: &str, params: CreateWorkspaceParams) -> FutureResult { - let token = token.to_owned(); - let url = self.config.workspace_url(); - FutureResult::new(async move { - let workspace = create_workspace_request(&token, params, &url).await?; - Ok(workspace) - }) - } - - fn read_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult { - let token = token.to_owned(); - let url = self.config.workspace_url(); - FutureResult::new(async move { - let repeated_workspace = read_workspaces_request(&token, params, &url).await?; - Ok(repeated_workspace) - }) - } - - fn update_workspace(&self, token: &str, params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> { - let token = token.to_owned(); - let url = self.config.workspace_url(); - FutureResult::new(async move { - let _ = update_workspace_request(&token, params, &url).await?; - Ok(()) - }) - } - - fn delete_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<(), FlowyError> { - let token = token.to_owned(); - let url = self.config.workspace_url(); - FutureResult::new(async move { - let _ = delete_workspace_request(&token, params, &url).await?; - Ok(()) - }) - } - - fn create_view(&self, token: &str, params: CreateViewParams) -> FutureResult { - let token = token.to_owned(); - let url = self.config.view_url(); - FutureResult::new(async move { - let view = create_view_request(&token, params, &url).await?; - Ok(view) - }) - } - - fn read_view(&self, token: &str, params: ViewId) -> FutureResult, FlowyError> { - let token = token.to_owned(); - let url = self.config.view_url(); - FutureResult::new(async move { - let view = read_view_request(&token, params, &url).await?; - Ok(view) - }) - } - - fn delete_view(&self, token: &str, params: RepeatedViewId) -> FutureResult<(), FlowyError> { - let token = token.to_owned(); - let url = self.config.view_url(); - FutureResult::new(async move { - let _ = delete_view_request(&token, params, &url).await?; - Ok(()) - }) - } - - fn update_view(&self, token: &str, params: UpdateViewParams) -> FutureResult<(), FlowyError> { - let token = token.to_owned(); - let url = self.config.view_url(); - FutureResult::new(async move { - let _ = update_view_request(&token, params, &url).await?; - Ok(()) - }) - } - - fn create_app(&self, token: &str, params: CreateAppParams) -> FutureResult { - let token = token.to_owned(); - let url = self.config.app_url(); - FutureResult::new(async move { - let app = create_app_request(&token, params, &url).await?; - Ok(app) - }) - } - - fn read_app(&self, token: &str, params: AppId) -> FutureResult, FlowyError> { - let token = token.to_owned(); - let url = self.config.app_url(); - FutureResult::new(async move { - let app = read_app_request(&token, params, &url).await?; - Ok(app) - }) - } - - fn update_app(&self, token: &str, params: UpdateAppParams) -> FutureResult<(), FlowyError> { - let token = token.to_owned(); - let url = self.config.app_url(); - FutureResult::new(async move { - let _ = update_app_request(&token, params, &url).await?; - Ok(()) - }) - } - - fn delete_app(&self, token: &str, params: AppId) -> FutureResult<(), FlowyError> { - let token = token.to_owned(); - let url = self.config.app_url(); - FutureResult::new(async move { - let _ = delete_app_request(&token, params, &url).await?; - Ok(()) - }) - } - - fn create_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> { - let token = token.to_owned(); - let url = self.config.trash_url(); - FutureResult::new(async move { - let _ = create_trash_request(&token, params, &url).await?; - Ok(()) - }) - } - - fn delete_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> { - let token = token.to_owned(); - let url = self.config.trash_url(); - FutureResult::new(async move { - let _ = delete_trash_request(&token, params, &url).await?; - Ok(()) - }) - } - - fn read_trash(&self, token: &str) -> FutureResult { - let token = token.to_owned(); - let url = self.config.trash_url(); - FutureResult::new(async move { - let repeated_trash = read_trash_request(&token, &url).await?; - Ok(repeated_trash) - }) - } -} diff --git a/frontend/rust-lib/flowy-core/src/services/server/server_api_mock.rs b/frontend/rust-lib/flowy-core/src/services/server/server_api_mock.rs deleted file mode 100644 index f4ca7f64ff..0000000000 --- a/frontend/rust-lib/flowy-core/src/services/server/server_api_mock.rs +++ /dev/null @@ -1,116 +0,0 @@ -use crate::{ - entities::{ - app::{App, AppId, CreateAppParams, RepeatedApp, UpdateAppParams}, - trash::{RepeatedTrash, RepeatedTrashId}, - view::{CreateViewParams, RepeatedView, RepeatedViewId, UpdateViewParams, View, ViewId}, - workspace::{CreateWorkspaceParams, RepeatedWorkspace, UpdateWorkspaceParams, Workspace, WorkspaceId}, - }, - errors::FlowyError, - services::server::WorkspaceServerAPI, -}; -use lib_infra::{future::FutureResult, timestamp, uuid_string}; - -pub struct WorkspaceServerMock {} - -impl WorkspaceServerAPI for WorkspaceServerMock { - fn init(&self) {} - - fn create_workspace(&self, _token: &str, params: CreateWorkspaceParams) -> FutureResult { - let time = timestamp(); - let workspace = Workspace { - id: uuid_string(), - name: params.name, - desc: params.desc, - apps: RepeatedApp::default(), - modified_time: time, - create_time: time, - }; - - FutureResult::new(async { Ok(workspace) }) - } - - fn read_workspace(&self, _token: &str, _params: WorkspaceId) -> FutureResult { - FutureResult::new(async { - let repeated_workspace = RepeatedWorkspace { items: vec![] }; - Ok(repeated_workspace) - }) - } - - fn update_workspace(&self, _token: &str, _params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } - - fn delete_workspace(&self, _token: &str, _params: WorkspaceId) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } - - fn create_view(&self, _token: &str, params: CreateViewParams) -> FutureResult { - let time = timestamp(); - let view = View { - id: params.view_id, - belong_to_id: params.belong_to_id, - name: params.name, - desc: params.desc, - view_type: params.view_type, - version: 0, - belongings: RepeatedView::default(), - modified_time: time, - create_time: time, - }; - FutureResult::new(async { Ok(view) }) - } - - fn read_view(&self, _token: &str, _params: ViewId) -> FutureResult, FlowyError> { - FutureResult::new(async { Ok(None) }) - } - - fn delete_view(&self, _token: &str, _params: RepeatedViewId) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } - - fn update_view(&self, _token: &str, _params: UpdateViewParams) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } - - fn create_app(&self, _token: &str, params: CreateAppParams) -> FutureResult { - let time = timestamp(); - let app = App { - id: uuid_string(), - workspace_id: params.workspace_id, - name: params.name, - desc: params.desc, - belongings: RepeatedView::default(), - version: 0, - modified_time: time, - create_time: time, - }; - FutureResult::new(async { Ok(app) }) - } - - fn read_app(&self, _token: &str, _params: AppId) -> FutureResult, FlowyError> { - FutureResult::new(async { Ok(None) }) - } - - fn update_app(&self, _token: &str, _params: UpdateAppParams) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } - - fn delete_app(&self, _token: &str, _params: AppId) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } - - fn create_trash(&self, _token: &str, _params: RepeatedTrashId) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } - - fn delete_trash(&self, _token: &str, _params: RepeatedTrashId) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } - - fn read_trash(&self, _token: &str) -> FutureResult { - FutureResult::new(async { - let repeated_trash = RepeatedTrash { items: vec![] }; - Ok(repeated_trash) - }) - } -} diff --git a/frontend/rust-lib/flowy-core/src/services/trash/controller.rs b/frontend/rust-lib/flowy-core/src/services/trash/controller.rs index ab4070574c..28c6079002 100644 --- a/frontend/rust-lib/flowy-core/src/services/trash/controller.rs +++ b/frontend/rust-lib/flowy-core/src/services/trash/controller.rs @@ -1,9 +1,9 @@ use crate::{ entities::trash::{RepeatedTrash, RepeatedTrashId, Trash, TrashId, TrashType}, errors::{FlowyError, FlowyResult}, - module::{WorkspaceDatabase, WorkspaceUser}, + module::{CoreCloudService, WorkspaceDatabase, WorkspaceUser}, notify::{send_anonymous_dart_notification, WorkspaceNotification}, - services::{server::Server, trash::sql::TrashTableSql}, + services::trash::sql::TrashTableSql, }; use crossbeam_utils::thread; use flowy_database::SqliteConnection; @@ -13,18 +13,22 @@ use tokio::sync::{broadcast, mpsc}; pub struct TrashController { pub database: Arc, notify: broadcast::Sender, - server: Server, + cloud_service: Arc, user: Arc, } impl TrashController { - pub fn new(database: Arc, server: Server, user: Arc) -> Self { + pub fn new( + database: Arc, + cloud_service: Arc, + user: Arc, + ) -> Self { let (tx, _) = broadcast::channel(10); Self { database, notify: tx, - server, + cloud_service, user, } } @@ -194,7 +198,7 @@ impl TrashController { fn create_trash_on_server>(&self, trash: T) -> FlowyResult<()> { let token = self.user.token()?; let trash_identifiers = trash.into(); - let server = self.server.clone(); + let server = self.cloud_service.clone(); // TODO: retry? let _ = tokio::spawn(async move { match server.create_trash(&token, trash_identifiers).await { @@ -209,7 +213,7 @@ impl TrashController { fn delete_trash_on_server>(&self, trash: T) -> FlowyResult<()> { let token = self.user.token()?; let trash_identifiers = trash.into(); - let server = self.server.clone(); + let server = self.cloud_service.clone(); let _ = tokio::spawn(async move { match server.delete_trash(&token, trash_identifiers).await { Ok(_) => {}, @@ -222,7 +226,7 @@ impl TrashController { #[tracing::instrument(level = "debug", skip(self), err)] fn read_trash_on_server(&self) -> FlowyResult<()> { let token = self.user.token()?; - let server = self.server.clone(); + let server = self.cloud_service.clone(); let pool = self.database.db_pool()?; tokio::spawn(async move { @@ -255,7 +259,7 @@ impl TrashController { #[tracing::instrument(level = "debug", skip(self), err)] async fn delete_all_trash_on_server(&self) -> FlowyResult<()> { let token = self.user.token()?; - let server = self.server.clone(); + let server = self.cloud_service.clone(); server.delete_trash(&token, RepeatedTrashId::all()).await } } diff --git a/frontend/rust-lib/flowy-core/src/services/view/controller.rs b/frontend/rust-lib/flowy-core/src/services/view/controller.rs index 57854cc10f..42ae8197d5 100644 --- a/frontend/rust-lib/flowy-core/src/services/view/controller.rs +++ b/frontend/rust-lib/flowy-core/src/services/view/controller.rs @@ -13,10 +13,9 @@ use crate::{ view::{CreateViewParams, RepeatedView, UpdateViewParams, View, ViewId}, }, errors::{FlowyError, FlowyResult}, - module::{WorkspaceDatabase, WorkspaceUser}, + module::{CoreCloudService, WorkspaceDatabase, WorkspaceUser}, notify::{send_dart_notification, WorkspaceNotification}, services::{ - server::Server, view::sql::{ViewTable, ViewTableChangeset, ViewTableSql}, TrashController, TrashEvent, @@ -31,7 +30,7 @@ const LATEST_VIEW_ID: &str = "latest_view_id"; pub(crate) struct ViewController { user: Arc, - server: Server, + cloud_service: Arc, database: Arc, trash_controller: Arc, document_ctx: Arc, @@ -41,13 +40,13 @@ impl ViewController { pub(crate) fn new( user: Arc, database: Arc, - server: Server, + cloud_service: Arc, trash_can: Arc, document_ctx: Arc, ) -> Self { Self { user, - server, + cloud_service, database, trash_controller: trash_can, document_ctx, @@ -238,14 +237,14 @@ impl ViewController { #[tracing::instrument(skip(self), err)] async fn create_view_on_server(&self, params: CreateViewParams) -> Result { let token = self.user.token()?; - let view = self.server.create_view(&token, params).await?; + let view = self.cloud_service.create_view(&token, params).await?; Ok(view) } #[tracing::instrument(skip(self), err)] fn update_view_on_server(&self, params: UpdateViewParams) -> Result<(), FlowyError> { let token = self.user.token()?; - let server = self.server.clone(); + let server = self.cloud_service.clone(); tokio::spawn(async move { match server.update_view(&token, params).await { Ok(_) => {}, @@ -261,7 +260,7 @@ impl ViewController { #[tracing::instrument(skip(self), err)] fn read_view_on_server(&self, params: ViewId) -> Result<(), FlowyError> { let token = self.user.token()?; - let server = self.server.clone(); + let server = self.cloud_service.clone(); let pool = self.database.db_pool()?; // TODO: Retry with RetryAction? tokio::spawn(async move { diff --git a/frontend/rust-lib/flowy-core/src/services/workspace/controller.rs b/frontend/rust-lib/flowy-core/src/services/workspace/controller.rs index c56f256856..2fb44869e0 100644 --- a/frontend/rust-lib/flowy-core/src/services/workspace/controller.rs +++ b/frontend/rust-lib/flowy-core/src/services/workspace/controller.rs @@ -1,10 +1,9 @@ use crate::{ errors::*, - module::{WorkspaceDatabase, WorkspaceUser}, + module::{CoreCloudService, WorkspaceDatabase, WorkspaceUser}, notify::*, services::{ read_local_workspace_apps, - server::Server, workspace::sql::{WorkspaceTable, WorkspaceTableChangeset, WorkspaceTableSql}, TrashController, }, @@ -17,7 +16,7 @@ pub struct WorkspaceController { pub user: Arc, pub(crate) database: Arc, pub(crate) trash_controller: Arc, - server: Server, + cloud_service: Arc, } impl WorkspaceController { @@ -25,13 +24,13 @@ impl WorkspaceController { user: Arc, database: Arc, trash_can: Arc, - server: Server, + cloud_service: Arc, ) -> Self { Self { user, database, trash_controller: trash_can, - server, + cloud_service, } } @@ -182,13 +181,13 @@ impl WorkspaceController { #[tracing::instrument(level = "debug", skip(self), err)] async fn create_workspace_on_server(&self, params: CreateWorkspaceParams) -> Result { let token = self.user.token()?; - let workspace = self.server.create_workspace(&token, params).await?; + let workspace = self.cloud_service.create_workspace(&token, params).await?; Ok(workspace) } #[tracing::instrument(level = "debug", skip(self), err)] fn update_workspace_on_server(&self, params: UpdateWorkspaceParams) -> Result<(), FlowyError> { - let (token, server) = (self.user.token()?, self.server.clone()); + let (token, server) = (self.user.token()?, self.cloud_service.clone()); tokio::spawn(async move { match server.update_workspace(&token, params).await { Ok(_) => {}, @@ -206,7 +205,7 @@ impl WorkspaceController { let params = WorkspaceId { workspace_id: Some(workspace_id.to_string()), }; - let (token, server) = (self.user.token()?, self.server.clone()); + let (token, server) = (self.user.token()?, self.cloud_service.clone()); tokio::spawn(async move { match server.delete_workspace(&token, params).await { Ok(_) => {}, diff --git a/frontend/rust-lib/flowy-core/src/util.rs b/frontend/rust-lib/flowy-core/src/util.rs index 4073106dde..ff1315155a 100644 --- a/frontend/rust-lib/flowy-core/src/util.rs +++ b/frontend/rust-lib/flowy-core/src/util.rs @@ -1,5 +1,5 @@ #![allow(clippy::type_complexity)] -use crate::{module::WorkspaceUser, services::server::Server}; +use crate::module::{CoreCloudService, WorkspaceUser}; use lib_infra::retry::Action; use pin_project::pin_project; use std::{ @@ -10,12 +10,12 @@ use std::{ task::{Context, Poll}, }; -pub(crate) type Builder = Box Fut + Send + Sync>; +pub(crate) type Builder = Box) -> Fut + Send + Sync>; #[allow(dead_code)] pub(crate) struct RetryAction { token: String, - server: Server, + cloud_service: Arc, user: Arc, builder: Builder, phantom: PhantomData<(T, E)>, @@ -23,15 +23,15 @@ pub(crate) struct RetryAction { impl RetryAction { #[allow(dead_code)] - pub(crate) fn new(server: Server, user: Arc, builder: F) -> Self + pub(crate) fn new(cloud_service: Arc, user: Arc, builder: F) -> Self where Fut: Future> + Send + Sync + 'static, - F: Fn(String, Server) -> Fut + Send + Sync + 'static, + F: Fn(String, Arc) -> Fut + Send + Sync + 'static, { let token = user.token().unwrap_or_else(|_| "".to_owned()); Self { token, - server, + cloud_service, user, builder: Box::new(builder), phantom: PhantomData, @@ -50,7 +50,7 @@ where type Error = E; fn run(&mut self) -> Self::Future { - let fut = (self.builder)(self.token.clone(), self.server.clone()); + let fut = (self.builder)(self.token.clone(), self.cloud_service.clone()); Box::pin(RetryActionFut { fut: Box::pin(fut) }) } } diff --git a/frontend/rust-lib/flowy-document/Cargo.toml b/frontend/rust-lib/flowy-document/Cargo.toml index 634cc7b2cc..a9023b409b 100644 --- a/frontend/rust-lib/flowy-document/Cargo.toml +++ b/frontend/rust-lib/flowy-document/Cargo.toml @@ -9,7 +9,6 @@ edition = "2018" [dependencies] flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration" } flowy-derive = { path = "../../../shared-lib/flowy-derive" } -backend-service = { path = "../../../shared-lib/backend-service" } lib-ot = { path = "../../../shared-lib/lib-ot" } lib-ws = { path = "../../../shared-lib/lib-ws" } lib-infra = { path = "../../../shared-lib/lib-infra" } @@ -24,7 +23,6 @@ diesel = {version = "1.4.8", features = ["sqlite"]} diesel_derives = {version = "1.4.1", features = ["sqlite"]} protobuf = {version = "2.18.0"} unicode-segmentation = "1.8" -lazy_static = "1.4.0" log = "0.4.14" tokio = {version = "1", features = ["sync"]} tracing = { version = "0.1", features = ["log"] } @@ -38,7 +36,6 @@ url = "2.2" serde = { version = "1.0", features = ["derive"] } serde_json = {version = "1.0"} chrono = "0.4.19" -futures-core = { version = "0.3", default-features = false } futures-util = "0.3.15" byteorder = {version = "1.3.4"} async-stream = "0.3.2" @@ -52,7 +49,6 @@ flowy-net = { path = "../flowy-net" } color-eyre = { version = "0.5", default-features = false } criterion = "0.3" rand = "0.7.3" -env_logger = "0.8.2" [features] diff --git a/frontend/rust-lib/flowy-document/src/context.rs b/frontend/rust-lib/flowy-document/src/context.rs index 7a30967bbc..5a0931176c 100644 --- a/frontend/rust-lib/flowy-document/src/context.rs +++ b/frontend/rust-lib/flowy-document/src/context.rs @@ -1,10 +1,8 @@ -use crate::errors::FlowyError; -use backend_service::configuration::ClientServerConfiguration; - use crate::{ controller::DocumentController, core::{DocumentWSReceivers, DocumentWebSocket}, - server::construct_doc_server, + errors::FlowyError, + DocumentCloudService, }; use flowy_database::ConnectionPool; use std::sync::Arc; @@ -26,10 +24,14 @@ impl DocumentContext { user: Arc, ws_receivers: Arc, ws_sender: Arc, - server_config: &ClientServerConfiguration, + cloud_service: Arc, ) -> DocumentContext { - let server = construct_doc_server(server_config); - let doc_ctrl = Arc::new(DocumentController::new(server, user.clone(), ws_receivers, ws_sender)); + let doc_ctrl = Arc::new(DocumentController::new( + cloud_service, + user.clone(), + ws_receivers, + ws_sender, + )); Self { controller: doc_ctrl, user, diff --git a/frontend/rust-lib/flowy-document/src/controller.rs b/frontend/rust-lib/flowy-document/src/controller.rs index 1ff83d0d16..d3b9cb740c 100644 --- a/frontend/rust-lib/flowy-document/src/controller.rs +++ b/frontend/rust-lib/flowy-document/src/controller.rs @@ -8,7 +8,7 @@ use crate::{ WSStateReceiver, }, errors::FlowyError, - server::Server, + DocumentCloudService, }; use bytes::Bytes; use dashmap::DashMap; @@ -22,7 +22,7 @@ use lib_infra::future::FutureResult; use std::sync::Arc; pub struct DocumentController { - server: Server, + cloud_service: Arc, ws_receivers: Arc, ws_sender: Arc, open_cache: Arc, @@ -31,14 +31,14 @@ pub struct DocumentController { impl DocumentController { pub(crate) fn new( - server: Server, + cloud_service: Arc, user: Arc, ws_receivers: Arc, ws_sender: Arc, ) -> Self { let open_cache = Arc::new(OpenDocCache::new()); Self { - server, + cloud_service, ws_receivers, ws_sender, open_cache, @@ -119,7 +119,7 @@ impl DocumentController { let rev_manager = self.make_rev_manager(doc_id, pool.clone())?; let server = Arc::new(RevisionServerImpl { token, - server: self.server.clone(), + server: self.cloud_service.clone(), }); let doc_editor = ClientDocumentEditor::new(doc_id, user, rev_manager, self.ws_sender.clone(), server).await?; self.ws_receivers.add(doc_id, doc_editor.ws_handler()); @@ -136,7 +136,7 @@ impl DocumentController { struct RevisionServerImpl { token: String, - server: Server, + server: Arc, } impl RevisionServer for RevisionServerImpl { @@ -149,7 +149,7 @@ impl RevisionServer for RevisionServerImpl { let token = self.token.clone(); FutureResult::new(async move { - match server.read_doc(&token, params).await? { + match server.read_document(&token, params).await? { None => Err(FlowyError::record_not_found().context("Remote doesn't have this document")), Some(doc) => Ok(doc), } diff --git a/frontend/rust-lib/flowy-document/src/lib.rs b/frontend/rust-lib/flowy-document/src/lib.rs index 4079b04145..f845d5b331 100644 --- a/frontend/rust-lib/flowy-document/src/lib.rs +++ b/frontend/rust-lib/flowy-document/src/lib.rs @@ -3,7 +3,6 @@ pub(crate) mod controller; pub mod core; mod notify; pub mod protobuf; -pub mod server; mod ws_receivers; #[macro_use] @@ -12,3 +11,15 @@ extern crate flowy_database; pub mod errors { pub use flowy_error::{internal_error, ErrorCode, FlowyError}; } + +use crate::errors::FlowyError; +use flowy_collaboration::entities::doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams}; +use lib_infra::future::FutureResult; + +pub trait DocumentCloudService: Send + Sync { + fn create_document(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError>; + + fn read_document(&self, token: &str, params: DocumentId) -> FutureResult, FlowyError>; + + fn update_document(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError>; +} diff --git a/frontend/rust-lib/flowy-document/src/server/middleware.rs b/frontend/rust-lib/flowy-document/src/server/middleware.rs deleted file mode 100644 index c6cb85bb2d..0000000000 --- a/frontend/rust-lib/flowy-document/src/server/middleware.rs +++ /dev/null @@ -1,29 +0,0 @@ -use backend_service::{request::ResponseMiddleware, response::FlowyResponse}; -use lazy_static::lazy_static; -use std::sync::Arc; - -lazy_static! { - pub(crate) static ref MIDDLEWARE: Arc = Arc::new(DocMiddleware {}); -} - -pub(crate) struct DocMiddleware {} -impl ResponseMiddleware for DocMiddleware { - fn receive_response(&self, token: &Option, response: &FlowyResponse) { - if let Some(error) = &response.error { - if error.is_unauthorized() { - log::error!("document user is unauthorized"); - - match token { - None => {}, - Some(_token) => { - // let error = - // FlowyError::new(ErrorCode::UserUnauthorized, ""); - // observable(token, - // WorkspaceObservable::UserUnauthorized).error(error). - // build() - }, - } - } - } - } -} diff --git a/frontend/rust-lib/flowy-document/src/server/mod.rs b/frontend/rust-lib/flowy-document/src/server/mod.rs deleted file mode 100644 index 34a2a2dd31..0000000000 --- a/frontend/rust-lib/flowy-document/src/server/mod.rs +++ /dev/null @@ -1,31 +0,0 @@ -mod middleware; -mod server_api; -mod server_api_mock; - -pub use server_api::*; -// TODO: ignore mock files in production -use crate::errors::FlowyError; -use backend_service::configuration::ClientServerConfiguration; -use flowy_collaboration::entities::doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams}; -use lib_infra::future::FutureResult; -pub use server_api_mock::*; -use std::sync::Arc; - -pub(crate) type Server = Arc; -pub trait DocumentServerAPI { - fn create_doc(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError>; - - fn read_doc(&self, token: &str, params: DocumentId) -> FutureResult, FlowyError>; - - fn update_doc(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError>; -} - -pub(crate) fn construct_doc_server( - server_config: &ClientServerConfiguration, -) -> Arc { - if cfg!(feature = "http_server") { - Arc::new(DocServer::new(server_config.clone())) - } else { - Arc::new(DocServerMock {}) - } -} diff --git a/frontend/rust-lib/flowy-document/src/server/server_api.rs b/frontend/rust-lib/flowy-document/src/server/server_api.rs deleted file mode 100644 index 675cecfa08..0000000000 --- a/frontend/rust-lib/flowy-document/src/server/server_api.rs +++ /dev/null @@ -1,67 +0,0 @@ -use crate::{errors::FlowyError, server::DocumentServerAPI}; -use backend_service::{configuration::*, request::HttpRequestBuilder}; -use flowy_collaboration::entities::doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams}; -use lib_infra::future::FutureResult; - -pub struct DocServer { - config: ClientServerConfiguration, -} - -impl DocServer { - pub fn new(config: ClientServerConfiguration) -> Self { Self { config } } -} - -impl DocumentServerAPI for DocServer { - fn create_doc(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError> { - let token = token.to_owned(); - let url = self.config.doc_url(); - FutureResult::new(async move { create_doc_request(&token, params, &url).await }) - } - - fn read_doc(&self, token: &str, params: DocumentId) -> FutureResult, FlowyError> { - let token = token.to_owned(); - let url = self.config.doc_url(); - FutureResult::new(async move { read_doc_request(&token, params, &url).await }) - } - - fn update_doc(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> { - let token = token.to_owned(); - let url = self.config.doc_url(); - FutureResult::new(async move { reset_doc_request(&token, params, &url).await }) - } -} - -pub(crate) fn request_builder() -> HttpRequestBuilder { - HttpRequestBuilder::new().middleware(super::middleware::MIDDLEWARE.clone()) -} - -pub async fn create_doc_request(token: &str, params: CreateDocParams, url: &str) -> Result<(), FlowyError> { - let _ = request_builder() - .post(&url.to_owned()) - .header(HEADER_TOKEN, token) - .protobuf(params)? - .send() - .await?; - Ok(()) -} - -pub async fn read_doc_request(token: &str, params: DocumentId, url: &str) -> Result, FlowyError> { - let doc = request_builder() - .get(&url.to_owned()) - .header(HEADER_TOKEN, token) - .protobuf(params)? - .option_response() - .await?; - - Ok(doc) -} - -pub async fn reset_doc_request(token: &str, params: ResetDocumentParams, url: &str) -> Result<(), FlowyError> { - let _ = request_builder() - .patch(&url.to_owned()) - .header(HEADER_TOKEN, token) - .protobuf(params)? - .send() - .await?; - Ok(()) -} diff --git a/frontend/rust-lib/flowy-document/src/server/server_api_mock.rs b/frontend/rust-lib/flowy-document/src/server/server_api_mock.rs deleted file mode 100644 index e4629e8a9e..0000000000 --- a/frontend/rust-lib/flowy-document/src/server/server_api_mock.rs +++ /dev/null @@ -1,29 +0,0 @@ -use flowy_collaboration::{ - document::default::initial_delta_string, - entities::doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams}, -}; -use lib_infra::future::FutureResult; - -use crate::{errors::FlowyError, server::DocumentServerAPI}; - -pub struct DocServerMock {} - -impl DocumentServerAPI for DocServerMock { - fn create_doc(&self, _token: &str, _params: CreateDocParams) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } - - fn read_doc(&self, _token: &str, params: DocumentId) -> FutureResult, FlowyError> { - let doc = DocumentInfo { - doc_id: params.doc_id, - text: initial_delta_string(), - rev_id: 0, - base_rev_id: 0, - }; - FutureResult::new(async { Ok(Some(doc)) }) - } - - fn update_doc(&self, _token: &str, _params: ResetDocumentParams) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } -} diff --git a/frontend/rust-lib/flowy-document/tests/editor/mod.rs b/frontend/rust-lib/flowy-document/tests/editor/mod.rs index e04443e9d1..6d50c99b51 100644 --- a/frontend/rust-lib/flowy-document/tests/editor/mod.rs +++ b/frontend/rust-lib/flowy-document/tests/editor/mod.rs @@ -91,9 +91,8 @@ impl TestBuilder { pub fn new() -> Self { static INIT: Once = Once::new(); INIT.call_once(|| { - color_eyre::install().unwrap(); + let _ = color_eyre::install(); std::env::set_var("RUST_LOG", LEVEL); - env_logger::init(); }); Self { diff --git a/frontend/rust-lib/flowy-net/Cargo.toml b/frontend/rust-lib/flowy-net/Cargo.toml index e0567c4bc0..0106c9a7e8 100644 --- a/frontend/rust-lib/flowy-net/Cargo.toml +++ b/frontend/rust-lib/flowy-net/Cargo.toml @@ -10,6 +10,10 @@ lib-dispatch = { path = "../lib-dispatch" } flowy-error = { path = "../flowy-error" } flowy-derive = { path = "../../../shared-lib/flowy-derive" } flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration"} +backend-service = { path = "../../../shared-lib/backend-service" } +flowy-core-data-model = { path = "../../../shared-lib/flowy-core-data-model" } +flowy-user-data-model = { path = "../../../shared-lib/flowy-user-data-model"} +lazy_static = "1.4.0" lib-infra = { path = "../../../shared-lib/lib-infra" } protobuf = {version = "2.18.0"} lib-ws = { path = "../../../shared-lib/lib-ws" } diff --git a/frontend/rust-lib/flowy-net/src/cloud/core.rs b/frontend/rust-lib/flowy-net/src/cloud/core.rs new file mode 100644 index 0000000000..ac89975c55 --- /dev/null +++ b/frontend/rust-lib/flowy-net/src/cloud/core.rs @@ -0,0 +1,476 @@ +use backend_service::{ + configuration::{ClientServerConfiguration, HEADER_TOKEN}, + errors::ServerError, + request::{HttpRequestBuilder, ResponseMiddleware}, + response::FlowyResponse, +}; +use flowy_core_data_model::entities::{ + app::{App, AppId, CreateAppParams, RepeatedApp, UpdateAppParams}, + trash::{RepeatedTrash, RepeatedTrashId}, + view::{CreateViewParams, RepeatedView, RepeatedViewId, UpdateViewParams, View, ViewId}, + workspace::{CreateWorkspaceParams, RepeatedWorkspace, UpdateWorkspaceParams, Workspace, WorkspaceId}, +}; +use flowy_error::FlowyError; + +use lazy_static::lazy_static; +use lib_infra::{future::FutureResult, timestamp, uuid_string}; +use std::sync::Arc; +use tokio::sync::broadcast; + +pub struct CoreHttpCloudService { + config: ClientServerConfiguration, +} + +impl CoreHttpCloudService { + pub fn new(config: ClientServerConfiguration) -> CoreHttpCloudService { Self { config } } +} + +impl CoreHttpCloudService { + pub fn init(&self) {} + + pub fn create_workspace(&self, token: &str, params: CreateWorkspaceParams) -> FutureResult { + let token = token.to_owned(); + let url = self.config.workspace_url(); + FutureResult::new(async move { + let workspace = create_workspace_request(&token, params, &url).await?; + Ok(workspace) + }) + } + + pub fn read_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult { + let token = token.to_owned(); + let url = self.config.workspace_url(); + FutureResult::new(async move { + let repeated_workspace = read_workspaces_request(&token, params, &url).await?; + Ok(repeated_workspace) + }) + } + + pub fn update_workspace(&self, token: &str, params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> { + let token = token.to_owned(); + let url = self.config.workspace_url(); + FutureResult::new(async move { + let _ = update_workspace_request(&token, params, &url).await?; + Ok(()) + }) + } + + pub fn delete_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<(), FlowyError> { + let token = token.to_owned(); + let url = self.config.workspace_url(); + FutureResult::new(async move { + let _ = delete_workspace_request(&token, params, &url).await?; + Ok(()) + }) + } + + pub fn create_view(&self, token: &str, params: CreateViewParams) -> FutureResult { + let token = token.to_owned(); + let url = self.config.view_url(); + FutureResult::new(async move { + let view = create_view_request(&token, params, &url).await?; + Ok(view) + }) + } + + pub fn read_view(&self, token: &str, params: ViewId) -> FutureResult, FlowyError> { + let token = token.to_owned(); + let url = self.config.view_url(); + FutureResult::new(async move { + let view = read_view_request(&token, params, &url).await?; + Ok(view) + }) + } + + pub fn delete_view(&self, token: &str, params: RepeatedViewId) -> FutureResult<(), FlowyError> { + let token = token.to_owned(); + let url = self.config.view_url(); + FutureResult::new(async move { + let _ = delete_view_request(&token, params, &url).await?; + Ok(()) + }) + } + + pub fn update_view(&self, token: &str, params: UpdateViewParams) -> FutureResult<(), FlowyError> { + let token = token.to_owned(); + let url = self.config.view_url(); + FutureResult::new(async move { + let _ = update_view_request(&token, params, &url).await?; + Ok(()) + }) + } + + pub fn create_app(&self, token: &str, params: CreateAppParams) -> FutureResult { + let token = token.to_owned(); + let url = self.config.app_url(); + FutureResult::new(async move { + let app = create_app_request(&token, params, &url).await?; + Ok(app) + }) + } + + pub fn read_app(&self, token: &str, params: AppId) -> FutureResult, FlowyError> { + let token = token.to_owned(); + let url = self.config.app_url(); + FutureResult::new(async move { + let app = read_app_request(&token, params, &url).await?; + Ok(app) + }) + } + + pub fn update_app(&self, token: &str, params: UpdateAppParams) -> FutureResult<(), FlowyError> { + let token = token.to_owned(); + let url = self.config.app_url(); + FutureResult::new(async move { + let _ = update_app_request(&token, params, &url).await?; + Ok(()) + }) + } + + pub fn delete_app(&self, token: &str, params: AppId) -> FutureResult<(), FlowyError> { + let token = token.to_owned(); + let url = self.config.app_url(); + FutureResult::new(async move { + let _ = delete_app_request(&token, params, &url).await?; + Ok(()) + }) + } + + pub fn create_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> { + let token = token.to_owned(); + let url = self.config.trash_url(); + FutureResult::new(async move { + let _ = create_trash_request(&token, params, &url).await?; + Ok(()) + }) + } + + pub fn delete_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> { + let token = token.to_owned(); + let url = self.config.trash_url(); + FutureResult::new(async move { + let _ = delete_trash_request(&token, params, &url).await?; + Ok(()) + }) + } + + pub fn read_trash(&self, token: &str) -> FutureResult { + let token = token.to_owned(); + let url = self.config.trash_url(); + FutureResult::new(async move { + let repeated_trash = read_trash_request(&token, &url).await?; + Ok(repeated_trash) + }) + } +} + +pub struct CoreLocalCloudService {} +impl CoreLocalCloudService { + pub fn new(_config: &ClientServerConfiguration) -> Self { CoreLocalCloudService {} } +} +impl CoreLocalCloudService { + pub fn init(&self) {} + + pub fn create_workspace(&self, _token: &str, params: CreateWorkspaceParams) -> FutureResult { + let time = timestamp(); + let workspace = Workspace { + id: uuid_string(), + name: params.name, + desc: params.desc, + apps: RepeatedApp::default(), + modified_time: time, + create_time: time, + }; + + FutureResult::new(async { Ok(workspace) }) + } + + pub fn read_workspace(&self, _token: &str, _params: WorkspaceId) -> FutureResult { + FutureResult::new(async { + let repeated_workspace = RepeatedWorkspace { items: vec![] }; + Ok(repeated_workspace) + }) + } + + pub fn update_workspace(&self, _token: &str, _params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } + + pub fn delete_workspace(&self, _token: &str, _params: WorkspaceId) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } + + pub fn create_view(&self, _token: &str, params: CreateViewParams) -> FutureResult { + let time = timestamp(); + let view = View { + id: params.view_id, + belong_to_id: params.belong_to_id, + name: params.name, + desc: params.desc, + view_type: params.view_type, + version: 0, + belongings: RepeatedView::default(), + modified_time: time, + create_time: time, + }; + FutureResult::new(async { Ok(view) }) + } + + pub fn read_view(&self, _token: &str, _params: ViewId) -> FutureResult, FlowyError> { + FutureResult::new(async { Ok(None) }) + } + + pub fn delete_view(&self, _token: &str, _params: RepeatedViewId) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } + + pub fn update_view(&self, _token: &str, _params: UpdateViewParams) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } + + pub fn create_app(&self, _token: &str, params: CreateAppParams) -> FutureResult { + let time = timestamp(); + let app = App { + id: uuid_string(), + workspace_id: params.workspace_id, + name: params.name, + desc: params.desc, + belongings: RepeatedView::default(), + version: 0, + modified_time: time, + create_time: time, + }; + FutureResult::new(async { Ok(app) }) + } + + pub fn read_app(&self, _token: &str, _params: AppId) -> FutureResult, FlowyError> { + FutureResult::new(async { Ok(None) }) + } + + pub fn update_app(&self, _token: &str, _params: UpdateAppParams) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } + + pub fn delete_app(&self, _token: &str, _params: AppId) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } + + pub fn create_trash(&self, _token: &str, _params: RepeatedTrashId) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } + + pub fn delete_trash(&self, _token: &str, _params: RepeatedTrashId) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } + + pub fn read_trash(&self, _token: &str) -> FutureResult { + FutureResult::new(async { + let repeated_trash = RepeatedTrash { items: vec![] }; + Ok(repeated_trash) + }) + } +} + +lazy_static! { + static ref MIDDLEWARE: Arc = Arc::new(CoreResponseMiddleware::new()); +} + +pub struct CoreResponseMiddleware { + invalid_token_sender: broadcast::Sender, +} + +impl CoreResponseMiddleware { + fn new() -> Self { + let (sender, _) = broadcast::channel(10); + CoreResponseMiddleware { + invalid_token_sender: sender, + } + } + + pub fn invalid_token_subscribe(&self) -> broadcast::Receiver { self.invalid_token_sender.subscribe() } +} + +impl ResponseMiddleware for CoreResponseMiddleware { + fn receive_response(&self, token: &Option, response: &FlowyResponse) { + if let Some(error) = &response.error { + if error.is_unauthorized() { + tracing::error!("user is unauthorized"); + match token { + None => {}, + Some(token) => match self.invalid_token_sender.send(token.clone()) { + Ok(_) => {}, + Err(e) => tracing::error!("{:?}", e), + }, + } + } + } + } +} + +fn request_builder() -> HttpRequestBuilder { HttpRequestBuilder::new().middleware(MIDDLEWARE.clone()) } + +pub async fn create_workspace_request( + token: &str, + params: CreateWorkspaceParams, + url: &str, +) -> Result { + let workspace = request_builder() + .post(&url.to_owned()) + .header(HEADER_TOKEN, token) + .protobuf(params)? + .response() + .await?; + Ok(workspace) +} + +pub async fn read_workspaces_request( + token: &str, + params: WorkspaceId, + url: &str, +) -> Result { + let repeated_workspace = request_builder() + .get(&url.to_owned()) + .header(HEADER_TOKEN, token) + .protobuf(params)? + .response::() + .await?; + + Ok(repeated_workspace) +} + +pub async fn update_workspace_request( + token: &str, + params: UpdateWorkspaceParams, + url: &str, +) -> Result<(), ServerError> { + let _ = request_builder() + .patch(&url.to_owned()) + .header(HEADER_TOKEN, token) + .protobuf(params)? + .send() + .await?; + Ok(()) +} + +pub async fn delete_workspace_request(token: &str, params: WorkspaceId, url: &str) -> Result<(), ServerError> { + let _ = request_builder() + .delete(url) + .header(HEADER_TOKEN, token) + .protobuf(params)? + .send() + .await?; + Ok(()) +} + +// App +pub async fn create_app_request(token: &str, params: CreateAppParams, url: &str) -> Result { + let app = request_builder() + .post(&url.to_owned()) + .header(HEADER_TOKEN, token) + .protobuf(params)? + .response() + .await?; + Ok(app) +} + +pub async fn read_app_request(token: &str, params: AppId, url: &str) -> Result, ServerError> { + let app = request_builder() + .get(&url.to_owned()) + .header(HEADER_TOKEN, token) + .protobuf(params)? + .option_response() + .await?; + + Ok(app) +} + +pub async fn update_app_request(token: &str, params: UpdateAppParams, url: &str) -> Result<(), ServerError> { + let _ = request_builder() + .patch(&url.to_owned()) + .header(HEADER_TOKEN, token) + .protobuf(params)? + .send() + .await?; + Ok(()) +} + +pub async fn delete_app_request(token: &str, params: AppId, url: &str) -> Result<(), ServerError> { + let _ = request_builder() + .delete(&url.to_owned()) + .header(HEADER_TOKEN, token) + .protobuf(params)? + .send() + .await?; + Ok(()) +} + +// View +pub async fn create_view_request(token: &str, params: CreateViewParams, url: &str) -> Result { + let view = request_builder() + .post(&url.to_owned()) + .header(HEADER_TOKEN, token) + .protobuf(params)? + .response() + .await?; + Ok(view) +} + +pub async fn read_view_request(token: &str, params: ViewId, url: &str) -> Result, ServerError> { + let view = request_builder() + .get(&url.to_owned()) + .header(HEADER_TOKEN, token) + .protobuf(params)? + .option_response() + .await?; + + Ok(view) +} + +pub async fn update_view_request(token: &str, params: UpdateViewParams, url: &str) -> Result<(), ServerError> { + let _ = request_builder() + .patch(&url.to_owned()) + .header(HEADER_TOKEN, token) + .protobuf(params)? + .send() + .await?; + Ok(()) +} + +pub async fn delete_view_request(token: &str, params: RepeatedViewId, url: &str) -> Result<(), ServerError> { + let _ = request_builder() + .delete(&url.to_owned()) + .header(HEADER_TOKEN, token) + .protobuf(params)? + .send() + .await?; + Ok(()) +} + +pub async fn create_trash_request(token: &str, params: RepeatedTrashId, url: &str) -> Result<(), ServerError> { + let _ = request_builder() + .post(&url.to_owned()) + .header(HEADER_TOKEN, token) + .protobuf(params)? + .send() + .await?; + Ok(()) +} + +pub async fn delete_trash_request(token: &str, params: RepeatedTrashId, url: &str) -> Result<(), ServerError> { + let _ = request_builder() + .delete(&url.to_owned()) + .header(HEADER_TOKEN, token) + .protobuf(params)? + .send() + .await?; + Ok(()) +} + +pub async fn read_trash_request(token: &str, url: &str) -> Result { + let repeated_trash = request_builder() + .get(&url.to_owned()) + .header(HEADER_TOKEN, token) + .response::() + .await?; + Ok(repeated_trash) +} diff --git a/frontend/rust-lib/flowy-net/src/cloud/document.rs b/frontend/rust-lib/flowy-net/src/cloud/document.rs new file mode 100644 index 0000000000..c4cf1f4ffc --- /dev/null +++ b/frontend/rust-lib/flowy-net/src/cloud/document.rs @@ -0,0 +1,134 @@ +use backend_service::{ + configuration::*, + request::{HttpRequestBuilder, ResponseMiddleware}, + response::FlowyResponse, +}; +use flowy_collaboration::{ + document::default::initial_delta_string, + entities::doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams}, +}; +use flowy_error::FlowyError; +use lazy_static::lazy_static; +use lib_infra::future::FutureResult; +use std::sync::Arc; + +pub struct DocumentHttpCloudService { + config: ClientServerConfiguration, +} + +impl DocumentHttpCloudService { + pub fn new(config: ClientServerConfiguration) -> Self { Self { config } } +} + +impl DocumentHttpCloudService { + pub fn create_document_request(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError> { + let token = token.to_owned(); + let url = self.config.doc_url(); + FutureResult::new(async move { create_document_request(&token, params, &url).await }) + } + + pub fn read_document_request( + &self, + token: &str, + params: DocumentId, + ) -> FutureResult, FlowyError> { + let token = token.to_owned(); + let url = self.config.doc_url(); + FutureResult::new(async move { read_document_request(&token, params, &url).await }) + } + + pub fn update_document_request(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> { + let token = token.to_owned(); + let url = self.config.doc_url(); + FutureResult::new(async move { reset_doc_request(&token, params, &url).await }) + } +} + +pub struct DocumentLocalCloudService {} + +impl DocumentLocalCloudService { + pub fn create_document_request(&self, _token: &str, _params: CreateDocParams) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } + + pub fn read_document_request( + &self, + _token: &str, + params: DocumentId, + ) -> FutureResult, FlowyError> { + let doc = DocumentInfo { + doc_id: params.doc_id, + text: initial_delta_string(), + rev_id: 0, + base_rev_id: 0, + }; + FutureResult::new(async { Ok(Some(doc)) }) + } + + pub fn update_document_request(&self, _token: &str, _params: ResetDocumentParams) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } +} + +pub async fn create_document_request(token: &str, params: CreateDocParams, url: &str) -> Result<(), FlowyError> { + let _ = request_builder() + .post(&url.to_owned()) + .header(HEADER_TOKEN, token) + .protobuf(params)? + .send() + .await?; + Ok(()) +} + +pub async fn read_document_request( + token: &str, + params: DocumentId, + url: &str, +) -> Result, FlowyError> { + let doc = request_builder() + .get(&url.to_owned()) + .header(HEADER_TOKEN, token) + .protobuf(params)? + .option_response() + .await?; + + Ok(doc) +} + +pub async fn reset_doc_request(token: &str, params: ResetDocumentParams, url: &str) -> Result<(), FlowyError> { + let _ = request_builder() + .patch(&url.to_owned()) + .header(HEADER_TOKEN, token) + .protobuf(params)? + .send() + .await?; + Ok(()) +} + +fn request_builder() -> HttpRequestBuilder { HttpRequestBuilder::new().middleware(MIDDLEWARE.clone()) } + +lazy_static! { + pub(crate) static ref MIDDLEWARE: Arc = Arc::new(DocumentResponseMiddleware {}); +} + +pub(crate) struct DocumentResponseMiddleware {} +impl ResponseMiddleware for DocumentResponseMiddleware { + fn receive_response(&self, token: &Option, response: &FlowyResponse) { + if let Some(error) = &response.error { + if error.is_unauthorized() { + tracing::error!("document user is unauthorized"); + + match token { + None => {}, + Some(_token) => { + // let error = + // FlowyError::new(ErrorCode::UserUnauthorized, ""); + // observable(token, + // WorkspaceObservable::UserUnauthorized).error(error). + // build() + }, + } + } + } + } +} diff --git a/frontend/rust-lib/flowy-net/src/cloud/mod.rs b/frontend/rust-lib/flowy-net/src/cloud/mod.rs new file mode 100644 index 0000000000..ee2bdb2073 --- /dev/null +++ b/frontend/rust-lib/flowy-net/src/cloud/mod.rs @@ -0,0 +1,3 @@ +pub mod core; +pub mod document; +pub mod user; diff --git a/frontend/rust-lib/flowy-net/src/cloud/user.rs b/frontend/rust-lib/flowy-net/src/cloud/user.rs new file mode 100644 index 0000000000..c93543f841 --- /dev/null +++ b/frontend/rust-lib/flowy-net/src/cloud/user.rs @@ -0,0 +1,155 @@ +use backend_service::{configuration::*, errors::ServerError, request::HttpRequestBuilder}; +use flowy_error::FlowyError; +use flowy_user_data_model::entities::{ + SignInParams, + SignInResponse, + SignUpParams, + SignUpResponse, + UpdateUserParams, + UserProfile, +}; +use lib_infra::{future::FutureResult, uuid_string}; + +pub struct UserHttpCloudService { + config: ClientServerConfiguration, +} +impl UserHttpCloudService { + pub fn new(config: &ClientServerConfiguration) -> Self { Self { config: config.clone() } } +} + +impl UserHttpCloudService { + pub fn sign_up(&self, params: SignUpParams) -> FutureResult { + let url = self.config.sign_up_url(); + FutureResult::new(async move { + let resp = user_sign_up_request(params, &url).await?; + Ok(resp) + }) + } + + pub fn sign_in(&self, params: SignInParams) -> FutureResult { + let url = self.config.sign_in_url(); + FutureResult::new(async move { + let resp = user_sign_in_request(params, &url).await?; + Ok(resp) + }) + } + + pub fn sign_out(&self, token: &str) -> FutureResult<(), FlowyError> { + let token = token.to_owned(); + let url = self.config.sign_out_url(); + FutureResult::new(async move { + let _ = user_sign_out_request(&token, &url).await; + Ok(()) + }) + } + + pub fn update_user(&self, token: &str, params: UpdateUserParams) -> FutureResult<(), FlowyError> { + let token = token.to_owned(); + let url = self.config.user_profile_url(); + FutureResult::new(async move { + let _ = update_user_profile_request(&token, params, &url).await?; + Ok(()) + }) + } + + pub fn get_user(&self, token: &str) -> FutureResult { + let token = token.to_owned(); + let url = self.config.user_profile_url(); + FutureResult::new(async move { + let profile = get_user_profile_request(&token, &url).await?; + Ok(profile) + }) + } + + pub fn ws_addr(&self) -> String { self.config.ws_addr() } +} +pub struct UserLocalCloudService(); +impl UserLocalCloudService { + pub fn new(_config: &ClientServerConfiguration) -> Self { Self() } +} + +impl UserLocalCloudService { + pub fn sign_up(&self, params: SignUpParams) -> FutureResult { + let uid = uuid_string(); + FutureResult::new(async move { + Ok(SignUpResponse { + user_id: uid.clone(), + name: params.name, + email: params.email, + token: uid, + }) + }) + } + + pub fn sign_in(&self, params: SignInParams) -> FutureResult { + let user_id = uuid_string(); + FutureResult::new(async { + Ok(SignInResponse { + user_id: user_id.clone(), + name: params.name, + email: params.email, + token: user_id, + }) + }) + } + + pub fn sign_out(&self, _token: &str) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } + + pub fn update_user(&self, _token: &str, _params: UpdateUserParams) -> FutureResult<(), FlowyError> { + FutureResult::new(async { Ok(()) }) + } + + pub fn get_user(&self, _token: &str) -> FutureResult { + FutureResult::new(async { Ok(UserProfile::default()) }) + } + + pub fn ws_addr(&self) -> String { "ws://localhost:8000/ws/".to_owned() } +} + +pub async fn user_sign_up_request(params: SignUpParams, url: &str) -> Result { + let response = request_builder() + .post(&url.to_owned()) + .protobuf(params)? + .response() + .await?; + Ok(response) +} + +pub async fn user_sign_in_request(params: SignInParams, url: &str) -> Result { + let response = request_builder() + .post(&url.to_owned()) + .protobuf(params)? + .response() + .await?; + Ok(response) +} + +pub async fn user_sign_out_request(token: &str, url: &str) -> Result<(), ServerError> { + let _ = request_builder() + .delete(&url.to_owned()) + .header(HEADER_TOKEN, token) + .send() + .await?; + Ok(()) +} + +pub async fn get_user_profile_request(token: &str, url: &str) -> Result { + let user_profile = request_builder() + .get(&url.to_owned()) + .header(HEADER_TOKEN, token) + .response() + .await?; + Ok(user_profile) +} + +pub async fn update_user_profile_request(token: &str, params: UpdateUserParams, url: &str) -> Result<(), ServerError> { + let _ = request_builder() + .patch(&url.to_owned()) + .header(HEADER_TOKEN, token) + .protobuf(params)? + .send() + .await?; + Ok(()) +} + +fn request_builder() -> HttpRequestBuilder { HttpRequestBuilder::new() } diff --git a/frontend/rust-lib/flowy-net/src/lib.rs b/frontend/rust-lib/flowy-net/src/lib.rs index 90aabe3fcc..c1ac5bafb9 100644 --- a/frontend/rust-lib/flowy-net/src/lib.rs +++ b/frontend/rust-lib/flowy-net/src/lib.rs @@ -1,3 +1,4 @@ +pub mod cloud; pub mod entities; mod event; mod handlers; diff --git a/frontend/rust-lib/flowy-net/src/services/local/local_server.rs b/frontend/rust-lib/flowy-net/src/services/local/local_server.rs index 35614b9b07..7283af9b1e 100644 --- a/frontend/rust-lib/flowy-net/src/services/local/local_server.rs +++ b/frontend/rust-lib/flowy-net/src/services/local/local_server.rs @@ -13,7 +13,7 @@ use tokio::sync::{mpsc, mpsc::UnboundedSender}; pub struct LocalDocumentServer { pub doc_manager: Arc, sender: mpsc::UnboundedSender, - persistence: Arc, + persistence: Arc, } impl LocalDocumentServer { @@ -64,7 +64,7 @@ impl LocalDocumentServer { struct LocalDocumentUser { user_id: String, ws_sender: mpsc::UnboundedSender, - persistence: Arc, + persistence: Arc, } impl RevisionUser for LocalDocumentUser { diff --git a/frontend/rust-lib/flowy-net/src/services/local/persistence.rs b/frontend/rust-lib/flowy-net/src/services/local/persistence.rs index 684d479a40..55de97fa4b 100644 --- a/frontend/rust-lib/flowy-net/src/services/local/persistence.rs +++ b/frontend/rust-lib/flowy-net/src/services/local/persistence.rs @@ -30,7 +30,7 @@ impl std::default::Default for LocalServerDocumentPersistence { } } -impl DocumentPersistence for LocalServerDocumentPersistence { +impl ServerDocumentPersistence for LocalServerDocumentPersistence { fn read_document(&self, doc_id: &str) -> BoxResultFuture { let inner = self.inner.clone(); let doc_id = doc_id.to_owned(); diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/core_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/core_deps.rs new file mode 100644 index 0000000000..7f9450c202 --- /dev/null +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/core_deps.rs @@ -0,0 +1,211 @@ +use backend_service::configuration::ClientServerConfiguration; +use flowy_core::{ + errors::FlowyError, + module::{CoreCloudService, WorkspaceDatabase, WorkspaceUser}, + prelude::{ + App, + AppId, + CreateAppParams, + CreateViewParams, + CreateWorkspaceParams, + RepeatedTrash, + RepeatedTrashId, + RepeatedViewId, + RepeatedWorkspace, + UpdateAppParams, + UpdateViewParams, + UpdateWorkspaceParams, + View, + ViewId, + Workspace, + WorkspaceId, + }, +}; +use flowy_database::ConnectionPool; +use flowy_net::cloud::core::{CoreHttpCloudService, CoreLocalCloudService}; +use flowy_user::services::user::UserSession; +use lib_infra::future::FutureResult; +use std::sync::Arc; + +pub struct CoreDepsResolver(); +impl CoreDepsResolver { + pub fn resolve( + user_session: Arc, + server_config: &ClientServerConfiguration, + ) -> ( + Arc, + Arc, + Arc, + ) { + let user: Arc = Arc::new(WorkspaceUserImpl(user_session.clone())); + let database: Arc = Arc::new(WorkspaceDatabaseImpl(user_session)); + let cloud_service = make_core_cloud_service(server_config); + (user, database, cloud_service) + } +} + +struct WorkspaceDatabaseImpl(Arc); +impl WorkspaceDatabase for WorkspaceDatabaseImpl { + fn db_pool(&self) -> Result, FlowyError> { + self.0.db_pool().map_err(|e| FlowyError::internal().context(e)) + } +} + +struct WorkspaceUserImpl(Arc); +impl WorkspaceUser for WorkspaceUserImpl { + fn user_id(&self) -> Result { self.0.user_id().map_err(|e| FlowyError::internal().context(e)) } + + fn token(&self) -> Result { self.0.token().map_err(|e| FlowyError::internal().context(e)) } +} + +fn make_core_cloud_service(config: &ClientServerConfiguration) -> Arc { + if cfg!(feature = "http_server") { + Arc::new(CoreHttpCloudServiceAdaptor::new(config)) + } else { + Arc::new(CoreLocalCloudServiceAdaptor::new(config)) + } +} + +struct CoreHttpCloudServiceAdaptor(CoreHttpCloudService); +impl CoreHttpCloudServiceAdaptor { + fn new(config: &ClientServerConfiguration) -> Self { Self(CoreHttpCloudService::new(config.clone())) } +} +impl CoreCloudService for CoreHttpCloudServiceAdaptor { + fn init(&self) { + // let mut rx = BACKEND_API_MIDDLEWARE.invalid_token_subscribe(); + // tokio::spawn(async move { + // while let Ok(invalid_token) = rx.recv().await { + // let error = FlowyError::new(ErrorCode::UserUnauthorized, ""); + // send_dart_notification(&invalid_token, + // WorkspaceNotification::UserUnauthorized) .error(error) + // .send() + // } + // }); + self.0.init() + } + + fn create_workspace(&self, token: &str, params: CreateWorkspaceParams) -> FutureResult { + self.0.create_workspace(token, params) + } + + fn read_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult { + self.0.read_workspace(token, params) + } + + fn update_workspace(&self, token: &str, params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> { + self.0.update_workspace(token, params) + } + + fn delete_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<(), FlowyError> { + self.0.delete_workspace(token, params) + } + + fn create_view(&self, token: &str, params: CreateViewParams) -> FutureResult { + self.0.create_view(token, params) + } + + fn read_view(&self, token: &str, params: ViewId) -> FutureResult, FlowyError> { + self.0.read_view(token, params) + } + + fn delete_view(&self, token: &str, params: RepeatedViewId) -> FutureResult<(), FlowyError> { + self.0.delete_view(token, params) + } + + fn update_view(&self, token: &str, params: UpdateViewParams) -> FutureResult<(), FlowyError> { + self.0.update_view(token, params) + } + + fn create_app(&self, token: &str, params: CreateAppParams) -> FutureResult { + self.0.create_app(token, params) + } + + fn read_app(&self, token: &str, params: AppId) -> FutureResult, FlowyError> { + self.0.read_app(token, params) + } + + fn update_app(&self, token: &str, params: UpdateAppParams) -> FutureResult<(), FlowyError> { + self.0.update_app(token, params) + } + + fn delete_app(&self, token: &str, params: AppId) -> FutureResult<(), FlowyError> { + self.0.delete_app(token, params) + } + + fn create_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> { + self.0.create_trash(token, params) + } + + fn delete_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> { + self.0.delete_trash(token, params) + } + + fn read_trash(&self, token: &str) -> FutureResult { self.0.read_trash(token) } +} + +struct CoreLocalCloudServiceAdaptor(CoreLocalCloudService); +impl CoreLocalCloudServiceAdaptor { + fn new(config: &ClientServerConfiguration) -> Self { Self(CoreLocalCloudService::new(config)) } +} + +impl CoreCloudService for CoreLocalCloudServiceAdaptor { + fn init(&self) { self.0.init() } + + fn create_workspace(&self, token: &str, params: CreateWorkspaceParams) -> FutureResult { + self.0.create_workspace(token, params) + } + + fn read_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult { + self.0.read_workspace(token, params) + } + + fn update_workspace(&self, token: &str, params: UpdateWorkspaceParams) -> FutureResult<(), FlowyError> { + self.0.update_workspace(token, params) + } + + fn delete_workspace(&self, token: &str, params: WorkspaceId) -> FutureResult<(), FlowyError> { + self.0.delete_workspace(token, params) + } + + fn create_view(&self, token: &str, params: CreateViewParams) -> FutureResult { + self.0.create_view(token, params) + } + + fn read_view(&self, token: &str, params: ViewId) -> FutureResult, FlowyError> { + self.0.read_view(token, params) + } + + fn delete_view(&self, token: &str, params: RepeatedViewId) -> FutureResult<(), FlowyError> { + self.0.delete_view(token, params) + } + + fn update_view(&self, token: &str, params: UpdateViewParams) -> FutureResult<(), FlowyError> { + self.0.update_view(token, params) + } + + fn create_app(&self, token: &str, params: CreateAppParams) -> FutureResult { + self.0.create_app(token, params) + } + + fn read_app(&self, token: &str, params: AppId) -> FutureResult, FlowyError> { + self.0.read_app(token, params) + } + + fn update_app(&self, token: &str, params: UpdateAppParams) -> FutureResult<(), FlowyError> { + self.0.update_app(token, params) + } + + fn delete_app(&self, token: &str, params: AppId) -> FutureResult<(), FlowyError> { + self.0.delete_app(token, params) + } + + fn create_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> { + self.0.create_trash(token, params) + } + + fn delete_trash(&self, token: &str, params: RepeatedTrashId) -> FutureResult<(), FlowyError> { + self.0.delete_trash(token, params) + } + + fn read_trash(&self, token: &str) -> FutureResult { self.0.read_trash(token) } +} diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs index b42e273c6f..7751257814 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs @@ -1,50 +1,58 @@ +use backend_service::configuration::ClientServerConfiguration; use bytes::Bytes; -use flowy_collaboration::entities::ws::DocumentClientWSData; +use flowy_collaboration::entities::{ + doc::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams}, + ws::DocumentClientWSData, +}; use flowy_database::ConnectionPool; use flowy_document::{ context::DocumentUser, core::{DocumentWSReceivers, DocumentWebSocket, WSStateReceiver}, errors::{internal_error, FlowyError}, + DocumentCloudService, +}; +use flowy_net::{ + cloud::document::{DocumentHttpCloudService, DocumentLocalCloudService}, + services::ws_conn::FlowyWebSocketConnect, }; -use flowy_net::services::ws_conn::FlowyWebSocketConnect; use flowy_user::services::user::UserSession; +use lib_infra::future::FutureResult; use lib_ws::{WSMessageReceiver, WSModule, WebSocketRawMessage}; use std::{convert::TryInto, path::Path, sync::Arc}; +pub struct DocumentDependencies { + pub user: Arc, + pub ws_receivers: Arc, + pub ws_sender: Arc, + pub cloud_service: Arc, +} + pub struct DocumentDepsResolver(); impl DocumentDepsResolver { pub fn resolve( ws_conn: Arc, user_session: Arc, - ) -> ( - Arc, - Arc, - Arc, - ) { - let user = Arc::new(DocumentUserImpl { user: user_session }); - - let ws_sender = Arc::new(DocumentWebSocketAdapter { - ws_conn: ws_conn.clone(), - }); + server_config: &ClientServerConfiguration, + ) -> DocumentDependencies { + let user = Arc::new(DocumentUserImpl(user_session)); + let ws_sender = Arc::new(DocumentWebSocketImpl(ws_conn.clone())); let ws_receivers = Arc::new(DocumentWSReceivers::new()); - let receiver = Arc::new(WSMessageReceiverAdaptor(ws_receivers.clone())); + let receiver = Arc::new(WSMessageReceiverImpl(ws_receivers.clone())); ws_conn.add_ws_message_receiver(receiver).unwrap(); - (user, ws_receivers, ws_sender) + let cloud_service = make_document_cloud_service(server_config); + DocumentDependencies { + user, + ws_receivers, + ws_sender, + cloud_service, + } } } -struct DocumentUserImpl { - user: Arc, -} - -impl DocumentUserImpl {} - +struct DocumentUserImpl(Arc); impl DocumentUser for DocumentUserImpl { fn user_dir(&self) -> Result { - let dir = self - .user - .user_dir() - .map_err(|e| FlowyError::unauthorized().context(e))?; + let dir = self.0.user_dir().map_err(|e| FlowyError::unauthorized().context(e))?; let doc_dir = format!("{}/document", dir); if !Path::new(&doc_dir).exists() { @@ -53,35 +61,77 @@ impl DocumentUser for DocumentUserImpl { Ok(doc_dir) } - fn user_id(&self) -> Result { self.user.user_id() } + fn user_id(&self) -> Result { self.0.user_id() } - fn token(&self) -> Result { self.user.token() } + fn token(&self) -> Result { self.0.token() } - fn db_pool(&self) -> Result, FlowyError> { self.user.db_pool() } + fn db_pool(&self) -> Result, FlowyError> { self.0.db_pool() } } -struct DocumentWebSocketAdapter { - ws_conn: Arc, -} - -impl DocumentWebSocket for DocumentWebSocketAdapter { +struct DocumentWebSocketImpl(Arc); +impl DocumentWebSocket for DocumentWebSocketImpl { fn send(&self, data: DocumentClientWSData) -> Result<(), FlowyError> { let bytes: Bytes = data.try_into().unwrap(); let msg = WebSocketRawMessage { module: WSModule::Doc, data: bytes.to_vec(), }; - let sender = self.ws_conn.ws_sender()?; + let sender = self.0.ws_sender()?; sender.send(msg).map_err(internal_error)?; Ok(()) } - fn subscribe_state_changed(&self) -> WSStateReceiver { self.ws_conn.subscribe_websocket_state() } + fn subscribe_state_changed(&self) -> WSStateReceiver { self.0.subscribe_websocket_state() } } -struct WSMessageReceiverAdaptor(Arc); - -impl WSMessageReceiver for WSMessageReceiverAdaptor { +struct WSMessageReceiverImpl(Arc); +impl WSMessageReceiver for WSMessageReceiverImpl { fn source(&self) -> WSModule { WSModule::Doc } fn receive_message(&self, msg: WebSocketRawMessage) { self.0.did_receive_data(Bytes::from(msg.data)); } } + +fn make_document_cloud_service(server_config: &ClientServerConfiguration) -> Arc { + if cfg!(feature = "http_server") { + Arc::new(DocumentHttpCloudServiceAdaptor::new(server_config.clone())) + } else { + Arc::new(DocumentLocalCloudServiceAdaptor::new()) + } +} + +struct DocumentHttpCloudServiceAdaptor(DocumentHttpCloudService); +impl DocumentHttpCloudServiceAdaptor { + fn new(config: ClientServerConfiguration) -> Self { + DocumentHttpCloudServiceAdaptor(DocumentHttpCloudService::new(config)) + } +} +impl DocumentCloudService for DocumentHttpCloudServiceAdaptor { + fn create_document(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError> { + self.0.create_document_request(token, params) + } + + fn read_document(&self, token: &str, params: DocumentId) -> FutureResult, FlowyError> { + self.0.read_document_request(token, params) + } + + fn update_document(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> { + self.0.update_document_request(token, params) + } +} + +struct DocumentLocalCloudServiceAdaptor(DocumentLocalCloudService); +impl DocumentLocalCloudServiceAdaptor { + fn new() -> Self { Self(DocumentLocalCloudService {}) } +} +impl DocumentCloudService for DocumentLocalCloudServiceAdaptor { + fn create_document(&self, token: &str, params: CreateDocParams) -> FutureResult<(), FlowyError> { + self.0.create_document_request(token, params) + } + + fn read_document(&self, token: &str, params: DocumentId) -> FutureResult, FlowyError> { + self.0.read_document_request(token, params) + } + + fn update_document(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> { + self.0.update_document_request(token, params) + } +} diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/mod.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/mod.rs index bf1e78d59b..6b391852a1 100644 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/mod.rs +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/mod.rs @@ -1,5 +1,7 @@ +mod core_deps; mod document_deps; -mod workspace_deps; +mod user_deps; +pub use core_deps::*; pub use document_deps::*; -pub use workspace_deps::*; +pub use user_deps::*; diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/user_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/user_deps.rs new file mode 100644 index 0000000000..db1e790a83 --- /dev/null +++ b/frontend/rust-lib/flowy-sdk/src/deps_resolve/user_deps.rs @@ -0,0 +1,58 @@ +use crate::FlowyError; +use backend_service::configuration::ClientServerConfiguration; +use flowy_net::cloud::user::{UserHttpCloudService, UserLocalCloudService}; +use flowy_user::{ + entities::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserParams, UserProfile}, + module::UserCloudService, +}; +use lib_infra::future::FutureResult; +use std::sync::Arc; + +pub struct UserDepsResolver(); +impl UserDepsResolver { + pub fn resolve(server_config: &ClientServerConfiguration) -> Arc { + make_user_cloud_service(server_config) + } +} + +fn make_user_cloud_service(config: &ClientServerConfiguration) -> Arc { + if cfg!(feature = "http_server") { + Arc::new(UserHttpCloudServiceAdaptor(UserHttpCloudService::new(config))) + } else { + Arc::new(UserLocalCloudServiceAdaptor(UserLocalCloudService::new(config))) + } +} + +struct UserHttpCloudServiceAdaptor(UserHttpCloudService); +impl UserCloudService for UserHttpCloudServiceAdaptor { + fn sign_up(&self, params: SignUpParams) -> FutureResult { self.0.sign_up(params) } + + fn sign_in(&self, params: SignInParams) -> FutureResult { self.0.sign_in(params) } + + fn sign_out(&self, token: &str) -> FutureResult<(), FlowyError> { self.0.sign_out(token) } + + fn update_user(&self, token: &str, params: UpdateUserParams) -> FutureResult<(), FlowyError> { + self.0.update_user(token, params) + } + + fn get_user(&self, token: &str) -> FutureResult { self.0.get_user(token) } + + fn ws_addr(&self) -> String { self.0.ws_addr() } +} + +struct UserLocalCloudServiceAdaptor(UserLocalCloudService); +impl UserCloudService for UserLocalCloudServiceAdaptor { + fn sign_up(&self, params: SignUpParams) -> FutureResult { self.0.sign_up(params) } + + fn sign_in(&self, params: SignInParams) -> FutureResult { self.0.sign_in(params) } + + fn sign_out(&self, token: &str) -> FutureResult<(), FlowyError> { self.0.sign_out(token) } + + fn update_user(&self, token: &str, params: UpdateUserParams) -> FutureResult<(), FlowyError> { + self.0.update_user(token, params) + } + + fn get_user(&self, token: &str) -> FutureResult { self.0.get_user(token) } + + fn ws_addr(&self) -> String { self.0.ws_addr() } +} diff --git a/frontend/rust-lib/flowy-sdk/src/deps_resolve/workspace_deps.rs b/frontend/rust-lib/flowy-sdk/src/deps_resolve/workspace_deps.rs deleted file mode 100644 index f44b45568e..0000000000 --- a/frontend/rust-lib/flowy-sdk/src/deps_resolve/workspace_deps.rs +++ /dev/null @@ -1,49 +0,0 @@ -use flowy_core::{ - errors::FlowyError, - module::{WorkspaceDatabase, WorkspaceUser}, -}; -use flowy_database::ConnectionPool; -use flowy_user::services::user::UserSession; -use std::sync::Arc; - -pub struct WorkspaceDepsResolver { - inner: Arc, -} - -struct Resolver { - pub(crate) user_session: Arc, -} - -impl WorkspaceDepsResolver { - pub fn new(user_session: Arc) -> Self { - Self { - inner: Arc::new(Resolver { user_session }), - } - } - - pub fn split_into(self) -> (Arc, Arc) { - let user: Arc = self.inner.clone(); - let database: Arc = self.inner; - (user, database) - } -} - -impl WorkspaceDatabase for Resolver { - fn db_pool(&self) -> Result, FlowyError> { - self.user_session - .db_pool() - .map_err(|e| FlowyError::internal().context(e)) - } -} - -impl WorkspaceUser for Resolver { - fn user_id(&self) -> Result { - self.user_session - .user_id() - .map_err(|e| FlowyError::internal().context(e)) - } - - fn token(&self) -> Result { - self.user_session.token().map_err(|e| FlowyError::internal().context(e)) - } -} diff --git a/frontend/rust-lib/flowy-sdk/src/lib.rs b/frontend/rust-lib/flowy-sdk/src/lib.rs index 7d0ee297eb..3afe5a8c7a 100644 --- a/frontend/rust-lib/flowy-sdk/src/lib.rs +++ b/frontend/rust-lib/flowy-sdk/src/lib.rs @@ -1,6 +1,6 @@ mod deps_resolve; pub mod module; -use crate::deps_resolve::{DocumentDepsResolver, WorkspaceDepsResolver}; +use crate::deps_resolve::*; use backend_service::configuration::ClientServerConfiguration; use flowy_core::{context::CoreContext, errors::FlowyError, module::init_core}; use flowy_document::context::DocumentContext; @@ -12,7 +12,7 @@ use flowy_net::{ }, }; use flowy_user::{ - prelude::UserStatus, + entities::UserStatus, services::user::{UserSession, UserSessionConfig}, }; use lib_dispatch::prelude::*; @@ -103,7 +103,7 @@ impl FlowySDK { config.server_config.ws_addr(), default_web_socket(), )); - let user_session = mk_user_session(&config); + let user_session = mk_user_session(&config, &config.server_config); let flowy_document = mk_document(&ws_conn, &user_session, &config.server_config); let core_ctx = mk_core_context(&user_session, &flowy_document, &config.server_config); @@ -186,9 +186,9 @@ async fn _listen_user_status( } } -async fn _listen_network_status(mut subscribe: broadcast::Receiver, core: Arc) { - while let Ok(new_type) = subscribe.recv().await { - core.network_state_changed(new_type); +async fn _listen_network_status(mut subscribe: broadcast::Receiver, _core: Arc) { + while let Ok(_new_type) = subscribe.recv().await { + // core.network_state_changed(new_type); } } @@ -209,10 +209,11 @@ fn init_log(config: &FlowySDKConfig) { } } -fn mk_user_session(config: &FlowySDKConfig) -> Arc { +fn mk_user_session(config: &FlowySDKConfig, server_config: &ClientServerConfiguration) -> Arc { let session_cache_key = format!("{}_session_cache", &config.name); - let user_config = UserSessionConfig::new(&config.root, &config.server_config, &session_cache_key); - Arc::new(UserSession::new(user_config)) + let user_config = UserSessionConfig::new(&config.root, &session_cache_key); + let cloud_service = UserDepsResolver::resolve(server_config); + Arc::new(UserSession::new(user_config, cloud_service)) } fn mk_core_context( @@ -220,9 +221,8 @@ fn mk_core_context( flowy_document: &Arc, server_config: &ClientServerConfiguration, ) -> Arc { - let workspace_deps = WorkspaceDepsResolver::new(user_session.clone()); - let (user, database) = workspace_deps.split_into(); - init_core(user, database, flowy_document.clone(), server_config) + let (user, database, cloud_service) = CoreDepsResolver::resolve(user_session.clone(), server_config); + init_core(user, database, flowy_document.clone(), cloud_service) } fn default_web_socket() -> Arc { @@ -234,10 +234,15 @@ fn default_web_socket() -> Arc { } pub fn mk_document( - ws_manager: &Arc, + ws_conn: &Arc, user_session: &Arc, server_config: &ClientServerConfiguration, ) -> Arc { - let (user, ws_receivers, ws_sender) = DocumentDepsResolver::resolve(ws_manager.clone(), user_session.clone()); - Arc::new(DocumentContext::new(user, ws_receivers, ws_sender, server_config)) + let dependencies = DocumentDepsResolver::resolve(ws_conn.clone(), user_session.clone(), server_config); + Arc::new(DocumentContext::new( + dependencies.user, + dependencies.ws_receivers, + dependencies.ws_sender, + dependencies.cloud_service, + )) } diff --git a/frontend/rust-lib/flowy-user/Cargo.toml b/frontend/rust-lib/flowy-user/Cargo.toml index aaec7a42d9..01fea2fc7d 100644 --- a/frontend/rust-lib/flowy-user/Cargo.toml +++ b/frontend/rust-lib/flowy-user/Cargo.toml @@ -7,14 +7,11 @@ edition = "2018" [dependencies] flowy-user-data-model = { path = "../../../shared-lib/flowy-user-data-model" } -backend-service = { path = "../../../shared-lib/backend-service" } flowy-derive = { path = "../../../shared-lib/flowy-derive" } lib-infra = { path = "../../../shared-lib/lib-infra" } -flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration", optional = true} derive_more = {version = "0.99", features = ["display"]} flowy-database = { path = "../flowy-database" } -flowy-net = { path = "../flowy-net" } dart-notify = { path = "../dart-notify" } lib-dispatch = { path = "../lib-dispatch" } flowy-error = { path = "../flowy-error", features = ["db", "backend"] } diff --git a/frontend/rust-lib/flowy-user/src/lib.rs b/frontend/rust-lib/flowy-user/src/lib.rs index 5484162948..0b344145ab 100644 --- a/frontend/rust-lib/flowy-user/src/lib.rs +++ b/frontend/rust-lib/flowy-user/src/lib.rs @@ -10,10 +10,6 @@ mod sql_tables; #[macro_use] extern crate flowy_database; -pub mod prelude { - pub use crate::{entities::*, services::server::*}; -} - pub mod errors { pub use flowy_error::{internal_error, ErrorCode, FlowyError}; } diff --git a/frontend/rust-lib/flowy-user/src/module.rs b/frontend/rust-lib/flowy-user/src/module.rs index 0660602068..5045d6fa2f 100644 --- a/frontend/rust-lib/flowy-user/src/module.rs +++ b/frontend/rust-lib/flowy-user/src/module.rs @@ -1,6 +1,13 @@ use lib_dispatch::prelude::*; -use crate::{event::UserEvent, handlers::*, services::user::UserSession}; +use crate::{ + entities::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserParams, UserProfile}, + errors::FlowyError, + event::UserEvent, + handlers::*, + services::user::UserSession, +}; +use lib_infra::future::FutureResult; use std::sync::Arc; pub fn create(user_session: Arc) -> Module { @@ -15,3 +22,12 @@ pub fn create(user_session: Arc) -> Module { .event(UserEvent::UpdateUser, update_user_handler) .event(UserEvent::CheckUser, check_user_handler) } + +pub trait UserCloudService: Send + Sync { + fn sign_up(&self, params: SignUpParams) -> FutureResult; + fn sign_in(&self, params: SignInParams) -> FutureResult; + fn sign_out(&self, token: &str) -> FutureResult<(), FlowyError>; + fn update_user(&self, token: &str, params: UpdateUserParams) -> FutureResult<(), FlowyError>; + fn get_user(&self, token: &str) -> FutureResult; + fn ws_addr(&self) -> String; +} diff --git a/frontend/rust-lib/flowy-user/src/services/mod.rs b/frontend/rust-lib/flowy-user/src/services/mod.rs index 4f7a828480..22d12a3827 100644 --- a/frontend/rust-lib/flowy-user/src/services/mod.rs +++ b/frontend/rust-lib/flowy-user/src/services/mod.rs @@ -1,2 +1 @@ -pub mod server; pub mod user; diff --git a/frontend/rust-lib/flowy-user/src/services/server/mod.rs b/frontend/rust-lib/flowy-user/src/services/server/mod.rs deleted file mode 100644 index 6178abd3dd..0000000000 --- a/frontend/rust-lib/flowy-user/src/services/server/mod.rs +++ /dev/null @@ -1,31 +0,0 @@ -mod server_api; -mod server_api_mock; - -pub use server_api::*; -pub use server_api_mock::*; - -use std::sync::Arc; -pub(crate) type Server = Arc; -use crate::{ - entities::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserParams, UserProfile}, - errors::FlowyError, -}; -use backend_service::configuration::ClientServerConfiguration; -use lib_infra::future::FutureResult; - -pub trait UserServerAPI { - fn sign_up(&self, params: SignUpParams) -> FutureResult; - fn sign_in(&self, params: SignInParams) -> FutureResult; - fn sign_out(&self, token: &str) -> FutureResult<(), FlowyError>; - fn update_user(&self, token: &str, params: UpdateUserParams) -> FutureResult<(), FlowyError>; - fn get_user(&self, token: &str) -> FutureResult; - fn ws_addr(&self) -> String; -} - -pub(crate) fn construct_user_server(config: &ClientServerConfiguration) -> Arc { - if cfg!(feature = "http_server") { - Arc::new(UserHttpServer::new(config.clone())) - } else { - Arc::new(UserServerMock {}) - } -} diff --git a/frontend/rust-lib/flowy-user/src/services/server/server_api.rs b/frontend/rust-lib/flowy-user/src/services/server/server_api.rs deleted file mode 100644 index 5161849b37..0000000000 --- a/frontend/rust-lib/flowy-user/src/services/server/server_api.rs +++ /dev/null @@ -1,88 +0,0 @@ -use crate::{ - entities::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserParams, UserProfile}, - errors::FlowyError, - services::server::UserServerAPI, -}; -use backend_service::{configuration::*, http_request::*}; -use lib_infra::future::FutureResult; - -pub struct UserHttpServer { - config: ClientServerConfiguration, -} -impl UserHttpServer { - pub fn new(config: ClientServerConfiguration) -> Self { Self { config } } -} - -impl UserServerAPI for UserHttpServer { - fn sign_up(&self, params: SignUpParams) -> FutureResult { - let url = self.config.sign_up_url(); - FutureResult::new(async move { - let resp = user_sign_up_request(params, &url).await?; - Ok(resp) - }) - } - - fn sign_in(&self, params: SignInParams) -> FutureResult { - let url = self.config.sign_in_url(); - FutureResult::new(async move { - let resp = user_sign_in_request(params, &url).await?; - Ok(resp) - }) - } - - fn sign_out(&self, token: &str) -> FutureResult<(), FlowyError> { - let token = token.to_owned(); - let url = self.config.sign_out_url(); - FutureResult::new(async move { - let _ = user_sign_out_request(&token, &url).await; - Ok(()) - }) - } - - fn update_user(&self, token: &str, params: UpdateUserParams) -> FutureResult<(), FlowyError> { - let token = token.to_owned(); - let url = self.config.user_profile_url(); - FutureResult::new(async move { - let _ = update_user_profile_request(&token, params, &url).await?; - Ok(()) - }) - } - - fn get_user(&self, token: &str) -> FutureResult { - let token = token.to_owned(); - let url = self.config.user_profile_url(); - FutureResult::new(async move { - let profile = get_user_profile_request(&token, &url).await?; - Ok(profile) - }) - } - - fn ws_addr(&self) -> String { self.config.ws_addr() } -} - -// use crate::notify::*; -// use backend_service::response::FlowyResponse; -// use flowy_user_data_model::errors::ErrorCode; - -// struct Middleware {} -// -// -// -// impl ResponseMiddleware for Middleware { -// fn receive_response(&self, token: &Option, response: -// &FlowyResponse) { if let Some(error) = &response.error { -// if error.is_unauthorized() { -// log::error!("user unauthorized"); -// match token { -// None => {}, -// Some(token) => { -// let error = -// FlowyError::new(ErrorCode::UserUnauthorized, ""); -// dart_notify(token, UserNotification::UserUnauthorized) -// .error(error) .send() -// }, -// } -// } -// } -// } -// } diff --git a/frontend/rust-lib/flowy-user/src/services/server/server_api_mock.rs b/frontend/rust-lib/flowy-user/src/services/server/server_api_mock.rs deleted file mode 100644 index bace6bc616..0000000000 --- a/frontend/rust-lib/flowy-user/src/services/server/server_api_mock.rs +++ /dev/null @@ -1,49 +0,0 @@ -use crate::{ - entities::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserParams, UserProfile}, - errors::FlowyError, -}; - -use crate::services::server::UserServerAPI; -use lib_infra::{future::FutureResult, uuid_string}; - -pub struct UserServerMock {} - -impl UserServerMock {} - -impl UserServerAPI for UserServerMock { - fn sign_up(&self, params: SignUpParams) -> FutureResult { - let uid = uuid_string(); - FutureResult::new(async move { - Ok(SignUpResponse { - user_id: uid.clone(), - name: params.name, - email: params.email, - token: uid, - }) - }) - } - - fn sign_in(&self, params: SignInParams) -> FutureResult { - let user_id = uuid_string(); - FutureResult::new(async { - Ok(SignInResponse { - user_id: user_id.clone(), - name: params.name, - email: params.email, - token: user_id, - }) - }) - } - - fn sign_out(&self, _token: &str) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) } - - fn update_user(&self, _token: &str, _params: UpdateUserParams) -> FutureResult<(), FlowyError> { - FutureResult::new(async { Ok(()) }) - } - - fn get_user(&self, _token: &str) -> FutureResult { - FutureResult::new(async { Ok(UserProfile::default()) }) - } - - fn ws_addr(&self) -> String { "ws://localhost:8000/ws/".to_owned() } -} diff --git a/frontend/rust-lib/flowy-user/src/services/user/user_session.rs b/frontend/rust-lib/flowy-user/src/services/user/user_session.rs index 5613d4cf92..1f5d59a3e8 100644 --- a/frontend/rust-lib/flowy-user/src/services/user/user_session.rs +++ b/frontend/rust-lib/flowy-user/src/services/user/user_session.rs @@ -1,9 +1,3 @@ -use parking_lot::RwLock; -use serde::{Deserialize, Serialize}; -use std::sync::Arc; -use tokio::sync::mpsc; - -use backend_service::configuration::ClientServerConfiguration; use flowy_database::{ kv::KV, query_dsl::*, @@ -14,29 +8,29 @@ use flowy_database::{ }; use flowy_user_data_model::entities::{SignInResponse, SignUpResponse}; use lib_sqlite::ConnectionPool; +use parking_lot::RwLock; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tokio::sync::mpsc; use crate::{ entities::{SignInParams, SignUpParams, UpdateUserParams, UserProfile}, errors::{ErrorCode, FlowyError}, + module::UserCloudService, notify::*, - services::{ - server::{construct_user_server, Server}, - user::{database::UserDB, notifier::UserNotifier}, - }, + services::user::{database::UserDB, notifier::UserNotifier}, sql_tables::{UserTable, UserTableChangeset}, }; pub struct UserSessionConfig { root_dir: String, - server_config: ClientServerConfiguration, session_cache_key: String, } impl UserSessionConfig { - pub fn new(root_dir: &str, server_config: &ClientServerConfiguration, session_cache_key: &str) -> Self { + pub fn new(root_dir: &str, session_cache_key: &str) -> Self { Self { root_dir: root_dir.to_owned(), - server_config: server_config.clone(), session_cache_key: session_cache_key.to_owned(), } } @@ -45,21 +39,19 @@ impl UserSessionConfig { pub struct UserSession { database: UserDB, config: UserSessionConfig, - #[allow(dead_code)] - server: Server, + cloud_service: Arc, session: RwLock>, pub notifier: UserNotifier, } impl UserSession { - pub fn new(config: UserSessionConfig) -> Self { + pub fn new(config: UserSessionConfig, cloud_service: Arc) -> Self { let db = UserDB::new(&config.root_dir); - let server = construct_user_server(&config.server_config); let notifier = UserNotifier::new(); Self { database: db, config, - server, + cloud_service, session: RwLock::new(None), notifier, } @@ -92,7 +84,7 @@ impl UserSession { if self.is_login(¶ms.email) { self.user_profile().await } else { - let resp = self.server.sign_in(params).await?; + let resp = self.cloud_service.sign_in(params).await?; let session: Session = resp.clone().into(); let _ = self.set_session(Some(session))?; let user_table = self.save_user(resp.into()).await?; @@ -107,7 +99,7 @@ impl UserSession { if self.is_login(¶ms.email) { self.user_profile().await } else { - let resp = self.server.sign_up(params).await?; + let resp = self.cloud_service.sign_up(params).await?; let session: Session = resp.clone().into(); let _ = self.set_session(Some(session))?; let user_table = self.save_user(resp.into()).await?; @@ -180,7 +172,7 @@ impl UserSession { impl UserSession { fn read_user_profile_on_server(&self, token: &str) -> Result<(), FlowyError> { - let server = self.server.clone(); + let server = self.cloud_service.clone(); let token = token.to_owned(); tokio::spawn(async move { match server.get_user(&token).await { @@ -200,7 +192,7 @@ impl UserSession { } async fn update_user_on_server(&self, token: &str, params: UpdateUserParams) -> Result<(), FlowyError> { - let server = self.server.clone(); + let server = self.cloud_service.clone(); let token = token.to_owned(); let _ = tokio::spawn(async move { match server.update_user(&token, params).await { @@ -216,7 +208,7 @@ impl UserSession { } async fn sign_out_on_server(&self, token: &str) -> Result<(), FlowyError> { - let server = self.server.clone(); + let server = self.cloud_service.clone(); let token = token.to_owned(); let _ = tokio::spawn(async move { match server.sign_out(&token).await { @@ -273,7 +265,7 @@ impl UserSession { } pub async fn update_user( - _server: Server, + _cloud_service: Arc, pool: Arc, params: UpdateUserParams, ) -> Result<(), FlowyError> { diff --git a/frontend/rust-lib/flowy-user/tests/event/auth_test.rs b/frontend/rust-lib/flowy-user/tests/event/auth_test.rs index 899eb48731..57b25dcdb8 100644 --- a/frontend/rust-lib/flowy-user/tests/event/auth_test.rs +++ b/frontend/rust-lib/flowy-user/tests/event/auth_test.rs @@ -1,6 +1,7 @@ use crate::helper::*; use flowy_test::{event_builder::UserModuleEventBuilder, FlowySDKTest}; -use flowy_user::{errors::ErrorCode, event::UserEvent::*, prelude::*}; +use flowy_user::{errors::ErrorCode, event::UserEvent::*}; +use flowy_user_data_model::entities::{SignInRequest, SignUpRequest, UserProfile}; #[tokio::test] async fn sign_up_with_invalid_email() { diff --git a/frontend/rust-lib/flowy-user/tests/event/user_profile_test.rs b/frontend/rust-lib/flowy-user/tests/event/user_profile_test.rs index 9fef50600f..fad0ac7c35 100644 --- a/frontend/rust-lib/flowy-user/tests/event/user_profile_test.rs +++ b/frontend/rust-lib/flowy-user/tests/event/user_profile_test.rs @@ -1,6 +1,7 @@ use crate::helper::*; use flowy_test::{event_builder::UserModuleEventBuilder, FlowySDKTest}; -use flowy_user::{errors::ErrorCode, event::UserEvent::*, prelude::*}; +use flowy_user::{errors::ErrorCode, event::UserEvent::*}; +use flowy_user_data_model::entities::{UpdateUserRequest, UserProfile}; use lib_infra::uuid_string; use serial_test::*; diff --git a/shared-lib/backend-service/src/http_request.rs b/shared-lib/backend-service/src/http_request.rs deleted file mode 100644 index 015bd358bb..0000000000 --- a/shared-lib/backend-service/src/http_request.rs +++ /dev/null @@ -1,219 +0,0 @@ -use crate::{configuration::HEADER_TOKEN, errors::ServerError, request::HttpRequestBuilder}; -use flowy_core_data_model::entities::prelude::*; -use flowy_user_data_model::entities::prelude::*; - -pub(crate) fn request_builder() -> HttpRequestBuilder { - HttpRequestBuilder::new().middleware(crate::middleware::BACKEND_API_MIDDLEWARE.clone()) -} - -pub async fn user_sign_up_request(params: SignUpParams, url: &str) -> Result { - let response = request_builder() - .post(&url.to_owned()) - .protobuf(params)? - .response() - .await?; - Ok(response) -} - -pub async fn user_sign_in_request(params: SignInParams, url: &str) -> Result { - let response = request_builder() - .post(&url.to_owned()) - .protobuf(params)? - .response() - .await?; - Ok(response) -} - -pub async fn user_sign_out_request(token: &str, url: &str) -> Result<(), ServerError> { - let _ = request_builder() - .delete(&url.to_owned()) - .header(HEADER_TOKEN, token) - .send() - .await?; - Ok(()) -} - -pub async fn get_user_profile_request(token: &str, url: &str) -> Result { - let user_profile = request_builder() - .get(&url.to_owned()) - .header(HEADER_TOKEN, token) - .response() - .await?; - Ok(user_profile) -} - -pub async fn update_user_profile_request(token: &str, params: UpdateUserParams, url: &str) -> Result<(), ServerError> { - let _ = request_builder() - .patch(&url.to_owned()) - .header(HEADER_TOKEN, token) - .protobuf(params)? - .send() - .await?; - Ok(()) -} - -pub async fn create_workspace_request( - token: &str, - params: CreateWorkspaceParams, - url: &str, -) -> Result { - let workspace = request_builder() - .post(&url.to_owned()) - .header(HEADER_TOKEN, token) - .protobuf(params)? - .response() - .await?; - Ok(workspace) -} - -pub async fn read_workspaces_request( - token: &str, - params: WorkspaceId, - url: &str, -) -> Result { - let repeated_workspace = request_builder() - .get(&url.to_owned()) - .header(HEADER_TOKEN, token) - .protobuf(params)? - .response::() - .await?; - - Ok(repeated_workspace) -} - -pub async fn update_workspace_request( - token: &str, - params: UpdateWorkspaceParams, - url: &str, -) -> Result<(), ServerError> { - let _ = request_builder() - .patch(&url.to_owned()) - .header(HEADER_TOKEN, token) - .protobuf(params)? - .send() - .await?; - Ok(()) -} - -pub async fn delete_workspace_request(token: &str, params: WorkspaceId, url: &str) -> Result<(), ServerError> { - let _ = request_builder() - .delete(url) - .header(HEADER_TOKEN, token) - .protobuf(params)? - .send() - .await?; - Ok(()) -} - -// App -pub async fn create_app_request(token: &str, params: CreateAppParams, url: &str) -> Result { - let app = request_builder() - .post(&url.to_owned()) - .header(HEADER_TOKEN, token) - .protobuf(params)? - .response() - .await?; - Ok(app) -} - -pub async fn read_app_request(token: &str, params: AppId, url: &str) -> Result, ServerError> { - let app = request_builder() - .get(&url.to_owned()) - .header(HEADER_TOKEN, token) - .protobuf(params)? - .option_response() - .await?; - - Ok(app) -} - -pub async fn update_app_request(token: &str, params: UpdateAppParams, url: &str) -> Result<(), ServerError> { - let _ = request_builder() - .patch(&url.to_owned()) - .header(HEADER_TOKEN, token) - .protobuf(params)? - .send() - .await?; - Ok(()) -} - -pub async fn delete_app_request(token: &str, params: AppId, url: &str) -> Result<(), ServerError> { - let _ = request_builder() - .delete(&url.to_owned()) - .header(HEADER_TOKEN, token) - .protobuf(params)? - .send() - .await?; - Ok(()) -} - -// View -pub async fn create_view_request(token: &str, params: CreateViewParams, url: &str) -> Result { - let view = request_builder() - .post(&url.to_owned()) - .header(HEADER_TOKEN, token) - .protobuf(params)? - .response() - .await?; - Ok(view) -} - -pub async fn read_view_request(token: &str, params: ViewId, url: &str) -> Result, ServerError> { - let view = request_builder() - .get(&url.to_owned()) - .header(HEADER_TOKEN, token) - .protobuf(params)? - .option_response() - .await?; - - Ok(view) -} - -pub async fn update_view_request(token: &str, params: UpdateViewParams, url: &str) -> Result<(), ServerError> { - let _ = request_builder() - .patch(&url.to_owned()) - .header(HEADER_TOKEN, token) - .protobuf(params)? - .send() - .await?; - Ok(()) -} - -pub async fn delete_view_request(token: &str, params: RepeatedViewId, url: &str) -> Result<(), ServerError> { - let _ = request_builder() - .delete(&url.to_owned()) - .header(HEADER_TOKEN, token) - .protobuf(params)? - .send() - .await?; - Ok(()) -} - -pub async fn create_trash_request(token: &str, params: RepeatedTrashId, url: &str) -> Result<(), ServerError> { - let _ = request_builder() - .post(&url.to_owned()) - .header(HEADER_TOKEN, token) - .protobuf(params)? - .send() - .await?; - Ok(()) -} - -pub async fn delete_trash_request(token: &str, params: RepeatedTrashId, url: &str) -> Result<(), ServerError> { - let _ = request_builder() - .delete(&url.to_owned()) - .header(HEADER_TOKEN, token) - .protobuf(params)? - .send() - .await?; - Ok(()) -} - -pub async fn read_trash_request(token: &str, url: &str) -> Result { - let repeated_trash = request_builder() - .get(&url.to_owned()) - .header(HEADER_TOKEN, token) - .response::() - .await?; - Ok(repeated_trash) -} diff --git a/shared-lib/backend-service/src/lib.rs b/shared-lib/backend-service/src/lib.rs index 6bd5aa18c4..18e02d6b95 100644 --- a/shared-lib/backend-service/src/lib.rs +++ b/shared-lib/backend-service/src/lib.rs @@ -1,6 +1,4 @@ pub mod configuration; pub mod errors; -pub mod http_request; -pub mod middleware; pub mod request; pub mod response; diff --git a/shared-lib/backend-service/src/middleware.rs b/shared-lib/backend-service/src/middleware.rs deleted file mode 100644 index fee46765cc..0000000000 --- a/shared-lib/backend-service/src/middleware.rs +++ /dev/null @@ -1,39 +0,0 @@ -use crate::{request::ResponseMiddleware, response::FlowyResponse}; -use lazy_static::lazy_static; -use std::sync::Arc; -use tokio::sync::broadcast; -lazy_static! { - pub static ref BACKEND_API_MIDDLEWARE: Arc = Arc::new(WorkspaceMiddleware::new()); -} - -pub struct WorkspaceMiddleware { - invalid_token_sender: broadcast::Sender, -} - -impl WorkspaceMiddleware { - fn new() -> Self { - let (sender, _) = broadcast::channel(10); - WorkspaceMiddleware { - invalid_token_sender: sender, - } - } - - pub fn invalid_token_subscribe(&self) -> broadcast::Receiver { self.invalid_token_sender.subscribe() } -} - -impl ResponseMiddleware for WorkspaceMiddleware { - fn receive_response(&self, token: &Option, response: &FlowyResponse) { - if let Some(error) = &response.error { - if error.is_unauthorized() { - log::error!("user is unauthorized"); - match token { - None => {}, - Some(token) => match self.invalid_token_sender.send(token.clone()) { - Ok(_) => {}, - Err(e) => log::error!("{:?}", e), - }, - } - } - } - } -} diff --git a/shared-lib/flowy-collaboration/src/sync/server.rs b/shared-lib/flowy-collaboration/src/sync/server.rs index dc1c899e90..76a4276315 100644 --- a/shared-lib/flowy-collaboration/src/sync/server.rs +++ b/shared-lib/flowy-collaboration/src/sync/server.rs @@ -16,7 +16,7 @@ use tokio::{ task::spawn_blocking, }; -pub trait DocumentPersistence: Send + Sync + Debug { +pub trait ServerDocumentPersistence: Send + Sync + Debug { fn read_document(&self, doc_id: &str) -> BoxResultFuture; fn create_document( @@ -40,11 +40,11 @@ pub trait DocumentPersistence: Send + Sync + Debug { pub struct ServerDocumentManager { open_doc_map: Arc>>>, - persistence: Arc, + persistence: Arc, } impl ServerDocumentManager { - pub fn new(persistence: Arc) -> Self { + pub fn new(persistence: Arc) -> Self { Self { open_doc_map: Arc::new(RwLock::new(HashMap::new())), persistence, @@ -169,12 +169,12 @@ impl std::ops::Drop for ServerDocumentManager { struct OpenDocHandle { doc_id: String, sender: mpsc::Sender, - persistence: Arc, + persistence: Arc, users: DashMap>, } impl OpenDocHandle { - fn new(doc: DocumentInfo, persistence: Arc) -> Result { + fn new(doc: DocumentInfo, persistence: Arc) -> Result { let doc_id = doc.doc_id.clone(); let (sender, receiver) = mpsc::channel(100); let users = DashMap::new(); @@ -257,17 +257,17 @@ enum DocumentCommand { ApplyRevisions { user: Arc, repeated_revision: RepeatedRevisionPB, - persistence: Arc, + persistence: Arc, ret: oneshot::Sender>, }, Ping { user: Arc, - persistence: Arc, + persistence: Arc, rev_id: i64, ret: oneshot::Sender>, }, Reset { - persistence: Arc, + persistence: Arc, repeated_revision: RepeatedRevisionPB, ret: oneshot::Sender>, }, diff --git a/shared-lib/flowy-collaboration/src/sync/synchronizer.rs b/shared-lib/flowy-collaboration/src/sync/synchronizer.rs index e48e81d77e..43cb98a582 100644 --- a/shared-lib/flowy-collaboration/src/sync/synchronizer.rs +++ b/shared-lib/flowy-collaboration/src/sync/synchronizer.rs @@ -6,7 +6,7 @@ use crate::{ }, errors::CollaborateError, protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, - sync::DocumentPersistence, + sync::ServerDocumentPersistence, util::*, }; use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta}; @@ -54,7 +54,7 @@ impl RevisionSynchronizer { &self, user: Arc, repeated_revision: RepeatedRevisionPB, - persistence: Arc, + persistence: Arc, ) -> Result<(), CollaborateError> { let doc_id = self.doc_id.clone(); if repeated_revision.get_items().is_empty() { @@ -115,7 +115,7 @@ impl RevisionSynchronizer { pub async fn pong( &self, user: Arc, - persistence: Arc, + persistence: Arc, client_rev_id: i64, ) -> Result<(), CollaborateError> { let doc_id = self.doc_id.clone(); @@ -144,7 +144,7 @@ impl RevisionSynchronizer { #[tracing::instrument(level = "debug", skip(self, repeated_revision, persistence), fields(doc_id), err)] pub async fn reset( &self, - persistence: Arc, + persistence: Arc, repeated_revision: RepeatedRevisionPB, ) -> Result<(), CollaborateError> { let doc_id = self.doc_id.clone(); @@ -191,7 +191,11 @@ impl RevisionSynchronizer { pub(crate) fn rev_id(&self) -> i64 { self.rev_id.load(SeqCst) } - async fn is_applied_before(&self, new_revision: &RevisionPB, persistence: &Arc) -> bool { + async fn is_applied_before( + &self, + new_revision: &RevisionPB, + persistence: &Arc, + ) -> bool { let rev_ids = Some(vec![new_revision.rev_id]); if let Ok(revisions) = persistence.read_revisions(&self.doc_id, rev_ids).await { if let Some(revision) = revisions.first() { @@ -207,7 +211,7 @@ impl RevisionSynchronizer { async fn push_revisions_to_user( &self, user: Arc, - persistence: Arc, + persistence: Arc, from: i64, to: i64, ) { diff --git a/shared-lib/lib-ot/src/core/delta/delta.rs b/shared-lib/lib-ot/src/core/delta/delta.rs index 85a2b51161..f0b5aa4478 100644 --- a/shared-lib/lib-ot/src/core/delta/delta.rs +++ b/shared-lib/lib-ot/src/core/delta/delta.rs @@ -446,7 +446,7 @@ fn invert_from_other( tracing::trace!("invert op: {} [{}:{}]", operation, start, end); let other_ops = DeltaIter::from_interval(other, Interval::new(start, end)).ops(); other_ops.into_iter().for_each(|other_op| match operation { - Operation::Delete(n) => { + Operation::Delete(_n) => { // tracing::trace!("invert delete: {} by add {}", n, other_op); base.add(other_op); },