refactor backend dir hierarchi

This commit is contained in:
appflowy 2021-12-06 21:47:21 +08:00
parent 6298a0d96d
commit 8457682092
62 changed files with 247 additions and 314 deletions

View File

@ -1,9 +1,8 @@
use std::{net::TcpListener, time::Duration};
use actix::Actor;
use actix_identity::{CookieIdentityPolicy, IdentityService};
use actix_web::{dev::Server, middleware, web, web::Data, App, HttpServer, Scope};
use sqlx::{postgres::PgPoolOptions, PgPool};
use std::{net::TcpListener, time::Duration};
use tokio::time::interval;
use crate::{
@ -13,16 +12,15 @@ use crate::{
Settings,
},
context::AppContext,
service::{
services::{
app::router as app,
doc::router as doc,
trash::router as trash,
user::router as user,
view::router as view,
workspace::router as workspace,
ws,
ws::WsServer,
},
web_socket::WsServer,
};
pub struct Application {
@ -75,7 +73,7 @@ async fn period_check(_pool: Data<PgPool>) {
}
}
fn ws_scope() -> Scope { web::scope("/ws").service(ws::router::establish_ws_connection) }
fn ws_scope() -> Scope { web::scope("/ws").service(crate::web_socket::router::establish_ws_connection) }
fn user_scope() -> Scope {
// https://developer.mozilla.org/en-US/docs/Web/HTTP
@ -124,6 +122,9 @@ fn user_scope() -> Scope {
.route(web::delete().to(trash::delete_handler))
.route(web::get().to(trash::read_handler))
)
.service(web::resource("/sync")
.route(web::post().to(trash::create_handler))
)
// password
.service(web::resource("/password_change")
.route(web::post().to(user::change_password))
@ -131,7 +132,7 @@ fn user_scope() -> Scope {
}
pub async fn init_app_context(configuration: &Settings) -> AppContext {
let _ = crate::service::log::Builder::new("flowy-server")
let _ = crate::services::log::Builder::new("flowy-server")
.env_filter("Trace")
.build();
let pg_pool = get_connection_pool(&configuration.database)

View File

@ -1,6 +1,6 @@
use crate::service::{
doc::manager::DocBiz,
ws::{WsBizHandlers, WsServer},
use crate::{
services::doc::manager::DocBiz,
web_socket::{WsBizHandlers, WsServer},
};
use actix::Addr;
use actix_web::web::Data;

View File

@ -74,7 +74,7 @@ impl Token {
}
}
use crate::service::user::EXPIRED_DURATION_DAYS;
use crate::services::user::EXPIRED_DURATION_DAYS;
use actix_web::{dev::Payload, FromRequest, HttpRequest};
use backend_service::configuration::HEADER_TOKEN;
use futures::future::{ready, Ready};

View File

@ -3,5 +3,6 @@ pub mod config;
pub mod context;
mod entities;
mod middleware;
pub mod service;
pub mod services;
mod sqlx_ext;
pub mod web_socket;

View File

@ -1,4 +1,4 @@
use crate::service::user::{LoggedUser, AUTHORIZED_USERS};
use crate::services::user::{LoggedUser, AUTHORIZED_USERS};
use actix_service::{Service, Transform};
use actix_web::{
dev::{ServiceRequest, ServiceResponse},

View File

@ -1,10 +1,10 @@
use crate::{
entities::workspace::{AppTable, APP_TABLE},
service::{app::sql_builder::*, user::LoggedUser, view::read_view_belong_to_id},
services::{app::sql_builder::*, user::LoggedUser, view::read_view_belong_to_id},
sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder},
};
use crate::service::trash::read_trash_ids;
use crate::services::trash::read_trash_ids;
use backend_service::errors::{invalid_params, ServerError};
use chrono::Utc;
use flowy_workspace_infra::{

View File

@ -7,7 +7,7 @@ use flowy_workspace_infra::protobuf::{AppIdentifier, CreateAppParams, UpdateAppP
use protobuf::Message;
use sqlx::PgPool;
use crate::service::{
use crate::services::{
app::{
app::{create_app, delete_app, read_app, update_app},
sql_builder::check_app_id,

View File

@ -1,6 +1,6 @@
use crate::service::{
doc::edit::ServerDocEditor,
ws::{entities::Socket, WsUser},
use crate::{
services::doc::edit::ServerDocEditor,
web_socket::{entities::Socket, WsUser},
};
use actix_web::web::Data;
use async_stream::stream;

View File

@ -1,7 +1,9 @@
use crate::service::{
doc::{edit::edit_actor::EditUser, update_doc},
util::md5,
ws::{entities::Socket, WsMessageAdaptor},
use crate::{
services::{
doc::{edit::edit_actor::EditUser, update_doc},
util::md5,
},
web_socket::{entities::Socket, WsMessageAdaptor},
};
use actix_web::web::Data;
use backend_service::errors::{internal_error, ServerError};

View File

@ -1,10 +1,10 @@
use crate::service::{
doc::{
use crate::{
services::doc::{
edit::edit_actor::{EditDocActor, EditMsg},
read_doc,
ws_actor::{DocWsActor, DocWsMsg},
},
ws::{entities::Socket, WsBizHandler, WsClientData, WsUser},
web_socket::{entities::Socket, WsBizHandler, WsClientData, WsUser},
};
use actix_web::web::Data;
use backend_service::errors::{internal_error, Result as DocResult, ServerError};

View File

@ -1,4 +1,4 @@
use crate::service::{
use crate::services::{
doc::{create_doc, read_doc, update_doc},
util::parse_from_payload,
};

View File

@ -1,7 +1,9 @@
use crate::service::{
doc::manager::{DocManager, DocOpenHandle},
util::{md5, parse_from_bytes},
ws::{entities::Socket, WsClientData, WsUser},
use crate::{
services::{
doc::manager::{DocManager, DocOpenHandle},
util::{md5, parse_from_bytes},
},
web_socket::{entities::Socket, WsClientData, WsUser},
};
use actix_rt::task::spawn_blocking;
use actix_web::web::Data;

View File

@ -6,4 +6,3 @@ pub mod user;
pub(crate) mod util;
pub mod view;
pub mod workspace;
pub mod ws;

View File

@ -1,4 +1,4 @@
use crate::service::{
use crate::services::{
trash::{create_trash, delete_all_trash, delete_trash, read_trash},
user::LoggedUser,
util::parse_from_payload,

View File

@ -1,6 +1,6 @@
use crate::{
entities::workspace::{TrashTable, TRASH_TABLE},
service::{
services::{
app::app::{delete_app, read_app_table},
user::LoggedUser,
view::{delete_view, read_view_table},

View File

@ -1,6 +1,6 @@
use crate::{
entities::{token::Token, user::UserTable},
service::user::{hash_password, verify_password, LoggedUser},
services::user::{hash_password, verify_password, LoggedUser},
sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder},
};
use anyhow::Context;
@ -16,7 +16,7 @@ use flowy_user_infra::{
use sqlx::{PgPool, Postgres};
use super::AUTHORIZED_USERS;
use crate::service::user::user_default::create_default_workspace;
use crate::services::user::user_default::create_default_workspace;
pub async fn sign_in(pool: &PgPool, params: SignInParams) -> Result<SignInResponse, ServerError> {
let email = UserEmail::parse(params.email).map_err(|e| ServerError::params_invalid().context(e))?;
@ -68,7 +68,6 @@ pub async fn register_user(pool: &PgPool, params: SignUpParams) -> Result<FlowyR
let logged_user = LoggedUser::new(&response_data.user_id);
AUTHORIZED_USERS.store_auth(logged_user, true);
let _ = create_default_workspace(&mut transaction, response_data.get_user_id()).await?;
transaction
.commit()

View File

@ -11,7 +11,7 @@ use flowy_user_infra::protobuf::{SignInParams, SignUpParams, UpdateUserParams};
use crate::{
entities::token::Token,
service::{
services::{
user::{get_user_profile, register_user, set_user_profile, sign_in, sign_out, LoggedUser},
util::parse_from_payload,
},

View File

@ -1,18 +1,19 @@
use crate::{
service::{
services::{
app::sql_builder::NewAppSqlBuilder as AppBuilder,
workspace::sql_builder::NewWorkspaceBuilder as WorkspaceBuilder,
},
sqlx_ext::{map_sqlx_error, DBTransaction},
};
use crate::service::view::{create_view_with_args, sql_builder::NewViewSqlBuilder};
use crate::services::view::{create_view_with_args, sql_builder::NewViewSqlBuilder};
use backend_service::errors::ServerError;
use chrono::Utc;
use flowy_document_infra::user_default::doc_initial_string;
use flowy_workspace_infra::protobuf::Workspace;
use std::convert::TryInto;
#[allow(dead_code)]
pub async fn create_default_workspace(
transaction: &mut DBTransaction<'_>,
user_id: &str,

View File

@ -1,4 +1,4 @@
use crate::service::{
use crate::services::{
doc::manager::DocBiz,
user::LoggedUser,
util::parse_from_payload,

View File

@ -1,6 +1,6 @@
use crate::{
entities::workspace::{ViewTable, VIEW_TABLE},
service::{
services::{
doc::{create_doc, delete_doc},
trash::read_trash_ids,
user::LoggedUser,

View File

@ -1,4 +1,4 @@
use crate::service::{
use crate::services::{
user::LoggedUser,
util::parse_from_payload,
workspace::{

View File

@ -1,7 +1,7 @@
use super::sql_builder::NewWorkspaceBuilder;
use crate::{
entities::workspace::{AppTable, WorkspaceTable, WORKSPACE_TABLE},
service::{app::app::read_app, user::LoggedUser, workspace::sql_builder::*},
services::{app::app::read_app, user::LoggedUser, workspace::sql_builder::*},
sqlx_ext::*,
};
use anyhow::Context;

View File

@ -1,5 +1,4 @@
use crate::service::ws::WsClientData;
use crate::web_socket::WsClientData;
use lib_ws::WsModule;
use std::{collections::HashMap, sync::Arc};

View File

@ -1,4 +1,4 @@
use crate::service::ws::WsMessageAdaptor;
use crate::web_socket::WsMessageAdaptor;
use actix::{Message, Recipient};
use backend_service::errors::ServerError;
use serde::{Deserialize, Serialize};

View File

@ -1,6 +1,6 @@
use crate::service::{
user::LoggedUser,
ws::{WsBizHandlers, WsClient, WsServer, WsUser},
use crate::{
services::user::LoggedUser,
web_socket::{WsBizHandlers, WsClient, WsServer, WsUser},
};
use actix::Addr;
use actix_web::{

View File

@ -1,13 +1,11 @@
use crate::{
config::{HEARTBEAT_INTERVAL, PING_TIMEOUT},
service::{
user::LoggedUser,
ws::{
entities::{Connect, Disconnect, Socket},
WsBizHandlers,
WsMessageAdaptor,
WsServer,
},
services::user::LoggedUser,
web_socket::{
entities::{Connect, Disconnect, Socket},
WsBizHandlers,
WsMessageAdaptor,
WsServer,
},
};
use actix::*;

View File

@ -1,4 +1,4 @@
use crate::service::ws::{
use crate::web_socket::{
entities::{Connect, Disconnect, Session, SessionId},
WsMessageAdaptor,
};

View File

@ -0,0 +1,3 @@
mod workspace_task;
pub use workspace_task::*;

View File

@ -0,0 +1,60 @@
use crate::{
core::CoreContext,
errors::WorkspaceError,
notify::{send_dart_notification, WorkspaceNotification},
services::workspace::sql::{WorkspaceTable, WorkspaceTableSql},
};
use flowy_workspace_infra::entities::workspace::WorkspaceIdentifier;
use lib_dispatch::prelude::Unit;
use std::sync::Arc;
#[tracing::instrument(level = "debug", skip(core), err)]
pub fn read_workspaces_on_server(
core: Unit<Arc<CoreContext>>,
user_id: String,
params: WorkspaceIdentifier,
) -> Result<(), WorkspaceError> {
let (token, server) = (core.user.token()?, core.server.clone());
let app_ctrl = core.app_controller.clone();
let view_ctrl = core.view_controller.clone();
let conn = core.database.db_connection()?;
tokio::spawn(async move {
// Opti: handle the error and retry?
let workspaces = server.read_workspace(&token, params).await?;
let _ = (&*conn).immediate_transaction::<_, WorkspaceError, _>(|| {
tracing::debug!("Save {} workspace", workspaces.len());
for workspace in &workspaces.items {
let m_workspace = workspace.clone();
let apps = m_workspace.apps.clone().into_inner();
let workspace_table = WorkspaceTable::new(m_workspace, &user_id);
let _ = WorkspaceTableSql::create_workspace(workspace_table, &*conn)?;
tracing::debug!("Save {} apps", apps.len());
for app in apps {
let views = app.belongings.clone().into_inner();
match app_ctrl.save_app(app, &*conn) {
Ok(_) => {},
Err(e) => log::error!("create app failed: {:?}", e),
}
tracing::debug!("Save {} views", views.len());
for view in views {
match view_ctrl.save_view(view, &*conn) {
Ok(_) => {},
Err(e) => log::error!("create view failed: {:?}", e),
}
}
}
}
Ok(())
})?;
send_dart_notification(&token, WorkspaceNotification::WorkspaceListUpdated)
.payload(workspaces)
.send();
Result::<(), WorkspaceError>::Ok(())
});
Ok(())
}

View File

@ -1,7 +1,7 @@
use crate::{
entities::workspace::RepeatedWorkspace,
errors::{WorkspaceError, WorkspaceResult},
module::WorkspaceUser,
module::{WorkspaceDatabase, WorkspaceUser},
notify::{send_dart_notification, WorkspaceNotification},
services::{server::Server, AppController, TrashController, ViewController, WorkspaceController},
};
@ -12,24 +12,25 @@ use lazy_static::lazy_static;
use lib_infra::entities::network_state::NetworkType;
use parking_lot::RwLock;
use std::{collections::HashMap, sync::Arc};
lazy_static! {
static ref INIT_WORKSPACE: RwLock<HashMap<String, bool>> = RwLock::new(HashMap::new());
}
pub struct FlowyCore {
pub struct CoreContext {
pub user: Arc<dyn WorkspaceUser>,
server: Server,
pub(crate) server: Server,
pub(crate) database: Arc<dyn WorkspaceDatabase>,
pub workspace_controller: Arc<WorkspaceController>,
pub(crate) app_controller: Arc<AppController>,
pub(crate) view_controller: Arc<ViewController>,
pub(crate) trash_controller: Arc<TrashController>,
}
impl FlowyCore {
impl CoreContext {
pub(crate) fn new(
user: Arc<dyn WorkspaceUser>,
server: Server,
database: Arc<dyn WorkspaceDatabase>,
workspace_controller: Arc<WorkspaceController>,
app_controller: Arc<AppController>,
view_controller: Arc<ViewController>,
@ -42,6 +43,7 @@ impl FlowyCore {
Self {
user,
server,
database,
workspace_controller,
app_controller,
view_controller,
@ -49,24 +51,6 @@ impl FlowyCore {
}
}
async fn init(&self, token: &str) -> Result<(), WorkspaceError> {
if let Some(is_init) = INIT_WORKSPACE.read().get(token) {
if *is_init {
return Ok(());
}
}
log::debug!("Start initializing flowy core");
INIT_WORKSPACE.write().insert(token.to_owned(), true);
let _ = self.server.init();
let _ = self.workspace_controller.init()?;
let _ = self.app_controller.init()?;
let _ = self.view_controller.init()?;
let _ = self.trash_controller.init()?;
log::debug!("Finish initializing core");
Ok(())
}
pub fn network_state_changed(&self, new_type: NetworkType) {
match new_type {
NetworkType::UnknownNetworkType => {},
@ -77,8 +61,6 @@ impl FlowyCore {
}
pub async fn user_did_sign_in(&self, token: &str) -> WorkspaceResult<()> {
// TODO: (nathan) do something here
log::debug!("workspace initialize after sign in");
let _ = self.init(token).await?;
Ok(())
@ -131,4 +113,21 @@ impl FlowyCore {
let _ = self.init(&token).await?;
Ok(())
}
async fn init(&self, token: &str) -> Result<(), WorkspaceError> {
if let Some(is_init) = INIT_WORKSPACE.read().get(token) {
if *is_init {
return Ok(());
}
}
log::debug!("Start initializing flowy core");
INIT_WORKSPACE.write().insert(token.to_owned(), true);
let _ = self.workspace_controller.init()?;
let _ = self.app_controller.init()?;
let _ = self.view_controller.init()?;
let _ = self.trash_controller.init()?;
log::debug!("Finish initializing core");
Ok(())
}
}

View File

@ -0,0 +1,56 @@
use crate::{
core::{aggregate_tasks::read_workspaces_on_server, CoreContext},
errors::WorkspaceError,
services::{get_current_workspace, read_local_workspace_apps},
};
use flowy_workspace_infra::entities::{
view::View,
workspace::{CurrentWorkspaceSetting, QueryWorkspaceRequest, RepeatedWorkspace, WorkspaceIdentifier},
};
use lib_dispatch::prelude::{data_result, Data, DataResult, Unit};
use std::{convert::TryInto, sync::Arc};
#[tracing::instrument(skip(data, core), err)]
pub(crate) async fn read_workspaces_handler(
data: Data<QueryWorkspaceRequest>,
core: Unit<Arc<CoreContext>>,
) -> DataResult<RepeatedWorkspace, WorkspaceError> {
let params: WorkspaceIdentifier = data.into_inner().try_into()?;
let user_id = core.user.user_id()?;
let conn = &*core.database.db_connection()?;
let workspace_controller = core.workspace_controller.clone();
let trash_controller = core.trash_controller.clone();
let workspaces = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
let mut workspaces = workspace_controller.read_local_workspaces(params.workspace_id.clone(), &user_id, conn)?;
for workspace in workspaces.iter_mut() {
let apps = read_local_workspace_apps(&workspace.id, trash_controller.clone(), conn)?.into_inner();
workspace.apps.items = apps;
}
Ok(workspaces)
})?;
let _ = read_workspaces_on_server(core, user_id, params);
data_result(workspaces)
}
#[tracing::instrument(skip(core), err)]
pub async fn read_cur_workspace_handler(
core: Unit<Arc<CoreContext>>,
) -> DataResult<CurrentWorkspaceSetting, WorkspaceError> {
let workspace_id = get_current_workspace()?;
let user_id = core.user.user_id()?;
let params = WorkspaceIdentifier {
workspace_id: Some(workspace_id.clone()),
};
let conn = &*core.database.db_connection()?;
let workspace = core
.workspace_controller
.read_local_workspace(workspace_id, &user_id, conn)?;
let latest_view: Option<View> = core.view_controller.latest_visit_view().unwrap_or(None);
let setting = CurrentWorkspaceSetting { workspace, latest_view };
let _ = read_workspaces_on_server(core, user_id, params);
data_result(setting)
}

View File

@ -1,4 +1,5 @@
mod flowy_core;
mod task;
mod aggregate_tasks;
mod core_context;
pub use flowy_core::*;
pub mod event_handler;
pub use core_context::*;

View File

@ -1,69 +0,0 @@
use crate::{errors::WorkspaceError, services::WorkspaceController};
use flowy_workspace_infra::entities::workspace::{QueryWorkspaceRequest, RepeatedWorkspace, WorkspaceIdentifier};
use lib_dispatch::prelude::{data_result, Data, DataResult, Unit};
use std::{convert::TryInto, sync::Arc};
#[tracing::instrument(skip(data, controller), err)]
pub(crate) async fn read_workspaces_handler(
data: Data<QueryWorkspaceRequest>,
controller: Unit<Arc<WorkspaceController>>,
) -> DataResult<RepeatedWorkspace, WorkspaceError> {
let params: WorkspaceIdentifier = data.into_inner().try_into()?;
let user_id = controller.user.user_id()?;
let workspaces = controller.read_local_workspaces(
params.workspace_id.clone(),
&user_id,
&*controller.database.db_connection()?,
)?;
let _ = controller.read_workspaces_on_server(user_id, params);
data_result(workspaces)
}
// #[tracing::instrument(level = "debug", skip(self), err)]
// fn read_workspaces_on_server(&self, user_id: String, params:
// WorkspaceIdentifier) -> Result<(), WorkspaceError> { let (token, server)
// = self.token_with_server()?; let workspace_sql =
// self.workspace_sql.clone(); let app_ctrl = self.app_controller.clone();
// let view_ctrl = self.view_controller.clone();
// let conn = self.database.db_connection()?;
// tokio::spawn(async move {
// // Opti: handle the error and retry?
// let workspaces = server.read_workspace(&token, params).await?;
// let _ = (&*conn).immediate_transaction::<_, WorkspaceError, _>(|| {
// tracing::debug!("Save {} workspace", workspaces.len());
// for workspace in &workspaces.items {
// let m_workspace = workspace.clone();
// let apps = m_workspace.apps.clone().into_inner();
// let workspace_table = WorkspaceTable::new(m_workspace,
// &user_id);
//
// let _ = workspace_sql.create_workspace(workspace_table,
// &*conn)?; tracing::debug!("Save {} apps", apps.len());
// for app in apps {
// let views = app.belongings.clone().into_inner();
// match app_ctrl.save_app(app, &*conn) {
// Ok(_) => {},
// Err(e) => log::error!("create app failed: {:?}", e),
// }
//
// tracing::debug!("Save {} views", views.len());
// for view in views {
// match view_ctrl.save_view(view, &*conn) {
// Ok(_) => {},
// Err(e) => log::error!("create view failed: {:?}",
// e), }
// }
// }
// }
// Ok(())
// })?;
//
// send_dart_notification(&token,
// WorkspaceNotification::WorkspaceListUpdated) .payload(workspaces)
// .send();
// Result::<(), WorkspaceError>::Ok(())
// });
//
// Ok(())
// }

View File

@ -7,7 +7,7 @@ use lib_dispatch::prelude::*;
use lib_sqlite::ConnectionPool;
use crate::{
core::FlowyCore,
core::{event_handler::*, CoreContext},
errors::WorkspaceError,
event::WorkspaceEvent,
services::{
@ -45,7 +45,7 @@ pub fn init_core(
database: Arc<dyn WorkspaceDatabase>,
flowy_document: Arc<FlowyDocument>,
server_config: &ClientServerConfiguration,
) -> Arc<FlowyCore> {
) -> Arc<CoreContext> {
let server = construct_workspace_server(server_config);
let trash_controller = Arc::new(TrashController::new(database.clone(), server.clone(), user.clone()));
@ -68,15 +68,14 @@ pub fn init_core(
let workspace_controller = Arc::new(WorkspaceController::new(
user.clone(),
database.clone(),
app_controller.clone(),
view_controller.clone(),
trash_controller.clone(),
server.clone(),
));
Arc::new(FlowyCore::new(
Arc::new(CoreContext::new(
user,
server,
database,
workspace_controller,
app_controller,
view_controller,
@ -84,13 +83,14 @@ pub fn init_core(
))
}
pub fn create(core: Arc<FlowyCore>) -> Module {
pub fn create(core: Arc<CoreContext>) -> Module {
let mut module = Module::new()
.name("Flowy-Workspace")
.data(core.workspace_controller.clone())
.data(core.app_controller.clone())
.data(core.view_controller.clone())
.data(core.trash_controller.clone());
.data(core.trash_controller.clone())
.data(core.clone());
module = module
.event(WorkspaceEvent::CreateWorkspace, create_workspace_handler)

View File

@ -241,11 +241,11 @@ fn notify_apps_changed(
pub fn read_local_workspace_apps(
workspace_id: &str,
trash_can: Arc<TrashController>,
trash_controller: Arc<TrashController>,
conn: &SqliteConnection,
) -> Result<RepeatedApp, WorkspaceError> {
let mut app_tables = AppTableSql::read_workspace_apps(workspace_id, false, conn)?;
let trash_ids = trash_can.trash_ids(conn)?;
let trash_ids = trash_controller.trash_ids(conn)?;
app_tables.retain(|app_table| !trash_ids.contains(&app_table.id));
let apps = app_tables.into_iter().map(|table| table.into()).collect::<Vec<App>>();

View File

@ -66,7 +66,7 @@ pub(crate) fn construct_workspace_server(
config: &ClientServerConfiguration,
) -> Arc<dyn WorkspaceServerAPI + Send + Sync> {
if cfg!(feature = "http_server") {
Arc::new(WorkspaceServer::new(config.clone()))
Arc::new(WorkspaceHttpServer::new(config.clone()))
} else {
Arc::new(WorkspaceServerMock {})
}

View File

@ -13,15 +13,15 @@ use backend_service::{configuration::ClientServerConfiguration, middleware::*, w
use flowy_workspace_infra::errors::ErrorCode;
use lib_infra::future::ResultFuture;
pub struct WorkspaceServer {
pub struct WorkspaceHttpServer {
config: ClientServerConfiguration,
}
impl WorkspaceServer {
pub fn new(config: ClientServerConfiguration) -> WorkspaceServer { Self { config } }
impl WorkspaceHttpServer {
pub fn new(config: ClientServerConfiguration) -> WorkspaceHttpServer { Self { config } }
}
impl WorkspaceServerAPI for WorkspaceServer {
impl WorkspaceServerAPI for WorkspaceHttpServer {
fn init(&self) {
let mut rx = BACKEND_API_MIDDLEWARE.invalid_token_subscribe();
tokio::spawn(async move {

View File

@ -6,22 +6,17 @@ use crate::{
read_local_workspace_apps,
server::Server,
workspace::sql::{WorkspaceTable, WorkspaceTableChangeset, WorkspaceTableSql},
AppController,
TrashController,
ViewController,
},
};
use flowy_database::SqliteConnection;
use flowy_workspace_infra::entities::{app::RepeatedApp, view::View, workspace::*};
use flowy_workspace_infra::entities::{app::RepeatedApp, workspace::*};
use lib_infra::kv::KV;
use std::sync::Arc;
pub struct WorkspaceController {
pub user: Arc<dyn WorkspaceUser>,
pub(crate) workspace_sql: Arc<WorkspaceTableSql>,
pub(crate) view_controller: Arc<ViewController>,
pub(crate) database: Arc<dyn WorkspaceDatabase>,
pub(crate) app_controller: Arc<AppController>,
pub(crate) trash_controller: Arc<TrashController>,
server: Server,
}
@ -30,18 +25,12 @@ impl WorkspaceController {
pub(crate) fn new(
user: Arc<dyn WorkspaceUser>,
database: Arc<dyn WorkspaceDatabase>,
app_controller: Arc<AppController>,
view_controller: Arc<ViewController>,
trash_can: Arc<TrashController>,
server: Server,
) -> Self {
let workspace_sql = Arc::new(WorkspaceTableSql {});
Self {
user,
workspace_sql,
view_controller,
database,
app_controller,
trash_controller: trash_can,
server,
}
@ -74,7 +63,7 @@ impl WorkspaceController {
// other journaling modes, EXCLUSIVE prevents other database connections from
// reading the database while the transaction is underway.
conn.immediate_transaction::<_, WorkspaceError, _>(|| {
self.workspace_sql.create_workspace(workspace_table, conn)?;
WorkspaceTableSql::create_workspace(workspace_table, conn)?;
let repeated_workspace = self.read_local_workspaces(None, &user_id, conn)?;
send_dart_notification(&token, WorkspaceNotification::UserCreateWorkspace)
.payload(repeated_workspace)
@ -94,7 +83,7 @@ impl WorkspaceController {
let workspace_id = changeset.id.clone();
let conn = &*self.database.db_connection()?;
conn.immediate_transaction::<_, WorkspaceError, _>(|| {
let _ = self.workspace_sql.update_workspace(changeset, conn)?;
let _ = WorkspaceTableSql::update_workspace(changeset, conn)?;
let user_id = self.user.user_id()?;
let workspace = self.read_local_workspace(workspace_id.clone(), &user_id, conn)?;
send_dart_notification(&workspace_id, WorkspaceNotification::WorkspaceUpdated)
@ -115,7 +104,7 @@ impl WorkspaceController {
let token = self.user.token()?;
let conn = &*self.database.db_connection()?;
conn.immediate_transaction::<_, WorkspaceError, _>(|| {
let _ = self.workspace_sql.delete_workspace(workspace_id, conn)?;
let _ = WorkspaceTableSql::delete_workspace(workspace_id, conn)?;
let repeated_workspace = self.read_local_workspaces(None, &user_id, conn)?;
send_dart_notification(&token, WorkspaceNotification::UserDeleteWorkspace)
.payload(repeated_workspace)
@ -140,31 +129,6 @@ impl WorkspaceController {
}
}
pub(crate) async fn read_workspaces(
&self,
params: WorkspaceIdentifier,
) -> Result<RepeatedWorkspace, WorkspaceError> {
let user_id = self.user.user_id()?;
let workspaces =
self.read_local_workspaces(params.workspace_id.clone(), &user_id, &*self.database.db_connection()?)?;
let _ = self.read_workspaces_on_server(user_id, params);
Ok(workspaces)
}
pub(crate) async fn read_current_workspace(&self) -> Result<CurrentWorkspaceSetting, WorkspaceError> {
let workspace_id = get_current_workspace()?;
let user_id = self.user.user_id()?;
let params = WorkspaceIdentifier {
workspace_id: Some(workspace_id.clone()),
};
let workspace = self.read_local_workspace(workspace_id, &user_id, &*self.database.db_connection()?)?;
let latest_view: Option<View> = self.view_controller.latest_visit_view().unwrap_or(None);
let setting = CurrentWorkspaceSetting { workspace, latest_view };
let _ = self.read_workspaces_on_server(user_id, params)?;
Ok(setting)
}
pub(crate) async fn read_current_workspace_apps(&self) -> Result<RepeatedApp, WorkspaceError> {
let workspace_id = get_current_workspace()?;
let conn = self.database.db_connection()?;
@ -181,19 +145,17 @@ impl WorkspaceController {
conn: &SqliteConnection,
) -> Result<RepeatedWorkspace, WorkspaceError> {
let workspace_id = workspace_id.to_owned();
let workspace_tables = self.workspace_sql.read_workspaces(workspace_id, user_id, conn)?;
let workspace_tables = WorkspaceTableSql::read_workspaces(workspace_id, user_id, conn)?;
let mut workspaces = vec![];
for table in workspace_tables {
let apps = self.read_local_apps(&table.id, conn)?.into_inner();
let mut workspace: Workspace = table.into();
workspace.apps.items = apps;
let workspace: Workspace = table.into();
workspaces.push(workspace);
}
Ok(RepeatedWorkspace { items: workspaces })
}
fn read_local_workspace(
pub(crate) fn read_local_workspace(
&self,
workspace_id: String,
user_id: &str,
@ -218,12 +180,6 @@ impl WorkspaceController {
}
impl WorkspaceController {
fn token_with_server(&self) -> Result<(String, Server), WorkspaceError> {
let token = self.user.token()?;
let server = self.server.clone();
Ok((token, server))
}
#[tracing::instrument(level = "debug", skip(self), err)]
async fn create_workspace_on_server(&self, params: CreateWorkspaceParams) -> Result<Workspace, WorkspaceError> {
let token = self.user.token()?;
@ -233,7 +189,7 @@ impl WorkspaceController {
#[tracing::instrument(level = "debug", skip(self), err)]
fn update_workspace_on_server(&self, params: UpdateWorkspaceParams) -> Result<(), WorkspaceError> {
let (token, server) = self.token_with_server()?;
let (token, server) = (self.user.token()?, self.server.clone());
tokio::spawn(async move {
match server.update_workspace(&token, params).await {
Ok(_) => {},
@ -251,7 +207,7 @@ impl WorkspaceController {
let params = WorkspaceIdentifier {
workspace_id: Some(workspace_id.to_string()),
};
let (token, server) = self.token_with_server()?;
let (token, server) = (self.user.token()?, self.server.clone());
tokio::spawn(async move {
match server.delete_workspace(&token, params).await {
Ok(_) => {},
@ -263,64 +219,13 @@ impl WorkspaceController {
});
Ok(())
}
#[tracing::instrument(level = "debug", skip(self), err)]
pub(crate) fn read_workspaces_on_server(
&self,
user_id: String,
params: WorkspaceIdentifier,
) -> Result<(), WorkspaceError> {
let (token, server) = self.token_with_server()?;
let workspace_sql = self.workspace_sql.clone();
let app_ctrl = self.app_controller.clone();
let view_ctrl = self.view_controller.clone();
let conn = self.database.db_connection()?;
tokio::spawn(async move {
// Opti: handle the error and retry?
let workspaces = server.read_workspace(&token, params).await?;
let _ = (&*conn).immediate_transaction::<_, WorkspaceError, _>(|| {
tracing::debug!("Save {} workspace", workspaces.len());
for workspace in &workspaces.items {
let m_workspace = workspace.clone();
let apps = m_workspace.apps.clone().into_inner();
let workspace_table = WorkspaceTable::new(m_workspace, &user_id);
let _ = workspace_sql.create_workspace(workspace_table, &*conn)?;
tracing::debug!("Save {} apps", apps.len());
for app in apps {
let views = app.belongings.clone().into_inner();
match app_ctrl.save_app(app, &*conn) {
Ok(_) => {},
Err(e) => log::error!("create app failed: {:?}", e),
}
tracing::debug!("Save {} views", views.len());
for view in views {
match view_ctrl.save_view(view, &*conn) {
Ok(_) => {},
Err(e) => log::error!("create view failed: {:?}", e),
}
}
}
}
Ok(())
})?;
send_dart_notification(&token, WorkspaceNotification::WorkspaceListUpdated)
.payload(workspaces)
.send();
Result::<(), WorkspaceError>::Ok(())
});
Ok(())
}
}
const CURRENT_WORKSPACE_ID: &str = "current_workspace_id";
fn set_current_workspace(workspace_id: &str) { KV::set_str(CURRENT_WORKSPACE_ID, workspace_id.to_owned()); }
fn get_current_workspace() -> Result<String, WorkspaceError> {
pub fn get_current_workspace() -> Result<String, WorkspaceError> {
match KV::get_str(CURRENT_WORKSPACE_ID) {
None => Err(WorkspaceError::record_not_found()
.context("Current workspace not found or should call open workspace first")),

View File

@ -15,14 +15,6 @@ pub(crate) async fn create_workspace_handler(
data_result(detail)
}
#[tracing::instrument(skip(controller), err)]
pub(crate) async fn read_cur_workspace_handler(
controller: Unit<Arc<WorkspaceController>>,
) -> DataResult<CurrentWorkspaceSetting, WorkspaceError> {
let workspace = controller.read_current_workspace().await?;
data_result(workspace)
}
#[tracing::instrument(skip(controller), err)]
pub(crate) async fn read_workspace_apps_handler(
controller: Unit<Arc<WorkspaceController>>,
@ -31,16 +23,6 @@ pub(crate) async fn read_workspace_apps_handler(
data_result(repeated_app)
}
#[tracing::instrument(skip(data, controller), err)]
pub(crate) async fn read_workspaces_handler(
data: Data<QueryWorkspaceRequest>,
controller: Unit<Arc<WorkspaceController>>,
) -> DataResult<RepeatedWorkspace, WorkspaceError> {
let params: WorkspaceIdentifier = data.into_inner().try_into()?;
let workspaces = controller.read_workspaces(params).await?;
data_result(workspaces)
}
#[tracing::instrument(skip(data, controller), err)]
pub(crate) async fn open_workspace_handler(
data: Data<QueryWorkspaceRequest>,

View File

@ -13,11 +13,7 @@ use flowy_database::{
pub(crate) struct WorkspaceTableSql {}
impl WorkspaceTableSql {
pub(crate) fn create_workspace(
&self,
table: WorkspaceTable,
conn: &SqliteConnection,
) -> Result<(), WorkspaceError> {
pub(crate) fn create_workspace(table: WorkspaceTable, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
match diesel_record_count!(workspace_table, &table.id, conn) {
0 => diesel_insert_table!(workspace_table, &table, conn),
_ => {
@ -29,7 +25,6 @@ impl WorkspaceTableSql {
}
pub(crate) fn read_workspaces(
&self,
workspace_id: Option<String>,
user_id: &str,
conn: &SqliteConnection,
@ -50,7 +45,6 @@ impl WorkspaceTableSql {
#[allow(dead_code)]
pub(crate) fn update_workspace(
&self,
changeset: WorkspaceTableChangeset,
conn: &SqliteConnection,
) -> Result<(), WorkspaceError> {
@ -59,7 +53,7 @@ impl WorkspaceTableSql {
}
#[allow(dead_code)]
pub(crate) fn delete_workspace(&self, workspace_id: &str, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
pub(crate) fn delete_workspace(workspace_id: &str, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
diesel_delete_table!(workspace_table, workspace_id, conn);
Ok(())
}

View File

@ -3,7 +3,7 @@ mod deps_resolve;
pub mod module;
use crate::deps_resolve::WorkspaceDepsResolver;
use backend_service::configuration::ClientServerConfiguration;
use flowy_core::{errors::WorkspaceError, module::init_core, prelude::FlowyCore};
use flowy_core::{errors::WorkspaceError, module::init_core, prelude::CoreContext};
use flowy_document::module::FlowyDocument;
use flowy_user::{
prelude::UserStatus,
@ -66,7 +66,7 @@ pub struct FlowySDK {
config: FlowySDKConfig,
pub user_session: Arc<UserSession>,
pub flowy_document: Arc<FlowyDocument>,
pub core: Arc<FlowyCore>,
pub core: Arc<CoreContext>,
pub dispatcher: Arc<EventDispatcher>,
}
@ -99,7 +99,7 @@ impl FlowySDK {
pub fn dispatcher(&self) -> Arc<EventDispatcher> { self.dispatcher.clone() }
}
fn _init(dispatch: &EventDispatcher, user_session: Arc<UserSession>, core: Arc<FlowyCore>) {
fn _init(dispatch: &EventDispatcher, user_session: Arc<UserSession>, core: Arc<CoreContext>) {
let user_status_subscribe = user_session.notifier.user_status_subscribe();
let network_status_subscribe = user_session.notifier.network_type_subscribe();
let cloned_core = core.clone();
@ -113,7 +113,7 @@ fn _init(dispatch: &EventDispatcher, user_session: Arc<UserSession>, core: Arc<F
});
}
async fn _listen_user_status(mut subscribe: broadcast::Receiver<UserStatus>, core: Arc<FlowyCore>) {
async fn _listen_user_status(mut subscribe: broadcast::Receiver<UserStatus>, core: Arc<CoreContext>) {
while let Ok(status) = subscribe.recv().await {
let result = || async {
match status {
@ -141,7 +141,7 @@ async fn _listen_user_status(mut subscribe: broadcast::Receiver<UserStatus>, cor
}
}
async fn _listen_network_status(mut subscribe: broadcast::Receiver<NetworkType>, core: Arc<FlowyCore>) {
async fn _listen_network_status(mut subscribe: broadcast::Receiver<NetworkType>, core: Arc<CoreContext>) {
while let Ok(new_type) = subscribe.recv().await {
core.network_state_changed(new_type);
}
@ -168,7 +168,7 @@ fn mk_core(
user_session: Arc<UserSession>,
flowy_document: Arc<FlowyDocument>,
server_config: &ClientServerConfiguration,
) -> Arc<FlowyCore> {
) -> Arc<CoreContext> {
let workspace_deps = WorkspaceDepsResolver::new(user_session);
let (user, database) = workspace_deps.split_into();
init_core(user, database, flowy_document, server_config)

View File

@ -1,19 +1,19 @@
use crate::deps_resolve::DocumentDepsResolver;
use backend_service::configuration::ClientServerConfiguration;
use flowy_core::prelude::FlowyCore;
use flowy_core::prelude::CoreContext;
use flowy_document::module::FlowyDocument;
use flowy_user::services::user::UserSession;
use lib_dispatch::prelude::Module;
use std::sync::Arc;
pub fn mk_modules(core: Arc<FlowyCore>, user_session: Arc<UserSession>) -> Vec<Module> {
pub fn mk_modules(core: Arc<CoreContext>, user_session: Arc<UserSession>) -> Vec<Module> {
let user_module = mk_user_module(user_session);
let workspace_module = mk_core_module(core);
vec![user_module, workspace_module]
}
fn mk_user_module(user_session: Arc<UserSession>) -> Module { flowy_user::module::create(user_session) }
fn mk_core_module(core: Arc<FlowyCore>) -> Module { flowy_core::module::create(core) }
fn mk_core_module(core: Arc<CoreContext>) -> Module { flowy_core::module::create(core) }
pub fn mk_document_module(
user_session: Arc<UserSession>,

View File

@ -23,7 +23,7 @@ pub trait UserServerAPI {
pub(crate) fn construct_user_server(config: &ClientServerConfiguration) -> Arc<dyn UserServerAPI + Send + Sync> {
if cfg!(feature = "http_server") {
Arc::new(UserServer::new(config.clone()))
Arc::new(UserHttpServer::new(config.clone()))
} else {
Arc::new(UserServerMock {})
}

View File

@ -6,14 +6,14 @@ use crate::{
use backend_service::{configuration::*, user_request::*};
use lib_infra::future::ResultFuture;
pub struct UserServer {
pub struct UserHttpServer {
config: ClientServerConfiguration,
}
impl UserServer {
impl UserHttpServer {
pub fn new(config: ClientServerConfiguration) -> Self { Self { config } }
}
impl UserServerAPI for UserServer {
impl UserServerAPI for UserHttpServer {
fn sign_up(&self, params: SignUpParams) -> ResultFuture<SignUpResponse, UserError> {
let url = self.config.sign_up_url();
ResultFuture::new(async move {