fix wanrings

This commit is contained in:
appflowy
2021-12-04 23:54:14 +08:00
parent 31efacb274
commit ce8805aff8
35 changed files with 179 additions and 104 deletions

View File

@ -1,5 +1,7 @@
#![allow(clippy::all)]
#![cfg_attr(rustfmt, rustfmt::skip)]
use actix_web::web::Data; use actix_web::web::Data;
use backend::service::doc::{crud::update_doc, doc::DocManager}; use backend::service::doc::{crud::update_doc, manager::DocManager};
use backend_service::config::ServerConfig; use backend_service::config::ServerConfig;
use flowy_document::services::doc::ClientDocEditor as ClientEditDocContext; use flowy_document::services::doc::ClientDocEditor as ClientEditDocContext;
use flowy_test::{workspace::ViewTest, FlowyTest}; use flowy_test::{workspace::ViewTest, FlowyTest};

View File

@ -1,3 +1,4 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit // Auto-generated, do not edit
mod model; mod model;
pub use model::*; pub use model::*;

View File

@ -1,3 +1,4 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit // Auto-generated, do not edit
mod ffi_response; mod ffi_response;

View File

@ -1,3 +1,4 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit // Auto-generated, do not edit
mod model; mod model;
pub use model::*; pub use model::*;

View File

@ -1,3 +1,4 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit // Auto-generated, do not edit
mod subject; mod subject;

View File

@ -1,3 +1,4 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit // Auto-generated, do not edit
mod model; mod model;
pub use model::*; pub use model::*;

View File

@ -1,3 +1,4 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit // Auto-generated, do not edit
mod observable; mod observable;

View File

@ -4,7 +4,11 @@ pub mod module;
use crate::deps_resolve::WorkspaceDepsResolver; use crate::deps_resolve::WorkspaceDepsResolver;
use backend_service::config::ServerConfig; use backend_service::config::ServerConfig;
use flowy_document::module::FlowyDocument; use flowy_document::module::FlowyDocument;
use flowy_user::services::user::{UserSession, UserSessionBuilder, UserStatus}; use flowy_user::{
notify::{NetworkState, NetworkType},
prelude::UserStatus,
services::user::{UserSession, UserSessionConfig},
};
use flowy_workspace::{errors::WorkspaceError, prelude::WorkspaceController}; use flowy_workspace::{errors::WorkspaceError, prelude::WorkspaceController};
use lib_dispatch::prelude::*; use lib_dispatch::prelude::*;
use module::mk_modules; use module::mk_modules;
@ -73,11 +77,9 @@ impl FlowySDK {
tracing::debug!("🔥 {:?}", config); tracing::debug!("🔥 {:?}", config);
let session_cache_key = format!("{}_session_cache", &config.name); let session_cache_key = format!("{}_session_cache", &config.name);
let user_session = Arc::new(
UserSessionBuilder::new() let user_config = UserSessionConfig::new(&config.root, &config.server_config, &session_cache_key);
.root_dir(&config.root, &config.server_config, &session_cache_key) let user_session = Arc::new(UserSession::new(user_config));
.build(),
);
let flowy_document = mk_document_module(user_session.clone(), &config.server_config); let flowy_document = mk_document_module(user_session.clone(), &config.server_config);
let workspace_ctrl = let workspace_ctrl =
mk_workspace_controller(user_session.clone(), flowy_document.clone(), &config.server_config); mk_workspace_controller(user_session.clone(), flowy_document.clone(), &config.server_config);
@ -98,10 +100,12 @@ impl FlowySDK {
} }
fn _init(dispatch: &EventDispatcher, user_session: Arc<UserSession>, workspace_controller: Arc<WorkspaceController>) { fn _init(dispatch: &EventDispatcher, user_session: Arc<UserSession>, workspace_controller: Arc<WorkspaceController>) {
let subscribe = user_session.status_subscribe(); let user_status_subscribe = user_session.notifier.user_status_subscribe();
let network_status_subscribe = user_session.notifier.network_status_subscribe();
dispatch.spawn(async move { dispatch.spawn(async move {
user_session.init(); user_session.init();
_listen_user_status(subscribe, workspace_controller).await; _listen_user_status(user_status_subscribe, workspace_controller.clone()).await;
_listen_network_status(network_status_subscribe, workspace_controller).await;
}); });
} }
@ -109,31 +113,51 @@ async fn _listen_user_status(
mut subscribe: broadcast::Receiver<UserStatus>, mut subscribe: broadcast::Receiver<UserStatus>,
workspace_controller: Arc<WorkspaceController>, workspace_controller: Arc<WorkspaceController>,
) { ) {
loop { while let Ok(status) = subscribe.recv().await {
if let Ok(status) = subscribe.recv().await { let result = || async {
let result = || async { match status {
match status { UserStatus::Login { token } => {
UserStatus::Login { token } => { let _ = workspace_controller.user_did_sign_in(&token).await?;
let _ = workspace_controller.user_did_sign_in(&token).await?; },
}, UserStatus::Logout { .. } => {
UserStatus::Logout { .. } => { workspace_controller.user_did_logout().await;
workspace_controller.user_did_logout().await; },
}, UserStatus::Expired { .. } => {
UserStatus::Expired { .. } => { workspace_controller.user_session_expired().await;
workspace_controller.user_session_expired().await; },
}, UserStatus::SignUp { profile, ret } => {
UserStatus::SignUp { profile, ret } => { let _ = workspace_controller.user_did_sign_up(&profile.token).await?;
let _ = workspace_controller.user_did_sign_up(&profile.token).await?; let _ = ret.send(());
let _ = ret.send(()); },
},
}
Ok::<(), WorkspaceError>(())
};
match result().await {
Ok(_) => {},
Err(e) => log::error!("{}", e),
} }
Ok::<(), WorkspaceError>(())
};
match result().await {
Ok(_) => {},
Err(e) => log::error!("{}", e),
}
}
}
async fn _listen_network_status(
mut subscribe: broadcast::Receiver<NetworkState>,
_workspace_controller: Arc<WorkspaceController>,
) {
while let Ok(status) = subscribe.recv().await {
let result = || async {
match status.ty {
NetworkType::UnknownNetworkType => {},
NetworkType::Wifi => {},
NetworkType::Cell => {},
NetworkType::Ethernet => {},
}
Ok::<(), WorkspaceError>(())
};
match result().await {
Ok(_) => {},
Err(e) => log::error!("{}", e),
} }
} }
} }

View File

@ -0,0 +1,4 @@
mod status;
pub use flowy_user_infra::entities::*;
pub use status::*;

View File

@ -0,0 +1,19 @@
use crate::entities::UserProfile;
use tokio::sync::mpsc;
#[derive(Clone)]
pub enum UserStatus {
Login {
token: String,
},
Logout {
token: String,
},
Expired {
token: String,
},
SignUp {
profile: UserProfile,
ret: mpsc::Sender<()>,
},
}

View File

@ -1,12 +1,12 @@
mod handlers; mod handlers;
mod sql_tables; mod sql_tables;
pub use flowy_user_infra::entities;
pub mod errors; pub mod errors;
pub mod entities;
pub mod event; pub mod event;
pub mod module; pub mod module;
mod notify; pub mod notify;
pub mod protobuf; pub mod protobuf;
pub mod services; pub mod services;

View File

@ -1,2 +1,2 @@
mod observable; mod observable;
pub(crate) use observable::*; pub use observable::*;

View File

@ -1,6 +1,5 @@
use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
use dart_notify::DartNotifyBuilder; use dart_notify::DartNotifyBuilder;
use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
const OBSERVABLE_CATEGORY: &str = "User"; const OBSERVABLE_CATEGORY: &str = "User";
@ -25,7 +24,7 @@ pub(crate) fn dart_notify(id: &str, ty: UserNotification) -> DartNotifyBuilder {
DartNotifyBuilder::new(id, ty, OBSERVABLE_CATEGORY) DartNotifyBuilder::new(id, ty, OBSERVABLE_CATEGORY)
} }
#[derive(ProtoBuf_Enum, Debug)] #[derive(ProtoBuf_Enum, Debug, Clone)]
pub enum NetworkType { pub enum NetworkType {
UnknownNetworkType = 0, UnknownNetworkType = 0,
Wifi = 1, Wifi = 1,
@ -37,7 +36,7 @@ impl std::default::Default for NetworkType {
fn default() -> Self { NetworkType::UnknownNetworkType } fn default() -> Self { NetworkType::UnknownNetworkType }
} }
#[derive(ProtoBuf, Debug, Default)] #[derive(ProtoBuf, Debug, Default, Clone)]
pub struct NetworkState { pub struct NetworkState {
#[pb(index = 1)] #[pb(index = 1)]
pub ty: NetworkType, pub ty: NetworkType,

View File

@ -1,3 +1,4 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit // Auto-generated, do not edit
mod model; mod model;
pub use model::*; pub use model::*;

View File

@ -1,3 +1,4 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit // Auto-generated, do not edit
mod observable; mod observable;

View File

@ -1,24 +0,0 @@
use crate::services::user::{UserSession, UserSessionConfig};
use backend_service::config::ServerConfig;
pub struct UserSessionBuilder {
config: Option<UserSessionConfig>,
}
impl std::default::Default for UserSessionBuilder {
fn default() -> Self { Self { config: None } }
}
impl UserSessionBuilder {
pub fn new() -> Self { UserSessionBuilder::default() }
pub fn root_dir(mut self, dir: &str, server_config: &ServerConfig, session_cache_key: &str) -> Self {
self.config = Some(UserSessionConfig::new(dir, server_config, session_cache_key));
self
}
pub fn build(mut self) -> UserSession {
let config = self.config.take().unwrap();
UserSession::new(config)
}
}

View File

@ -1,6 +1,5 @@
pub use builder::*;
pub use user_session::*; pub use user_session::*;
mod builder;
pub mod database; pub mod database;
mod notifier;
mod user_session; mod user_session;

View File

@ -0,0 +1,52 @@
use crate::{
entities::{UserProfile, UserStatus},
notify::NetworkState,
};
use tokio::sync::{broadcast, mpsc};
pub struct UserNotifier {
user_status_notifier: broadcast::Sender<UserStatus>,
network_status_notifier: broadcast::Sender<NetworkState>,
}
impl std::default::Default for UserNotifier {
fn default() -> Self {
let (user_status_notifier, _) = broadcast::channel(10);
let (network_status_notifier, _) = broadcast::channel(10);
UserNotifier {
user_status_notifier,
network_status_notifier,
}
}
}
impl UserNotifier {
pub(crate) fn new() -> Self { UserNotifier::default() }
pub(crate) fn notify_login(&self, token: &str) {
let _ = self.user_status_notifier.send(UserStatus::Login {
token: token.to_owned(),
});
}
pub(crate) fn notify_sign_up(&self, ret: mpsc::Sender<()>, user_profile: &UserProfile) {
let _ = self.user_status_notifier.send(UserStatus::SignUp {
profile: user_profile.clone(),
ret,
});
}
pub(crate) fn notify_logout(&self, token: &str) {
let _ = self.user_status_notifier.send(UserStatus::Logout {
token: token.to_owned(),
});
}
pub fn update_network_state(&self, state: NetworkState) { let _ = self.network_status_notifier.send(state); }
pub fn user_status_subscribe(&self) -> broadcast::Receiver<UserStatus> { self.user_status_notifier.subscribe() }
pub fn network_status_subscribe(&self) -> broadcast::Receiver<NetworkState> {
self.network_status_notifier.subscribe()
}
}

View File

@ -7,7 +7,10 @@ use crate::{
use crate::{ use crate::{
notify::*, notify::*,
services::server::{construct_user_server, Server}, services::{
server::{construct_user_server, Server},
user::notifier::UserNotifier,
},
}; };
use backend_service::config::ServerConfig; use backend_service::config::ServerConfig;
use flowy_database::{ use flowy_database::{
@ -23,24 +26,7 @@ use lib_ws::{WsController, WsMessageHandler, WsState};
use parking_lot::RwLock; use parking_lot::RwLock;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{broadcast, mpsc}; use tokio::sync::mpsc;
#[derive(Clone)]
pub enum UserStatus {
Login {
token: String,
},
Logout {
token: String,
},
Expired {
token: String,
},
SignUp {
profile: UserProfile,
ret: mpsc::Sender<()>,
},
}
pub struct UserSessionConfig { pub struct UserSessionConfig {
root_dir: String, root_dir: String,
@ -65,7 +51,7 @@ pub struct UserSession {
server: Server, server: Server,
session: RwLock<Option<Session>>, session: RwLock<Option<Session>>,
pub ws_controller: Arc<WsController>, pub ws_controller: Arc<WsController>,
status_notifier: broadcast::Sender<UserStatus>, pub notifier: UserNotifier,
} }
impl UserSession { impl UserSession {
@ -73,25 +59,23 @@ impl UserSession {
let db = UserDB::new(&config.root_dir); let db = UserDB::new(&config.root_dir);
let server = construct_user_server(&config.server_config); let server = construct_user_server(&config.server_config);
let ws_controller = Arc::new(WsController::new()); let ws_controller = Arc::new(WsController::new());
let (status_notifier, _) = broadcast::channel(10); let notifier = UserNotifier::new();
Self { Self {
database: db, database: db,
config, config,
server, server,
session: RwLock::new(None), session: RwLock::new(None),
ws_controller, ws_controller,
status_notifier, notifier,
} }
} }
pub fn init(&self) { pub fn init(&self) {
if let Ok(session) = self.get_session() { if let Ok(session) = self.get_session() {
let _ = self.status_notifier.send(UserStatus::Login { token: session.token }); self.notifier.notify_login(&session.token);
} }
} }
pub fn status_subscribe(&self) -> broadcast::Receiver<UserStatus> { self.status_notifier.subscribe() }
pub fn db_connection(&self) -> Result<DBConnection, UserError> { pub fn db_connection(&self) -> Result<DBConnection, UserError> {
let user_id = self.get_session()?.user_id; let user_id = self.get_session()?.user_id;
self.database.get_connection(&user_id) self.database.get_connection(&user_id)
@ -118,9 +102,7 @@ impl UserSession {
let _ = self.set_session(Some(session))?; let _ = self.set_session(Some(session))?;
let user_table = self.save_user(resp.into()).await?; let user_table = self.save_user(resp.into()).await?;
let user_profile: UserProfile = user_table.into(); let user_profile: UserProfile = user_table.into();
let _ = self.status_notifier.send(UserStatus::Login { self.notifier.notify_login(&user_profile.token);
token: user_profile.token.clone(),
});
Ok(user_profile) Ok(user_profile)
} }
} }
@ -136,10 +118,7 @@ impl UserSession {
let user_table = self.save_user(resp.into()).await?; let user_table = self.save_user(resp.into()).await?;
let user_profile: UserProfile = user_table.into(); let user_profile: UserProfile = user_table.into();
let (ret, mut tx) = mpsc::channel(1); let (ret, mut tx) = mpsc::channel(1);
let _ = self.status_notifier.send(UserStatus::SignUp { self.notifier.notify_sign_up(ret, &user_profile);
profile: user_profile.clone(),
ret,
});
let _ = tx.recv().await; let _ = tx.recv().await;
Ok(user_profile) Ok(user_profile)
@ -153,9 +132,7 @@ impl UserSession {
diesel::delete(dsl::user_table.filter(dsl::id.eq(&session.user_id))).execute(&*(self.db_connection()?))?; diesel::delete(dsl::user_table.filter(dsl::id.eq(&session.user_id))).execute(&*(self.db_connection()?))?;
let _ = self.database.close_user_db(&session.user_id)?; let _ = self.database.close_user_db(&session.user_id)?;
let _ = self.set_session(None)?; let _ = self.set_session(None)?;
let _ = self.status_notifier.send(UserStatus::Logout { self.notifier.notify_logout(&session.token);
token: session.token.clone(),
});
let _ = self.sign_out_on_server(&session.token).await?; let _ = self.sign_out_on_server(&session.token).await?;
Ok(()) Ok(())

View File

@ -1,3 +1,4 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit // Auto-generated, do not edit
mod model; mod model;
pub use model::*; pub use model::*;

View File

@ -1,3 +1,4 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit // Auto-generated, do not edit
mod observable; mod observable;

View File

@ -1,3 +1,4 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit // Auto-generated, do not edit
mod model; mod model;
pub use model::*; pub use model::*;

View File

@ -1,3 +1,4 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit // Auto-generated, do not edit
mod kv; mod kv;

View File

@ -53,6 +53,8 @@ fn write_rust_crate_mod_file(crate_infos: &[CrateProtoInfo]) {
{ {
Ok(ref mut file) => { Ok(ref mut file) => {
let mut mod_file_content = String::new(); let mut mod_file_content = String::new();
mod_file_content.push_str("#![cfg_attr(rustfmt, rustfmt::skip)]\n");
mod_file_content.push_str("// Auto-generated, do not edit\n"); mod_file_content.push_str("// Auto-generated, do not edit\n");
walk_dir( walk_dir(
crate_info.inner.proto_file_output_dir().as_ref(), crate_info.inner.proto_file_output_dir().as_ref(),

View File

@ -17,7 +17,8 @@ impl CrateProtoInfo {
// mod model; // mod model;
// pub use model::*; // pub use model::*;
let mod_file_path = format!("{}/mod.rs", self.inner.protobuf_crate_name()); let mod_file_path = format!("{}/mod.rs", self.inner.protobuf_crate_name());
let mut content = format!("// Auto-generated, do not edit\n"); let mut content = "#![cfg_attr(rustfmt, rustfmt::skip)]\n".to_owned();
content.push_str("// Auto-generated, do not edit\n");
content.push_str("mod model;\npub use model::*;"); content.push_str("mod model;\npub use model::*;");
match OpenOptions::new() match OpenOptions::new()
.create(true) .create(true)

View File

@ -1,4 +1,4 @@
#[rustfmt::skip] #![cfg_attr(rustfmt, rustfmt::skip)]
pub enum TypeCategory { pub enum TypeCategory {
Array, Array,
Map, Map,

View File

@ -1,4 +1,4 @@
#[rustfmt::skip] #![cfg_attr(rustfmt, rustfmt::skip)]
pub enum TypeCategory { pub enum TypeCategory {
Array, Array,
Map, Map,

View File

@ -1,3 +1,4 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit // Auto-generated, do not edit
mod model; mod model;
pub use model::*; pub use model::*;

View File

@ -1,3 +1,4 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit // Auto-generated, do not edit
mod ws; mod ws;

View File

@ -1,3 +1,4 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit // Auto-generated, do not edit
mod model; mod model;
pub use model::*; pub use model::*;

View File

@ -1,3 +1,4 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit // Auto-generated, do not edit
mod errors; mod errors;

View File

@ -1,3 +1,4 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit // Auto-generated, do not edit
mod model; mod model;
pub use model::*; pub use model::*;

View File

@ -1,3 +1,4 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit // Auto-generated, do not edit
mod view_update; mod view_update;

View File

@ -1,3 +1,4 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit // Auto-generated, do not edit
mod model; mod model;
pub use model::*; pub use model::*;

View File

@ -1,3 +1,4 @@
#![cfg_attr(rustfmt, rustfmt::skip)]
// Auto-generated, do not edit // Auto-generated, do not edit
mod errors; mod errors;